Reactor で skip・take 処理
「Bacon.js で skip・take 処理」と同様の処理を Reactor を使用し Java 8 で実装してみました。
ソースは http://github.com/fits/try_samples/tree/master/blog/20150104-2/
はじめに
今回は Gradle を使ってビルド・実行するため、下記のような build.gradle を用意しました。
build.gradle
apply plugin: 'application' repositories { maven { url 'http://repo.spring.io/libs-snapshot' } jcenter() } dependencies { compile 'io.projectreactor:reactor-core:2.0.0.M2' runtime 'org.slf4j:slf4j-nop:1.7.9' } mainClassName = 'sample.Sample1' //mainClassName = 'sample.Sample2'
単純な処理
ストリームへ送信された sample1 ~ sample6 をそのまま出力する処理を実装しました。
Broadcaster
を使えばストリームへ要素を送信する事ができます。
Sample1.java
package sample; import reactor.Environment; import reactor.rx.Promise; import reactor.rx.Streams; import reactor.rx.stream.Broadcaster; public class Sample1 { public static void main(String... args) throws Exception { Environment env = new Environment(); Broadcaster<String> stream = Streams.broadcast(env); Promise<String> promise = stream.observe(System.out::println).next(); Streams.range(1, 6).consume( i -> stream.onNext("sample" + i) ); promise.await(); env.shutdown(); } }
実行結果は下記の通りです。
実行結果
> gradle -q run sample1 sample2 sample3 sample4 sample5 sample6
skip・take 処理
次に、skip と take 処理ですが、Reactor には skip に該当する処理が見当たらなかったので、filter
と AtomicInteger
を使って自前で実装してみました。
なお、下記 (1) の箇所のように next
メソッドを使って Promise
を取得した場合は take した最初の要素しか出力されない点に注意が必要です。
take した全ての要素を出力させるには、下記 (2) の箇所のように最後の要素を取得する tap
メソッド (戻り値は TapAndControls<O>
) を使ったり、全要素をリスト化して取得する toList
メソッド (戻り値は Promise<java.util.List<O>>
) 等を使ったりする必要がありました。
Sample2.java
package sample; import reactor.Environment; import reactor.rx.Promise; import reactor.rx.Stream; import reactor.rx.Streams; import reactor.rx.stream.Broadcaster; import reactor.rx.action.support.TapAndControls; import java.util.concurrent.atomic.AtomicInteger; public class Sample2 { public static void main(String... args) throws Exception { Environment env = new Environment(); skipAndTakeSampleA(env); System.out.println("-----"); skipAndTakeSampleB(env); env.shutdown(); } private static void skipAndTakeSampleA(Environment env) throws Exception { Broadcaster<String> stream = Streams.broadcast(env); // (1) next を使うと take した最初の要素のみ出力 Promise<String> promise = skip(stream, 2) .take(3) .map(s -> "#" + s) .observe(System.out::println) .next(); Streams.range(1, 6).consume( i -> stream.onNext("sampleA-" + i) ); promise.await(); } private static void skipAndTakeSampleB(Environment env) throws Exception { Broadcaster<String> stream = Streams.broadcast(env); // (2) take した全要素を出力するには tap や toList 等を使う必要あり TapAndControls<String> tap = skip(stream, 2) .take(3) .map(s -> "#" + s) .observe(System.out::println) .tap(); Streams.range(1, 6).consume( i -> stream.onNext("sampleB-" + i) ); tap.get(); } // skip 処理 private static <T> Stream<T> skip(Stream<T> st, int num) { AtomicInteger counter = new AtomicInteger(); return st.filter(s -> counter.incrementAndGet() > num); } }
実行結果は下記の通りです。 (build.gradle の mainClassName を変更して実行)
実行結果
> gradle -q run #sampleA-3 ----- #sampleB-3 #sampleB-4 #sampleB-5
build.gradle
・・・ //mainClassName = 'sample.Sample1' mainClassName = 'sample.Sample2'