Using Direct Synchronous Messaging in Previous Versions
Synchronous messaging was removed and is no longer supported for Liferay DXP 7.4 U49/Liferay Portal 7.4 GA49 and above.
Direct synchronous messaging is the easiest way to block processing until all listeners receive a message. You call the SynchronousMessageSender
’s send(String, Message)
method and pass in a destination name and message instance. The SynchronousMessageSender
uses the current thread to process message reception directly in each of the destination’s registered message listeners. When listener processing completes, execution continues in the class that called the send(String, Message)
method. This example demonstrates using direct synchronous messaging.
Send a Direct Synchronous Message
In an example project, you’ll use a SynchronousMessageSender
to send a message directly to two listeners.
Start a new Liferay instance by running
docker run -it -m 8g -p 8080:8080 liferay/portal:7.4.3.48-ga48
Sign in to Liferay at http://localhost:8080. Use the email address test@liferay.com and the password test. When prompted, change the password to learn.
Then, follow these steps:
-
Download and unzip the example.
curl https://resources.learn.liferay.com/dxp/latest/en/building-applications/core-frameworks/message-bus/liferay-x6n5.zip -O
unzip liferay-x6n5.zip
-
Build and deploy the example project modules.
cd liferay-x6n5
./gradlew deploy -Ddeploy.docker.container.id=$(docker ps -lq)
NoteThis command is the same as copying the module JARs to
/opt/liferay/osgi/modules
on the Docker container. -
The Docker container console shows that the modules started.
STARTED com.acme.x6n5.able.impl_1.0.0 STARTED com.acme.x6n5.baker.impl_1.0.0 STARTED com.acme.x6n5.charlie.impl_1.0.0 STARTED com.acme.x6n5.dog.impl_1.0.0
-
Visit the Liferay instance with your browser at
http://localhost:8080
and sign in using your credentials. -
Open the Gogo Shell.
-
In the Gogo Shell command field, enter
x6n5:sendMessage
followed by a message. For example,x6n5:sendMessage foo
-
Confirm the output looks like this.
INFO [pipe-x6n5:sendMessage foo][X6N5DogMessageListener:21] Received message payload foo INFO [pipe-x6n5:sendMessage foo][X6N5CharlieMessageListener:21] Received message payload foo INFO [pipe-x6n5:sendMessage foo][X6N5BakerOSGiCommands:28] Response: X6N5CharlieMessageListener
The thread blocks in the message sender (i.e., X6N5BakerOSGiCommands
) when it sends the message. After processing the message in X6N5CharlieMessageListener
and X6N5DogMessageListener
, the thread continues in the message sender.
Project Overview
The four example modules have one class. On class manages the destination, another sends a message, and the other two listen for messages sent to the destination.
Example Classes:
Class | Module | Description |
---|---|---|
X6N5AbleMessagingConfigurator | x6n5-able-impl | Creates a message destination named acme/x6n5_able and registers it with the Message Bus. |
X6N5BakerOSGiCommands | x6n5-baker-impl | Sends a message to the acme/x6n5_able destination and logs the response. |
X6N5CharlieMessageListener | x6n5-charlie-impl | Listens for messages sent to the acme/x6n5_able destination. It logs the message payload and sets a response on the message. |
X6N5DogMessageListener | x6n5-dog-impl | Listens for messages sent to the acme/x6n5_able destination. It logs the message payload and sets a response on the message. |
Here’s the event flow:
-
When a user executes the
x6n5:sendMessage
Gogo shell command,X6N5BakerOSGiCommands
sends the command arguments in a message payload to theacme/x6n5_able
destination. -
The current thread processes message reception for each listener (i.e.,
X6N5CharlieMessageListener
andX6N5DogMessageListener
) in succession. The listeners log the message payload and set a response on the message. The response from the latest listener processed supersedes previous responses. -
Processing returns to
X6N5BakerOSGiCommands
, where it logs the message response.
Now you can examine each class, starting with the destination configurator.
Examine the Destination Configurator
The x6n5-able-impl
module’s X6N5AbleMessagingConfigurator
class creates and configures a destination named acme/x6n5_able
. Here’s the code:
@Component
public class X6N5AbleMessagingConfigurator {
@Activate
private void _activate(BundleContext bundleContext) {
Destination destination = _destinationFactory.createDestination(
DestinationConfiguration.createSynchronousDestinationConfiguration(
"acme/x6n5_able"));
_serviceRegistration = bundleContext.registerService(
Destination.class, destination,
MapUtil.singletonDictionary(
"destination.name", destination.getName()));
}
@Deactivate
private void _deactivate() {
if (_serviceRegistration != null) {
_serviceRegistration.unregister();
}
}
@Reference
private DestinationFactory _destinationFactory;
private ServiceRegistration<Destination> _serviceRegistration;
}
This configurator is a Component
class. It uses the @Reference
annotation to inject a DestinationFactory
instance.
The _activate(BundleContext)
method uses the DestinationFactory
and a DestinationConfiguration
to create a synchronous destination named acme/x6n5_able
. Synchronous destinations are optimized for synchronous messaging. Lastly, the method registers the Destination
in an OSGi service using the BundleContext
.
When X6N5AbleMessagingConfigurator
deactivates, its _deactivate()
method unregisters the destination service.
Examine the Sender
The x6n5-baker-impl
module’s X6N5BakerOSGiCommands
class provides an OSGi Command that sends messages to the destination:
@Component(
property = {"osgi.command.function=sendMessage", "osgi.command.scope=x6n5"},
service = X6N5BakerOSGiCommands.class
)
public class X6N5BakerOSGiCommands {
public void sendMessage(String payload) throws MessageBusException {
Message message = new Message();
message.setPayload(payload);
Object response = _synchronousMessageSender.send(
"acme/x6n5_able", message);
System.out.println("Response: " + response);
}
@Reference(target = "(mode=DIRECT)")
private SynchronousMessageSender _synchronousMessageSender;
}
X6N5BakerOSGiCommands
is a service Component
of its own class type. It uses a @Reference
annotation to inject a SynchronousMessageSender
that’s set to direct mode (specified by the annotation’s target = "(mode=DIRECT)"
attribute).
In direct mode, the SynchronousMessageSender
send
method blocks the calling class until the current thread delivers the message to all listeners.
X6N5BakerOSGiCommands
’s @Component
properties define a Gogo shell command function called sendMessage
and in the x6n5
scope. The command and maps to the sendMessage(String)
method and takes an input String
.
The sendMessage(String)
method creates a Message
with the Gogo shell command’s String
as a payload. The SynchronousMessageSender
send(String, Message)
method uses the current thread to deliver the message to acme/x6n5_able
Destination
message listeners. Execution blocks in the X6N5BakerOSGiCommands
class until the thread processes the message in all the MessageListener
s. Then execution continues in the X6N5BakerOSGiCommands
sendMessage(String)
method, where it logs the message response.
Examine the Listeners
The x6n5-charlie-impl
module’s X6N5CharlieMessageListener
class and x6n5-dog-impl
module’s X6N5DogMessageListener
class listen for messages sent to the acme/x6n5_able
Destination
. They register the same way Listening for Messages demonstrates.
X6N5CharlieMessageListener
class:
@Component(
property = "destination.name=acme/x6n5_able",
service = MessageListener.class
)
public class X6N5CharlieMessageListener implements MessageListener {
@Override
public void receive(Message message) {
System.out.println("Received message payload " + message.getPayload());
message.setResponse("X6N5CharlieMessageListener");
}
}
X6N5DogMessageListener
class:
@Component(
property = "destination.name=acme/x6n5_able",
service = MessageListener.class
)
public class X6N5DogMessageListener implements MessageListener {
@Override
public void receive(Message message) {
System.out.println(
"Received message payload " + message.getPayload());
message.setResponse("X6N5DogMessageListener");
}
}
Each listener’s receive(Message)
method logs the message payload and then sets the message response to its own class name.
Congratulations! You know how to use direct synchronous messaging.
What’s Next
If you want to explore synchronous messaging using default mode, see Using Default Synchronous Messaging in Previous Versions.
If you want to continue processing immediately after sending a message, see Using Asynchronous Messaging.