読者です 読者をやめる 読者になる 読者になる

RxJava2 で並列処理

Java Reactive

前回 と同じような並列処理を RxJava 2.0 で試してみました。

ソースは http://github.com/fits/try_samples/tree/master/blog/20161226/

はじめに

まずは、map の処理と subscribe の処理を順番に 3回繰り返す処理です。 前回の Reactor との違いは MonoSingle に変わっただけです。

(A) サンプルコード
Single.just("(A)")
    .repeat(3)
    .map(n -> {
        printThread(n + "_map");
        return n;
    })
    .subscribe(n -> printThread(n + "_subscribe"));
(A) サンプルコードの実行結果
(A)_map, thread: Thread[main,5,main]
(A)_subscribe, thread: Thread[main,5,main]
(A)_map, thread: Thread[main,5,main]
(A)_subscribe, thread: Thread[main,5,main]
(A)_map, thread: Thread[main,5,main]
(A)_subscribe, thread: Thread[main,5,main]

observeOn の場合

RxJava 2.0 では Reactor の publishOn の代わりに observeOn が使えます。 main スレッドとは別のスレッドで実行しますが、順番に実行する点は変わりません。

(B) observeOn を使った場合
Single.just("(B)")
    .repeat(3)
    .observeOn(Schedulers.computation())
    // 実質的には以下でも同じ
    //.observeOn(Schedulers.single())
    .map(n -> {
        printThread(n + "_map");
        return n;
    })
    .subscribe(n -> printThread(n + "_subscribe"));
(B) observeOn を使った場合の実行結果
(B)_map, thread: Thread[RxComputationThreadPool-1,5,main]
(B)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main]
(B)_map, thread: Thread[RxComputationThreadPool-1,5,main]
(B)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main]
(B)_map, thread: Thread[RxComputationThreadPool-1,5,main]
(B)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main]

flatMap と subscribeOn を使った並列処理

RxJava 2.0 では Reactor の ParallelFlux 相当の処理は用意されていないようです。

そこで、flatMapsubscribeOn を使ってみました。

(C) flatMap + subscribeOn
Single.just("(C)")
    .repeat(3)
    .flatMap(s ->
        Flowable.just(s)
            .map(n -> {
                printThread(n + "_map");
                return n;
            })
            .subscribeOn(Schedulers.computation())
            //.subscribeOn(Schedulers.newThread())
    )
    .subscribe(n -> printThread(n + "_subscribe"));

ただし、以下の実行結果のように別スレッドで実行してはいるものの、map と subscribe を同じスレッドで実行するわけではありませんでした。

なお、Schedulers.computation() の場合、スレッド数は実行環境のコア数に依存するようです。(newThread は新しいスレッドを使う)

(C) flatMap + subscribeOn の実行結果1
(C)_map, thread: Thread[RxComputationThreadPool-2,5,main]
(C)_map, thread: Thread[RxComputationThreadPool-3,5,main]
(C)_map, thread: Thread[RxComputationThreadPool-1,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-2,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main]
(C) flatMap + subscribeOn の実行結果2
(C)_map, thread: Thread[RxComputationThreadPool-1,5,main]
(C)_map, thread: Thread[RxComputationThreadPool-3,5,main]
(C)_map, thread: Thread[RxComputationThreadPool-2,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-1,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-2,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-2,5,main]

備考

最後に、今回使用したソースを記載しておきます。

build.gradle
apply plugin: 'application'

mainClassName = 'App'

repositories {
    jcenter()
}

dependencies {
    compile 'io.reactivex.rxjava2:rxjava:2.0.3'
}

run {
    standardInput = System.in

    if (project.hasProperty('args')) {
        args project.args
    }
}
src/main/java/App.java
import io.reactivex.Single;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;

import java.lang.invoke.MethodHandle;

import static java.lang.invoke.MethodHandles.publicLookup;
import static java.lang.invoke.MethodType.methodType;

public class App {
    public static void main(String... args) throws Throwable {
        MethodHandle mh = publicLookup().findStatic(
            App.class, 
            "sample" + args[0], 
            methodType(void.class)
        );

        mh.invoke();

        System.in.read();
    }

    // (A)
    public static void sampleA() {
        Single.just("(A)")
            .repeat(3)
            .map(n -> {
                printThread(n + "_map");
                return n;
            })
            .subscribe(n -> printThread(n + "_subscribe"));
    }

    // (B)
    public static void sampleB() {
        Single.just("(B)")
            .repeat(3)
            .observeOn(Schedulers.computation())
            //.observeOn(Schedulers.single())
            .map(n -> {
                printThread(n + "_map");
                return n;
            })
            .subscribe(n -> printThread(n + "_subscribe"));
    }

    // (C) 
    public static void sampleC() {
        Single.just("(C)")
            .repeat(3)
            .flatMap(s ->
                Flowable.just(s)
                    .map(n -> {
                        printThread(n + "_map");
                        return n;
                    })
                    .subscribeOn(Schedulers.computation())
                    //.subscribeOn(Schedulers.newThread())

            )
            .subscribe(n -> printThread(n + "_subscribe"));
    }

    private static void printThread(String msg) {
        System.out.println(msg + ", thread: " + Thread.currentThread());
    }
}
実行例
> gradle -q run -Pargs=C

(C)_map, thread: Thread[RxComputationThreadPool-2,5,main]
(C)_map, thread: Thread[RxComputationThreadPool-3,5,main]
(C)_map, thread: Thread[RxComputationThreadPool-1,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-2,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-3,5,main]
(C)_subscribe, thread: Thread[RxComputationThreadPool-3,5,main]