非同期メッセージングの使用
メッセージバスの非同期オプションは、「ファイア・アンド・フォーゲット」動作を提供します。メッセージを送信し、応答を待たずに処理を続行します。
非同期メッセージは、 シリアル または パラレル 宛先に送信されます。
シリアル 宛先の場合、メッセージバスはメッセージをキューに入れ、メッセージごとに1つのワーカースレッドを委任します。 スレッドはメッセージリスナーを順番に処理します。
パラレル 宛先の場合、メッセージバスはメッセージをキューに入れ、1つのメッセージリスナーにつきメッセージごとに1つのワーカースレッドを委任します。 スレッドはメッセージリスナーを同時に処理します。
別のクラス(メッセージリスナー)がリッスンしているシリアル宛先にメッセージを送信することから始めます。
メッセージを送る
新しいLiferay インスタンスを起動し、以下を実行します。
docker run -it -m 8g -p 8080:8080 liferay/portal:7.4.3.55-ga55。
http://localhost:8080でLiferayへのサインインします。 メールアドレス[email protected]_とパスワード_test_を使用してください。 プロンプトが表示されたら、パスワードを _learn_に変更します。
そして、サンプルプロジェクトでメッセージの送信から始めてください。
サンプルプロジェクトをダウンロードし、解凍します。
curl https://resources.learn.liferay.com/dxp/latest/ja/building-applications/core-frameworks/message-bus/liferay-n8k5.zip -O
unzip liferay-n8k5.zip
宛先モジュール
n8k5-able-impl
をビルドしてデプロイします。cd liferay-n8k5/n8k5-able-impl
../gradlew deploy -Ddeploy.docker.container.id=$(docker ps -lq)
noteこのコマンドは、モジュールJARをDockerコンテナの
/opt/liferay/osgi/modules
にコピーするのと同じです。Dockerコンテナコンソールに、モジュールが起動されたことが示されます。
STARTED com.acme.n8k5.able.impl_1.0.0
リスナーモジュール
n8k5-charlie-impl
をビルドしてデプロイします。cd ../n8k5-charlie-impl
../gradlew deploy -Ddeploy.docker.container.id=$(docker ps -lq)
Dockerコンテナコンソールに、モジュールが起動されたことが示されます。
STARTED com.acme.n8k5.charlie.impl_1.0.0
送信者モジュール
n8k5-baker-impl
をビルドしてデプロイします。cd ../n8k5-baker-impl
../gradlew deploy -Ddeploy.docker.container.id=$(docker ps -lq)
Dockerコンテナコンソールで、
N8K5Baker
がメッセージを送信し、N8K5CharlieMessageListener
がメッセージを受信し、n8k5-baker-impl
モジュールが起動したことを確認してください。INFO [pipe-start 2025][N8K5Baker:24] Sent message to acme/n8k5_able INFO [acme/n8k5_able-4][N8K5CharlieMessageListener:21] Received message payload N8K5Baker#_activate STARTED com.acme.n8k5.baker.impl_1.0.0 [2025]
N8K5Baker
は、宛先acme/n8k5_able
にメッセージを送信したことを報告しました。 N8K5CharlieMessageListener
は、宛先acme/n8k5_able
でペイロードN8K5Baker#_activate
を含むメッセージを受信しました。 これで、サンプルコードを調べることができます。
プロジェクトの概要
この例の3つのモジュールには、それぞれ1つのクラスがあります。 各クラスは、メッセージングコンポーネントの1つ(宛先、送信者、リスナー)を表します。
クラスの例:
クラス | 説明 |
---|---|
n8k5-able-impl の N8K5AbleMessagingConfigurator |
acme/n8k5_able という名前のメッセージ宛先を作成し、メッセージバスに登録します。 |
n8k5-baker-impl の N8K5Baker |
acme/n8k5_able 宛先にメッセージを送信します。 |
n8k5-charlie-impl の N8K5CharlieMessageListener |
acme/n8k5_able 宛先に送信されたメッセージをリッスンします。 |
これらがどのように相互作用するかを以下に示します。
N8K5Baker
が有効になり(たとえば、n8k5-baker-impl
モジュールが起動したとき)、acme/n8k5_able
宛先にメッセージを送信します。- メッセージバスがメッセージを
N8K5CharlieMessageListener
に送信します。 N8K5CharlieMessageListener
がメッセージを受信します。
宛先構成と送信者クラスを調べます。 リスナークラスN8K5CharlieMessageListener
は、 メッセージを聞く に示す方法と同じ方法で登録します。
宛先構成を調べる
n8k5-able-impl
モジュールのN8K5AbleMessagingConfigurator
クラスは、宛先を作成して構成します。 コードは次のとおりです。
@Component
public class N8K5AbleMessagingConfigurator {
@Activate
private void _activate(BundleContext bundleContext) {
Destination destination = _destinationFactory.createDestination(
DestinationConfiguration.createSerialDestinationConfiguration(
"acme/n8k5_able"));
_serviceRegistration = bundleContext.registerService(
Destination.class, destination,
MapUtil.singletonDictionary(
"destination.name", destination.getName()));
}
@Deactivate
private void _deactivate() {
if (_serviceRegistration != null) {
_serviceRegistration.unregister();
}
}
@Reference
private DestinationFactory _destinationFactory;
private ServiceRegistration<Destination> _serviceRegistration;
}
どのクラスでも宛先を作成および構成できますが、 Component
にはDestinationFactory
のように依存関係を挿入できます。 _destinationFactory
フィールドの @Reference
アノテーションは、LiferayのOSGiフレームワークにDestinationFactory
インスタンスを挿入するようにシグナルを送信します。
_activate
メソッドでは、N8K5AbleMessagingConfigurator
は DestinationFactory
と DestinationConfiguration
を使用して、acme/n8k5_able
という名前の シリアル 宛先を作成します。 次に、OSGiフレームワークBundleContext
を使用して、Destination
に対するサービスを登録します。 N8K5AbleMessagingConfigurator
が無効化されると、_deactivate
メソッドはサービスの登録を解除します。
送信者を調べる
以下のN8K5Baker
クラスは、ペイロード"N8K5Baker#_activate"
を含むメッセージをacme/n8k5_able
という名前の宛先に送信します。
@Activate
private void _activate() {
Message message = new Message();
message.setPayload("N8K5Baker");
_messageBus.sendMessage("acme/n8k5_able", message);
}
@Reference
private MessageBus _messageBus;
コンポーネントとして、N8K5Baker
は@Reference
アノテーションを使用してMessageBus
インスタンスを挿入します。
コンポーネントのアクティブ化時に、N8K5Baker
は、アクティブ化メソッド_activate()
を介してメッセージを作成して送信します。 Message
インスタンスを構築し、それにペイロードを追加します。 ペイロードは、Message
に入力できるもののうちの1つです。
主なメッセージ入力方法は次のとおりです。
メソッド | 説明 |
---|---|
setPayload(Object) |
Message のメインコンテンツを追加します。 |
setResponseDestinationName(String) |
応答を受信するためのDestination を参照します。 |
setValues(Map<String,Object>) |
Map から追加データを提供します。 |
N8K5Baker
は、 MessageBus
のsendMessage(String, Message)
メソッドを呼び出して、acme/n8k5_able
という名前の 宛先
にメッセージを送信します。 MessageBus
は新しいスレッドを開始し、acme/n8k5_able
宛先
に登録されている MessageListener
インスタンスにメッセージ
を送信します。 N8K5Baker
のスレッドが継続します。
Message
への応答を受信したい場合は、Message
に応答先を設定し、 N8K5Baker
などのクラスをMessageListener
としてその宛先に登録します。 詳細については、メッセージのリッスンを参照してください。
応答処理の追加
メッセージ受信者からの応答が必要な場合は、返信の応答先を設定します。
- メッセージ応答用に別の宛先を登録します。
- クラス(例えば、元の送信者)を
MessageListener
として応答先に登録します。 - メッセージで応答先を渡します。
MessageListener
に応答ロジックを追加します。
ステップ1:応答の宛先を登録する
N8K5AbleDestinationConfigurator
が宛先を管理するのと同じ方法で、応答先を管理するようにN8K5Baker
を変更できます。 _activate()
メソッドのシグネチャを_activate(BundleContext bundleContext)
に置き換え、acme/n8k5_baker
応答先のサービスを作成、構成、および登録するコードを追加します。 サービスの登録を解除する_deactivate()
メソッドを追加します。 _activate(BundleContext bundleContext)
メソッドと_deactivate()
メソッドは次のようになります。
@Activate
private void _activate(BundleContext bundleContext) {
Destination destination = _destinationFactory.createDestination(
DestinationConfiguration.createSerialDestinationConfiguration(
"acme/n8k5_baker"));
_serviceRegistration = bundleContext.registerService(
Destination.class, destination,
MapUtil.singletonDictionary(
"destination.name", destination.getName()));
Message message = new Message();
message.setPayload("N8K5Baker#_activate");
_messageBus.sendMessage("acme/n8k5_able", message);
}
@Deactivate
private void _deactivate() {
if (_serviceRegistration != null) {
_serviceRegistration.unregister();
}
}
@Reference
private DestinationFactory _destinationFactory;
private ServiceRegistration<Destination> _serviceRegistration;
ステップ2:N8K5Baker
を応答先のリスナーとして登録する
送信者N8K5Baker
の変更点は次のとおりです。
@Component
アノテーションを更新し、N8K5Baker
をMessageListener.class
タイプのサービスとして宣言し、プロパティ"destination.name=acme/n8k5_baker"
を介してN8K5Baker
を応答先にマッピングします。MessageListener
インターフェイスを実装します。receive(Message)
メソッドをメッセージ処理ロジックでオーバーライドします。
送信者の変更は次のようになります。
@Component(
property = "destination.name=acme/n8k5_baker",
service = MessageListener.class
)
public class N8K5Baker implements MessageListener {
@Override
public void receive(Message message) {
Object payload = message.getPayload();
_log.info("Received message payload " + payload.toString());
}
// Existing methods and fields
private static final Log _log = LogFactoryUtil.getLog(N8K5Baker.class);
}
ステップ3:メッセージの応答先を渡す
N8K5Baker
が送信するメッセージの応答先としてacme/n8k5_baker
を設定します。 次のようになります。
@Activate
private void _activate(BundleContext bundleContext) {
// Destination setup
Message message = new Message();
message.setPayload("N8K5Baker#_activate");
message.setResponseDestinationName("acme/n8k5_baker");
_messageBus.sendMessage("acme/n8k5_able", message);
}
ステップ4:MessageListener
に応答ロジックを追加する
MessageListener
のreceive(Message)
メソッドで、応答を設定し、メッセージから応答先を取得し、MessageBus
インスタンスを使用して応答メッセージを応答先に送信します。 次のようになります。
public void receive(Message message) {
// Message processing
message.setResponse("N8K5CharlieMessageListener");
Message responseMessage = new Message();
responseMessage.setDestinationName(
message.getResponseDestinationName());
responseMessage.setPayload("N8K5CharlieMessageListener");
responseMessage.setResponseId(message.getResponseId());
_messageBus.sendMessage(
message.getResponseDestinationName(), responseMessage);
}
// Existing methods and fields
@Reference
private MessageBus _messageBus;
変更をテストする
サンプルプロジェクトを再デプロイして、変更をテストします。
cd ../../liferay-n8k5.zip
./gradlew deploy -Ddeploy.docker.container.id=$(docker ps -lq)
出力は次のようになります。
STARTED com.acme.n8k5.charlie.impl_1.0.0 [2020]
STARTED com.acme.n8k5.baker.impl_1.0.0 [2025]
INFO [acme/n8k5_able-2][N8K5CharlieMessageListener:23] Received message payload N8K5Baker#_activate
INFO [acme/n8k5_baker-2][N8K5Baker:30] Received message payload N8K5CharlieMessageListener
N8K5CharlieMessageListener
は、N8K5Baker
のメッセージを受信してから、応答メッセージを応答先に送信します。 N8K5Baker
は応答メッセージを受信し、メッセージペイロードを出力します。
クラスでメッセージを再度交換する場合は、Gogo シェルでモジュール(OSGiバンドル)を再起動できます。 バンドルを一覧表示して(lb
)バンドルIDを取得し、バンドルを停止して(stop <id>
)、バンドルを再起動します(start <id>
)。
OSGiコンポーネントではないクラスでは、 MessageBusUtil と、Destination
、DestinationConfiguration
、Message
、およびMessageListener
インスタンスを使用してメッセージを送信できます。
示されているようにDestination
サービスを登録できますが、BundleContext
を別の方法で取得する必要があります(たとえば、Bundle bundle = FrameworkUtil.getBundle(YourClass.class); BundleContext bundleContext = bundle.getBundleContext()
を呼び出しを行うことによって)。
2つのクラス間で非同期的にメッセージを交換しました。
次のステップ
非同期メッセージングに慣れてきたので、最適なパフォーマンスになるように調整できます。 メッセージングパフォーマンスのチューニング でその方法を学びましょう。
デフォルト と ダイレクト モードを使用した同期メッセージングを検討したい場合は、 以前のバージョンでダイレクト同期メッセージングを使用 と 以前のバージョンでデフォルト同期メッセージングを使用 に詳細が記載されています。