CountDownLatchを改造して、別スレッドの複数並列処理の結果に応じて処理する
Javaのconcurrentパッケージには同期処理を行うための便利な機能が用意されている。
今回注目したのはCountDownLatch.
CountDownLatchを使うと、他の複数スレッドの処理の完了を待ってからメインのスレッドで処理を継続できる。
こんなカンジで。
public void execute() { CountDownLatch latch = new CountDownLatch(3); Runnable otherThreadTask1 = this.doTask(task1, latch); Runnable otherThreadTask2 = this.doTask(task2, latch); Runnable otherThreadTask3 = this.doTask(task3, latch); // 別スレッドでの処理を登録 ExecutorService service = Executors.newFixedThreadPool(3); service.submit(otherThreadTask1); service.submit(otherThreadTask2); service.submit(otherThreadTask3); try { // task1〜3の処理が完了するまで待つ latch.await(); // 以降の処理を行う doSomething(); } catch (InterruptedException iEx) { // 中断処理 } } // 別スレッドで行うタスクを生成する。 public Runnable doTask(Runnable runnable, CountDownLatch latch) { final Runnable mainTask = runnable; final CountDownLatch targetLatch = latch; Runnable task = new Runnable() { public void run() { mainTask.run(); // 処理完了を通知 targetLatch.countDown(); } }; return task; }
CountDownLatchそのもの使い方について触れたい訳ではないので、
この辺の話はこちらにお任せして。
単純に複数スレッドの処理完了を待つのであればこれで問題ないが、実際にはもうちょっと複雑な状況が発生する。
- 別スレッドの処理のうち、どれか1つが完了すれば、await()の待ち処理を解除したい
- でも、別スレッドの処理が全て異常終了することもあるから、このときは全て異常終了するのを待ちたい
こういった状況では、単にCountDownLatchを使用するのでは対応できない。
他スレッドの異常終了を検知するための同期機構が必要になる。
また、正常終了/異常終了の結果をもとに何かしら処理することになるので、
await()を呼び出したスレッドで、これらの結果を取得する同期機構も必要になったり。。
もう少し、具体的にすると、以下のような状況になる。
レプリケートしている複数のサーバに同時に接続して、どこかのサーバに接続成功したら、その接続先に対して処理を行う。全てのサーバに接続できなかった場合は数回リトライして、だめなら処理中断。
並列処理にせずに順番にサーバに接続を試みると、タイムアウトで時間がかかるし、しかも接続先が多くなるので、異常時はそのタイムアウトがサーバ台数発生する、そんなの運用にたえられないよ。
どうせ、どこかは活きているはずだから、そこにすぐ繋げるようにしてよ。。
といった、ちょっと面倒くさい要望。
要は、CountDownLatchのawait()実行後、正常/異常で処理を分岐させたい。
正常処理は1つのcountDown()でawait()ロックを解放させたいが、
異常処理は3つのcountDown()でawait()ロックを解放させたい。
そして、ロック解放後は、正常/異常をCountDownLatchが保持しているはずだから、
そこから正常/異常を判断したい。
標準APIを調べてみても、このようなことができるクラスが見つからなかった(あるいは単なる知識不足か)ので、CountDownLatchで上手いことすれば出来るか確認するために、実装を覗いてみた。
詳細は省略するが、CountDownLatchの同期機構は内部で利用しているAbstractQueuedSynchronizerが担っている。
そしてAbstractQueuedSynchronizerは、保持しているstateパラメータで同期をとっている。
このstateパラメータはint型で、外部から利用できるのはこのパラメータ1つのみ。
なお、具体的な同期メカニズムはjava内部に隠ぺいされていた。
つまり、CountDownLatchの同期機構は1パラメータでロック取得/解放をしている。
ここで目的としている、正常/異常といった複数パラメータの管理はできないことがわかった。
ということで、こういった処理ができるクラスConditionLatchを作ってみた。
方針としては、AbstractQueuedSynchronizerの同期機構を利用して、
1つのstateパラメータで複数の状態を無理やり管理させる。
どうせ、int型の最大値からcountDownさせることは殆どないのだから、その余りそうな箇所を有効活用してみた。
ConditionLatchのコードはGitHubに公開したので、詳細はそちらをみてもらいたい。
https://github.com/kajitiluna/ConditionLatch
このクラスを使用することで、上記の状況は以下のような実装で対応できる。
public void execute() { // 正常処理のcountを1で、異常処理のcountを3でlatchを作成 ConditionLatch<SomeConnection, SomeException> latch = new ConditionLatch<>(1, 3); Runnable task1 = this.connectTask(server1, latch); Runnable task2 = this.connectTask(server2, latch); Runnable task3 = this.connectTask(server3, latch); // 別スレッドでの処理を登録 ExecutorService service = Executors.newFixedThreadPool(3); service.submit(task1); service.submit(task2); service.submit(task3); try { List<SomeConnection> resultList = latch.await(); SomeConnection connection = resultList.get(0); // 接続情報をもとに以降の処理を行う doSomething(connection); } catch (SubmittedFailureResultException sfEx) { // 処理失敗時の処理 } catch (InterruptedException iEx) { // Interrupt時の処理 } } public Runnable connectTask(Server server, ConditionLatch<?, ?> latch) { final Server targetServer = server; final ConditionLatch<?, ?> targetLatch = latch; Runnable task = new Runnable() { @Override public void run() { try { SomeConnection connection = connectServer(targetServer); // 接続できたときに接続情報をlatchに渡す targetLatch.submit(connection); // submit()で正常処理をcountDown } catch (SomeException ex) { // 接続に失敗したとき targetLatch.submitForFail(ex); // submitForFail()で異常処理をcountDown } } }; return task; }
これで、正常/異常で処理を分けるCountDownLatchを簡単に実装できる。
ちなみに、今回は同期パラメータがintであるAbstractQueuedSynchronizerを使用したが、
実装は同じでパラメータ型をlongに変えただけのAbstractQueuedLongSynchronizerというのもある。
広い範囲が必要なら、こちらのクラスを使用するほうがいいかも。