Bacon.js で skip・take 処理

リアクティブプログラミング用ライブラリの Bacon.js を Node.js 上で使用し、「RxJS で行単位のファイル処理」 で試したような skip・take 処理のサンプルを実装してみました。

ソースは http://github.com/fits/try_samples/tree/master/blog/20150104/

はじめに

npm でインストールするために下記のような package.json を用意します。 今回は CoffeeScript で実装するので coffee-script も追加しています。

package.json
{
  ・・・
  "dependencies": {
    "baconjs": "*",
    "coffee-script": "*"
  }
}

npm install でインストールします。

インストール
> npm install

Bus の利用

まずは Bus を使って、ストリームへ push した内容をそのままログ出力する処理を実装してみます。

Bus は後から要素を push できるストリームですが、Bacon.js には他にも色々とストリームを作る方法が用意されています。 (下記は一部)

  • 配列からストリームを作る Bacon.fromArray
  • コールバック関数を使ってストリームを作る Bacon.fromCallback, Bacon.fromNodeCallback (Node.js 用のコールバック関数へ適用)
  • 任意のストリームを作る Bacon.fromBinder
sample1.coffee
Bacon = require('baconjs').Bacon

bus = new Bacon.Bus()

bus.log()

[1..6].forEach (i) -> bus.push "sample#{i}"

bus.end()

実行結果は下記の通りです。 Bus へ push した sample1 ~ sample6 を順次出力しています。

実行結果
> coffee sample1.coffee

sample1
sample2
sample3
sample4
sample5
sample6
<end>

skip・take 処理

次に、下記のような加工処理を追加してみました。

  • (1) 2つの要素を無視 (skip)
  • (2) 3つの要素だけを取得 (take)
  • (3) 先頭に # を付ける (map)
sample2.coffee
Bacon = require('baconjs').Bacon

bus = new Bacon.Bus()

bus.skip(2).take(3).map((s) -> "##{s}").log()

[1..6].forEach (i) -> bus.push "sample#{i}"

bus.end()

実行結果は下記の通りです。

実行結果
> coffee sample2.coffee

#sample3
#sample4
#sample5
<end>

Reactor で skip・take 処理

Bacon.js で skip・take 処理」と同様の処理を Reactor を使用し Java 8 で実装してみました。

ソースは http://github.com/fits/try_samples/tree/master/blog/20150104-2/

はじめに

今回は Gradle を使ってビルド・実行するため、下記のような build.gradle を用意しました。

build.gradle
apply plugin: 'application'

repositories {
    maven { url 'http://repo.spring.io/libs-snapshot' }
    jcenter()
}

dependencies {
    compile 'io.projectreactor:reactor-core:2.0.0.M2'
    runtime 'org.slf4j:slf4j-nop:1.7.9'
}

mainClassName = 'sample.Sample1'
//mainClassName = 'sample.Sample2'

単純な処理

ストリームへ送信された sample1 ~ sample6 をそのまま出力する処理を実装しました。 Broadcaster を使えばストリームへ要素を送信する事ができます。

Sample1.java
package sample;

import reactor.Environment;
import reactor.rx.Promise;
import reactor.rx.Streams;
import reactor.rx.stream.Broadcaster;

public class Sample1 {
    public static void main(String... args) throws Exception {
        Environment env = new Environment();

        Broadcaster<String> stream = Streams.broadcast(env);

        Promise<String> promise = stream.observe(System.out::println).next();

        Streams.range(1, 6).consume( i -> stream.onNext("sample" + i) );

        promise.await();

        env.shutdown();
    }
}

実行結果は下記の通りです。

実行結果
> gradle -q run

sample1
sample2
sample3
sample4
sample5
sample6

skip・take 処理

次に、skip と take 処理ですが、Reactor には skip に該当する処理が見当たらなかったので、filterAtomicInteger を使って自前で実装してみました。

なお、下記 (1) の箇所のように next メソッドを使って Promise を取得した場合は take した最初の要素しか出力されない点に注意が必要です。

take した全ての要素を出力させるには、下記 (2) の箇所のように最後の要素を取得する tap メソッド (戻り値は TapAndControls<O>) を使ったり、全要素をリスト化して取得する toList メソッド (戻り値は Promise<java.util.List<O>>) 等を使ったりする必要がありました。

Sample2.java
package sample;

import reactor.Environment;
import reactor.rx.Promise;
import reactor.rx.Stream;
import reactor.rx.Streams;
import reactor.rx.stream.Broadcaster;
import reactor.rx.action.support.TapAndControls;

import java.util.concurrent.atomic.AtomicInteger;

public class Sample2 {
    public static void main(String... args) throws Exception {
        Environment env = new Environment();

        skipAndTakeSampleA(env);

        System.out.println("-----");

        skipAndTakeSampleB(env);

        env.shutdown();
    }

    private static void skipAndTakeSampleA(Environment env) throws Exception {
        Broadcaster<String> stream = Streams.broadcast(env);

        // (1) next を使うと take した最初の要素のみ出力
        Promise<String> promise = skip(stream, 2)
            .take(3)
            .map(s -> "#" + s)
            .observe(System.out::println)
            .next();

        Streams.range(1, 6).consume( i -> stream.onNext("sampleA-" + i) );

        promise.await();
    }

    private static void skipAndTakeSampleB(Environment env) throws Exception {
        Broadcaster<String> stream = Streams.broadcast(env);

        // (2) take した全要素を出力するには tap や toList 等を使う必要あり
        TapAndControls<String> tap = skip(stream, 2)
            .take(3)
            .map(s -> "#" + s)
            .observe(System.out::println)
            .tap();

        Streams.range(1, 6).consume( i -> stream.onNext("sampleB-" + i) );

        tap.get();
    }

    // skip 処理
    private static <T> Stream<T> skip(Stream<T> st, int num) {
        AtomicInteger counter = new AtomicInteger();

        return st.filter(s -> counter.incrementAndGet() > num);
    }
}

実行結果は下記の通りです。 (build.gradle の mainClassName を変更して実行)

実行結果
> gradle -q run

#sampleA-3
-----
#sampleB-3
#sampleB-4
#sampleB-5
build.gradle
・・・
//mainClassName = 'sample.Sample1'
mainClassName = 'sample.Sample2'