非同期メッセージングの使用

非同期メッセージングの使用

メッセージバスの非同期オプションは、「ファイア・アンド・フォーゲット」動作を提供します。メッセージを送信し、応答を待たずに処理を続行します。

非同期メッセージは、 シリアル または パラレル 宛先に送信されます。

  • シリアル 宛先の場合、メッセージバスはメッセージをキューに入れ、メッセージごとに1つのワーカースレッドを委任します。 スレッドはメッセージリスナーを順番に処理します。

  • パラレル 宛先の場合、メッセージバスはメッセージをキューに入れ、1つのメッセージリスナーにつきメッセージごとに1つのワーカースレッドを委任します。 スレッドはメッセージリスナーを同時に処理します。

別のクラス(メッセージリスナー)がリッスンしているシリアル宛先にメッセージを送信することから始めます。

メッセージを送る

新しいLiferay インスタンスを起動し、以下を実行します。

docker run -it -m 8g -p 8080:8080 liferay/portal:7.4.3.55-ga55。

http://localhost:8080でLiferayへのサインインします。 メールアドレス[email protected]_とパスワード_test_を使用してください。 プロンプトが表示されたら、パスワードを _learn_に変更します。

そして、サンプルプロジェクトでメッセージの送信から始めてください。

  1. サンプルプロジェクトをダウンロードし、解凍します。

    curl https://resources.learn.liferay.com/dxp/latest/ja/building-applications/core-frameworks/message-bus/liferay-n8k5.zip -O
    
    unzip liferay-n8k5.zip
    
  2. 宛先モジュールn8k5-able-implをビルドしてデプロイします。

    cd liferay-n8k5/n8k5-able-impl
    
    ../gradlew deploy -Ddeploy.docker.container.id=$(docker ps -lq)
    
    note

    このコマンドは、モジュールJARをDockerコンテナの/opt/liferay/osgi/modulesにコピーするのと同じです。

    Dockerコンテナコンソールに、モジュールが起動されたことが示されます。

    STARTED com.acme.n8k5.able.impl_1.0.0
    
  3. リスナーモジュールn8k5-charlie-implをビルドしてデプロイします。

    cd ../n8k5-charlie-impl
    
    ../gradlew deploy -Ddeploy.docker.container.id=$(docker ps -lq)
    

    Dockerコンテナコンソールに、モジュールが起動されたことが示されます。

    STARTED com.acme.n8k5.charlie.impl_1.0.0
    
  4. 送信者モジュールn8k5-baker-implをビルドしてデプロイします。

    cd ../n8k5-baker-impl
    
    ../gradlew deploy -Ddeploy.docker.container.id=$(docker ps -lq)
    

    Dockerコンテナコンソールで、N8K5Baker がメッセージを送信し、N8K5CharlieMessageListener がメッセージを受信し、n8k5-baker-impl モジュールが起動したことを確認してください。

    INFO  [pipe-start 2025][N8K5Baker:24] Sent message to acme/n8k5_able
    INFO  [acme/n8k5_able-4][N8K5CharlieMessageListener:21] Received message payload N8K5Baker#_activate
    STARTED com.acme.n8k5.baker.impl_1.0.0 [2025]
    

N8K5Bakerは、宛先acme/n8k5_ableにメッセージを送信したことを報告しました。 N8K5CharlieMessageListenerは、宛先acme/n8k5_ableでペイロードN8K5Baker#_activateを含むメッセージを受信しました。 これで、サンプルコードを調べることができます。

プロジェクトの概要

この例の3つのモジュールには、それぞれ1つのクラスがあります。 各クラスは、メッセージングコンポーネントの1つ(宛先、送信者、リスナー)を表します。

クラスの例:

クラス 説明
n8k5-able-impl の N8K5AbleMessagingConfigurator acme/n8k5_ableという名前のメッセージ宛先を作成し、メッセージバスに登録します。
n8k5-baker-impl の N8K5Baker acme/n8k5_able宛先にメッセージを送信します。
n8k5-charlie-impl の N8K5CharlieMessageListener acme/n8k5_able宛先に送信されたメッセージをリッスンします。

これらがどのように相互作用するかを以下に示します。

  1. N8K5Bakerが有効になり(たとえば、n8k5-baker-implモジュールが起動したとき)、acme/n8k5_able宛先にメッセージを送信します。
  2. メッセージバスがメッセージをN8K5CharlieMessageListenerに送信します。
  3. N8K5CharlieMessageListenerがメッセージを受信します。

宛先構成と送信者クラスを調べます。 リスナークラスN8K5CharlieMessageListenerは、 メッセージを聞く に示す方法と同じ方法で登録します。

宛先構成を調べる

n8k5-able-implモジュールのN8K5AbleMessagingConfiguratorクラスは、宛先を作成して構成します。 コードは次のとおりです。

@Component
public class N8K5AbleMessagingConfigurator {

	@Activate
	private void _activate(BundleContext bundleContext) {
		Destination destination = _destinationFactory.createDestination(
			DestinationConfiguration.createSerialDestinationConfiguration(
				"acme/n8k5_able"));

		_serviceRegistration = bundleContext.registerService(
			Destination.class, destination,
			MapUtil.singletonDictionary(
				"destination.name", destination.getName()));
	}

	@Deactivate
	private void _deactivate() {
		if (_serviceRegistration != null) {
			_serviceRegistration.unregister();
		}
	}

	@Reference
	private DestinationFactory _destinationFactory;

	private ServiceRegistration<Destination> _serviceRegistration;

}

どのクラスでも宛先を作成および構成できますが、 Component にはDestinationFactoryのように依存関係を挿入できます。 _destinationFactoryフィールドの @Reference アノテーションは、LiferayのOSGiフレームワークにDestinationFactoryインスタンスを挿入するようにシグナルを送信します。

_activateメソッドでは、N8K5AbleMessagingConfiguratorDestinationFactoryDestinationConfiguration を使用して、acme/n8k5_ableという名前の シリアル 宛先を作成します。 次に、OSGiフレームワークBundleContextを使用して、Destinationに対するサービスを登録します。 N8K5AbleMessagingConfiguratorが無効化されると、_deactivateメソッドはサービスの登録を解除します。

送信者を調べる

以下のN8K5Bakerクラスは、ペイロード"N8K5Baker#_activate"を含むメッセージをacme/n8k5_ableという名前の宛先に送信します。


	@Activate
	private void _activate() {
		Message message = new Message();

		message.setPayload("N8K5Baker");

		_messageBus.sendMessage("acme/n8k5_able", message);
	}

	@Reference
	private MessageBus _messageBus;

コンポーネントとして、N8K5Baker@Referenceアノテーションを使用してMessageBusインスタンスを挿入します。

コンポーネントのアクティブ化時に、N8K5Bakerは、アクティブ化メソッド_activate()を介してメッセージを作成して送信します。 Message インスタンスを構築し、それにペイロードを追加します。 ペイロードは、Messageに入力できるもののうちの1つです。

主なメッセージ入力方法は次のとおりです。

メソッド 説明
setPayload(Object) Messageのメインコンテンツを追加します。
setResponseDestinationName(String) 応答を受信するためのDestinationを参照します。
setValues(Map<String,Object>) Mapから追加データを提供します。

N8K5Bakerは、 MessageBussendMessage(String, Message)メソッドを呼び出して、acme/n8k5_ableという名前の 宛先 にメッセージを送信します。 MessageBusは新しいスレッドを開始し、acme/n8k5_able 宛先に登録されている MessageListener インスタンスにメッセージを送信します。 N8K5Bakerのスレッドが継続します。

note

Messageへの応答を受信したい場合は、Messageに応答先を設定し、 N8K5BakerなどのクラスをMessageListenerとしてその宛先に登録します。 詳細については、メッセージのリッスンを参照してください。

応答処理の追加

メッセージ受信者からの応答が必要な場合は、返信の応答先を設定します。

  1. メッセージ応答用に別の宛先を登録します。
  2. クラス(例えば、元の送信者)をMessageListenerとして応答先に登録します。
  3. メッセージで応答先を渡します。
  4. MessageListenerに応答ロジックを追加します。

ステップ1:応答の宛先を登録する

N8K5AbleDestinationConfiguratorが宛先を管理するのと同じ方法で、応答先を管理するようにN8K5Bakerを変更できます。 _activate()メソッドのシグネチャを_activate(BundleContext bundleContext)に置き換え、acme/n8k5_baker応答先のサービスを作成、構成、および登録するコードを追加します。 サービスの登録を解除する_deactivate()メソッドを追加します。 _activate(BundleContext bundleContext)メソッドと_deactivate()メソッドは次のようになります。

@Activate
private void _activate(BundleContext bundleContext) {
   Destination destination = _destinationFactory.createDestination(
      DestinationConfiguration.createSerialDestinationConfiguration(
         "acme/n8k5_baker"));

   _serviceRegistration = bundleContext.registerService(
      Destination.class, destination,
      MapUtil.singletonDictionary(
         "destination.name", destination.getName()));

   Message message = new Message();

   message.setPayload("N8K5Baker#_activate");

   _messageBus.sendMessage("acme/n8k5_able", message);
}

@Deactivate
private void _deactivate() {
   if (_serviceRegistration != null) {
      _serviceRegistration.unregister();
   }
}

@Reference
private DestinationFactory _destinationFactory;

private ServiceRegistration<Destination> _serviceRegistration;

ステップ2:N8K5Bakerを応答先のリスナーとして登録する

送信者N8K5Bakerの変更点は次のとおりです。

  1. @Componentアノテーションを更新し、N8K5BakerMessageListener.classタイプのサービスとして宣言し、プロパティ"destination.name=acme/n8k5_baker"を介してN8K5Bakerを応答先にマッピングします。
  2. MessageListener インターフェイスを実装します。
  3. receive(Message)メソッドをメッセージ処理ロジックでオーバーライドします。

送信者の変更は次のようになります。

@Component(
    property = "destination.name=acme/n8k5_baker",
    service = MessageListener.class
)
public class N8K5Baker implements MessageListener {

    @Override
    public void receive(Message message) {
        Object payload = message.getPayload();

        _log.info("Received message payload " + payload.toString());
    }

   // Existing methods and fields

   private static final Log _log = LogFactoryUtil.getLog(N8K5Baker.class);
}

ステップ3:メッセージの応答先を渡す

N8K5Bakerが送信するメッセージの応答先としてacme/n8k5_bakerを設定します。 次のようになります。

@Activate
private void _activate(BundleContext bundleContext) {
   // Destination setup

   Message message = new Message();

   message.setPayload("N8K5Baker#_activate");
   message.setResponseDestinationName("acme/n8k5_baker");

   _messageBus.sendMessage("acme/n8k5_able", message);
}

ステップ4:MessageListenerに応答ロジックを追加する

MessageListenerreceive(Message)メソッドで、応答を設定し、メッセージから応答先を取得し、MessageBusインスタンスを使用して応答メッセージを応答先に送信します。 次のようになります。

public void receive(Message message) {
   // Message processing

   message.setResponse("N8K5CharlieMessageListener");

   Message responseMessage = new Message();

   responseMessage.setDestinationName(
      message.getResponseDestinationName());
   responseMessage.setPayload("N8K5CharlieMessageListener");
   responseMessage.setResponseId(message.getResponseId());

   _messageBus.sendMessage(
      message.getResponseDestinationName(), responseMessage);
}

// Existing methods and fields

@Reference
private MessageBus _messageBus;

変更をテストする

サンプルプロジェクトを再デプロイして、変更をテストします。

cd ../../liferay-n8k5.zip
./gradlew deploy -Ddeploy.docker.container.id=$(docker ps -lq)

出力は次のようになります。

STARTED com.acme.n8k5.charlie.impl_1.0.0 [2020]
STARTED com.acme.n8k5.baker.impl_1.0.0 [2025]
INFO  [acme/n8k5_able-2][N8K5CharlieMessageListener:23] Received message payload N8K5Baker#_activate
INFO  [acme/n8k5_baker-2][N8K5Baker:30] Received message payload N8K5CharlieMessageListener

N8K5CharlieMessageListenerは、N8K5Bakerのメッセージを受信してから、応答メッセージを応答先に送信します。 N8K5Bakerは応答メッセージを受信し、メッセージペイロードを出力します。

note

クラスでメッセージを再度交換する場合は、Gogo シェルでモジュール(OSGiバンドル)を再起動できます。 バンドルを一覧表示して(lb)バンドルIDを取得し、バンドルを停止して(stop <id>)、バンドルを再起動します(start <id>)。

note

OSGiコンポーネントではないクラスでは、 MessageBusUtil と、DestinationDestinationConfigurationMessage、およびMessageListenerインスタンスを使用してメッセージを送信できます。

示されているようにDestinationサービスを登録できますが、BundleContextを別の方法で取得する必要があります(たとえば、Bundle bundle = FrameworkUtil.getBundle(YourClass.class); BundleContext bundleContext = bundle.getBundleContext()を呼び出しを行うことによって)。

  2つのクラス間で非同期的にメッセージを交換しました。

次のステップ

非同期メッセージングに慣れてきたので、最適なパフォーマンスになるように調整できます。 メッセージングパフォーマンスのチューニング でその方法を学びましょう。

デフォルトダイレクト モードを使用した同期メッセージングを検討したい場合は、 以前のバージョンでダイレクト同期メッセージングを使用以前のバージョンでデフォルト同期メッセージングを使用 に詳細が記載されています。

追加情報