oo

Tuning Messaging Performance

Messaging performance is tuned at the destinations. Performance depends on the destination type, the amount of processing the message listeners require, and the thread pool available to process messages.

Here are the three destination types:

Parallel Destination

  • Messages sent here are queued.
  • Worker threads from a thread pool deliver messages to registered message listeners, one worker thread per message per message listener. Threads simultaneously deliver the same message to a destination’s message listeners.

Serial Destination

  • Messages sent here are queued.
  • Worker threads from a thread pool deliver messages to registered message listeners, one worker thread per message.

Synchronous Destination

  • Messages sent here are directly delivered to message listeners.
  • The thread sending the message also delivers the message to all message listeners.

You can send messages in different ways using the applicable destination types.

Destination Type Compatibility

Below is each destination type’s compatibility with asynchronous messaging, default synchronous messaging, and direct synchronous messaging.

Destination Type Asynchronous Messaging Default Synchronous Messaging Direct Synchronous Messaging
Parallel yes yes no
Serial yes yes no
Synchronous no no yes

Here you’ll start with examining an example project’s messaging performance. Then you’ll use the API to get destination statistics and to configure the destination. Lastly, you’ll reconfigure the example destination settings, re-run the example, and examine the statistics.

Monitor Messaging in an Example Project

The example project creates a destination, registers message listeners, and lists destination statistics via a Gogo shell command.

Start a new Liferay instance by running

docker run -it -m 8g -p 8080:8080 liferay/portal:7.4.3.112-ga112

Sign in to Liferay at http://localhost:8080. Use the email address test@liferay.com and the password test. When prompted, change the password to learn.

Then, follow these steps:

  1. Download and unzip the example.

    curl https://resources.learn.liferay.com/dxp/latest/en/building-applications/core-frameworks/message-bus/liferay-w3r2.zip -O
    
    unzip liferay-w3r2.zip
    
  2. Build and deploy the example project modules.

    cd liferay-w3r2
    
    ./gradlew deploy -Ddeploy.docker.container.id=$(docker ps -lq)
    
    note

    This command is the same as copying the module JARs to /opt/liferay/osgi/modules on the Docker container.

  3. The Docker container console confirms module startup and reports the destination configuration.

    STARTED com.acme.w3r2.charlie.impl_1.0.0 [1390]
    	STARTED com.acme.w3r2.able.impl_1.0.0 [1388]
    	[W3R2AbleMessagingConfigurator:27] {_destinationName=acme/w3r2_able,
    	_destinationType=serial, _maximumQueueSize=2147483647,
    	_rejectedExecutionHandler=null, _workersCoreSize=2, _workersMaxSize=5}
    	STARTED com.acme.w3r2.baker.impl_1.0.0 [1389]
    
  4. Visit the Liferay instance with your browser at http://localhost:8080 and sign in using your credentials.

  5. Open the Script console.

  6. In the script field, send a message by executing the following Groovy code:

    import com.liferay.portal.kernel.messaging.*;
    
    	MessageBusUtil.sendMessage(
    		"acme/w3r2_able",
    		new Message() {
    			{
    				setPayload("foo");
    			}
    		});
    
  7. Confirm that message listeners from W3R2BakerMessageListenerManager received the message.

    	[acme/w3r2_able-2][W3R2BakerMessageListenerManager:30] Received message payload foo
    	[acme/w3r2_able-2][W3R2BakerMessageListenerManager:30] Received message payload foo
    	[acme/w3r2_able-2][W3R2BakerMessageListenerManager:30] Received message payload foo
    	[acme/w3r2_able-2][W3R2BakerMessageListenerManager:30] Received message payload foo
    	[acme/w3r2_able-2][W3R2BakerMessageListenerManager:30] Received message payload foo
    
  8. Open the Gogo shell.

  9. Get the destination statistics by executing the w3r2:listDestinationStats command in the Gogo shell command field.

    w3r2:listDestinationStats
    
  10. Confirm the acme/w3r2_able destination’s 10 listeners and the sent message count.

    	[pipe-w3r2:listDestinationStats][W3R2CharlieOSGiCommands:29] acme/w3r2_able
    	active thread count 0, current thread count 1, largest thread count 1, max
    	thread pool size 1, message listener count 10, min thread pool size 1, pending
    	message count 0, sent message count 1
    

The example’s three modules configured a destination, registered 10 message listeners, and provided a Gogo shell command to list the destination’s statistics.

When W3R2AbleMessagingConfigurator in w3r2-able-impl activates, it configures the acme/w3r2_able destination and logs the DestinationConfiguration’s toString() value.

@Activate
private void _activate(BundleContext bundleContext) {
	DestinationConfiguration destinationConfiguration =
		DestinationConfiguration.createSerialDestinationConfiguration(
			"acme/w3r2_able");

	if (_log.isInfoEnabled()) {
		_log.info(destinationConfiguration.toString());
	}

	Destination destination = _destinationFactory.createDestination(
		destinationConfiguration);

	_serviceRegistration = bundleContext.registerService(
		Destination.class, destination,
		MapUtil.singletonDictionary(
			"destination.name", destination.getName()));
}

W3R2CharlieOSGiCommands in the w3r2-charlie-impl module logs the destination statistics using the w3r2:listDestinationStats Gogo shell command it provides. Examine how W3R2CharlieOSGiCommands’s listDestinationStats() method gets destination statistics.

@Component(
	property = {
		"osgi.command.function=listDestinationStats", "osgi.command.scope=w3r2"
	},
	service = W3R2CharlieOSGiCommands.class
)
public class W3R2CharlieOSGiCommands {

	public void listDestinationStats() {
		if (_log.isInfoEnabled()) {
			Destination destination = _messageBus.getDestination(
				"acme/w3r2_able");

			DestinationStatistics destinationStatistics =
				destination.getDestinationStatistics();

			_log.info(
				StringBundler.concat(
					"acme/w3r2_able active thread count ",
					destinationStatistics.getActiveThreadCount(),
					", current thread count ",
					destinationStatistics.getCurrentThreadCount(),
					", largest thread count ",
					destinationStatistics.getLargestThreadCount(),
					", max thread pool size ",
					destinationStatistics.getMaxThreadPoolSize(),
					", message listener count ",
					destination.getMessageListenerCount(),
					", min thread pool size ",
					destinationStatistics.getMinThreadPoolSize(),
					", pending message count ",
					destinationStatistics.getPendingMessageCount(),
					", sent message count ",
					destinationStatistics.getSentMessageCount()));
		}
	}

	private static final Log _log = LogFactoryUtil.getLog(
		W3R2CharlieOSGiCommands.class);

	@Reference
	private MessageBus _messageBus;

}

The listDestinationStats() method uses the _messageBus instance to get the Destination and then gets a DestinationStatistics instance from the destination. The destination populates the DestinationStatistics object with the latest statistics. The method logs the following destination information:

  • Active thread count
  • Current thread count
  • Largest thread count
  • Maximum thread pool size
  • Message listener count
  • Minimum (starting) thread pool size
  • Pending message count
  • Sent message count

You can monitor your message destination using this same API.

Monitoring Messaging

The messaging API facilitates monitoring messaging performance at destinations in the context of their settings. The following tables list API methods to access destination settings and messaging statistics.

Destination Settings:

Destination Setting API Method
Destination type Destination#getDestinationType()
Maximum thread pool size DestinationConfiguration#getWorkersMaxSize() and DestinationStatistic#getMaxThreadPoolSize()
Minimum thread pool size DestinationConfiguration#getWorkersCoreSize() and DestinationStatistic#getMinThreadPoolSize()
Message queue size DestinationConfiguration#getMaximumQueueSize()

Destination Statistics:

Destination Statistic API Method
Message listener count Destination#getMessageListenerCount()
Messages pending count DestinationStatistics#getPendingMessageCount()
Messages sent count DestinationStatistics#getSentMessageCount()
Current thread count DestinationStatistics#getCurrentThreadCount()
Active thread count DestinationStatistics#getActiveThreadCount()
Largest thread count DestinationStatistics#getLargestThreadCount()

Make sure to digest a destination’s statistics in the context of its settings.

After examining destination statistics, you can try to improve performance by reconfiguring the destination.

Changing Destination Type

If you’re using a serial destination and messages aren’t reaching some message listeners fast enough, you can increase the maximum thread pool size (explained next) or try switching to the parallel destination type. Message Bus processes parallel destination message listeners simultaneously using threads from the thread pool.

You can switch destination types by replacing your current DestinationConfiguration with one of the type you want. Create a new parallel or serial DestinationConfiguration using the applicable DestinationConfiguration method:

  • createParallelDestinationConfiguration(String)
  • createSerialDestinationConfiguration(String)

See Reconfigure the Example Destination for details.

Configuring the Message Queue and Thread Pool

Each serial and parallel destination has a message queue and a dedicated thread pool.

If a message arrives when the queue is full, the destination’s RejectedExecutionHandler handles the message. The default handler discards the message and logs a warning. The default maximum message queue size is Java’s maximum integer value, but you can reduce it if you like.

The Message Bus draws message listener processing threads from the destination’s thread pool. The pool has a starting size and a maximum size.

You can change the maximum message queue size, rejected execution handler, thread pool starting size (core size), and thread pool maximum size using these DestinationConfiguration methods:

  • setMaximumQueueSize(int maximumQueueSize)
  • setRejectedExecutionHandler(RejectedExecutionHandler rejectedExecutionHandler)
  • setWorkersCoreSize(int workersCoreSize)
  • setWorkersMaxSize(int workersMaxSize)

Next reconfigure the example destination.

Reconfigure the Example Destination

Here you’ll reconfigure the example’s acme/w3r2_able destination with these settings:

  • Destination type: parallel
  • Starting thread pool size: 10
  • Maximum thread pool size: 20

Here are the steps:

  1. Use a different DestinationConfiguration by replacing W3R2AbleMessagingConfigurator’s _activate(BundleContext) method with this code:

    	@Activate
    	private void _activate(BundleContext bundleContext) {
    		DestinationConfiguration destinationConfiguration =
    			DestinationConfiguration.createParallelDestinationConfiguration(
    				"acme/w3r2_able");
    
    		destinationConfiguration.setWorkersCoreSize(10);
    		destinationConfiguration.setWorkersMaxSize(20);
    
    		if (_log.isInfoEnabled()) {
    			_log.info(destinationConfiguration.toString());
    		}
    
    		Destination destination = _destinationFactory.createDestination(
    			destinationConfiguration);
    
    		_serviceRegistration = bundleContext.registerService(
    			Destination.class, destination,
    			MapUtil.singletonDictionary(
    				"destination.name", destination.getName()));
    	}
    
  2. Redeploy the modules.

    	./gradlew deploy -Ddeploy.docker.container.id=$(docker ps -lq)
    
  3. The Docker container console confirms the w3r2-able-impl module startup and reports the destination configuration.

    	STARTED com.acme.w3r2.able.impl_1.0.0 [1388]
    	[W3R2AbleMessagingConfigurator:27] {_destinationName=acme/w3r2_able,
    	_destinationType=parallel, _maximumQueueSize=2147483647,
    	_rejectedExecutionHandler=null, _workersCoreSize=10, _workersMaxSize=20}
    
  4. Get the message listener module (Acme W3R2 Baker Implementation) ID by running this Gogo shell command:

    lb | grep W3R2
    

    Each line starts with the corresponding module’s ID number.

    1388|Active     |   10|Acme W3R2 Able Implementation (1.0.0)|1.0.0
    1389|Active     |   10|Acme W3R2 Baker Implementation (1.0.0)|1.0.0
    1390|Active     |   10|Acme W3R2 Charlie Implementation (1.0.0)|1.0.0
    
  5. Bind the message listeners to the destination replacement by restarting the message listener module with the following Gogo shell commands. Replace the number with your module’s ID:

    stop 1389
    
    start 1389
    
  6. Send another message by executing the following Groovy code again in the Script console.

    import com.liferay.portal.kernel.messaging.*;
    
    	MessageBusUtil.sendMessage(
    		"acme/w3r2_able",
    		new Message() {
    			{
    				setPayload("foo");
    			}
    		});
    
  7. Get the destination statistics by executing the w3r2:listDestinationStats command in the Gogo shell.

    	w3r2:listDestinationStats
    

A log message like this one confirms your new settings.

[pipe-w3r2:listDestinationStats][W3R2CharlieOSGiCommands:29] acme/w3r2_able
active thread count 0, current thread count 10, largest thread count 10, max
thread pool size 20, message listener count 10, min thread pool size 10,
pending message count 0, sent message count 2

Now you know how to monitor messaging at a destination and adjust destination settings. You can test different settings to optimize performance.