RxJava2 で並列処理
前回 と同じような並列処理を RxJava 2.0 で試してみました。
ソースは http://github.com/fits/try_samples/tree/master/blog/20161226/
はじめに
まずは、map の処理と subscribe の処理を順番に 3回繰り返す処理です。
前回の Reactor との違いは Mono
が Single
に変わっただけです。
(A) サンプルコード
Single.just("(A)") .repeat(3) .map(n -> { printThread(n + "_map"); return n; }) .subscribe(n -> printThread(n + "_subscribe"));
(A) サンプルコードの実行結果
(A)_map, thread: Thread[main,5,main] (A)_subscribe, thread: Thread[main,5,main] (A)_map, thread: Thread[main,5,main] (A)_subscribe, thread: Thread[main,5,main] (A)_map, thread: Thread[main,5,main] (A)_subscribe, thread: Thread[main,5,main]
observeOn の場合
RxJava 2.0 では Reactor の publishOn
の代わりに observeOn
が使えます。
main スレッドとは別のスレッドで実行しますが、順番に実行する点は変わりません。
(B) observeOn を使った場合
Single.just("(B)") .repeat(3) .observeOn(Schedulers.computation()) // 実質的には以下でも同じ //.observeOn(Schedulers.single()) .map(n -> { printThread(n + "_map"); return n; }) .subscribe(n -> printThread(n + "_subscribe"));
(B) observeOn を使った場合の実行結果
(B)_map, thread: Thread[RxComputationThreadPool-1,5,main] (B)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main] (B)_map, thread: Thread[RxComputationThreadPool-1,5,main] (B)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main] (B)_map, thread: Thread[RxComputationThreadPool-1,5,main] (B)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main]
flatMap と subscribeOn を使った並列処理
RxJava 2.0 では Reactor の ParallelFlux
相当の処理は用意されていないようです。
そこで、flatMap
と subscribeOn
を使ってみました。
(C) flatMap + subscribeOn
Single.just("(C)") .repeat(3) .flatMap(s -> Flowable.just(s) .map(n -> { printThread(n + "_map"); return n; }) .subscribeOn(Schedulers.computation()) //.subscribeOn(Schedulers.newThread()) ) .subscribe(n -> printThread(n + "_subscribe"));
ただし、以下の実行結果のように別スレッドで実行してはいるものの、map と subscribe を同じスレッドで実行するわけではありませんでした。
なお、Schedulers.computation()
の場合、スレッド数は実行環境のコア数に依存するようです。(newThread は新しいスレッドを使う)
(C) flatMap + subscribeOn の実行結果1
(C)_map, thread: Thread[RxComputationThreadPool-2,5,main] (C)_map, thread: Thread[RxComputationThreadPool-3,5,main] (C)_map, thread: Thread[RxComputationThreadPool-1,5,main] (C)_subscribe, thread: Thread[RxComputationThreadPool-2,5,main] (C)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main] (C)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main]
(C) flatMap + subscribeOn の実行結果2
(C)_map, thread: Thread[RxComputationThreadPool-1,5,main] (C)_map, thread: Thread[RxComputationThreadPool-3,5,main] (C)_map, thread: Thread[RxComputationThreadPool-2,5,main] (C)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main] (C)_subscribe, thread: Thread[RxComputationThreadPool-2,5,main] (C)_subscribe, thread: Thread[RxComputationThreadPool-2,5,main]
備考
最後に、今回使用したソースを記載しておきます。
build.gradle
apply plugin: 'application' mainClassName = 'App' repositories { jcenter() } dependencies { compile 'io.reactivex.rxjava2:rxjava:2.0.3' } run { standardInput = System.in if (project.hasProperty('args')) { args project.args } }
src/main/java/App.java
import io.reactivex.Single; import io.reactivex.Flowable; import io.reactivex.schedulers.Schedulers; import java.lang.invoke.MethodHandle; import static java.lang.invoke.MethodHandles.publicLookup; import static java.lang.invoke.MethodType.methodType; public class App { public static void main(String... args) throws Throwable { MethodHandle mh = publicLookup().findStatic( App.class, "sample" + args[0], methodType(void.class) ); mh.invoke(); System.in.read(); } // (A) public static void sampleA() { Single.just("(A)") .repeat(3) .map(n -> { printThread(n + "_map"); return n; }) .subscribe(n -> printThread(n + "_subscribe")); } // (B) public static void sampleB() { Single.just("(B)") .repeat(3) .observeOn(Schedulers.computation()) //.observeOn(Schedulers.single()) .map(n -> { printThread(n + "_map"); return n; }) .subscribe(n -> printThread(n + "_subscribe")); } // (C) public static void sampleC() { Single.just("(C)") .repeat(3) .flatMap(s -> Flowable.just(s) .map(n -> { printThread(n + "_map"); return n; }) .subscribeOn(Schedulers.computation()) //.subscribeOn(Schedulers.newThread()) ) .subscribe(n -> printThread(n + "_subscribe")); } private static void printThread(String msg) { System.out.println(msg + ", thread: " + Thread.currentThread()); } }
実行例
> gradle -q run -Pargs=C (C)_map, thread: Thread[RxComputationThreadPool-2,5,main] (C)_map, thread: Thread[RxComputationThreadPool-3,5,main] (C)_map, thread: Thread[RxComputationThreadPool-1,5,main] (C)_subscribe, thread: Thread[RxComputationThreadPool-2,5,main] (C)_subscribe, thread: Thread[RxComputationThreadPool-3,5,main] (C)_subscribe, thread: Thread[RxComputationThreadPool-3,5,main]