旧バージョンのデフォルトの同期メッセージングを使用する
同期メッセージは削除され、Liferay DXP 7.4 U49/Liferay Portal 7.4 GA49 以降ではサポートされなくなりました。
デフォルトの同期メッセージングでは、メッセージバススレッドが登録されたメッセージリスナーにメッセージをディスパッチしている間、送信者はブロックします。 応答メッセージが受信されるか、送信者スレッドがタイムアウトすると、送信者はブロックを解除します。
送信者は、受信した最初の応答メッセージのブロックを解除します。
サンプルプロジェクトを使用して、デフォルトの同期メッセージを送信します。 次に、例を変更してメッセージをタイムアウトにします。
デフォルトの同期メッセージを送信する
サンプルプロジェクトでは、デフォルトモードでSynchronousMessageSender
を使用してメッセージを送信し、応答を待ちます。
新しいLiferay インスタンスを起動し、以下を実行します。
docker run -it -m 8g -p 8080:8080 liferay/portal:7.4.3.55-ga55。
http://localhost:8080でLiferayへのサインインします。 メールアドレス[email protected]_とパスワード_test_を使用してください。 プロンプトが表示されたら、パスワードを _learn_に変更します。
その後、以下の手順を実行してください。
サンプルをダウンロードし、解凍してください。
curl https://resources.learn.liferay.com/dxp/latest/ja/building-applications/core-frameworks/message-bus/liferay-m4q7.zip -O
unzip liferay-m4q7.zip
サンプルのプロジェクトモジュールをビルドしてデプロイします。
cd liferay-m4q7
./gradlew deploy -Ddeploy.docker.container.id=$(docker ps -lq)
noteこのコマンドは、モジュールJARをDockerコンテナの
/opt/liferay/osgi/modules
にコピーするのと同じです。Dockerコンテナコンソールにモジュールの起動が表示されます。
STARTED com.acme.m4q7.able.impl_1.0.0 STARTED com.acme.m4q7.baker.impl_1.0.0 STARTED com.acme.m4q7.charlie.impl_1.0.0
ブラウザで
http://localhost:8080
にあるLiferayインスタンスにアクセスし、認証情報を使用してサインインします。Gogoシェルを開きます。
Gogoシェルコマンドフィールドに、
m4q7:sendMessage
と入力し、その後にメッセージを入力します。 例:m4q7:sendMessage foo
出力が次のようになっていることを確認します。
INFO [acme/m4q7_able-2][M4Q7CharlieMessageListener:23] Received message payload foo INFO [acme/m4q7_baker-2][M4Q7BakerMessageListener:21] Received message payload M4Q7CharlieMessageListener INFO [pipe-m4q7:sendMessage foo][M4Q7BakerOSGiCommands:28] Response: M4Q7CharlieMessageListener
acme/m4q7_able
宛先で、M4Q7CharlieMessageListener
がGogoシェルメッセージを受信しました。 acme/m4q7_baker
宛先で、M4Q7BakerMessageListener
がM4Q7CharlieMessageListener
から応答メッセージを受信しました。 最後に、M4Q7BakerOSGiCommands
のsendMessage
メソッドが、メッセージ送信者から返された応答オブジェクトをログに記録しました。
プロジェクトの概要
3つのサンプルモジュールクラスは、宛先を管理し、メッセージをリッスンし、メッセージを送信します。
m4q7-able-impl
モジュール: M4Q7AbleMessagingConfigurator
は、acme/m4q7_able
という名前のメッセージ宛先を作成し、それをメッセージバスに登録します。
m4q7-baker-impl
モジュール:
M4Q7BakerOSGiCommands
は、acme/m4q7_able
宛先にメッセージを送信し、応答をログに記録します。M4Q7BakerMessagingConfigurator
は、acme/m4q7_baker
という名前のメッセージ宛先を作成し、それをメッセージバスに登録します。M4Q7BakerMessageListener
は、acme/m4q7_baker
宛先に送信されたメッセージをリッスンし、メッセージペイロードをログに記録します。
m4q7-charlie-impl
モジュール: M4Q7CharlieMessageListener
は、acme/m4q7_able
宛先に送信されたメッセージをリッスンし、メッセージペイロードをログに記録し、元のメッセージの応答先に応答メッセージを送信します。
イベントフローは次のとおりです。
- `m4q7:sendMessage</0Gogoシェルコマンドを呼び出して、メッセージを渡します。
- M4Q7BakerOSGiCommands
の
sendMessage(String)メソッドは、Gogoシェルコマンドでトリガーし、メッセージ内のコマンド引数を
acme/m4q7_able`宛先に送信します。 - メッセージバススレッドは、メッセージを
M4Q7CharlieMessageListener
に配信します。 M4Q7CharlieMessageListener
は、メッセージペイロードをログに記録し、応答メッセージ内の独自のクラス名を元のメッセージの応答先acme/m4q7_baker
に送信します。M4Q7BakerMessageListener
は応答メッセージを受信し、そのペイロードをログに記録します。- 処理は
M4Q7BakerOSGiCommands
に戻り、元のメッセージへの応答をログに記録します。
次に、宛先コンフィギュレーターから順に、各クラスを調べます。
宛先コンフィグレーターを調べる
m4q7-able-impl
モジュールとm4q7-baker-impl
モジュールには、それぞれ宛先コンフィギュレータークラスM4Q7AbleMessagingConfigurator
とM4Q7BakerMessagingConfigurator
があります。 それぞれが宛先を作成して構成します。
M4Q7AbleMessagingConfigurator
クラスは、acme/m4q7_able
宛先を構成します。
@Component
public class M4Q7AbleMessagingConfigurator {
@Activate
private void _activate(BundleContext bundleContext) {
DestinationConfiguration destinationConfiguration =
DestinationConfiguration.createSerialDestinationConfiguration(
"acme/m4q7_able");
Destination destination = _destinationFactory.createDestination(
destinationConfiguration);
_serviceRegistration = bundleContext.registerService(
Destination.class, destination,
MapUtil.singletonDictionary(
"destination.name", destination.getName()));
}
@Deactivate
private void _deactivate() {
if (_serviceRegistration != null) {
_serviceRegistration.unregister();
}
}
@Reference
private DestinationFactory _destinationFactory;
private ServiceRegistration<Destination> _serviceRegistration;
}
M4Q7BakerMessagingConfigurator
クラスは、acme/m4q7_baker
宛先を構成します。
@Component
public class M4Q7BakerMessagingConfigurator {
@Activate
private void _activate(BundleContext bundleContext) {
DestinationConfiguration destinationConfiguration =
DestinationConfiguration.createSerialDestinationConfiguration(
"acme/m4q7_baker");
Destination destination = _destinationFactory.createDestination(
destinationConfiguration);
_serviceRegistration = bundleContext.registerService(
Destination.class, destination,
MapUtil.singletonDictionary(
"destination.name", destination.getName()));
}
@Deactivate
private void _deactivate() {
if (_serviceRegistration != null) {
_serviceRegistration.unregister();
}
}
@Reference
private DestinationFactory _destinationFactory;
private ServiceRegistration<Destination> _serviceRegistration;
}
どちらのコンフィギュレータも Component
クラスです。 これらは @Reference
アノテーションを使用して、DestinationFactory
インスタンスを挿入します。
_activate(BundleContext)
メソッドは、 DestinationFactory
と DestinationConfiguration
を使用して シリアル 宛先を作成します。 最後に、_activate(BundleContext)
メソッドは、BundleContext
を使用して Destination
をOSGiサービスに登録します。
デフォルトの同期メッセージングでは、シリアルまたはパラレルの宛先のみを使用してください。 それらは、DestinationConfiguration
のcreateSerialDestinationConfiguration(String)
および createParallelDestinationConfiguration(String)
メソッドを呼び出すことで作成できます。
メッセージ送信者のタイムアウトが無効になるため、デフォルトの同期メッセージングでは同期宛先を使用しないでください。
コンフィギュレーターが無効になると、それらの_deactivate()
メソッドは宛先サービスの登録を解除します。
リスナーを調べる
m4q7-charlie-impl
モジュールのM4Q7CharlieMessageListener
クラスは、acme/m4q7_able
Destination
に送信されたメッセージをリッスンします。 メッセージを聞く に示されている方法と同じ方法で登録されます。
M4Q7CharlieMessageListener
クラス:
@Component(
property = "destination.name=acme/m4q7_able",
service = MessageListener.class
)
public class M4Q7CharlieMessageListener implements MessageListener {
@Override
public void receive(Message message) {
if (_log.isInfoEnabled()) {
_log.info("Received message payload " + message.getPayload());
}
_messageBus.sendMessage(
message.getResponseDestinationName(),
new Message() {
{
setPayload("M4Q7CharlieMessageListener");
setResponseId(message.getResponseId());
}
});
}
private static final Log _log = LogFactoryUtil.getLog(
M4Q7CharlieMessageListener.class);
@Reference
private MessageBus _messageBus;
}
M4Q7CharlieMessageListener
がメッセージを受信すると、そのreceive(Message)
メソッドはメッセージペイロードをログに記録し、元のメッセージの応答先に応答メッセージを送信します。 このメソッドは、応答メッセージのペイロードをリスナークラス名に設定し、応答メッセージIDを元のメッセージの応答IDに設定します。
デフォルトの同期メッセージングでは、応答メッセージは元のメッセージの応答IDを使用する必要があり、かつ、応答先に送信される必要があります。
m4q7-baker-impl
モジュールのM4Q7BakerMessageListener
クラスは、M4Q7BakerOSGiCommands
のメッセージの応答先であるacme/m4q7_baker
に送信されたメッセージをリッスンします。
M4Q7BakerMessageListener
クラス:
@Component(
property = "destination.name=acme/m4q7_baker",
service = MessageListener.class
)
public class M4Q7BakerMessageListener implements MessageListener {
@Override
public void receive(Message message) {
if (_log.isInfoEnabled()) {
_log.info("Received message payload " + message.getPayload());
}
}
private static final Log _log = LogFactoryUtil.getLog(
M4Q7BakerMessageListener.class);
}
M4Q7BakerMessageListener
がメッセージを受信すると、そのreceive(Message)
メソッドはメッセージペイロードをログに記録します。
送信者を調べる
m4q7-baker-impl
モジュールのM4Q7BakerOSGiCommands
クラスは、メッセージ内のコマンド引数を"acme/m4q7_able"
宛先に送信するトリガーとなるOSGiコマンドを提供します。
@Component(
property = {"osgi.command.function=sendMessage", "osgi.command.scope=m4q7"},
service = M4Q7BakerOSGiCommands.class
)
public class M4Q7BakerOSGiCommands {
public void sendMessage(String payload) throws MessageBusException {
Message message = new Message();
message.setPayload(payload);
message.setResponseDestinationName("acme/m4q7_baker");
Object response = _synchronousMessageSender.send(
"acme/m4q7_able", message, 10000);
if (_log.isInfoEnabled()) {
_log.info("Response: " + response);
}
}
private static final Log _log = LogFactoryUtil.getLog(
M4Q7BakerOSGiCommands.class);
@Reference(target = "(mode=DEFAULT)")
private SynchronousMessageSender _synchronousMessageSender;
}
M4Q7BakerOSGiCommands
は、独自のクラスタイプのサービスComponent
です。 これは、@Reference
アノテーションを使用して、 デフォルト モード(アノテーションのtarget = "(mode=DEFAULT)"
属性で指定)に設定されたSynchronousMessageSender
を挿入します。
デフォルトモードでは、SynchronousMessageSender
のsend
メソッドは、応答メッセージが受信されるまで、または送信者がタイムアウトするまで、呼び出し元のクラスをブロックします。
M4Q7BakerOSGiCommands
の@Component
プロパティは、m4q7
スコープでsendMessage
と呼ばれるGogoシェルコマンド関数を定義します。 このコマンドは入力String
を受け取り、M4Q7BakerOSGiCommands
のsendMessage(String)
メソッドにマッピングします。
sendMessage(String)
メソッドは、GogoシェルコマンドのString
をペイロードとして、"acme/m4q7_baker"
を応答先として Message
を作成します。
sendMessage(String)
メソッドは、SynchronousMessageSender
のsend(String, Message, long)
メソッドを呼び出してメッセージを送信し、"acme/m4q7_able"
宛先名、メッセージインスタンス、および10000
ミリ秒のタイムアウトを渡します。 デフォルトモードでは、SynchronousMessageSender
はメッセージバススレッドを使用してメッセージをメッセージリスナーに配信します。 元のメッセージの応答IDを持つメッセージが"acme/m4q7_baker"
応答先で受信されるまで、実行がM4Q7BakerOSGiCommands
クラスでブロックされます。 レスポンスを受信すると、 M4Q7BakerOSGiCommands
sendMessage(String)
メソッドで実行が継続され、メッセージ応答をログに記録します。 一致する応答メッセージを受信する前にタイムアウトが期限切れになると、SynchronousMessageSender
のsend(String, Message, long)
メソッドはMessageBusException
をスローします。
デフォルトの同期メッセージングでは、応答メッセージは元のメッセージの応答IDを使用する必要があり、かつ、応答先に送信される必要があります。
メッセージリスナーが応答メッセージを返すのを確認したので、応答のタイムアウトをテストできます。
応答タイムアウトのデモを実行する
メッセージ応答ロジックをオフにしてタイムアウトを強制する方法は次のとおりです。
M4Q7CharlieMessageListener
のreceive(Message)
メソッドで、_messageBus.sendMessage(...)
呼び出しをコメントアウトします。@Override public void receive(Message message) { if (_log.isInfoEnabled()) { Object payload = message.getPayload(); _log.info("Received message payload " + payload.toString()); } // _messageBus.sendMessage( // message.getResponseDestinationName(), // new Message() { // { // setPayload("M4Q7CharlieMessageListener"); // setResponseId(message.getResponseId()); // } // }); }
サンプルプロジェクトを再デプロイします。
./gradlew deploy -Ddeploy.docker.container.id=$(docker ps -lq)
Gogoシェルコマンドフィールドに、
m4q7:sendMessage
と入力し、その後にメッセージを入力します。 例:m4q7:sendMessage foo
Gogoシェルページが次のようになっていることを確認します。
Dockerコンソールのメッセージが次のようになっていることを確認します。
INFO [acme/m4q7_able-2][M4Q7CharlieMessageListener:23] Received message payload foo
M4Q7CharlieMessageListener
はメッセージを受信しましたが、応答しませんでした。 SynchronousMessageSender
は、Gogoシェルページに出力されたMessageBusException
をスローしました。
タイムアウトと同期してメッセージを送信しました。
次のステップ
direct モードを使用した同期メッセージングを検討したい場合は、 旧バージョンでダイレクトシンクロナスメッセージを使用する場合 を参照してください。
メッセージを送信した直後に処理を続行する場合は、 非同期メッセージングの使用 を参照してください。