Documentation

ダイレクト同期メッセージングの使用

ダイレクト同期メッセージングは、すべてのリスナーがメッセージを受信するまで処理をブロックする最も簡単な方法です。 SynchronousMessageSendersend(String, Message)メソッドを呼び出し、宛先名とメッセージインスタンスを渡します。 SynchronousMessageSenderは、現在のスレッドを使用して、宛先に登録されている各メッセージリスナーで直接メッセージ受信を処理します。 リスナーの処理が完了すると、send(String, Message)メソッドを呼び出したクラスで実行が続行されます。 この例は、ダイレクト同期メッセージングの使用をデモしています。

ダイレクト同期メッセージを送信する

サンプルプロジェクトでは、SynchronousMessageSenderを使用して、2つのリスナーに直接メッセージを送信します。

新しいLiferayインスタンスを実行します。

docker run -it -m 8g -p 8080:8080 liferay/portal:7.4.3.22-ga22

http://localhost:8080でLiferayにサインインします。メールアドレスtest@liferay.comとパスワードtestを使用してください。プロンプトが表示されたら、パスワードをlearnに変更します。

次に、以下の手順を実行します。

  1. サンプルをダウンロードして解凍します。

    curl https://learn.liferay.com/dxp/latest/ja/building-applications/core-frameworks/message-bus/liferay-x6n5.zip -O
    
    unzip liferay-x6n5.zip
    
  2. サンプルのプロジェクトモジュールをビルドしてデプロイします。

    cd liferay-x6n5
    
    ./gradlew deploy -Ddeploy.docker.container.id=$(docker ps -lq)
    

    注釈

    このコマンドは、モジュールJARをDockerコンテナの/opt/liferay/osgi/modulesにコピーするのと同じです。

  3. Dockerコンテナコンソールに、モジュールが起動されたことが示されます。

    STARTED com.acme.x6n5.able.impl_1.0.0
    STARTED com.acme.x6n5.baker.impl_1.0.0
    STARTED com.acme.x6n5.charlie.impl_1.0.0
    STARTED com.acme.x6n5.dog.impl_1.0.0
    
  4. ブラウザでhttp://localhost:8080にあるLiferayインスタンスにアクセスし、認証情報を使用してサインインします。

  5. Gogo シェルを開きます。

  6. Gogoシェルコマンドフィールドに、x6n5:sendMessageと入力し、その後にメッセージを入力します。 例:

    x6n5:sendMessage foo
    
  7. 出力が次のようになっていることを確認します。

    INFO  [pipe-x6n5:sendMessage foo][X6N5DogMessageListener:21] Received message payload foo
    INFO  [pipe-x6n5:sendMessage foo][X6N5CharlieMessageListener:21] Received message payload foo
    INFO  [pipe-x6n5:sendMessage foo][X6N5BakerOSGiCommands:28] Response: X6N5CharlieMessageListener
    

スレッドは、メッセージを送信するときにメッセージ送信者(つまり、X6N5BakerOSGiCommands)でブロックされます。 X6N5CharlieMessageListenerおよびX6N5DogMessageListenerでメッセージを処理した後、スレッドはメッセージ送信者で続行されます。

プロジェクトの概要

4つのサンプルモジュールには1つのクラスがあります。 1つのクラスは宛先を管理し、別のクラスはメッセージを送信し、他の2つは宛先に送信されたメッセージをリッスンします。

クラスの例:

クラス

モジュール

説明

X6N5AbleMessagingConfigurator

x6n5-able-impl

acme/x6n5_ableという名前のメッセージ宛先を作成し、メッセージバスに登録します。

X6N5BakerOSGiCommands

x6n5-baker-impl

acme/x6n5_able宛先にメッセージを送信し、応答をログに記録します。

X6N5CharlieMessageListener

x6n5-charlie-impl

acme/x6n5_able宛先に送信されたメッセージをリッスンします。 メッセージペイロードをログに記録し、メッセージに応答を設定します。

X6N5DogMessageListener

x6n5-dog-impl

acme/x6n5_able宛先に送信されたメッセージをリッスンします。 メッセージペイロードをログに記録し、メッセージに応答を設定します。

イベントフローは次のとおりです。

  1. ユーザーがx6n5:sendMessage Gogoシェルコマンドを実行すると、X6N5BakerOSGiCommandsはメッセージペイロードのコマンド引数をacme/x6n5_able宛先に送信します。

  2. 現在のスレッドは、各リスナー(つまり、X6N5CharlieMessageListenerX6N5DogMessageListener)のメッセージ受信を連続して処理します。 リスナーはメッセージペイロードをログに記録し、メッセージに応答を設定します。 処理された最新のリスナーからの応答は、以前の応答に優先します。

  3. 処理はX6N5BakerOSGiCommandsに戻り、メッセージ応答をログに記録します。

これで、宛先コンフィギュレーターから順に、各クラスを調べることができます

宛先コンフィグレーターを調べる

x6n5-able-implモジュールのX6N5AbleMessagingConfiguratorクラスは、acme/x6n5_ableという名前の宛先を作成して構成します。 コードは次のとおりです。

@Component
public class X6N5AbleMessagingConfigurator {

	@Activate
	private void _activate(BundleContext bundleContext) {
		Destination destination = _destinationFactory.createDestination(
			DestinationConfiguration.createSynchronousDestinationConfiguration(
				"acme/x6n5_able"));

		_serviceRegistration = bundleContext.registerService(
			Destination.class, destination,
			MapUtil.singletonDictionary(
				"destination.name", destination.getName()));
	}

	@Deactivate
	private void _deactivate() {
		if (_serviceRegistration != null) {
			_serviceRegistration.unregister();
		}
	}

	@Reference
	private DestinationFactory _destinationFactory;

	private ServiceRegistration<Destination> _serviceRegistration;

}

このコンフィギュレーターは Component クラスです。 これは、 @Reference アノテーションを使用して、DestinationFactoryインスタンスを挿入します。

_activate(BundleContext)メソッドは、 DestinationFactoryDestinationConfiguration を使用して、acme/x6n5_ableという名前の 同期 宛先を作成します。 同期宛先は、同期メッセージング用に最適化されています。 最後に、メソッドはBundleContextを使用して Destination をOSGiサービスに登録します。

X6N5AbleMessagingConfiguratorが無効化されると、その_deactivate()メソッドは宛先サービスの登録を解除します。

送信者を調べる

x6n5-baker-implモジュールのX6N5BakerOSGiCommandsクラスは、メッセージを宛先に送信するOSGiコマンドを提供します。

@Component(
	property = {"osgi.command.function=sendMessage", "osgi.command.scope=x6n5"},
	service = X6N5BakerOSGiCommands.class
)
public class X6N5BakerOSGiCommands {

	public void sendMessage(String payload) throws MessageBusException {
		Message message = new Message();

		message.setPayload(payload);

		Object response = _synchronousMessageSender.send(
			"acme/x6n5_able", message);

		if (_log.isInfoEnabled()) {
			_log.info("Response: " + response);
		}
	}

	private static final Log _log = LogFactoryUtil.getLog(
		X6N5BakerOSGiCommands.class);

	@Reference(target = "(mode=DIRECT)")
	private SynchronousMessageSender _synchronousMessageSender;

}

X6N5BakerOSGiCommandsは、独自のクラスタイプのサービスComponentです。 これは、@Referenceアノテーションを使用して、 ダイレクト モード(アノテーションのtarget = "(mode=DIRECT)"属性で指定)に設定されたSynchronousMessageSenderを挿入します。

注釈

ダイレクトモードでは、SynchronousMessageSender sendメソッドは、現在のスレッドがすべてのリスナーにメッセージを配信するまで、呼び出し元のクラスをブロックします。

X6N5BakerOSGiCommands@Componentプロパティは、sendMessageと呼ばれるGogoシェルコマンド関数をx6n5スコープで定義します。 コマンドはsendMessage(String)メソッドにマップされ、入力Stringを受け取ります。

sendMessage(String)メソッドは、GogoシェルコマンドのStringをペイロードとして使用して Message を作成します。 SynchronousMessageSender send(String, Message)メソッドは、現在のスレッドを使用して、acme/x6n5_able Destination メッセージリスナーにメッセージを配信します。 スレッドがすべての MessageListener でメッセージを処理するまで、実行はX6N5BakerOSGiCommandsでブロックされます。 その後、X6N5BakerOSGiCommands sendMessage(String)メソッドで実行が続行され、メッセージ応答がログに記録されます。

リスナーを調べる

x6n5-charlie-implモジュールのX6N5CharlieMessageListenerクラスとx6n5-dog-implモジュールのX6N5DogMessageListenerクラスは、acme/x6n5_able Destination に送信されたメッセージをリッスンします。 メッセージを聞く に示されている方法と同じ方法で登録されます。

X6N5CharlieMessageListenerクラス:

@Component(
	property = "destination.name=acme/x6n5_able",
	service = MessageListener.class
)
public class X6N5CharlieMessageListener implements MessageListener {

	@Override
	public void receive(Message message) {
		if (_log.isInfoEnabled()) {
			_log.info("Received message payload " + message.getPayload());
		}

		message.setResponse("X6N5CharlieMessageListener");
	}

	private static final Log _log = LogFactoryUtil.getLog(
		X6N5CharlieMessageListener.class);

}

X6N5DogMessageListenerクラス:

@Component(
	property = "destination.name=acme/x6n5_able",
	service = MessageListener.class
)
public class X6N5DogMessageListener implements MessageListener {

	@Override
	public void receive(Message message) {
		if (_log.isInfoEnabled()) {
			_log.info("Received message payload " + message.getPayload());
		}

		message.setResponse("X6N5DogMessageListener");
	}

	private static final Log _log = LogFactoryUtil.getLog(
		X6N5DogMessageListener.class);

}

各リスナーのreceive(Message)メソッドは、メッセージペイロードをログに記録してから、メッセージ応答を独自のクラス名に設定します。

ダイレクト同期メッセージングの使用方法が分かりました。

次のステップ

デフォルト モードを使用した同期メッセージングを検討する場合は、 デフォルトの同期メッセージングの使用 を参照してください。

メッセージを送信した直後に処理を続行する場合は、 非同期メッセージングの使用 を参照してください。