Reactor で並列処理

Reactor の並列処理を試してみました。

ソースは http://github.com/fits/try_samples/tree/master/blog/20161208/

はじめに

Reactor で以下のようなコードを実行すると、map の処理と subscribe の処理を順番に 3回繰り返します。(repeat メソッドの戻り値は Flux

(A) サンプルコード
Mono.just("(A)")
    .repeat(3)
    .map(n -> {
        printThread(n + "_map");
        return n;
    })
    .subscribe(n -> printThread(n + "_subscribe"));
(A) サンプルコードの実行結果
(A)_map, thread: Thread[main,5,main]
(A)_subscribe, thread: Thread[main,5,main]
(A)_map, thread: Thread[main,5,main]
(A)_subscribe, thread: Thread[main,5,main]
(A)_map, thread: Thread[main,5,main]
(A)_subscribe, thread: Thread[main,5,main]

ここで、今回は map からの処理を別スレッドで並列実行してみます。

publishOn の場合

publishOn を使うと main スレッドとは別のスレッドで実行しますが、順番に実行する点は変わりません。

(B) publishOn を使った場合
Mono.just("(B)")
    .repeat(3)
    .publishOn(Schedulers.parallel())
    // 実質的に以下と同じ
    //.publishOn(Schedulers.single())
    .map(n -> {
        printThread(n + "_map");
        return n;
    })
    .subscribe(n -> printThread(n + "_subscribe"));
(B) publishOn を使った場合の実行結果
(B)_map, thread: Thread[parallel-1,5,main]
(B)_subscribe, thread: Thread[parallel-1,5,main]
(B)_map, thread: Thread[parallel-1,5,main]
(B)_subscribe, thread: Thread[parallel-1,5,main]
(B)_map, thread: Thread[parallel-1,5,main]
(B)_subscribe, thread: Thread[parallel-1,5,main]

publishOn へ Schedulers.parallel() を渡しても複数のスレッドを使うわけでは無いようなので、実質的に Schedulers.single() と変わりは無さそうです。

ParallelFlux を使った並列処理

並列で実行するためには parallel メソッドを使って Flux から ParallelFlux を取得し、runOn メソッドで Schedulers.parallel() ※ を指定します。

 ※ この場合、Schedulers.parallel() の代わりに
    Schedulers.single() を使うと並列にならない点に注意
    (単一スレッドで実行する事になる)
(C) parallel + runOn
Mono.just("(C)")
    .repeat(3)
    .parallel()
    .runOn(Schedulers.parallel())
    .map(n -> {
        printThread(n + "_map");
        return n;
    })
    .subscribe(n -> printThread(n + "_subscribe"));
(C) parallel + runOn の実行結果
(C)_map, thread: Thread[parallel-1,5,main]
(C)_map, thread: Thread[parallel-3,5,main]
(C)_map, thread: Thread[parallel-2,5,main]
(C)_subscribe, thread: Thread[parallel-3,5,main]
(C)_subscribe, thread: Thread[parallel-1,5,main]
(C)_subscribe, thread: Thread[parallel-2,5,main]

3つのスレッドを使って並列に実行できているようです。

なお、runOnコメントアウトして実行すると (A) と同様の結果(main スレッドで順番に実行)になります。

備考

最後に、今回使用したソースを記載しておきます。

build.gradle
apply plugin: 'application'

mainClassName = 'App'

repositories {
    jcenter()
}

dependencies {
    compile 'io.projectreactor:reactor-core:3.0.3.RELEASE'
}

run {
    standardInput = System.in

    if (project.hasProperty('args')) {
        args project.args
    }
}
src/main/java/App.java
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.lang.invoke.MethodHandle;

import static java.lang.invoke.MethodHandles.publicLookup;
import static java.lang.invoke.MethodType.methodType;

public class App {
    public static void main(String... args) throws Throwable {
        MethodHandle mh = publicLookup().findStatic(
            App.class, 
            "sample" + args[0], 
            methodType(void.class)
        );

        mh.invoke();

        System.in.read();
    }

    // (A)
    public static void sampleA() {
        Mono.just("(A)")
            .repeat(3)
            .map(n -> {
                printThread(n + "_map");
                return n;
            })
            .subscribe(n -> printThread(n + "_subscribe"));
    }

    // (B) publishOn を使った場合
    public static void sampleB() {
        Mono.just("(B)")
            .repeat(3)
            .publishOn(Schedulers.parallel())
            //.publishOn(Schedulers.single())
            .map(n -> {
                printThread(n + "_map");
                return n;
            })
            .subscribe(n -> printThread(n + "_subscribe"));
    }

    // (C) parallel + runOn
    public static void sampleC() {
        Mono.just("(C)")
            .repeat(3)
            .parallel()
            .runOn(Schedulers.parallel())
            .map(n -> {
                printThread(n + "_map");
                return n;
            })
            .subscribe(n -> printThread(n + "_subscribe"));
    }

    private static void printThread(String msg) {
        System.out.println(msg + ", thread: " + Thread.currentThread());
    }
}
実行例
> gradle -q run -Pargs=B

(B)_map, thread: Thread[parallel-1,5,main]
(B)_subscribe, thread: Thread[parallel-1,5,main]
(B)_map, thread: Thread[parallel-1,5,main]
(B)_subscribe, thread: Thread[parallel-1,5,main]
(B)_map, thread: Thread[parallel-1,5,main]
(B)_subscribe, thread: Thread[parallel-1,5,main]