Using Default Synchronous Messaging in Previous Versions
Synchronous messaging was removed and is no longer supported for Liferay DXP 7.4 U49/Liferay Portal 7.4 GA49 and above.
In default synchronous messaging, the sender blocks while a Message Bus thread dispatches the message to registered message listeners. The sender unblocks when either a response message is received or the sender thread times out.
The sender unblocks on the first response message received.
You’ll send a default synchronous message using an example project. Then you’ll modify the example to make the message time out.
Send a Default Synchronous Message
The example project uses a SynchronousMessageSender
in default mode to send a message and wait for a reply.
In an example project, you’ll use a SynchronousMessageSender
to send a message directly to two listeners.
Start a new Liferay instance by running
docker run -it -m 8g -p 8080:8080 liferay/portal:7.4.3.48-ga48
Sign in to Liferay at http://localhost:8080. Use the email address test@liferay.com and the password test. When prompted, change the password to learn.
Then, follow these steps:
-
Download and unzip the example.
curl https://resources.learn.liferay.com/dxp/latest/en/building-applications/core-frameworks/message-bus/liferay-m4q7.zip -O
unzip liferay-m4q7.zip
-
Build and deploy the example project modules.
cd liferay-m4q7
./gradlew deploy -Ddeploy.docker.container.id=$(docker ps -lq)
NoteThis command is the same as copying the module JARs to
/opt/liferay/osgi/modules
on the Docker container. -
The Docker container console shows module startup.
STARTED com.acme.m4q7.able.impl_1.0.0 STARTED com.acme.m4q7.baker.impl_1.0.0 STARTED com.acme.m4q7.charlie.impl_1.0.0
-
Visit the Liferay instance with your browser at
http://localhost:8080
and sign in using your credentials. -
Open the Gogo shell.
-
In the Gogo shell command field, enter
m4q7:sendMessage
followed by a message. For example,m4q7:sendMessage foo
-
Confirm the output looks like this.
INFO [acme/m4q7_able-2][M4Q7CharlieMessageListener:23] Received message payload foo INFO [acme/m4q7_baker-2][M4Q7BakerMessageListener:21] Received message payload M4Q7CharlieMessageListener INFO [pipe-m4q7:sendMessage foo][M4Q7BakerOSGiCommands:28] Response: M4Q7CharlieMessageListener
At the acme/m4q7_able
destination, M4Q7CharlieMessageListener
received the Gogo shell message. At the acme/m4q7_baker
destination, M4Q7BakerMessageListener
received the response message from M4Q7CharlieMessageListener
. Lastly, M4Q7BakerOSGiCommands
’s sendMessage
method logged the response object returned from the message sender.
Project Overview
The three example module classes manage destinations, listen for messages, and send messages.
m4q7-able-impl
module: M4Q7AbleMessagingConfigurator
creates a message destination named acme/m4q7_able
and registers it with the Message Bus.
m4q7-baker-impl
module:
M4Q7BakerOSGiCommands
sends a message to theacme/m4q7_able
destination and logs the response.M4Q7BakerMessagingConfigurator
creates a message destination namedacme/m4q7_baker
and registers it with the Message Bus.M4Q7BakerMessageListener
listens for messages sent to theacme/m4q7_baker
destination and logs the message payload.
m4q7-charlie-impl
module: M4Q7CharlieMessageListener
Listens for messages sent to the acme/m4q7_able
destination, logs the message payload, and sends a response message to the original message’s response destination.
Here’s the event flow:
- You invoke the
m4q7:sendMessage
Gogo shell command, passing in a message. M4Q7BakerOSGiCommands
’ssendMessage(String)
method triggers on the Gogo shell command and sends the command arguments in a message to theacme/m4q7_able
destination.- A Message Bus thread delivers the message to
M4Q7CharlieMessageListener
. M4Q7CharlieMessageListener
logs the message payload and sends its own class name in a response message to the original message’s response destinationacme/m4q7_baker
.M4Q7BakerMessageListener
receives the response message and logs its payload.- Processing returns to
M4Q7BakerOSGiCommands
, where it logs the response to the original message.
Now examine each class, starting with the destination configurators.
Examine the Destination Configurators
The m4q7-able-impl
module and m4q7-baker-impl
module have destination configurator classes M4Q7AbleMessagingConfigurator
and M4Q7BakerMessagingConfigurator
, respectively. They each create and configure destinations.
The M4Q7AbleMessagingConfigurator
class configures the acme/m4q7_able
destination:
@Component
public class M4Q7AbleMessagingConfigurator {
@Activate
private void _activate(BundleContext bundleContext) {
DestinationConfiguration destinationConfiguration =
DestinationConfiguration.createSerialDestinationConfiguration(
"acme/m4q7_able");
Destination destination = _destinationFactory.createDestination(
destinationConfiguration);
_serviceRegistration = bundleContext.registerService(
Destination.class, destination,
MapUtil.singletonDictionary(
"destination.name", destination.getName()));
}
@Deactivate
private void _deactivate() {
if (_serviceRegistration != null) {
_serviceRegistration.unregister();
}
}
@Reference
private DestinationFactory _destinationFactory;
private ServiceRegistration<Destination> _serviceRegistration;
}
The M4Q7BakerMessagingConfigurator
class configures the acme/m4q7_baker
destination:
@Component
public class M4Q7BakerMessagingConfigurator {
@Activate
private void _activate(BundleContext bundleContext) {
DestinationConfiguration destinationConfiguration =
DestinationConfiguration.createSerialDestinationConfiguration(
"acme/m4q7_baker");
Destination destination = _destinationFactory.createDestination(
destinationConfiguration);
_serviceRegistration = bundleContext.registerService(
Destination.class, destination,
MapUtil.singletonDictionary(
"destination.name", destination.getName()));
}
@Deactivate
private void _deactivate() {
if (_serviceRegistration != null) {
_serviceRegistration.unregister();
}
}
@Reference
private DestinationFactory _destinationFactory;
private ServiceRegistration<Destination> _serviceRegistration;
}
Both configurators are Component
classes. They use the @Reference
annotation to inject a DestinationFactory
instance.
The _activate(BundleContext)
method uses the DestinationFactory
and a DestinationConfiguration
to create a serial destination. Lastly, the _activate(BundleContext)
method registers the Destination
in an OSGi service using the BundleContext
.
Only use serial or parallel destinations with default synchronous messaging. You can create them by calling DestinationConfiguration
’s createSerialDestinationConfiguration(String)
and createParallelDestinationConfiguration(String)
methods.
Don’t use synchronous destinations with default synchronous messaging because they nullify message sender timeouts.
When the configurators deactivate, their _deactivate()
methods unregister the destination services.
Examine the Listeners
The m4q7-charlie-impl
module’s M4Q7CharlieMessageListener
class listens for messages sent to the acme/m4q7_able
Destination
. It registers the same way Listening for Messages demonstrates.
M4Q7CharlieMessageListener
class:
@Component(
property = "destination.name=acme/m4q7_able",
service = MessageListener.class
)
public class M4Q7CharlieMessageListener implements MessageListener {
@Override
public void receive(Message message) {
if (_log.isInfoEnabled()) {
_log.info("Received message payload " + message.getPayload());
}
_messageBus.sendMessage(
message.getResponseDestinationName(),
new Message() {
{
setPayload("M4Q7CharlieMessageListener");
setResponseId(message.getResponseId());
}
});
}
private static final Log _log = LogFactoryUtil.getLog(
M4Q7CharlieMessageListener.class);
@Reference
private MessageBus _messageBus;
}
When M4Q7CharlieMessageListener
receives a message, its receive(Message)
method logs the message payload and sends a response message to the original message’s response destination. The method sets the response message payload to the listener class name and sets the response message ID to the original message’s response ID.
In default synchronous messaging, response messages must use the original message’s response ID and must be sent to the response destination.
The m4q7-baker-impl
module’s M4Q7BakerMessageListener
class listens for messages sent to acme/m4q7_baker
, which is the response destination for M4Q7BakerOSGiCommands
’s messages.
M4Q7BakerMessageListener
class:
@Component(
property = "destination.name=acme/m4q7_baker",
service = MessageListener.class
)
public class M4Q7BakerMessageListener implements MessageListener {
@Override
public void receive(Message message) {
if (_log.isInfoEnabled()) {
_log.info("Received message payload " + message.getPayload());
}
}
private static final Log _log = LogFactoryUtil.getLog(
M4Q7BakerMessageListener.class);
}
When M4Q7BakerMessageListener
receives a message, its receive(Message)
method logs the message payload.
Examine the Sender
The m4q7-baker-impl
module’s M4Q7BakerOSGiCommands
class provides an OSGi Command that triggers sending the command arguments in a message to the "acme/m4q7_able"
destination.
@Component(
property = {"osgi.command.function=sendMessage", "osgi.command.scope=m4q7"},
service = M4Q7BakerOSGiCommands.class
)
public class M4Q7BakerOSGiCommands {
public void sendMessage(String payload) throws MessageBusException {
Message message = new Message();
message.setPayload(payload);
message.setResponseDestinationName("acme/m4q7_baker");
Object response = _synchronousMessageSender.send(
"acme/m4q7_able", message, 10000);
if (_log.isInfoEnabled()) {
_log.info("Response: " + response);
}
}
private static final Log _log = LogFactoryUtil.getLog(
M4Q7BakerOSGiCommands.class);
@Reference(target = "(mode=DEFAULT)")
private SynchronousMessageSender _synchronousMessageSender;
}
M4Q7BakerOSGiCommands
is a service Component
of its own class type. It uses an @Reference
annotation to inject a SynchronousMessageSender
that’s set to default mode (specified by the annotation’s target = "(mode=DEFAULT)"
attribute).
In default mode, the SynchronousMessageSender
’s send
method blocks the calling class until a response message is received or until the sender times out.
M4Q7BakerOSGiCommands
’s @Component
properties define a Gogo shell command function called sendMessage
in the m4q7
scope. The command takes an input String
and maps to M4Q7BakerOSGiCommands
’s sendMessage(String)
method.
The sendMessage(String)
method creates a Message
with the Gogo shell command’s String
as the payload and "acme/m4q7_baker"
as the response destination.
The sendMessage(String)
method sends the message by calling SynchronousMessageSender
’s send(String, Message, long)
method, passing in the "acme/m4q7_able"
destination name, the message instance, and a 10000
millisecond timeout. In default mode, the SynchronousMessageSender
uses a Message Bus thread to deliver the message to message listeners. Execution blocks in the M4Q7BakerOSGiCommands
class until a message that has the original message’s response ID is received at the "acme/m4q7_baker"
response destination. When the response is received, execution continues in the M4Q7BakerOSGiCommands
sendMessage(String)
method, where it logs the message response. If the timeout expires before a matching response message is received, SynchronousMessageSender
’s send(String, Message, long)
method throws a MessageBusException
.
In default synchronous messaging, response messages must use the original message’s response ID and must be sent to the response destination.
Now that you’ve seen a message listener reply with a response message, you can test the response timeout.
Demonstrate the Response Timeout
Here’s how to turn off the message response logic to force the timeout.
-
In
M4Q7CharlieMessageListener
’sreceive(Message)
method, comment out the_messageBus.sendMessage(...)
call.@Override public void receive(Message message) { if (_log.isInfoEnabled()) { Object payload = message.getPayload(); _log.info("Received message payload " + payload.toString()); } // _messageBus.sendMessage( // message.getResponseDestinationName(), // new Message() { // { // setPayload("M4Q7CharlieMessageListener"); // setResponseId(message.getResponseId()); // } // }); }
-
Redeploy the example project.
./gradlew deploy -Ddeploy.docker.container.id=$(docker ps -lq)
-
In the Gogo shell command field, enter
m4q7:sendMessage
followed by a message. For example,m4q7:sendMessage foo
-
Confirm the Gogo shell page looks like this:
-
Confirm the message in the Docker console looks like this.
INFO [acme/m4q7_able-2][M4Q7CharlieMessageListener:23] Received message payload foo
M4Q7CharlieMessageListener
received the message but never replied to it. The SynchronousMessageSender
threw a MessageBusException
that was printed in the Gogo shell page.
Congratulations on sending messages synchronously with a timeout.
What’s Next
If you want to explore synchronous messaging using direct mode, see Using Direct Synchronous Messaging in Previous Versions.
If you want to continue processing immediately after sending a message, see Using Asynchronous Messaging.