Reactor で skip・take 処理

Bacon.js で skip・take 処理」と同様の処理を Reactor を使用し Java 8 で実装してみました。

ソースは http://github.com/fits/try_samples/tree/master/blog/20150104-2/

はじめに

今回は Gradle を使ってビルド・実行するため、下記のような build.gradle を用意しました。

build.gradle
apply plugin: 'application'

repositories {
    maven { url 'http://repo.spring.io/libs-snapshot' }
    jcenter()
}

dependencies {
    compile 'io.projectreactor:reactor-core:2.0.0.M2'
    runtime 'org.slf4j:slf4j-nop:1.7.9'
}

mainClassName = 'sample.Sample1'
//mainClassName = 'sample.Sample2'

単純な処理

ストリームへ送信された sample1 ~ sample6 をそのまま出力する処理を実装しました。 Broadcaster を使えばストリームへ要素を送信する事ができます。

Sample1.java
package sample;

import reactor.Environment;
import reactor.rx.Promise;
import reactor.rx.Streams;
import reactor.rx.stream.Broadcaster;

public class Sample1 {
    public static void main(String... args) throws Exception {
        Environment env = new Environment();

        Broadcaster<String> stream = Streams.broadcast(env);

        Promise<String> promise = stream.observe(System.out::println).next();

        Streams.range(1, 6).consume( i -> stream.onNext("sample" + i) );

        promise.await();

        env.shutdown();
    }
}

実行結果は下記の通りです。

実行結果
> gradle -q run

sample1
sample2
sample3
sample4
sample5
sample6

skip・take 処理

次に、skip と take 処理ですが、Reactor には skip に該当する処理が見当たらなかったので、filterAtomicInteger を使って自前で実装してみました。

なお、下記 (1) の箇所のように next メソッドを使って Promise を取得した場合は take した最初の要素しか出力されない点に注意が必要です。

take した全ての要素を出力させるには、下記 (2) の箇所のように最後の要素を取得する tap メソッド (戻り値は TapAndControls<O>) を使ったり、全要素をリスト化して取得する toList メソッド (戻り値は Promise<java.util.List<O>>) 等を使ったりする必要がありました。

Sample2.java
package sample;

import reactor.Environment;
import reactor.rx.Promise;
import reactor.rx.Stream;
import reactor.rx.Streams;
import reactor.rx.stream.Broadcaster;
import reactor.rx.action.support.TapAndControls;

import java.util.concurrent.atomic.AtomicInteger;

public class Sample2 {
    public static void main(String... args) throws Exception {
        Environment env = new Environment();

        skipAndTakeSampleA(env);

        System.out.println("-----");

        skipAndTakeSampleB(env);

        env.shutdown();
    }

    private static void skipAndTakeSampleA(Environment env) throws Exception {
        Broadcaster<String> stream = Streams.broadcast(env);

        // (1) next を使うと take した最初の要素のみ出力
        Promise<String> promise = skip(stream, 2)
            .take(3)
            .map(s -> "#" + s)
            .observe(System.out::println)
            .next();

        Streams.range(1, 6).consume( i -> stream.onNext("sampleA-" + i) );

        promise.await();
    }

    private static void skipAndTakeSampleB(Environment env) throws Exception {
        Broadcaster<String> stream = Streams.broadcast(env);

        // (2) take した全要素を出力するには tap や toList 等を使う必要あり
        TapAndControls<String> tap = skip(stream, 2)
            .take(3)
            .map(s -> "#" + s)
            .observe(System.out::println)
            .tap();

        Streams.range(1, 6).consume( i -> stream.onNext("sampleB-" + i) );

        tap.get();
    }

    // skip 処理
    private static <T> Stream<T> skip(Stream<T> st, int num) {
        AtomicInteger counter = new AtomicInteger();

        return st.filter(s -> counter.incrementAndGet() > num);
    }
}

実行結果は下記の通りです。 (build.gradle の mainClassName を変更して実行)

実行結果
> gradle -q run

#sampleA-3
-----
#sampleB-3
#sampleB-4
#sampleB-5
build.gradle
・・・
//mainClassName = 'sample.Sample1'
mainClassName = 'sample.Sample2'