Documentation

デフォルトの同期メッセージングの使用

デフォルトの同期メッセージングでは、メッセージバススレッドが登録されたメッセージリスナーにメッセージをディスパッチしている間、送信者はブロックします。 応答メッセージが受信されるか、送信者スレッドがタイムアウトすると、送信者はブロックを解除します。

注釈

送信者は、受信した最初の応答メッセージのブロックを解除します。

サンプルプロジェクトを使用して、デフォルトの同期メッセージを送信します。 次に、例を変更してメッセージをタイムアウトにします。

デフォルトの同期メッセージを送信する

サンプルプロジェクトでは、デフォルトモードでSynchronousMessageSenderを使用してメッセージを送信し、応答を待ちます。

新しいLiferayインスタンスを実行します。

docker run -it -m 8g -p 8080:8080 liferay/portal:7.4.3.22-ga22

http://localhost:8080でLiferayにサインインします。メールアドレスtest@liferay.comとパスワードtestを使用してください。プロンプトが表示されたら、パスワードをlearnに変更します。

次に、以下の手順を実行します。

  1. サンプルをダウンロードして解凍します。

    curl https://learn.liferay.com/dxp/latest/ja/building-applications/core-frameworks/message-bus/liferay-m4q7.zip -O
    
    unzip liferay-m4q7.zip
    
  2. サンプルのプロジェクトモジュールをビルドしてデプロイします。

    cd liferay-m4q7
    
    ./gradlew deploy -Ddeploy.docker.container.id=$(docker ps -lq)
    

    注釈

    このコマンドは、モジュールJARをDockerコンテナの/opt/liferay/osgi/modulesにコピーするのと同じです。

  3. Dockerコンテナコンソールにモジュールの起動が表示されます。

    STARTED com.acme.m4q7.able.impl_1.0.0
    STARTED com.acme.m4q7.baker.impl_1.0.0
    STARTED com.acme.m4q7.charlie.impl_1.0.0
    
  4. ブラウザでhttp://localhost:8080にあるLiferayインスタンスにアクセスし、認証情報を使用してサインインします。

  5. Gogoシェルを開きます。

  6. Gogoシェルコマンドフィールドに、m4q7:sendMessageと入力し、その後にメッセージを入力します。 例:

    m4q7:sendMessage foo
    
  7. 出力が次のようになっていることを確認します。

    INFO  [acme/m4q7_able-2][M4Q7CharlieMessageListener:23] Received message payload foo
    INFO  [acme/m4q7_baker-2][M4Q7BakerMessageListener:21] Received message payload M4Q7CharlieMessageListener
    INFO  [pipe-m4q7:sendMessage foo][M4Q7BakerOSGiCommands:28] Response: M4Q7CharlieMessageListener
    

acme/m4q7_able宛先で、M4Q7CharlieMessageListenerがGogoシェルメッセージを受信しました。 acme/m4q7_baker宛先で、M4Q7BakerMessageListenerM4Q7CharlieMessageListenerから応答メッセージを受信しました。 最後に、M4Q7BakerOSGiCommandssendMessageメソッドが、メッセージ送信者から返された応答オブジェクトをログに記録しました。

プロジェクトの概要

3つのサンプルモジュールクラスは、宛先を管理し、メッセージをリッスンし、メッセージを送信します。

m4q7-able-implモジュール: M4Q7AbleMessagingConfiguratorは、acme/m4q7_ableという名前のメッセージ宛先を作成し、それをメッセージバスに登録します。

m4q7-baker-implモジュール:

  • M4Q7BakerOSGiCommandsは、acme/m4q7_able宛先にメッセージを送信し、応答をログに記録します。

  • M4Q7BakerMessagingConfiguratorは、acme/m4q7_bakerという名前のメッセージ宛先を作成し、それをメッセージバスに登録します。

  • M4Q7BakerMessageListenerは、acme/m4q7_baker宛先に送信されたメッセージをリッスンし、メッセージペイロードをログに記録します。

m4q7-charlie-implモジュール: M4Q7CharlieMessageListenerは、acme/m4q7_able宛先に送信されたメッセージをリッスンし、メッセージペイロードをログに記録し、元のメッセージの応答先に応答メッセージを送信します。

イベントフローは次のとおりです。

  1. m4q7:sendMessage Gogoシェルコマンドを呼び出して、メッセージを渡します。

  2. M4Q7BakerOSGiCommandssendMessage(String)メソッドは、Gogoシェルコマンドでトリガーし、メッセージ内のコマンド引数をacme/m4q7_able宛先に送信します。

  3. メッセージバススレッドは、メッセージをM4Q7CharlieMessageListenerに配信します。

  4. M4Q7CharlieMessageListenerは、メッセージペイロードをログに記録し、応答メッセージ内の独自のクラス名を元のメッセージの応答先acme/m4q7_bakerに送信します。

  5. M4Q7BakerMessageListenerは応答メッセージを受信し、そのペイロードをログに記録します。

  6. 処理はM4Q7BakerOSGiCommandsに戻り、元のメッセージへの応答をログに記録します。

次に、宛先コンフィギュレーターから順に、各クラスを調べます。

宛先コンフィグレーターを調べる

m4q7-able-implモジュールとm4q7-baker-implモジュールには、それぞれ宛先コンフィギュレータークラスM4Q7AbleMessagingConfiguratorM4Q7BakerMessagingConfiguratorがあります。 それぞれが宛先を作成して構成します。

M4Q7AbleMessagingConfiguratorクラスは、acme/m4q7_able宛先を構成します。

@Component
public class M4Q7AbleMessagingConfigurator {

	@Activate
	private void _activate(BundleContext bundleContext) {
		DestinationConfiguration destinationConfiguration =
			DestinationConfiguration.createSerialDestinationConfiguration(
				"acme/m4q7_able");

		Destination destination = _destinationFactory.createDestination(
			destinationConfiguration);

		_serviceRegistration = bundleContext.registerService(
			Destination.class, destination,
			MapUtil.singletonDictionary(
				"destination.name", destination.getName()));
	}

	@Deactivate
	private void _deactivate() {
		if (_serviceRegistration != null) {
			_serviceRegistration.unregister();
		}
	}

	@Reference
	private DestinationFactory _destinationFactory;

	private ServiceRegistration<Destination> _serviceRegistration;

}

M4Q7BakerMessagingConfiguratorクラスは、acme/m4q7_baker宛先を構成します。

@Component
public class M4Q7BakerMessagingConfigurator {

	@Activate
	private void _activate(BundleContext bundleContext) {
		DestinationConfiguration destinationConfiguration =
			DestinationConfiguration.createSerialDestinationConfiguration(
				"acme/m4q7_baker");

		Destination destination = _destinationFactory.createDestination(
			destinationConfiguration);

		_serviceRegistration = bundleContext.registerService(
			Destination.class, destination,
			MapUtil.singletonDictionary(
				"destination.name", destination.getName()));
	}

	@Deactivate
	private void _deactivate() {
		if (_serviceRegistration != null) {
			_serviceRegistration.unregister();
		}
	}

	@Reference
	private DestinationFactory _destinationFactory;

	private ServiceRegistration<Destination> _serviceRegistration;

}

どちらのコンフィギュレータも Component クラスです。 これらは @Reference アノテーションを使用して、DestinationFactoryインスタンスを挿入します。

_activate(BundleContext)メソッドは、 DestinationFactoryDestinationConfiguration を使用して シリアル 宛先を作成します。 最後に、_activate(BundleContext)メソッドは、BundleContextを使用して Destination をOSGiサービスに登録します。

警告

デフォルトの同期メッセージングでは、シリアルまたはパラレルの宛先のみを使用してください。 それらは、DestinationConfigurationcreateSerialDestinationConfiguration(String)および createParallelDestinationConfiguration(String)メソッドを呼び出すことで作成できます。

メッセージ送信者のタイムアウトが無効になるため、デフォルトの同期メッセージングでは同期宛先を使用しないでください。

コンフィギュレーターが無効になると、それらの_deactivate()メソッドは宛先サービスの登録を解除します。

リスナーを調べる

m4q7-charlie-implモジュールのM4Q7CharlieMessageListenerクラスは、acme/m4q7_able Destination に送信されたメッセージをリッスンします。 メッセージを聞く に示されている方法と同じ方法で登録されます。

M4Q7CharlieMessageListenerクラス:

@Component(
	property = "destination.name=acme/m4q7_able",
	service = MessageListener.class
)
public class M4Q7CharlieMessageListener implements MessageListener {

	@Override
	public void receive(Message message) {
		if (_log.isInfoEnabled()) {
			_log.info("Received message payload " + message.getPayload());
		}

		_messageBus.sendMessage(
			message.getResponseDestinationName(),
			new Message() {
				{
					setPayload("M4Q7CharlieMessageListener");
					setResponseId(message.getResponseId());
				}
			});
	}

	private static final Log _log = LogFactoryUtil.getLog(
		M4Q7CharlieMessageListener.class);

	@Reference
	private MessageBus _messageBus;

}

M4Q7CharlieMessageListenerがメッセージを受信すると、そのreceive(Message)メソッドはメッセージペイロードをログに記録し、元のメッセージの応答先に応答メッセージを送信します。 このメソッドは、応答メッセージのペイロードをリスナークラス名に設定し、応答メッセージIDを元のメッセージの応答IDに設定します。

重要

デフォルトの同期メッセージングでは、応答メッセージは元のメッセージの応答IDを使用する必要があり、かつ、応答先に送信される必要があります。

m4q7-baker-implモジュールのM4Q7BakerMessageListenerクラスは、M4Q7BakerOSGiCommandsのメッセージの応答先であるacme/m4q7_bakerに送信されたメッセージをリッスンします。

M4Q7BakerMessageListenerクラス:

@Component(
	property = "destination.name=acme/m4q7_baker",
	service = MessageListener.class
)
public class M4Q7BakerMessageListener implements MessageListener {

	@Override
	public void receive(Message message) {
		if (_log.isInfoEnabled()) {
			_log.info("Received message payload " + message.getPayload());
		}
	}

	private static final Log _log = LogFactoryUtil.getLog(
		M4Q7BakerMessageListener.class);

}

M4Q7BakerMessageListenerがメッセージを受信すると、そのreceive(Message)メソッドはメッセージペイロードをログに記録します。

送信者を調べる

m4q7-baker-implモジュールのM4Q7BakerOSGiCommandsクラスは、メッセージ内のコマンド引数を"acme/m4q7_able"宛先に送信するトリガーとなるOSGiコマンドを提供します。

@Component(
	property = {"osgi.command.function=sendMessage", "osgi.command.scope=m4q7"},
	service = M4Q7BakerOSGiCommands.class
)
public class M4Q7BakerOSGiCommands {

	public void sendMessage(String payload) throws MessageBusException {
		Message message = new Message();

		message.setPayload(payload);
		message.setResponseDestinationName("acme/m4q7_baker");

		Object response = _synchronousMessageSender.send(
			"acme/m4q7_able", message, 10000);

		if (_log.isInfoEnabled()) {
			_log.info("Response: " + response);
		}
	}

	private static final Log _log = LogFactoryUtil.getLog(
		M4Q7BakerOSGiCommands.class);

	@Reference(target = "(mode=DEFAULT)")
	private SynchronousMessageSender _synchronousMessageSender;

}

M4Q7BakerOSGiCommandsは、独自のクラスタイプのサービスComponentです。 これは、@Referenceアノテーションを使用して、 デフォルト モード(アノテーションのtarget = "(mode=DEFAULT)"属性で指定)に設定されたSynchronousMessageSenderを挿入します。

注釈

デフォルトモードでは、SynchronousMessageSendersendメソッドは、応答メッセージが受信されるまで、または送信者がタイムアウトするまで、呼び出し元のクラスをブロックします。

M4Q7BakerOSGiCommands@Componentプロパティは、m4q7スコープでsendMessageと呼ばれるGogoシェルコマンド関数を定義します。 このコマンドは入力Stringを受け取り、M4Q7BakerOSGiCommandssendMessage(String)メソッドにマップします。

sendMessage(String)メソッドは、GogoシェルコマンドのStringをペイロードとして、"acme/m4q7_baker"を応答先として Message を作成します。

sendMessage(String)メソッドは、SynchronousMessageSendersend(String, Message, long)メソッドを呼び出してメッセージを送信し、"acme/m4q7_able"宛先名、メッセージインスタンス、および10000ミリ秒のタイムアウトを渡します。 デフォルトモードでは、SynchronousMessageSenderはメッセージバススレッドを使用してメッセージをメッセージリスナーに配信します。 元のメッセージの応答IDを持つメッセージが"acme/m4q7_baker"応答先で受信されるまで、実行がM4Q7BakerOSGiCommandsクラスでブロックされます。 応答を受信すると、M4Q7BakerOSGiCommandsメソッドで実行が続行され、メッセージ応答がログに記録されます。 一致する応答メッセージを受信する前にタイムアウトが期限切れになると、SynchronousMessageSendersend(String, Message, long)メソッドはMessageBusExceptionをスローします。

重要

デフォルトの同期メッセージングでは、応答メッセージは元のメッセージの応答IDを使用する必要があり、かつ、応答先に送信される必要があります。

メッセージリスナーが応答メッセージを返すのを確認したので、応答のタイムアウトをテストできます。

応答タイムアウトのデモを実行する

メッセージ応答ロジックをオフにしてタイムアウトを強制する方法は次のとおりです。

  1. M4Q7CharlieMessageListenerreceive(Message)メソッドで、_messageBus.sendMessage(...)呼び出しをコメントアウトします。

    @Override
    public void receive(Message message) {
        if (_log.isInfoEnabled()) {
            Object payload = message.getPayload();
    
            _log.info("Received message payload " + payload.toString());
        }
    
        // _messageBus.sendMessage(
        //  message.getResponseDestinationName(),
        //  new Message() {
        //      {
        //          setPayload("M4Q7CharlieMessageListener");
        //          setResponseId(message.getResponseId());
        //      }
        //  });
    }
    
  2. サンプルプロジェクトを再デプロイします。

    ./gradlew deploy -Ddeploy.docker.container.id=$(docker ps -lq)
    
  3. Gogoシェルコマンドフィールドに、m4q7:sendMessageと入力し、その後にメッセージを入力します。 例:

    m4q7:sendMessage foo
    
  4. Gogoシェルページが次のようになっていることを確認します。

    エラー:メッセージに対する返信がありません。

  5. Dockerコンソールのメッセージが次のようになっていることを確認します。

    INFO  [acme/m4q7_able-2][M4Q7CharlieMessageListener:23] Received message payload foo
    

M4Q7CharlieMessageListenerはメッセージを受信しましたが、応答しませんでした。 SynchronousMessageSenderは、Gogoシェルページに出力されたMessageBusExceptionをスローしました。

タイムアウトと同期してメッセージを送信しました。

次のステップ

ダイレクト モードを使用した同期メッセージングを検討する場合は、 ダイレクト同期メッセージングの使用 を参照してください。

メッセージを送信した直後に処理を続行する場合は、 非同期メッセージングの使用 を参照してください。