Sodium で関数型リアクティブプログラミング2 - skip・take 処理
前回に続き、Sodium を試してみます。
今回は 「RxJava で行単位のファイル処理 - Groovy, Java Lambda」 で実装したものと同等の処理を Sodium を使って実装してみました。
ソースは http://github.com/fits/try_samples/tree/master/blog/20141209/
skip・take 処理
Sodium には、RxJava の際に使った skip
(指定した処理数だけ無視する) や take
(指定した処理数だけ取り出す) のようなメソッドが用意されていないようなので、Event と Behavior を組み合わせて実装してみる事にします。
Event クラスには gate
メソッドがあり、このメソッドを使えば Behavior オブジェクトの値が true の場合だけイベントを発生するような Event オブジェクトを作成できます。
つまり、指定した数のイベントを受け取れば true と false が反転する Behavior オブジェクトを用意して gate
メソッドへ与えてやれば skip や take の処理を実現した Event オブジェクトを作成できそうです。
なお、skip と take の違いは、false から true への変化か true から false への変化かの違いしかありませんので、下記のサンプルでは batch
メソッドとして共通化し、判定処理を引数 Lambda1<Integer, Boolean> cond
として渡すようにしました。
batch
メソッドは以下のような処理内容となっています。
- (1) イベントの発生数をカウントアップする Behavior を用意 (実際は BehaviorSink を使用)
- (2) イベント発生時に (1) をカウントアップ
- (3) (1) のカウント値が条件に合致した場合のみイベントを発生させる Event を作成
ReadLineFile.java
import sodium.*; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.function.*; class ReadLineFile { public static void main(String... args) throws Exception { // skip 処理 Function<Integer, Function<Event<String>, Event<String>>> skip = n -> { return ev -> batch( v -> v >= n, n, ev); }; // take 処理 Function<Integer, Function<Event<String>, Event<String>>> take = n -> { return ev -> batch( v -> v < n, n, ev); }; // 1行スキップして 3行取得する Event を作成する処理を合成 (b) (c) Function<Event<String>, Event<String>> skipAndTake3 = skip.apply(1).andThen( take.apply(3) ); // (a) EventSink<String> es = new EventSink<>(); // (d) Listener esl = skipAndTake3.apply(es).map( v -> "# " + v ).listen( System.out::println ); // ファイルを行単位で処理 readFileLines(args[0], es); esl.unlisten(); } private static void readFileLines(String fileName, EventSink<String> es) throws IOException { // ファイルを行単位で EventSink へ send try (BufferedReader br = new BufferedReader(new FileReader(fileName))) { br.lines().forEach( es::send ); } } // skip・take の共通処理 private static Event<String> batch(Lambda1<Integer, Boolean> cond, int n, Event<String> ev) { // (1) イベント発生数をカウントする Behavior を用意 BehaviorSink<Integer> counter = new BehaviorSink<>(0); // (2) イベント発生時に counter をカウントアップ ev.listen( v -> counter.send(counter.sample() + 1) ); // (3) counter の値が条件に合致した場合のみイベント発生する Event を作成 return ev.gate(counter.map(cond)); } }
上記は、以下の 4つの Event が (a) -> (b) -> (c) -> (d)
の順でイベントを伝播するような構成となっており、(d) に到達した場合のみ System.out::println
を実行するようになっています。
- (a) EventSink
- (b) skip で作成した Event
- (c) take で作成した Event
- (d) 先頭に "# " を付与する Event
また、batch
メソッドの実装内容に関しては counter の無駄なカウントアップを防止するため、下記のように unlisten
するようにした方が望ましいかもしれません。
unlisten する処理を追加した batch メソッドの例
private static Event<String> batch(Lambda1<Integer, Boolean> cond, int n, Event<String> ev) { BehaviorSink<Integer> counter = new BehaviorSink<>(0); final ArrayList<Listener> list = new ArrayList<>(1); list.add(ev.listen( v -> { int newValue = counter.sample() + 1; counter.send(newValue); if (newValue >= n) { list.stream().forEach( li -> li.unlisten() ); } })); return ev.gate(counter.map(cond)); }
実行
それでは、下記ファイルを使って実行してみます。
test1.txt
1a 2b 3c 4d 5e
実行結果は以下の通りです。 1行スキップした後、3行を先頭に "# " を付与して出力しています。
実行結果
> java -cp .;sodium.jar ReadLineFile test1.txt # 2b # 3c # 4d
test1.txt を処理した際の (a) ~ (d) の Event に対するイベント伝播状況をまとめると以下のようになっていると考えられます。(○ は伝播する、× は伝播しない)
対象行 | 値 | (a) | (b) | (c) | (d) |
---|---|---|---|---|---|
1行目 | 1a | ○ | ○ | × | × |
2行目 | 2b | ○ | ○ | ○ | ○ |
3行目 | 3c | ○ | ○ | ○ | ○ |
4行目 | 4d | ○ | ○ | ○ | ○ |
5行目 | 5e | ○ | ○ | ○ | × |
(d) まで到達した 2 ~ 4行目だけを出力する結果となります。