Akka Streams で skip・take 処理
前回の 「Reactor で skip・take 処理」 と同様の処理を Akka Streams を使用し Java 8 で実装してみました。
- Akka Streams 1.0 M2
ソースは http://github.com/fits/try_samples/tree/master/blog/20150112/
はじめに
Gradle を使ってビルド・実行するため、下記のような build.gradle を用意しました。
build.gradle
apply plugin: 'application' repositories { jcenter() } dependencies { compile 'com.typesafe.akka:akka-stream-experimental_2.11:1.0-M2' } mainClassName = 'sample.Sample1' //mainClassName = 'sample.Sample2' //mainClassName = 'sample.PublisherSample1' //mainClassName = 'sample.PublisherSample2'
Iterable を使った処理1
Iterable を使った固定的なデータストリームに対して skip・take 処理を実装してみます。
まず、下記 (2) のように ActorSystem
と FlowMaterializer
オブジェクトを作成し、処理完了時に ActorSystem を shutdown する処理 (3) を用意しておきます。
今回は sample1 ~ sample6 のデータを Java 8 の Stream
を使って作りましたが、Stream は Iterable
インターフェースを持っておらず、Akka Streams には今のところ Source.from(Stream)
のようなメソッドも用意されていないので、List へ変換するなどして Iterable 化する必要があります。 (4)
(5) では、(4) を使って Source
を生成し、skip・take 処理を実施しています。
なお、Akka Streams では skip 処理を drop
メソッドとして用意しています。
最後に、(1) は Akka の不要なログ出力 (info レベル) を抑制するための措置です。 (設定ファイルで設定する方法もあります)
akka.loglevel
を off
にしてログ出力を無効化する事もできますが、そうするとエラーログすら出力されなくなるのでご注意下さい。
Sample1.java
package sample; import akka.actor.ActorSystem; import akka.dispatch.OnComplete; import akka.stream.FlowMaterializer; import akka.stream.javadsl.Source; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigValueFactory; import scala.runtime.BoxedUnit; import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; public class Sample1 { public static void main(String... args) { // (1) Akka の不要なログ出力を抑制するための設定 final Config config = ConfigFactory.load() .withValue("akka.loglevel", ConfigValueFactory.fromAnyRef("error")); // (2) ActorSystem・FlowMaterializer の作成 final ActorSystem system = ActorSystem.create("sample", config); final FlowMaterializer materializer = FlowMaterializer.create(system); // (3) 終了時の処理 final OnComplete<BoxedUnit> complete = new OnComplete<BoxedUnit>() { @Override public void onComplete(Throwable failure, BoxedUnit success) { system.shutdown(); } }; // (4) sample1 ~ sample6 List<String> data = IntStream.range(1, 7).mapToObj(i -> "sample" + i).collect(Collectors.toList()); // 以下でも可 //Iterable<String> data = () -> IntStream.range(1, 7).mapToObj(i -> "sample" + i).iterator(); // (5) 処理 Source.from(data) .drop(3) .take(2) .map(s -> "#" + s) .foreach(System.out::println, materializer) .onComplete(complete, system.dispatcher()); } }
また、(4) (5) の箇所は下記のように実装する事もできます。
Stream<String> stream = IntStream.range(1, 7).mapToObj(i -> "sample" + i); Source.from((Iterable<String>)stream::iterator) .drop(3) ・・・
実行結果
> gradle -q run #sample4 #sample5
Iterable を使った処理2
Sample1.java を Flow
を使って書き換えると下記のようになります。
Flow によってストリームの加工処理部分を分離できます。
Sample2.java
・・・ import akka.stream.javadsl.Flow; import akka.stream.javadsl.Sink; ・・・ public class Sample2 { public static void main(String... args) { ・・・ Flow<String, String> flow = Flow.<String>create() .drop(3) .take(2) .map(s -> "#" + s); List<String> data = IntStream.range(1, 7).mapToObj(i -> "sample" + i).collect(Collectors.toList()); flow.runWith( Source.from(data), Sink.foreach(System.out::println), materializer ).onComplete(complete, system.dispatcher()); } }
実行結果
> gradle -q run #sample4 #sample5
build.gradle
・・・ //mainClassName = 'sample.Sample1' mainClassName = 'sample.Sample2'
Publisher を使った処理1
これまで固定的なデータを使って処理を行いましたが、前回の Broadcaster
のような処理 (ストリームへ任意の要素を送信) を Akka Streams で実現するには org.reactivestreams.Publisher
を利用できます。
RxJava 等と組み合わせるのが簡単だと思いますが、今回は Publisher
の実装クラスを自作してみました。(SamplePublisher クラス)
まずは sample1 ~ sample6 をそのまま出力する処理を実装してみましたが、2点ほど注意点がありました。
- (1) akka.stream.materializer.initial-input-buffer-size のデフォルト値である 4 を超えたデータ数がバッファに格納されるとエラー (Input buffer overrun) が発生する
- (2) Subscriber に対して onSubscribe しないと onNext が機能しない
(1) を回避するために buffer
メソッドを使って十分なバッファ数を確保するように変更しました。
OverflowStrategy
に関しては用途に応じて設定する事になると思いますが、dropXXX や error を使用すると、バッファが溢れた際にデータが欠ける事になるので注意が必要です。
ちなみに、(1) のデフォルト設定は akka-stream-experimental_2.11-1.0-M2.jar 内の reference.conf ファイルで設定されています。
また、(2) の対策のため、Subscriber
(実体は ActorProcessor
) へ何も処理を行わない Subscription
を onSubscribe しています。
PublisherSample1.java
・・・ import akka.stream.OverflowStrategy; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; ・・・ public class PublisherSample1 { public static void main(String... args) { ・・・ try (SamplePublisher<String> publisher = new SamplePublisher<>()) { Source.from(publisher) // (1) Input buffer overrun の防止策 .buffer(10, OverflowStrategy.backpressure()) .map(s -> "#" + s) .foreach(System.out::println, materializer) .onComplete(complete, system.dispatcher()); IntStream.range(1, 7) .mapToObj(i -> "sample" + i) .forEach(d -> publisher.send(d)); } } private static class SamplePublisher<T> implements Publisher<T>, AutoCloseable { private List<Subscriber<? super T>> list = new ArrayList<>(); @Override public void subscribe(Subscriber<? super T> s) { list.add(s); // (2) 下記を実施しないと onNext が機能しない s.onSubscribe(new Subscription() { @Override public void request(long n) {} @Override public void cancel() {} }); } @Override public void close() { list.forEach(Subscriber::onComplete); } public void send(T msg) { // Subscriber へデータ送信 list.forEach(s -> s.onNext(msg)); } } }
SamplePublisher クラス内で Subscriber を List にて管理していますが、上記処理では subscribe は 1度しか実行されないので List で管理する必要は特にありません。
実行結果
> gradle -q run #sample1 #sample2 #sample3 #sample4 #sample5 #sample6
build.gradle
・・・ //mainClassName = 'sample.Sample2' mainClassName = 'sample.PublisherSample1'
Publisher を使った処理2
最後に skip(drop)・take の処理を追加してみます。
PublisherSample2.java
・・・ public class PublisherSample2 { public static void main(String... args) { ・・・ try (SamplePublisher<String> publisher = new SamplePublisher<>()) { Source.from(publisher) // Input buffer overrun の防止策 .buffer(10, OverflowStrategy.backpressure()) .drop(3) .take(2) .map(s -> "#" + s) .foreach(System.out::println, materializer) .onComplete(complete, system.dispatcher()); IntStream.range(1, 7) .mapToObj(i -> "sample" + i) .forEach(d -> publisher.send(d)); } } private static class SamplePublisher<T> implements Publisher<T>, AutoCloseable { ・・・ } }
実行結果
> gradle -q run #sample4 #sample5
build.gradle
・・・ //mainClassName = 'sample.PublisherSample1' mainClassName = 'sample.PublisherSample2'
Reactor で skip・take 処理
「Bacon.js で skip・take 処理」と同様の処理を Reactor を使用し Java 8 で実装してみました。
ソースは http://github.com/fits/try_samples/tree/master/blog/20150104-2/
はじめに
今回は Gradle を使ってビルド・実行するため、下記のような build.gradle を用意しました。
build.gradle
apply plugin: 'application' repositories { maven { url 'http://repo.spring.io/libs-snapshot' } jcenter() } dependencies { compile 'io.projectreactor:reactor-core:2.0.0.M2' runtime 'org.slf4j:slf4j-nop:1.7.9' } mainClassName = 'sample.Sample1' //mainClassName = 'sample.Sample2'
単純な処理
ストリームへ送信された sample1 ~ sample6 をそのまま出力する処理を実装しました。
Broadcaster
を使えばストリームへ要素を送信する事ができます。
Sample1.java
package sample; import reactor.Environment; import reactor.rx.Promise; import reactor.rx.Streams; import reactor.rx.stream.Broadcaster; public class Sample1 { public static void main(String... args) throws Exception { Environment env = new Environment(); Broadcaster<String> stream = Streams.broadcast(env); Promise<String> promise = stream.observe(System.out::println).next(); Streams.range(1, 6).consume( i -> stream.onNext("sample" + i) ); promise.await(); env.shutdown(); } }
実行結果は下記の通りです。
実行結果
> gradle -q run sample1 sample2 sample3 sample4 sample5 sample6
skip・take 処理
次に、skip と take 処理ですが、Reactor には skip に該当する処理が見当たらなかったので、filter
と AtomicInteger
を使って自前で実装してみました。
なお、下記 (1) の箇所のように next
メソッドを使って Promise
を取得した場合は take した最初の要素しか出力されない点に注意が必要です。
take した全ての要素を出力させるには、下記 (2) の箇所のように最後の要素を取得する tap
メソッド (戻り値は TapAndControls<O>
) を使ったり、全要素をリスト化して取得する toList
メソッド (戻り値は Promise<java.util.List<O>>
) 等を使ったりする必要がありました。
Sample2.java
package sample; import reactor.Environment; import reactor.rx.Promise; import reactor.rx.Stream; import reactor.rx.Streams; import reactor.rx.stream.Broadcaster; import reactor.rx.action.support.TapAndControls; import java.util.concurrent.atomic.AtomicInteger; public class Sample2 { public static void main(String... args) throws Exception { Environment env = new Environment(); skipAndTakeSampleA(env); System.out.println("-----"); skipAndTakeSampleB(env); env.shutdown(); } private static void skipAndTakeSampleA(Environment env) throws Exception { Broadcaster<String> stream = Streams.broadcast(env); // (1) next を使うと take した最初の要素のみ出力 Promise<String> promise = skip(stream, 2) .take(3) .map(s -> "#" + s) .observe(System.out::println) .next(); Streams.range(1, 6).consume( i -> stream.onNext("sampleA-" + i) ); promise.await(); } private static void skipAndTakeSampleB(Environment env) throws Exception { Broadcaster<String> stream = Streams.broadcast(env); // (2) take した全要素を出力するには tap や toList 等を使う必要あり TapAndControls<String> tap = skip(stream, 2) .take(3) .map(s -> "#" + s) .observe(System.out::println) .tap(); Streams.range(1, 6).consume( i -> stream.onNext("sampleB-" + i) ); tap.get(); } // skip 処理 private static <T> Stream<T> skip(Stream<T> st, int num) { AtomicInteger counter = new AtomicInteger(); return st.filter(s -> counter.incrementAndGet() > num); } }
実行結果は下記の通りです。 (build.gradle の mainClassName を変更して実行)
実行結果
> gradle -q run #sampleA-3 ----- #sampleB-3 #sampleB-4 #sampleB-5
build.gradle
・・・ //mainClassName = 'sample.Sample1' mainClassName = 'sample.Sample2'
Bacon.js で skip・take 処理
リアクティブプログラミング用ライブラリの Bacon.js を Node.js 上で使用し、「RxJS で行単位のファイル処理」 で試したような skip・take 処理のサンプルを実装してみました。
ソースは http://github.com/fits/try_samples/tree/master/blog/20150104/
はじめに
npm でインストールするために下記のような package.json を用意します。 今回は CoffeeScript で実装するので coffee-script も追加しています。
package.json
{ ・・・ "dependencies": { "baconjs": "*", "coffee-script": "*" } }
npm install でインストールします。
インストール
> npm install
Bus の利用
まずは Bus
を使って、ストリームへ push
した内容をそのままログ出力する処理を実装してみます。
Bus は後から要素を push できるストリームですが、Bacon.js には他にも色々とストリームを作る方法が用意されています。 (下記は一部)
- 配列からストリームを作る
Bacon.fromArray
- コールバック関数を使ってストリームを作る
Bacon.fromCallback
,Bacon.fromNodeCallback
(Node.js 用のコールバック関数へ適用) - 任意のストリームを作る
Bacon.fromBinder
sample1.coffee
Bacon = require('baconjs').Bacon bus = new Bacon.Bus() bus.log() [1..6].forEach (i) -> bus.push "sample#{i}" bus.end()
実行結果は下記の通りです。 Bus へ push した sample1 ~ sample6 を順次出力しています。
実行結果
> coffee sample1.coffee sample1 sample2 sample3 sample4 sample5 sample6 <end>
skip・take 処理
次に、下記のような加工処理を追加してみました。
- (1) 2つの要素を無視 (skip)
- (2) 3つの要素だけを取得 (take)
- (3) 先頭に # を付ける (map)
sample2.coffee
Bacon = require('baconjs').Bacon bus = new Bacon.Bus() bus.skip(2).take(3).map((s) -> "##{s}").log() [1..6].forEach (i) -> bus.push "sample#{i}" bus.end()
実行結果は下記の通りです。
実行結果
> coffee sample2.coffee #sample3 #sample4 #sample5 <end>
Ratpack で Java Web アプリケーション作成
Ratpack は以前 「Ratpack + JHaml + Morphia で MongoDB を使った Web アプリ開発」 で試しましたが、3年以上経っているので改めて試してみました。
今回は単純な Java Web アプリケーションを Ratpack で作成する事にします。
ソースは http://github.com/fits/try_samples/tree/master/blog/20141229/
Web アプリケーションの作成
今回の作成手順は以下のようになります。
- (1)
HandlerFactory
インターフェースの実装クラスを作成 - (2) ratpack.properties で handlerFactory を設定
ファイル構成
今回は Gradle でビルドしましたので、ファイルは以下のような構成になっています。
ビルド定義
アプリケーションの実行やアーカイブ化を簡単に行うため、Gradle 用の ratpack-java プラグインを使います。
ratpack-java を使う場合、ratpack の依存設定 (dependencies) は要らないようですが、slf4j だけは実行に要るようです。
build.gradle
buildscript { repositories { jcenter() } dependencies { classpath 'io.ratpack:ratpack-gradle:0.9.11' } } apply plugin: 'io.ratpack.ratpack-java' repositories { jcenter() } dependencies { runtime 'org.slf4j:slf4j-simple:1.7.5' }
(1) HandlerFactory 実装クラスの作成
とりあえず /sample/xxx
へアクセスすると単に "sample - xxx" を出力するだけの処理を実装しました。 (xxx は任意の文字列)
出力には Context
オブジェクトの render()
メソッドを使用し、URL のパラメータ部分 (下記の :id
) は PathTokens
の get()
メソッドで取得します。
src/main/java/AppHandlerFactory.java
import static ratpack.handling.Handlers.*; import ratpack.handling.Context; import ratpack.handling.Handler; import ratpack.launch.HandlerFactory; import ratpack.launch.LaunchConfig; public class AppHandlerFactory implements HandlerFactory { @Override public Handler create(LaunchConfig config) throws Exception { return chain( path("sample/:id", ctx -> ctx.render("sample - " + ctx.getPathTokens().get("id"))) ); } }
(2) handlerFactory の設定
作成した AppHandlerFactory を handlerFactory へ設定します。
こうする事で、実行時に handlerFactory として AppHandlerFactory が使用されます。
src/ratpack/ratpack.properties
handlerFactory=AppHandlerFactory
テスト実行
gradle run
でテスト実行できます。
> gradle run :compileJava UP-TO-DATE :processResources UP-TO-DATE :classes UP-TO-DATE :configureRun :prepareBaseDir UP-TO-DATE :run [main] INFO ratpack.server.internal.NettyRatpackServer - Ratpack started for http://localhost:5050 ・・・
http://localhost:5050/sample/abc
へアクセスすると、sample - abc
が表示されます。
成果物の生成
gradle distZip
を実行すると、build/distributions へ zip ファイルが生成されます。
> gradle distZip :compileJava UP-TO-DATE :processResources UP-TO-DATE :classes UP-TO-DATE :jar :startScripts :distZip BUILD SUCCESSFUL
zip ファイルを解凍して、bin ディレクトリの起動スクリプト (今回のサンプルでは ratpack_sample
) を実行すれば Web アプリケーションを単体起動できます。
MyBatis / iBatis の動的 SQL を API で作成
MyBatis / iBatis の API を使って DB へ接続せずに Mapper XML の動的 SQL を作成する方法です。
ソースは http://github.com/fits/try_samples/tree/master/blog/20141221/
MyBatis の場合
動的 SQL の結果を取得する手順は下記のようになります。
- (1) Configuration をインスタンス化
- (2) (1) と Mapper XML で XMLMapperBuilder をインスタンス化
- (3) Mapper XML をパース
- (4) (1) から指定の SQL に対応した MappedStatement を取得
- (5) (4) で取得した MappedStatement へパラメータを渡して動的 SQL を構築、結果の SQL を取得
今回は Groovy で実装してみました。
動的 SQL のパラメータはコマンドライン引数で JSON 文字列として指定するようにしています。
mybatis_sql_gen.groovy
@Grab('org.mybatis:mybatis:3.2.8') import org.apache.ibatis.session.* import org.apache.ibatis.builder.xml.* import groovy.json.JsonSlurper if (args.length < 3) { println '<mybatis mapper xml> <sql id> <json params>' return } // (1) def config = new Configuration() // (2) def parser = new XMLMapperBuilder(new File(args[0]).newInputStream(), config, "", config.sqlFragments) // (3) parser.parse() // (4) def st = config.getMappedStatement(args[1]) // パラメータの作成(JSON 文字列から) def params = new JsonSlurper().parseText args[2] // (5) def sql = st.getBoundSql(params).sql println sql
実行
下記 Mapper XML を使って実行してみます。
mapper.xml
<?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="sample"> <select id="findData"> SELECT * FROM data WHERE title like #{title} <if test="author != null and author.name != null"> AND author_name like #{author.name} </if> <if test="types"> AND type in <foreach item="type" collection="types" open="(" separator="," close=")"> #{type} </foreach> </if> </select> </mapper>
実行例1
まずはパラメータ無しの場合。{}
> groovy mybatis_sql_gen.groovy mapper.xml findData "{}" SELECT * FROM data WHERE title like ?
実行例2
次に、author.name と types パラメータを指定した場合。{"author":{"name": 1}, "types": [1, 2, 3]}
なお、今回の動的 SQL ではパラメータの有無しか見ていませんので、値には適当な数値 (1
や [1, 2, 3]
) を使っています。
> groovy mybatis_sql_gen.groovy gen.groovy mapper.xml findData "{\"author\":{\"name\": 1}, \"types\": [1, 2, 3]}" SELECT * FROM data WHERE title like ? AND author_name like ? AND type in ( ? , ? , ? )
iBatis の場合
iBatis の場合も、使用する API は異なりますが同じような手順で処理できます。
ibatis_sql_gen.groovy
@Grab('org.apache.ibatis:ibatis-sqlmap:2.3.4.726') import com.ibatis.sqlmap.engine.builder.xml.* import com.ibatis.sqlmap.engine.scope.* import groovy.json.JsonSlurper if (args.length < 3) { println '<ibatis mapper xml> <sql id> <json params>' return } def state = new XmlParserState() def parser = new SqlMapParser(state) parser.parse(new File(args[0]).newInputStream()) // SqlMapExecutorDelegate を取得 def dlg = state.config.delegate def st = dlg.getMappedStatement(args[1]) def sql = st.sql def scope = new StatementScope(new SessionScope()) scope.statement = st // パラメータの作成(JSON 文字列から) def params = new JsonSlurper().parseText args[2] println sql.getSql(scope, params)
実行
下記 Mapper XML を使って同様に実行してみます。
mapper.xml
<?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE sqlMap PUBLIC "-//ibatis.apache.org//DTD SQL Map 2.0//EN" "http://ibatis.apache.org/dtd/sql-map-2.dtd"> <sqlMap namespace="sample"> <select id="findData"> SELECT * FROM data WHERE title like #title# <isNotNull property="author"> <isNotNull property="author.name"> AND author_name like #author.name# </isNotNull> </isNotNull> <isNotNull property="types"> AND type in <iterate property="types" open="(" conjunction="," close=")"> #types[]# </iterate> </isNotNull> </select> </sqlMap>
実行例1
> groovy ibatis_sql_gen.groovy mapper.xml findData "{}" SELECT * FROM data WHERE title like ?
実行例2
> groovy ibatis_sql_gen.groovy mapper.xml findData "{\"author\":{\"name\": 1}, \"types\": [1, 2, 3]}" SELECT * FROM data WHERE title like ? AND author_name like ? AND type in ( ? , ? , ? )
Sodium で関数型リアクティブプログラミング2 - skip・take 処理
前回に続き、Sodium を試してみます。
今回は 「RxJava で行単位のファイル処理 - Groovy, Java Lambda」 で実装したものと同等の処理を Sodium を使って実装してみました。
ソースは http://github.com/fits/try_samples/tree/master/blog/20141209/
skip・take 処理
Sodium には、RxJava の際に使った skip
(指定した処理数だけ無視する) や take
(指定した処理数だけ取り出す) のようなメソッドが用意されていないようなので、Event と Behavior を組み合わせて実装してみる事にします。
Event クラスには gate
メソッドがあり、このメソッドを使えば Behavior オブジェクトの値が true の場合だけイベントを発生するような Event オブジェクトを作成できます。
つまり、指定した数のイベントを受け取れば true と false が反転する Behavior オブジェクトを用意して gate
メソッドへ与えてやれば skip や take の処理を実現した Event オブジェクトを作成できそうです。
なお、skip と take の違いは、false から true への変化か true から false への変化かの違いしかありませんので、下記のサンプルでは batch
メソッドとして共通化し、判定処理を引数 Lambda1<Integer, Boolean> cond
として渡すようにしました。
batch
メソッドは以下のような処理内容となっています。
- (1) イベントの発生数をカウントアップする Behavior を用意 (実際は BehaviorSink を使用)
- (2) イベント発生時に (1) をカウントアップ
- (3) (1) のカウント値が条件に合致した場合のみイベントを発生させる Event を作成
ReadLineFile.java
import sodium.*; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.function.*; class ReadLineFile { public static void main(String... args) throws Exception { // skip 処理 Function<Integer, Function<Event<String>, Event<String>>> skip = n -> { return ev -> batch( v -> v >= n, n, ev); }; // take 処理 Function<Integer, Function<Event<String>, Event<String>>> take = n -> { return ev -> batch( v -> v < n, n, ev); }; // 1行スキップして 3行取得する Event を作成する処理を合成 (b) (c) Function<Event<String>, Event<String>> skipAndTake3 = skip.apply(1).andThen( take.apply(3) ); // (a) EventSink<String> es = new EventSink<>(); // (d) Listener esl = skipAndTake3.apply(es).map( v -> "# " + v ).listen( System.out::println ); // ファイルを行単位で処理 readFileLines(args[0], es); esl.unlisten(); } private static void readFileLines(String fileName, EventSink<String> es) throws IOException { // ファイルを行単位で EventSink へ send try (BufferedReader br = new BufferedReader(new FileReader(fileName))) { br.lines().forEach( es::send ); } } // skip・take の共通処理 private static Event<String> batch(Lambda1<Integer, Boolean> cond, int n, Event<String> ev) { // (1) イベント発生数をカウントする Behavior を用意 BehaviorSink<Integer> counter = new BehaviorSink<>(0); // (2) イベント発生時に counter をカウントアップ ev.listen( v -> counter.send(counter.sample() + 1) ); // (3) counter の値が条件に合致した場合のみイベント発生する Event を作成 return ev.gate(counter.map(cond)); } }
上記は、以下の 4つの Event が (a) -> (b) -> (c) -> (d)
の順でイベントを伝播するような構成となっており、(d) に到達した場合のみ System.out::println
を実行するようになっています。
- (a) EventSink
- (b) skip で作成した Event
- (c) take で作成した Event
- (d) 先頭に "# " を付与する Event
また、batch
メソッドの実装内容に関しては counter の無駄なカウントアップを防止するため、下記のように unlisten
するようにした方が望ましいかもしれません。
unlisten する処理を追加した batch メソッドの例
private static Event<String> batch(Lambda1<Integer, Boolean> cond, int n, Event<String> ev) { BehaviorSink<Integer> counter = new BehaviorSink<>(0); final ArrayList<Listener> list = new ArrayList<>(1); list.add(ev.listen( v -> { int newValue = counter.sample() + 1; counter.send(newValue); if (newValue >= n) { list.stream().forEach( li -> li.unlisten() ); } })); return ev.gate(counter.map(cond)); }
実行
それでは、下記ファイルを使って実行してみます。
test1.txt
1a 2b 3c 4d 5e
実行結果は以下の通りです。 1行スキップした後、3行を先頭に "# " を付与して出力しています。
実行結果
> java -cp .;sodium.jar ReadLineFile test1.txt # 2b # 3c # 4d
test1.txt を処理した際の (a) ~ (d) の Event に対するイベント伝播状況をまとめると以下のようになっていると考えられます。(○ は伝播する、× は伝播しない)
対象行 | 値 | (a) | (b) | (c) | (d) |
---|---|---|---|---|---|
1行目 | 1a | ○ | ○ | × | × |
2行目 | 2b | ○ | ○ | ○ | ○ |
3行目 | 3c | ○ | ○ | ○ | ○ |
4行目 | 4d | ○ | ○ | ○ | ○ |
5行目 | 5e | ○ | ○ | ○ | × |
(d) まで到達した 2 ~ 4行目だけを出力する結果となります。
Sodium で関数型リアクティブプログラミング
関数型リアクティブプログラミング(FRP)用のライブラリ Sodium を試してみました。
Sodium には現時点で Java・Haskell・C++・C# 用のライブラリが用意されていますが(Embedded-C や Rust 用のライブラリも実装中の模様)、今回は Java 用のモジュールを使います。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20141123/
はじめに
Sodium の Java 用モジュールは Maven のセントラルリポジトリ等で配布されていないようなので、ソースを取得してビルドする事にします。
ビルドには Java 8 と Apache Ant を使います。(build.xml の source・target が 1.8 となっています)
ビルド例
$ git clone https://github.com/SodiumFRP/sodium.git ・・・ $ cd sodium/java $ ant
ビルドに成功すると sodium/sodium.jar ファイルが生成されます。
Event と Behavior
Sodium では下記のような Event
と Behavior
を組み合わせて処理を組み立てます。
クラス | 特徴 | 現在値の取得(sample メソッド) | イベント受信(listen メソッド) |
---|---|---|---|
Event | 離散的なストリームを扱う | × | ○ |
Behavior | 連続的なストリームを扱う | ○ | × |
Event の処理内容
まずは Event を単純に listen
するだけの処理を実装してみます。
Event に何らかの値を送信する(イベントを発火させる)には Event
のサブクラスである EventSink
の send
メソッドを使います。
なお、今回のようなサンプルでは Listener
を unlisten
する必要は無いのですが、一応入れています。
EventSample.java
import sodium.*; class EventSample { public static void main(String... args) { EventSink<String> es = new EventSink<>(); Listener esl = es.listen(System.out::println); es.send("ES1"); System.out.println("---"); es.send("ES2"); esl.unlisten(); } }
実行結果は下記の通りです。
EventSink へ send した値 (ES1
と ES2
) が listen の処理 (System.out::println) へ渡されています。
ビルドと実行
> javac -cp sodium.jar EventSample.java > java -cp .;sodium.jar EventSample ES1 --- ES2
Behavior の処理内容
次は、Behavior
のカレント値の変更を listen してみます。
Behavior を直接 listen する事はできませんが、updates
や value
メソッドを使えば Behavior の値の変更に対応した Event を取得できます。
updates と value の違いは、取得した Event が listen 時にカレント値を含むかどうかの違いです。
動作としては RxJava の PublishSubject と BehaviorSubject にそれぞれ該当すると思います。
Behavior の Event 取得メソッド | listen 時のカレント値の扱い | RxJava の類似クラス |
---|---|---|
updates | 含まない | rx.subjects.PublishSubject |
value | 含む | rx.subjects.BehaviorSubject |
Event と同様に Behavior のカレント値を変更するには BehaviorSink
の send
メソッドを使います。
BehaviorSample.java
import sodium.*; class BehaviorSample { public static void main(String... args) { updatesSample(); System.out.println(""); valueSample(); } // updates メソッドのサンプル private static void updatesSample() { System.out.println("*** Behavior.updates sample ***"); BehaviorSink<String> bh = new BehaviorSink<>("BH1"); Listener bhl = bh.updates().listen( msg -> System.out.println("behavior: " + msg) ); bh.send("BH2"); System.out.println("---"); bh.send("BH3"); bhl.unlisten(); } // value メソッドのサンプル private static void valueSample() { System.out.println("*** Behavior.value sample ***"); BehaviorSink<String> bh = new BehaviorSink<>("BH1"); Listener bhl = bh.value().listen( msg -> System.out.println("behavior: " + msg) ); bh.send("BH2"); System.out.println("---"); bh.send("BH3"); bhl.unlisten(); } }
value メソッドの場合のみ、初期値として設定した値 (BH1
) を出力しています。
実行結果
> java -cp .;sodium.jar BehaviorSample *** Behavior.updates sample *** behavior: BH2 --- behavior: BH3 *** Behavior.value sample *** behavior: BH1 behavior: BH2 --- behavior: BH3
Event の各種メソッド
最後に Event クラスの map・merge・hold・snapshot メソッドを簡単に試してみます。
map
map
メソッドによって元の Event で発火した値を加工した値を発火する Event を作成できます。
import sodium.*; class EventMethodSample { public static void main(String... args) { mapSample(); ・・・ } private static void mapSample() { System.out.println("*** Event.map sample ***"); EventSink<String> es = new EventSink<>(); Listener esl = es.listen( msg -> System.out.println("event sink: " + msg) ); // 元の値に !!! を付ける Event 作成 Event<String> me = es.map( msg -> msg + "!!!" ); Listener mel = me.listen( msg -> System.out.println("mapped event: " + msg) ); es.send("ME1"); es.send("ME2"); mel.unlisten(); esl.unlisten(); } ・・・ }
ちなみに、上記では使っていませんが、Listener
は append
する事が可能です。 (append で単一の Listener へまとめれば unlisten を個々に実施しなくても済みます)
実行結果
> java -cp .;sodium.jar EventMethodSample *** Event.map sample *** event sink: ME1 mapped event: ME1!!! event sink: ME2 mapped event: ME2!!! ・・・
merge
merge
メソッドによって二つの Event をマージできます。下記ではどちらの Event が発火しても発火する Event を作成しています。
import sodium.*; class EventMethodSample { public static void main(String... args) { ・・・ mergeSample(); ・・・ } ・・・ private static void mergeSample() { System.out.println("*** Event.merge sample ***"); EventSink<String> es1 = new EventSink<>(); Listener es1l = es1.listen( msg -> System.out.println("event sink1: " + msg) ); EventSink<String> es2 = new EventSink<>(); Listener es2l = es2.listen( msg -> System.out.println("event sink2: " + msg) ); Event<String> me = es1.merge(es2); Listener mel = me.listen( msg -> System.out.println("merged event: " + msg) ); es1.send("ES1-1"); System.out.println("---"); es2.send("ES2-1"); System.out.println("---"); es1.send("ES1-2"); mel.unlisten(); es2l.unlisten(); es1l.unlisten(); } ・・・ }
実行結果
> java -cp .;sodium.jar EventMethodSample ・・・ *** Event.merge sample *** event sink1: ES1-1 merged event: ES1-1 --- event sink2: ES2-1 merged event: ES2-1 --- event sink1: ES1-2 merged event: ES1-2 ・・・
hold
hold
メソッドによって Event の発火した値でカレント値が変化する Behavior を作成できます。
import sodium.*; class EventMethodSample { public static void main(String... args) { ・・・ holdSample(); ・・・ } ・・・ private static void holdSample() { System.out.println("*** Event.hold sample ***"); EventSink<String> es = new EventSink<>(); Listener esl = es.listen( msg -> System.out.println("event sink: " + msg) ); Behavior<String> bh = es.hold("BH1"); Listener bhl = bh.value().listen( msg -> System.out.println("behavior: " + msg) ); es.send("ES1"); System.out.println("bh current value: " + bh.sample()); System.out.println("---"); es.send("ES2"); System.out.println("bh current value: " + bh.sample()); esl.unlisten(); bhl.unlisten(); } ・・・ }
bh
の初期値は BH1
ですが、send した値 (ES1
や ES2
) によって sample
メソッドの結果が変化しています。
実行結果
> java -cp .;sodium.jar EventMethodSample ・・・ *** Event.hold sample *** behavior: BH1 event sink: ES1 behavior: ES1 bh current value: ES1 --- event sink: ES2 behavior: ES2 bh current value: ES2 ・・・
snapshot
snapshot
によって Event 発火時に任意の Behavior のカレント値を発火する Event を作成できます。
import sodium.*; class EventMethodSample { public static void main(String... args) { ・・・ snapshotSample(); } ・・・ private static void snapshotSample() { System.out.println("*** Event.snapshot sample ***"); EventSink<String> es = new EventSink<>(); Listener esl = es.listen( msg -> System.out.println("event sink: " + msg) ); Behavior<Integer> bh = new Behavior<>(1); Listener bhl = bh.value().listen( msg -> System.out.println("behavior: " + msg) ); Event<Integer> se = es.snapshot(bh); Listener sel = se.listen( i -> System.out.println("snapshot event: " + i) ); es.send("ES1"); System.out.println("bh current value: " + bh.sample()); System.out.println("---"); es.send("ES2"); System.out.println("bh current value: " + bh.sample()); sel.unlisten(); esl.unlisten(); bhl.unlisten(); } }
snapshot で作成した Event (se
) は EventSink へ send した値 (ES1
と ES2
) に関わらず、bh
のカレント値 (1
) を発火しています。
実行結果
> java -cp .;sodium.jar EventMethodSample ・・・ *** Event.snapshot sample *** behavior: 1 event sink: ES1 snapshot event: 1 bh current value: 1 --- event sink: ES2 snapshot event: 1 bh current value: 1 ・・・
snapshot した Event で発火するのは Behavior のカレント値であることを確認するため、上記の Behavior を BehaviorSink へ変更し ES2
を send する前にカレント値を 2
へ変更してみました。
import sodium.*; class EventMethodSample { public static void main(String... args) { ・・・ snapshotSample2(); } ・・・ private static void snapshotSample2() { ・・・ // BehaviorSink へ変更 BehaviorSink<Integer> bh = new BehaviorSink<>(1); Listener bhl = bh.value().listen( msg -> System.out.println("behavior: " + msg) ); Event<Integer> se = es.snapshot(bh); ・・・ System.out.println("---"); // bh のカレント値を 2 へ変更 bh.send(2); es.send("ES2"); System.out.println("bh current value: " + bh.sample()); ・・・ } }
Behavior のカレント値を 2
へ変更した後、snapshot の Event は 2
の値を発火している事を確認できます。
実行結果
> java -cp .;sodium.jar EventMethodSample ・・・ *** Event.snapshot sample2 *** behavior: 1 event sink: ES1 snapshot event: 1 bh current value: 1 --- behavior: 2 event sink: ES2 snapshot event: 2 bh current value: 2