oo

Using Asynchronous Messaging

Message Bus’s asynchronous option provides “fire and forget” behavior; send a message and continue processing without waiting for a response.

An asynchronous message is sent to a serial or parallel destination.

  • For serial destinations, the Message Bus queues messages and delegates one worker thread per message. The thread processes message listeners sequentially.

  • For parallel destinations, the Message Bus queues messages and delegates one worker thread per message per message listener. The threads process message listeners simultaneously.

Start with sending a message to a serial destination where another class (a message listener) is listening.

Send a Message

Start a new Liferay instance by running

docker run -it -m 8g -p 8080:8080 liferay/portal:7.4.3.112-ga112

Sign in to Liferay at http://localhost:8080. Use the email address test@liferay.com and the password test. When prompted, change the password to learn.

Then, start with sending a message in an example project.

  1. Download and unzip the example.

    curl https://resources.learn.liferay.com/dxp/latest/en/building-applications/core-frameworks/message-bus/liferay-n8k5.zip -O
    
    unzip liferay-n8k5.zip
    
  2. Build and deploy the destination module n8k5-able-impl.

    cd liferay-n8k5/n8k5-able-impl
    
    ../gradlew deploy -Ddeploy.docker.container.id=$(docker ps -lq)
    
    Note

    This command is the same as copying the module JAR to /opt/liferay/osgi/modules on the Docker container.

    The Docker container console shows that the module started.

    STARTED com.acme.n8k5.able.impl_1.0.0
    
  3. Build and deploy the listener module n8k5-charlie-impl.

    cd ../n8k5-charlie-impl
    
    ../gradlew deploy -Ddeploy.docker.container.id=$(docker ps -lq)
    

    The Docker container console shows that the module started.

    STARTED com.acme.n8k5.charlie.impl_1.0.0
    
  4. Build and deploy the sender module n8k5-baker-impl.

    cd ../n8k5-baker-impl
    
    ../gradlew deploy -Ddeploy.docker.container.id=$(docker ps -lq)
    

    In the Docker container console, confirm N8K5Baker sent a message, N8K5CharlieMessageListener received a message, and the n8k5-baker-impl module started.

    INFO  [pipe-start 2025][N8K5Baker:24] Sent message to acme/n8k5_able
    INFO  [acme/n8k5_able-4][N8K5CharlieMessageListener:21] Received message payload N8K5Baker#_activate
    STARTED com.acme.n8k5.baker.impl_1.0.0 [2025]
    

N8K5Baker reported sending a message to destination acme/n8k5_able. N8K5CharlieMessageListener received a message with payload N8K5Baker#_activate at destination acme/n8k5_able. Now you can examine the example code.

Project Overview

The example’s three modules each have one class. Each class represents one of the messaging components: destination, sender, listener:

Example Classes:

Class Description
N8K5AbleMessagingConfigurator in n8k5-able-impl Creates a message destination named acme/n8k5_able and registers it with the Message Bus.
N8K5Baker in n8k5-baker-impl Sends a message to the acme/n8k5_able destination.
N8K5CharlieMessageListener in n8k5-charlie-impl Listens for messages sent to the acme/n8k5_able destination.

Here’s how they interact:

  1. N8K5Baker activates (for example, when the n8k5-baker-impl module starts) and sends a message to the acme/n8k5_able destination.
  2. The Message Bus sends the message to N8K5CharlieMessageListener.
  3. N8K5CharlieMessageListener receives the message.

We’ll examine the destination configuration and sender classes. The listener class N8K5CharlieMessageListener registers the same way Listening for Messages demonstrates.

Examine the Destination Configuration

The n8k5-able-impl module’s N8K5AbleMessagingConfigurator class creates and configures the destination. Here’s the code:

@Component
public class N8K5AbleMessagingConfigurator {

	@Activate
	private void _activate(BundleContext bundleContext) {
		Destination destination = _destinationFactory.createDestination(
			DestinationConfiguration.createSerialDestinationConfiguration(
				"acme/n8k5_able"));

		_serviceRegistration = bundleContext.registerService(
			Destination.class, destination,
			MapUtil.singletonDictionary(
				"destination.name", destination.getName()));
	}

	@Deactivate
	private void _deactivate() {
		if (_serviceRegistration != null) {
			_serviceRegistration.unregister();
		}
	}

	@Reference
	private DestinationFactory _destinationFactory;

	private ServiceRegistration<Destination> _serviceRegistration;

}

Any class can create and configure a destination, but a Component can have injected dependencies, like the DestinationFactory. The _destinationFactory field’s @Reference annotation signals Liferay’s OSGi framework to inject a DestinationFactory instance.

In the _activate method, N8K5AbleMessagingConfigurator uses the DestinationFactory and a DestinationConfiguration to create a serial destination named acme/n8k5_able. Then it uses the OSGi Framework BundleContext to register a service for the Destination. When N8K5AbleMessagingConfigurator deactivates, the _deactivate method unregisters the service.

Examine the Sender

The N8K5Baker class below sends a message with the payload "N8K5Baker#_activate" to the destination named acme/n8k5_able.

@Activate
private void _activate() {
	Message message = new Message();

	message.setPayload("N8K5Baker");

	_messageBus.sendMessage("acme/n8k5_able", message);
}

@Reference
private MessageBus _messageBus;

As a Component, N8K5Baker uses the @Reference annotation to inject a MessageBus instance.

On component activation, N8K5Baker creates and sends a message via its activation method _activate(). It constructs a Message instance and adds a payload to it. A payload is one of several things you can populate in a Message.

Here are the main message-populating methods:

Method Description
setPayload(Object) Adds your Message’s main content.
setResponseDestinationName(String) Refers to a Destination to receive responses.
setValues(Map<String,Object>) Provides additional data from a Map.

N8K5Baker sends the message to a Destination named acme/n8k5_able by calling MessageBus’s sendMessage(String, Message) method. The MessageBus starts a new thread and sends the Message to MessageListener instances registered to the acme/n8k5_able Destination. N8K5Baker’s thread continues.

Note

If you want receive responses to a Message, set a response destination on the Message and register a class, such as N8K5Baker, as a MessageListener to that destination. See Listening for Messages for details.

Add Response Handling

If you want responses from message recipients, set a response destination for their replies:

  1. Register a separate destination for message responses.
  2. Register a class (e.g., the original sender) as a MessageListener on the response destination.
  3. Pass the response destination in the messages.
  4. Add response logic to the MessageListeners.

Step 1: Register a Destination for Responses

You can modify N8K5Baker to manage a response destination in the same way that N8K5AbleDestinationConfigurator manages its destination. Replace the _activate() method signature to _activate(BundleContext bundleContext) and add code to it that creates, configures, and registers a service for the acme/n8k5_baker response destination. Add a _deactivate() method that unregisters the service. The _activate(BundleContext bundleContext) and _deactivate() methods should look like this:

@Activate
private void _activate(BundleContext bundleContext) {
   Destination destination = _destinationFactory.createDestination(
      DestinationConfiguration.createSerialDestinationConfiguration(
         "acme/n8k5_baker"));

   _serviceRegistration = bundleContext.registerService(
      Destination.class, destination,
      MapUtil.singletonDictionary(
         "destination.name", destination.getName()));

   Message message = new Message();

   message.setPayload("N8K5Baker#_activate");

   _messageBus.sendMessage("acme/n8k5_able", message);
}

@Deactivate
private void _deactivate() {
   if (_serviceRegistration != null) {
      _serviceRegistration.unregister();
   }
}

@Reference
private DestinationFactory _destinationFactory;

private ServiceRegistration<Destination> _serviceRegistration;

Step 2: Register N8K5Baker as a Listener on the Response Destination

Here are the changes for sender N8K5Baker:

  1. Update the @Component annotation, declaring N8K5Baker a service of type MessageListener.class and mapping N8K5Baker to its response destination via a property "destination.name=acme/n8k5_baker".
  2. Implement the MessageListener interface.
  3. Override the receive(Message) method with message-processing logic.

Here’s what the sender changes look like:

@Component(
	property = "destination.name=acme/n8k5_baker",
	service = MessageListener.class
)
public class N8K5Baker implements MessageListener {

	@Override
	public void receive(Message message) {
		Object payload = message.getPayload();

		_log.info("Received message payload " + payload.toString());
	}

   // Existing methods and fields

   private static final Log _log = LogFactoryUtil.getLog(N8K5Baker.class);
}

Step 3: Pass the Response Destination in the Message

Set acme/n8k5_baker as the response destination in the message N8K5Baker sends. Here’s what it looks like:

@Activate
private void _activate(BundleContext bundleContext) {
   // Destination setup

   Message message = new Message();

   message.setPayload("N8K5Baker#_activate");
   message.setResponseDestinationName("acme/n8k5_baker");

   _messageBus.sendMessage("acme/n8k5_able", message);
}

Step 4: Add Response Logic to the MessageListeners

In your MessageListener’s receive(Message) methods, set a response, get the response destination from the message, and send a response message to the response destination using a MessageBus instance. Here’s what it looks like:

public void receive(Message message) {
   // Message processing

   message.setResponse("N8K5CharlieMessageListener");

   Message responseMessage = new Message();

   responseMessage.setDestinationName(
      message.getResponseDestinationName());
   responseMessage.setPayload("N8K5CharlieMessageListener");
   responseMessage.setResponseId(message.getResponseId());

   _messageBus.sendMessage(
      message.getResponseDestinationName(), responseMessage);
}

// Existing methods and fields

@Reference
private MessageBus _messageBus;

Test Your Changes

Test the changes by redeploying the example project.

cd ../../liferay-n8k5.zip
./gradlew deploy -Ddeploy.docker.container.id=$(docker ps -lq)

The output looks like this:

STARTED com.acme.n8k5.charlie.impl_1.0.0 [2020]
STARTED com.acme.n8k5.baker.impl_1.0.0 [2025]
INFO  [acme/n8k5_able-2][N8K5CharlieMessageListener:23] Received message payload N8K5Baker#_activate
INFO  [acme/n8k5_baker-2][N8K5Baker:30] Received message payload N8K5CharlieMessageListener

N8K5CharlieMessageListener receives N8K5Baker’s message and then sends a response message to the response destination. N8K5Baker receives the response message and prints the message payload.

Note

If you want the classes to exchange messages again, you can restart the modules (OSGi bundles) in the Gogo Shell. List the bundles (lb) to get the bundle IDs, stop the bundles (stop <id>), and restart the bundles (start <id>).

Note

In classes that aren’t OSGi Components, you can send messages using MessageBusUtil and Destination, DestinationConfiguration, Message, and MessageListener instances.

You can register Destination services as demonstrated, except you must get the BundleContext a different way (for example, by making these calls: Bundle bundle = FrameworkUtil.getBundle(YourClass.class); BundleContext bundleContext = bundle.getBundleContext()).

Congratulations! You’ve exchanged messages asynchronously between two classes.

What’s Next

Now that you’re familiar with asynchronous messaging, you can tune it for optimal performance. Learn how at Tuning Messaging Performance.

If you want to explore synchronous messaging using default and direct modes, see Using Direct Synchronous Messaging in Previous Versions and Using Default Synchronous Messaging in Previous Versions for details.