メッセージングパフォーマンスのチューニング
メッセージングパフォーマンスは、宛先で調整されます。 パフォーマンスは、宛先タイプ、メッセージリスナーが必要とする処理の量、およびメッセージの処理に使用できるスレッドプールによって異なります。
3つの宛先タイプは次のとおりです。
パラレル宛先
- ここで送信されたメッセージはキューに入れられます。
- スレッドプールのワーカースレッドは、登録されたメッセージリスナーにメッセージを配信します。1つのメッセージリスナーにつきメッセージごとに1つのワーカースレッドがあります。 スレッドは、同じメッセージを宛先のメッセージリスナーに同時に配信します。
シリアル宛先
- ここで送信されたメッセージはキューに入れられます。
- スレッドプールのワーカースレッドは、登録されたメッセージリスナーにメッセージを配信します(メッセージごとに1つのワーカースレッド)。
同期宛先
- ここで送信されるメッセージは、メッセージリスナーに直接配信されます。
- メッセージを送信するスレッドは、すべてのメッセージリスナーにもメッセージを配信します。
該当する宛先タイプを使用して、さまざまな方法でメッセージを送信できます。
宛先タイプの互換性
以下は、非同期メッセージング、デフォルトの同期メッセージング、および直接同期メッセージングとの各宛先タイプの互換性です。
宛先タイプ | 非同期メッセージング | デフォルトの同期メッセージング | 直接同期メッセージング |
---|---|---|---|
パラレル | はい | はい | いいえ |
シリアル | はい | はい | いいえ |
同期 | いいえ | いいえ | はい |
ここでは、サンプルプロジェクトのメッセージングパフォーマンスを調べることから始めます。 次に、APIを使用して宛先統計を取得し、宛先を設定します。 最後に、サンプルの宛先設定を再構成し、サンプルを再実行して、統計を調べます。
サンプルプロジェクトでメッセージングを監視する
サンプルプロジェクトは、宛先を作成し、メッセージリスナーを登録し、Gogoシェルコマンドを介して宛先統計を一覧表示します。
新しいLiferay インスタンスを起動し、以下を実行します。
docker run -it -m 8g -p 8080:8080 liferay/portal:7.4.3.55-ga55。
http://localhost:8080でLiferayへのサインインします。 メールアドレス[email protected]_とパスワード_test_を使用してください。 プロンプトが表示されたら、パスワードを _learn_に変更します。
その後、以下の手順を実行してください。
サンプルをダウンロードし、解凍してください。
curl https://resources.learn.liferay.com/dxp/latest/ja/building-applications/core-frameworks/message-bus/liferay-w3r2.zip -O
unzip liferay-w3r2.zip
サンプルのプロジェクトモジュールをビルドしてデプロイします。
cd liferay-w3r2
./gradlew deploy -Ddeploy.docker.container.id=$(docker ps -lq)
noteこのコマンドは、モジュールJARをDockerコンテナの
/opt/liferay/osgi/modules
にコピーするのと同じです。Dockerコンテナコンソールはモジュールの起動を確認し、宛先の構成を報告します。
STARTED com.acme.w3r2.charlie.impl_1.0.0 [1390] STARTED com.acme.w3r2.able.impl_1.0.0 [1388] [W3R2AbleMessagingConfigurator:27] {_destinationName=acme/w3r2_able, _destinationType=serial, _maximumQueueSize=2147483647, _rejectedExecutionHandler=null, _workersCoreSize=2, _workersMaxSize=5} STARTED com.acme.w3r2.baker.impl_1.0.0 [1389]
ブラウザで
http://localhost:8080
にあるLiferayインスタンスにアクセスし、認証情報を使用してサインインします。スクリプトコンソールを開きます。
スクリプトフィールドで、次のGroovyコードを実行してメッセージを送信します。
import com.liferay.portal.kernel.messaging.*; MessageBusUtil.sendMessage( "acme/w3r2_able", new Message() { { setPayload("foo"); } });
W3R2BakerMessageListenerManager
からのメッセージリスナーがメッセージを受信したことを確認します。[acme/w3r2_able-2][W3R2BakerMessageListenerManager:30] Received message payload foo [acme/w3r2_able-2][W3R2BakerMessageListenerManager:30] Received message payload foo [acme/w3r2_able-2][W3R2BakerMessageListenerManager:30] Received message payload foo [acme/w3r2_able-2][W3R2BakerMessageListenerManager:30] Received message payload foo [acme/w3r2_able-2][W3R2BakerMessageListenerManager:30] Received message payload foo
Gogoシェルを開きます。
Gogoシェルコマンドフィールドで
w3r2:listDestinationStats
コマンドを実行して、宛先統計を取得します。w3r2:listDestinationStats
acme/w3r2_able
宛先の10個のリスナーと送信されたメッセージ数を確認します。[pipe-w3r2:listDestinationStats][W3R2CharlieOSGiCommands:29] acme/w3r2_able active thread count 0, current thread count 1, largest thread count 1, max thread pool size 1, message listener count 10, min thread pool size 1, pending message count 0, sent message count 1
この例の3つのモジュールは、宛先を構成し、10個のメッセージリスナーを登録し、宛先の統計を一覧表示するGogoシェルコマンドを提供しました。
w3r2-able-impl
のW3R2AbleMessagingConfigurator
が有効になると、acme/w3r2_able
宛先が構成され、DestinationConfiguration
のtoString()
値がログに記録されます。
@Activate
private void _activate(BundleContext bundleContext) {
DestinationConfiguration destinationConfiguration =
DestinationConfiguration.createSerialDestinationConfiguration(
"acme/w3r2_able");
if (_log.isInfoEnabled()) {
_log.info(destinationConfiguration.toString());
}
Destination destination = _destinationFactory.createDestination(
destinationConfiguration);
_serviceRegistration = bundleContext.registerService(
Destination.class, destination,
MapUtil.singletonDictionary(
"destination.name", destination.getName()));
}
w3r2-charlie-impl
モジュールのW3R2CharlieOSGiCommands
は、それが提供するw3r2:listDestinationStats
Gogoシェルコマンドを使用して宛先統計をログに記録します。 W3R2CharlieOSGiCommands
のlistDestinationStats()
メソッドが宛先統計を取得する方法を調べます。
@Component(
property = {
"osgi.command.function=listDestinationStats", "osgi.command.scope=w3r2"
},
service = W3R2CharlieOSGiCommands.class
)
public class W3R2CharlieOSGiCommands {
public void listDestinationStats() {
if (_log.isInfoEnabled()) {
Destination destination = _messageBus.getDestination(
"acme/w3r2_able");
DestinationStatistics destinationStatistics =
destination.getDestinationStatistics();
_log.info(
StringBundler.concat(
"acme/w3r2_able active thread count ",
destinationStatistics.getActiveThreadCount(),
", current thread count ",
destinationStatistics.getCurrentThreadCount(),
", largest thread count ",
destinationStatistics.getLargestThreadCount(),
", max thread pool size ",
destinationStatistics.getMaxThreadPoolSize(),
", message listener count ",
destination.getMessageListenerCount(),
", min thread pool size ",
destinationStatistics.getMinThreadPoolSize(),
", pending message count ",
destinationStatistics.getPendingMessageCount(),
", sent message count ",
destinationStatistics.getSentMessageCount()));
}
}
private static final Log _log = LogFactoryUtil.getLog(
W3R2CharlieOSGiCommands.class);
@Reference
private MessageBus _messageBus;
}
listDestinationStats()
メソッドは、_messageBus
インスタンスを使用してDestination
を取得してから、宛先からDestinationStatistics
インスタンスを取得します。 宛先は、DestinationStatistics
オブジェクトに最新の統計を入力します。 このメソッドは、次の宛先情報をログに記録します。
- アクティブなスレッド数
- 現在のスレッド数
- 最大スレッド数
- 最大スレッドプールサイズ
- メッセージリスナー数
- 最小(開始)スレッドプールサイズ
- 保留中のメッセージ数
- 送信されたメッセージ数
これと同じAPIを使用してメッセージの宛先を監視できます。
メッセージングの監視
メッセージングAPIは、宛先の設定に応じたメッセージングパフォーマンスの監視を容易にします。 次の表に、宛先設定とメッセージング統計にアクセスするためのAPIメソッドを示します。
宛先設定:
宛先設定 | APIメソッド |
---|---|
宛先タイプ | Destination#getDestinationType() |
最大スレッドプールサイズ | DestinationConfiguration#getWorkersMaxSize() and DestinationStatistic#getMaxThreadPoolSize() |
最小スレッドプールサイズ | DestinationConfiguration#getWorkersCoreSize() and DestinationStatistic#getMinThreadPoolSize() |
メッセージキューのサイズ | DestinationConfiguration#getMaximumQueueSize() |
宛先統計:
宛先統計 | APIメソッド |
---|---|
メッセージリスナー数 | Destination#getMessageListenerCount() |
保留中のメッセージの数 | DestinationStatistics#getPendingMessageCount() |
送信されたメッセージ数 | DestinationStatistics#getSentMessageCount() |
現在のスレッド数 | DestinationStatistics#getCurrentThreadCount() |
アクティブなスレッド数 | DestinationStatistics#getActiveThreadCount() |
最大スレッド数 | DestinationStatistics#getLargestThreadCount() |
宛先の設定に応じて宛先の統計を把握するようにしてください。
宛先統計を調べた後、宛先を再構成することでパフォーマンスの向上を試みることができます。
宛先タイプの変更
シリアル宛先を使用していて、メッセージが一部のメッセージリスナーに十分な速度で到達しない場合は、最大スレッドプールサイズを増やすか(以下で説明します)、パラレル宛先タイプに切り替えてみてください。 メッセージバスは、スレッドプールのスレッドを使用して、パラレル宛先メッセージリスナーを同時に処理します。
現在の DestinationConfiguration
を必要なタイプの1つに置き換えることで、宛先タイプを切り替えることができます。 該当するDestinationConfiguration
メソッドを使用して、新しいパラレルまたはシリアルのDestinationConfiguration
を作成します。
createParallelDestinationConfiguration(String)
createSerialDestinationConfiguration(String)
詳細については、 サンプルの宛先を再構成する を参照してください。
メッセージキューとスレッドプールの構成
各シリアルおよびパラレル宛先には、メッセージキューと専用スレッドプールがあります。
キューがいっぱいになったときにメッセージが到着した場合、宛先のRejectedExecutionHandler
がメッセージを処理します。 デフォルトのハンドラーはメッセージを破棄し、警告をログに記録します。 デフォルトのメッセージキューの最大サイズはJavaの最大整数値ですが、この数は必要に応じて減らすことができます。
メッセージバスは、宛先のスレッドプールからメッセージリスナー処理スレッドを引き出します。 プールには、開始サイズと最大サイズがあります。
次の DestinationConfiguration
メソッドを使用して、メッセージキューの最大サイズ、拒否された実行ハンドラー、スレッドプールの開始サイズ(コアサイズ)、およびスレッドプールの最大サイズを変更できます。
setMaximumQueueSize(int maximumQueueSize)
setRejectedExecutionHandler(RejectedExecutionHandler rejectedExecutionHandler)
setWorkersCoreSize(int workersCoreSize)
setWorkersMaxSize(int workersMaxSize)
次に、サンプルの宛先を再構成します。
サンプルの宛先の再構成
ここでは、次の設定を使用して、サンプルのacme/w3r2_able
宛先を再構成します。
- 宛先タイプ:
parallel
- 開始スレッドプールサイズ:
10
- 最大スレッドプールサイズ:
20
手順は次のとおりです。
W3R2AbleMessagingConfigurator
の_activate(BundleContext)
メソッドを次のコードに置き換えて、別のDestinationConfiguration
を使用します。@Activate private void _activate(BundleContext bundleContext) { DestinationConfiguration destinationConfiguration = DestinationConfiguration.createParallelDestinationConfiguration( "acme/w3r2_able"); destinationConfiguration.setWorkersCoreSize(10); destinationConfiguration.setWorkersMaxSize(20); if (_log.isInfoEnabled()) { _log.info(destinationConfiguration.toString()); } Destination destination = _destinationFactory.createDestination( destinationConfiguration); _serviceRegistration = bundleContext.registerService( Destination.class, destination, MapUtil.singletonDictionary( "destination.name", destination.getName())); }
モジュールを再デプロイします。
./gradlew deploy -Ddeploy.docker.container.id=$(docker ps -lq)
Dockerコンテナコンソールは、
w3r2-able-impl
モジュールの起動を確認し、宛先構成を報告します。STARTED com.acme.w3r2.able.impl_1.0.0 [1388] [W3R2AbleMessagingConfigurator:27] {_destinationName=acme/w3r2_able, _destinationType=parallel, _maximumQueueSize=2147483647, _rejectedExecutionHandler=null, _workersCoreSize=10, _workersMaxSize=20}
次のGogoシェルコマンドを実行して、メッセージリスナーモジュール(Acme W3R2 Baker実装)のIDを取得します。
lb | grep W3R2
各行は、対応するモジュールのID番号で始まります。
1388|Active | 10|Acme W3R2 Able Implementation (1.0.0)|1.0.0 1389|Active | 10|Acme W3R2 Baker Implementation (1.0.0)|1.0.0 1390|Active | 10|Acme W3R2 Charlie Implementation (1.0.0)|1.0.0
次のGogoシェルコマンドを使用してメッセージリスナーモジュールを再起動することにより、メッセージリスナーを宛先置換にバインドします。 番号をモジュールのIDに置き換えます。
stop 1389
start 1389
スクリプトコンソールで次のGroovyコードを再度実行して、別のメッセージを送信します。
import com.liferay.portal.kernel.messaging.*; MessageBusUtil.sendMessage( "acme/w3r2_able", new Message() { { setPayload("foo"); } });
Gogoシェルで
w3r2:listDestinationStats
コマンドを実行して、宛先統計を取得します。w3r2:listDestinationStats
次ようなログメッセージは、新しい設定を確認するためのものです。
[pipe-w3r2:listDestinationStats][W3R2CharlieOSGiCommands:29] acme/w3r2_able
active thread count 0, current thread count 10, largest thread count 10, max
thread pool size 20, message listener count 10, min thread pool size 10,
pending message count 0, sent message count 2
これで、宛先でメッセージングを監視し、宛先設定を調整する方法がわかりました。 さまざまな設定をテストして、パフォーマンスを最適化できます。