Tuning Messaging Performance
Messaging performance is tuned at the destinations. Performance depends on the destination type, the amount of processing the message listeners require, and the thread pool available to process messages.
Here are the three destination types:
Parallel Destination
- Messages sent here are queued.
- Worker threads from a thread pool deliver messages to registered message listeners, one worker thread per message per message listener. Threads simultaneously deliver the same message to a destination’s message listeners.
Serial Destination
- Messages sent here are queued.
- Worker threads from a thread pool deliver messages to registered message listeners, one worker thread per message.
Synchronous Destination
- Messages sent here are directly delivered to message listeners.
- The thread sending the message also delivers the message to all message listeners.
You can send messages in different ways using the applicable destination types.
Destination Type Compatibility
Below is each destination type’s compatibility with asynchronous messaging, default synchronous messaging, and direct synchronous messaging.
Destination Type | Asynchronous Messaging | Default Synchronous Messaging | Direct Synchronous Messaging |
---|---|---|---|
Parallel | ✔ | ✔ | ✘ |
Serial | ✔ | ✔ | ✘ |
Synchronous | ✘ | ✘ | ✔ |
Here you’ll start with examining an example project’s messaging performance. Then you’ll use the API to get destination statistics and to configure the destination. Lastly, you’ll reconfigure the example destination settings, re-run the example, and examine the statistics.
Monitor Messaging in an Example Project
The example project creates a destination, registers message listeners, and lists destination statistics via a Gogo shell command.
Start a new Liferay instance by running
docker run -it -m 8g -p 8080:8080 liferay/portal:7.4.3.112-ga112
Sign in to Liferay at http://localhost:8080. Use the email address test@liferay.com and the password test. When prompted, change the password to learn.
Then, follow these steps:
-
Download and unzip the example.
curl https://resources.learn.liferay.com/dxp/latest/en/building-applications/core-frameworks/message-bus/liferay-w3r2.zip -O
unzip liferay-w3r2.zip
-
Build and deploy the example project modules.
cd liferay-w3r2
./gradlew deploy -Ddeploy.docker.container.id=$(docker ps -lq)
NoteThis command is the same as copying the module JARs to
/opt/liferay/osgi/modules
on the Docker container. -
The Docker container console confirms module startup and reports the destination configuration.
STARTED com.acme.w3r2.charlie.impl_1.0.0 [1390] STARTED com.acme.w3r2.able.impl_1.0.0 [1388] [W3R2AbleMessagingConfigurator:27] {_destinationName=acme/w3r2_able, _destinationType=serial, _maximumQueueSize=2147483647, _rejectedExecutionHandler=null, _workersCoreSize=2, _workersMaxSize=5} STARTED com.acme.w3r2.baker.impl_1.0.0 [1389]
-
Visit the Liferay instance with your browser at
http://localhost:8080
and sign in using your credentials. -
Open the Script console.
-
In the script field, send a message by executing the following Groovy code:
import com.liferay.portal.kernel.messaging.*; MessageBusUtil.sendMessage( "acme/w3r2_able", new Message() { { setPayload("foo"); } });
-
Confirm that message listeners from
W3R2BakerMessageListenerManager
received the message.[acme/w3r2_able-2][W3R2BakerMessageListenerManager:30] Received message payload foo [acme/w3r2_able-2][W3R2BakerMessageListenerManager:30] Received message payload foo [acme/w3r2_able-2][W3R2BakerMessageListenerManager:30] Received message payload foo [acme/w3r2_able-2][W3R2BakerMessageListenerManager:30] Received message payload foo [acme/w3r2_able-2][W3R2BakerMessageListenerManager:30] Received message payload foo
-
Open the Gogo shell.
-
Get the destination statistics by executing the
w3r2:listDestinationStats
command in the Gogo shell command field.w3r2:listDestinationStats
-
Confirm the
acme/w3r2_able
destination’s 10 listeners and the sent message count.[pipe-w3r2:listDestinationStats][W3R2CharlieOSGiCommands:29] acme/w3r2_able active thread count 0, current thread count 1, largest thread count 1, max thread pool size 1, message listener count 10, min thread pool size 1, pending message count 0, sent message count 1
The example’s three modules configured a destination, registered 10 message listeners, and provided a Gogo shell command to list the destination’s statistics.
When W3R2AbleMessagingConfigurator
in w3r2-able-impl
activates, it configures the acme/w3r2_able
destination and logs the DestinationConfiguration
’s toString()
value.
@Activate
private void _activate(BundleContext bundleContext) {
DestinationConfiguration destinationConfiguration =
DestinationConfiguration.createSerialDestinationConfiguration(
"acme/w3r2_able");
if (_log.isInfoEnabled()) {
_log.info(destinationConfiguration.toString());
}
Destination destination = _destinationFactory.createDestination(
destinationConfiguration);
_serviceRegistration = bundleContext.registerService(
Destination.class, destination,
MapUtil.singletonDictionary(
"destination.name", destination.getName()));
}
W3R2CharlieOSGiCommands
in the w3r2-charlie-impl
module logs the destination statistics using the w3r2:listDestinationStats
Gogo shell command it provides. Examine how W3R2CharlieOSGiCommands
’s listDestinationStats()
method gets destination statistics.
@Component(
property = {
"osgi.command.function=listDestinationStats", "osgi.command.scope=w3r2"
},
service = W3R2CharlieOSGiCommands.class
)
public class W3R2CharlieOSGiCommands {
public void listDestinationStats() {
if (_log.isInfoEnabled()) {
Destination destination = _messageBus.getDestination(
"acme/w3r2_able");
DestinationStatistics destinationStatistics =
destination.getDestinationStatistics();
_log.info(
StringBundler.concat(
"acme/w3r2_able active thread count ",
destinationStatistics.getActiveThreadCount(),
", current thread count ",
destinationStatistics.getCurrentThreadCount(),
", largest thread count ",
destinationStatistics.getLargestThreadCount(),
", max thread pool size ",
destinationStatistics.getMaxThreadPoolSize(),
", message listener count ",
destination.getMessageListenerCount(),
", min thread pool size ",
destinationStatistics.getMinThreadPoolSize(),
", pending message count ",
destinationStatistics.getPendingMessageCount(),
", sent message count ",
destinationStatistics.getSentMessageCount()));
}
}
private static final Log _log = LogFactoryUtil.getLog(
W3R2CharlieOSGiCommands.class);
@Reference
private MessageBus _messageBus;
}
The listDestinationStats()
method uses the _messageBus
instance to get the Destination
and then gets a DestinationStatistics
instance from the destination. The destination populates the DestinationStatistics
object with the latest statistics. The method logs the following destination information:
- Active thread count
- Current thread count
- Largest thread count
- Maximum thread pool size
- Message listener count
- Minimum (starting) thread pool size
- Pending message count
- Sent message count
You can monitor your message destination using this same API.
Monitoring Messaging
The messaging API facilitates monitoring messaging performance at destinations in the context of their settings. The following tables list API methods to access destination settings and messaging statistics.
Destination Settings:
Destination Setting | API Method |
---|---|
Destination type | Destination#getDestinationType() |
Maximum thread pool size | DestinationConfiguration#getWorkersMaxSize() and DestinationStatistic#getMaxThreadPoolSize() |
Minimum thread pool size | DestinationConfiguration#getWorkersCoreSize() and DestinationStatistic#getMinThreadPoolSize() |
Message queue size | DestinationConfiguration#getMaximumQueueSize() |
Destination Statistics:
Destination Statistic | API Method |
---|---|
Message listener count | Destination#getMessageListenerCount() |
Messages pending count | DestinationStatistics#getPendingMessageCount() |
Messages sent count | DestinationStatistics#getSentMessageCount() |
Current thread count | DestinationStatistics#getCurrentThreadCount() |
Active thread count | DestinationStatistics#getActiveThreadCount() |
Largest thread count | DestinationStatistics#getLargestThreadCount() |
Make sure to digest a destination’s statistics in the context of its settings.
After examining destination statistics, you can try to improve performance by reconfiguring the destination.
Changing Destination Type
If you’re using a serial destination and messages aren’t reaching some message listeners fast enough, you can increase the maximum thread pool size (explained next) or try switching to the parallel destination type. Message Bus processes parallel destination message listeners simultaneously using threads from the thread pool.
You can switch destination types by replacing your current DestinationConfiguration
with one of the type you want. Create a new parallel or serial DestinationConfiguration
using the applicable DestinationConfiguration
method:
createParallelDestinationConfiguration(String)
createSerialDestinationConfiguration(String)
See Reconfigure the Example Destination for details.
Configuring the Message Queue and Thread Pool
Each serial and parallel destination has a message queue and a dedicated thread pool.
If a message arrives when the queue is full, the destination’s RejectedExecutionHandler
handles the message. The default handler discards the message and logs a warning. The default maximum message queue size is Java’s maximum integer value, but you can reduce it if you like.
The Message Bus draws message listener processing threads from the destination’s thread pool. The pool has a starting size and a maximum size.
You can change the maximum message queue size, rejected execution handler, thread pool starting size (core size), and thread pool maximum size using these DestinationConfiguration
methods:
setMaximumQueueSize(int maximumQueueSize)
setRejectedExecutionHandler(RejectedExecutionHandler rejectedExecutionHandler)
setWorkersCoreSize(int workersCoreSize)
setWorkersMaxSize(int workersMaxSize)
Next reconfigure the example destination.
Reconfigure the Example Destination
Here you’ll reconfigure the example’s acme/w3r2_able
destination with these settings:
- Destination type:
parallel
- Starting thread pool size:
10
- Maximum thread pool size:
20
Here are the steps:
-
Use a different
DestinationConfiguration
by replacingW3R2AbleMessagingConfigurator
’s_activate(BundleContext)
method with this code:@Activate private void _activate(BundleContext bundleContext) { DestinationConfiguration destinationConfiguration = DestinationConfiguration.createParallelDestinationConfiguration( "acme/w3r2_able"); destinationConfiguration.setWorkersCoreSize(10); destinationConfiguration.setWorkersMaxSize(20); if (_log.isInfoEnabled()) { _log.info(destinationConfiguration.toString()); } Destination destination = _destinationFactory.createDestination( destinationConfiguration); _serviceRegistration = bundleContext.registerService( Destination.class, destination, MapUtil.singletonDictionary( "destination.name", destination.getName())); }
-
Redeploy the modules.
./gradlew deploy -Ddeploy.docker.container.id=$(docker ps -lq)
-
The Docker container console confirms the
w3r2-able-impl
module startup and reports the destination configuration.STARTED com.acme.w3r2.able.impl_1.0.0 [1388] [W3R2AbleMessagingConfigurator:27] {_destinationName=acme/w3r2_able, _destinationType=parallel, _maximumQueueSize=2147483647, _rejectedExecutionHandler=null, _workersCoreSize=10, _workersMaxSize=20}
-
Get the message listener module (Acme W3R2 Baker Implementation) ID by running this Gogo shell command:
lb | grep W3R2
Each line starts with the corresponding module’s ID number.
1388|Active | 10|Acme W3R2 Able Implementation (1.0.0)|1.0.0 1389|Active | 10|Acme W3R2 Baker Implementation (1.0.0)|1.0.0 1390|Active | 10|Acme W3R2 Charlie Implementation (1.0.0)|1.0.0
-
Bind the message listeners to the destination replacement by restarting the message listener module with the following Gogo shell commands. Replace the number with your module’s ID:
stop 1389
start 1389
-
Send another message by executing the following Groovy code again in the Script console.
import com.liferay.portal.kernel.messaging.*; MessageBusUtil.sendMessage( "acme/w3r2_able", new Message() { { setPayload("foo"); } });
-
Get the destination statistics by executing the
w3r2:listDestinationStats
command in the Gogo shell.w3r2:listDestinationStats
A log message like this one confirms your new settings.
[pipe-w3r2:listDestinationStats][W3R2CharlieOSGiCommands:29] acme/w3r2_able
active thread count 0, current thread count 10, largest thread count 10, max
thread pool size 20, message listener count 10, min thread pool size 10,
pending message count 0, sent message count 2
Now you know how to monitor messaging at a destination and adjust destination settings. You can test different settings to optimize performance.