Bacon.js で skip・take 処理
リアクティブプログラミング用ライブラリの Bacon.js を Node.js 上で使用し、「RxJS で行単位のファイル処理」 で試したような skip・take 処理のサンプルを実装してみました。
ソースは http://github.com/fits/try_samples/tree/master/blog/20150104/
はじめに
npm でインストールするために下記のような package.json を用意します。 今回は CoffeeScript で実装するので coffee-script も追加しています。
package.json
{ ・・・ "dependencies": { "baconjs": "*", "coffee-script": "*" } }
npm install でインストールします。
インストール
> npm install
Bus の利用
まずは Bus
を使って、ストリームへ push
した内容をそのままログ出力する処理を実装してみます。
Bus は後から要素を push できるストリームですが、Bacon.js には他にも色々とストリームを作る方法が用意されています。 (下記は一部)
- 配列からストリームを作る
Bacon.fromArray
- コールバック関数を使ってストリームを作る
Bacon.fromCallback
,Bacon.fromNodeCallback
(Node.js 用のコールバック関数へ適用) - 任意のストリームを作る
Bacon.fromBinder
sample1.coffee
Bacon = require('baconjs').Bacon bus = new Bacon.Bus() bus.log() [1..6].forEach (i) -> bus.push "sample#{i}" bus.end()
実行結果は下記の通りです。 Bus へ push した sample1 ~ sample6 を順次出力しています。
実行結果
> coffee sample1.coffee sample1 sample2 sample3 sample4 sample5 sample6 <end>
skip・take 処理
次に、下記のような加工処理を追加してみました。
- (1) 2つの要素を無視 (skip)
- (2) 3つの要素だけを取得 (take)
- (3) 先頭に # を付ける (map)
sample2.coffee
Bacon = require('baconjs').Bacon bus = new Bacon.Bus() bus.skip(2).take(3).map((s) -> "##{s}").log() [1..6].forEach (i) -> bus.push "sample#{i}" bus.end()
実行結果は下記の通りです。
実行結果
> coffee sample2.coffee #sample3 #sample4 #sample5 <end>
Reactor で skip・take 処理
「Bacon.js で skip・take 処理」と同様の処理を Reactor を使用し Java 8 で実装してみました。
ソースは http://github.com/fits/try_samples/tree/master/blog/20150104-2/
はじめに
今回は Gradle を使ってビルド・実行するため、下記のような build.gradle を用意しました。
build.gradle
apply plugin: 'application' repositories { maven { url 'http://repo.spring.io/libs-snapshot' } jcenter() } dependencies { compile 'io.projectreactor:reactor-core:2.0.0.M2' runtime 'org.slf4j:slf4j-nop:1.7.9' } mainClassName = 'sample.Sample1' //mainClassName = 'sample.Sample2'
単純な処理
ストリームへ送信された sample1 ~ sample6 をそのまま出力する処理を実装しました。
Broadcaster
を使えばストリームへ要素を送信する事ができます。
Sample1.java
package sample; import reactor.Environment; import reactor.rx.Promise; import reactor.rx.Streams; import reactor.rx.stream.Broadcaster; public class Sample1 { public static void main(String... args) throws Exception { Environment env = new Environment(); Broadcaster<String> stream = Streams.broadcast(env); Promise<String> promise = stream.observe(System.out::println).next(); Streams.range(1, 6).consume( i -> stream.onNext("sample" + i) ); promise.await(); env.shutdown(); } }
実行結果は下記の通りです。
実行結果
> gradle -q run sample1 sample2 sample3 sample4 sample5 sample6
skip・take 処理
次に、skip と take 処理ですが、Reactor には skip に該当する処理が見当たらなかったので、filter
と AtomicInteger
を使って自前で実装してみました。
なお、下記 (1) の箇所のように next
メソッドを使って Promise
を取得した場合は take した最初の要素しか出力されない点に注意が必要です。
take した全ての要素を出力させるには、下記 (2) の箇所のように最後の要素を取得する tap
メソッド (戻り値は TapAndControls<O>
) を使ったり、全要素をリスト化して取得する toList
メソッド (戻り値は Promise<java.util.List<O>>
) 等を使ったりする必要がありました。
Sample2.java
package sample; import reactor.Environment; import reactor.rx.Promise; import reactor.rx.Stream; import reactor.rx.Streams; import reactor.rx.stream.Broadcaster; import reactor.rx.action.support.TapAndControls; import java.util.concurrent.atomic.AtomicInteger; public class Sample2 { public static void main(String... args) throws Exception { Environment env = new Environment(); skipAndTakeSampleA(env); System.out.println("-----"); skipAndTakeSampleB(env); env.shutdown(); } private static void skipAndTakeSampleA(Environment env) throws Exception { Broadcaster<String> stream = Streams.broadcast(env); // (1) next を使うと take した最初の要素のみ出力 Promise<String> promise = skip(stream, 2) .take(3) .map(s -> "#" + s) .observe(System.out::println) .next(); Streams.range(1, 6).consume( i -> stream.onNext("sampleA-" + i) ); promise.await(); } private static void skipAndTakeSampleB(Environment env) throws Exception { Broadcaster<String> stream = Streams.broadcast(env); // (2) take した全要素を出力するには tap や toList 等を使う必要あり TapAndControls<String> tap = skip(stream, 2) .take(3) .map(s -> "#" + s) .observe(System.out::println) .tap(); Streams.range(1, 6).consume( i -> stream.onNext("sampleB-" + i) ); tap.get(); } // skip 処理 private static <T> Stream<T> skip(Stream<T> st, int num) { AtomicInteger counter = new AtomicInteger(); return st.filter(s -> counter.incrementAndGet() > num); } }
実行結果は下記の通りです。 (build.gradle の mainClassName を変更して実行)
実行結果
> gradle -q run #sampleA-3 ----- #sampleB-3 #sampleB-4 #sampleB-5
build.gradle
・・・ //mainClassName = 'sample.Sample1' mainClassName = 'sample.Sample2'