Reactor で並列処理
Reactor の並列処理を試してみました。
ソースは http://github.com/fits/try_samples/tree/master/blog/20161208/
はじめに
Reactor で以下のようなコードを実行すると、map の処理と subscribe の処理を順番に 3回繰り返します。(repeat
メソッドの戻り値は Flux
)
(A) サンプルコード
Mono.just("(A)") .repeat(3) .map(n -> { printThread(n + "_map"); return n; }) .subscribe(n -> printThread(n + "_subscribe"));
(A) サンプルコードの実行結果
(A)_map, thread: Thread[main,5,main] (A)_subscribe, thread: Thread[main,5,main] (A)_map, thread: Thread[main,5,main] (A)_subscribe, thread: Thread[main,5,main] (A)_map, thread: Thread[main,5,main] (A)_subscribe, thread: Thread[main,5,main]
ここで、今回は map からの処理を別スレッドで並列実行してみます。
publishOn の場合
publishOn
を使うと main スレッドとは別のスレッドで実行しますが、順番に実行する点は変わりません。
(B) publishOn を使った場合
Mono.just("(B)") .repeat(3) .publishOn(Schedulers.parallel()) // 実質的に以下と同じ //.publishOn(Schedulers.single()) .map(n -> { printThread(n + "_map"); return n; }) .subscribe(n -> printThread(n + "_subscribe"));
(B) publishOn を使った場合の実行結果
(B)_map, thread: Thread[parallel-1,5,main] (B)_subscribe, thread: Thread[parallel-1,5,main] (B)_map, thread: Thread[parallel-1,5,main] (B)_subscribe, thread: Thread[parallel-1,5,main] (B)_map, thread: Thread[parallel-1,5,main] (B)_subscribe, thread: Thread[parallel-1,5,main]
publishOn へ Schedulers.parallel()
を渡しても複数のスレッドを使うわけでは無いようなので、実質的に Schedulers.single()
と変わりは無さそうです。
ParallelFlux を使った並列処理
並列で実行するためには parallel
メソッドを使って Flux
から ParallelFlux
を取得し、runOn
メソッドで Schedulers.parallel()
※ を指定します。
※ この場合、Schedulers.parallel() の代わりに Schedulers.single() を使うと並列にならない点に注意 (単一スレッドで実行する事になる)
(C) parallel + runOn
Mono.just("(C)") .repeat(3) .parallel() .runOn(Schedulers.parallel()) .map(n -> { printThread(n + "_map"); return n; }) .subscribe(n -> printThread(n + "_subscribe"));
(C) parallel + runOn の実行結果
(C)_map, thread: Thread[parallel-1,5,main] (C)_map, thread: Thread[parallel-3,5,main] (C)_map, thread: Thread[parallel-2,5,main] (C)_subscribe, thread: Thread[parallel-3,5,main] (C)_subscribe, thread: Thread[parallel-1,5,main] (C)_subscribe, thread: Thread[parallel-2,5,main]
3つのスレッドを使って並列に実行できているようです。
なお、runOn
をコメントアウトして実行すると (A) と同様の結果(main スレッドで順番に実行)になります。
備考
最後に、今回使用したソースを記載しておきます。
build.gradle
apply plugin: 'application' mainClassName = 'App' repositories { jcenter() } dependencies { compile 'io.projectreactor:reactor-core:3.0.3.RELEASE' } run { standardInput = System.in if (project.hasProperty('args')) { args project.args } }
src/main/java/App.java
import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import java.lang.invoke.MethodHandle; import static java.lang.invoke.MethodHandles.publicLookup; import static java.lang.invoke.MethodType.methodType; public class App { public static void main(String... args) throws Throwable { MethodHandle mh = publicLookup().findStatic( App.class, "sample" + args[0], methodType(void.class) ); mh.invoke(); System.in.read(); } // (A) public static void sampleA() { Mono.just("(A)") .repeat(3) .map(n -> { printThread(n + "_map"); return n; }) .subscribe(n -> printThread(n + "_subscribe")); } // (B) publishOn を使った場合 public static void sampleB() { Mono.just("(B)") .repeat(3) .publishOn(Schedulers.parallel()) //.publishOn(Schedulers.single()) .map(n -> { printThread(n + "_map"); return n; }) .subscribe(n -> printThread(n + "_subscribe")); } // (C) parallel + runOn public static void sampleC() { Mono.just("(C)") .repeat(3) .parallel() .runOn(Schedulers.parallel()) .map(n -> { printThread(n + "_map"); return n; }) .subscribe(n -> printThread(n + "_subscribe")); } private static void printThread(String msg) { System.out.println(msg + ", thread: " + Thread.currentThread()); } }
実行例
> gradle -q run -Pargs=B (B)_map, thread: Thread[parallel-1,5,main] (B)_subscribe, thread: Thread[parallel-1,5,main] (B)_map, thread: Thread[parallel-1,5,main] (B)_subscribe, thread: Thread[parallel-1,5,main] (B)_map, thread: Thread[parallel-1,5,main] (B)_subscribe, thread: Thread[parallel-1,5,main]