Akka Streams で skip・take 処理

前回の 「Reactor で skip・take 処理」 と同様の処理を Akka Streams を使用し Java 8 で実装してみました。

  • Akka Streams 1.0 M2

ソースは http://github.com/fits/try_samples/tree/master/blog/20150112/

はじめに

Gradle を使ってビルド・実行するため、下記のような build.gradle を用意しました。

build.gradle
apply plugin: 'application'

repositories {
    jcenter()
}

dependencies {
    compile 'com.typesafe.akka:akka-stream-experimental_2.11:1.0-M2'
}

mainClassName = 'sample.Sample1'
//mainClassName = 'sample.Sample2'
//mainClassName = 'sample.PublisherSample1'
//mainClassName = 'sample.PublisherSample2'

Iterable を使った処理1

Iterable を使った固定的なデータストリームに対して skip・take 処理を実装してみます。

まず、下記 (2) のように ActorSystemFlowMaterializer オブジェクトを作成し、処理完了時に ActorSystem を shutdown する処理 (3) を用意しておきます。

今回は sample1 ~ sample6 のデータを Java 8 の Stream を使って作りましたが、Stream は Iterable インターフェースを持っておらず、Akka Streams には今のところ Source.from(Stream) のようなメソッドも用意されていないので、List へ変換するなどして Iterable 化する必要があります。 (4)

(5) では、(4) を使って Source を生成し、skip・take 処理を実施しています。 なお、Akka Streams では skip 処理を drop メソッドとして用意しています。

最後に、(1) は Akka の不要なログ出力 (info レベル) を抑制するための措置です。 (設定ファイルで設定する方法もあります)

akka.logleveloff にしてログ出力を無効化する事もできますが、そうするとエラーログすら出力されなくなるのでご注意下さい。

Sample1.java
package sample;

import akka.actor.ActorSystem;
import akka.dispatch.OnComplete;
import akka.stream.FlowMaterializer;
import akka.stream.javadsl.Source;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;

import scala.runtime.BoxedUnit;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class Sample1 {
    public static void main(String... args) {
        // (1) Akka の不要なログ出力を抑制するための設定
        final Config config = ConfigFactory.load()
            .withValue("akka.loglevel", ConfigValueFactory.fromAnyRef("error"));

        // (2) ActorSystem・FlowMaterializer の作成
        final ActorSystem system = ActorSystem.create("sample", config);
        final FlowMaterializer materializer = FlowMaterializer.create(system);

        // (3) 終了時の処理
        final OnComplete<BoxedUnit> complete = new OnComplete<BoxedUnit>() {
            @Override
            public void onComplete(Throwable failure, BoxedUnit success) {
                system.shutdown();
            }
        };

        // (4) sample1 ~ sample6
        List<String> data = IntStream.range(1, 7).mapToObj(i -> "sample" + i).collect(Collectors.toList());
        // 以下でも可
        //Iterable<String> data = () -> IntStream.range(1, 7).mapToObj(i -> "sample" + i).iterator();

        // (5) 処理
        Source.from(data)
            .drop(3)
            .take(2)
            .map(s -> "#" + s)
            .foreach(System.out::println, materializer)
            .onComplete(complete, system.dispatcher());
    }
}

また、(4) (5) の箇所は下記のように実装する事もできます。

Stream<String> stream = IntStream.range(1, 7).mapToObj(i -> "sample" + i);

Source.from((Iterable<String>)stream::iterator)
    .drop(3)
    ・・・
実行結果
> gradle -q run

#sample4
#sample5

Iterable を使った処理2

Sample1.javaFlow を使って書き換えると下記のようになります。
Flow によってストリームの加工処理部分を分離できます。

Sample2.java
・・・
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
・・・

public class Sample2 {
    public static void main(String... args) {
        ・・・

        Flow<String, String> flow = Flow.<String>create()
            .drop(3)
            .take(2)
            .map(s -> "#" + s);

        List<String> data = IntStream.range(1, 7).mapToObj(i -> "sample" + i).collect(Collectors.toList());

        flow.runWith(
            Source.from(data), 
            Sink.foreach(System.out::println), 
            materializer
        ).onComplete(complete, system.dispatcher());
    }
}
実行結果
> gradle -q run

#sample4
#sample5
build.gradle
・・・
//mainClassName = 'sample.Sample1'
mainClassName = 'sample.Sample2'

Publisher を使った処理1

これまで固定的なデータを使って処理を行いましたが、前回Broadcaster のような処理 (ストリームへ任意の要素を送信) を Akka Streams で実現するには org.reactivestreams.Publisher を利用できます。

RxJava 等と組み合わせるのが簡単だと思いますが、今回は Publisher の実装クラスを自作してみました。(SamplePublisher クラス)

まずは sample1 ~ sample6 をそのまま出力する処理を実装してみましたが、2点ほど注意点がありました。

  • (1) akka.stream.materializer.initial-input-buffer-size のデフォルト値である 4 を超えたデータ数がバッファに格納されるとエラー (Input buffer overrun) が発生する
  • (2) Subscriber に対して onSubscribe しないと onNext が機能しない

(1) を回避するために buffer メソッドを使って十分なバッファ数を確保するように変更しました。

OverflowStrategy に関しては用途に応じて設定する事になると思いますが、dropXXX や error を使用すると、バッファが溢れた際にデータが欠ける事になるので注意が必要です。

ちなみに、(1) のデフォルト設定は akka-stream-experimental_2.11-1.0-M2.jar 内の reference.conf ファイルで設定されています。

また、(2) の対策のため、Subscriber (実体は ActorProcessor) へ何も処理を行わない Subscription を onSubscribe しています。

PublisherSample1.java
・・・
import akka.stream.OverflowStrategy;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
・・・

public class PublisherSample1 {
    public static void main(String... args) {
        ・・・

        try (SamplePublisher<String> publisher = new SamplePublisher<>()) {

            Source.from(publisher)
                // (1) Input buffer overrun の防止策
                .buffer(10, OverflowStrategy.backpressure())
                .map(s -> "#" + s)
                .foreach(System.out::println, materializer)
                .onComplete(complete, system.dispatcher());

            IntStream.range(1, 7)
                .mapToObj(i -> "sample" + i)
                .forEach(d -> publisher.send(d));
        }
    }

    private static class SamplePublisher<T> implements Publisher<T>, AutoCloseable {
        private List<Subscriber<? super T>> list = new ArrayList<>();

        @Override
        public void subscribe(Subscriber<? super T> s) {
            list.add(s);
            // (2) 下記を実施しないと onNext が機能しない
            s.onSubscribe(new Subscription() {
                @Override public void request(long n) {}
                @Override public void cancel() {}
            });
        }

        @Override
        public void close() {
            list.forEach(Subscriber::onComplete);
        }

        public void send(T msg) {
            // Subscriber へデータ送信
            list.forEach(s -> s.onNext(msg));
        }
    }
}

SamplePublisher クラス内で Subscriber を List にて管理していますが、上記処理では subscribe は 1度しか実行されないので List で管理する必要は特にありません。

実行結果
> gradle -q run

#sample1
#sample2
#sample3
#sample4
#sample5
#sample6
build.gradle
・・・
//mainClassName = 'sample.Sample2'
mainClassName = 'sample.PublisherSample1'

Publisher を使った処理2

最後に skip(drop)・take の処理を追加してみます。

PublisherSample2.java
・・・

public class PublisherSample2 {
    public static void main(String... args) {
        ・・・

        try (SamplePublisher<String> publisher = new SamplePublisher<>()) {

            Source.from(publisher)
                // Input buffer overrun の防止策
                .buffer(10, OverflowStrategy.backpressure())
                .drop(3)
                .take(2)
                .map(s -> "#" + s)
                .foreach(System.out::println, materializer)
                .onComplete(complete, system.dispatcher());

            IntStream.range(1, 7)
                .mapToObj(i -> "sample" + i)
                .forEach(d -> publisher.send(d));
        }
    }

    private static class SamplePublisher<T> implements Publisher<T>, AutoCloseable {
        ・・・
    }
}
実行結果
> gradle -q run

#sample4
#sample5
build.gradle
・・・
//mainClassName = 'sample.PublisherSample1'
mainClassName = 'sample.PublisherSample2'

Reactor で skip・take 処理

Bacon.js で skip・take 処理」と同様の処理を Reactor を使用し Java 8 で実装してみました。

ソースは http://github.com/fits/try_samples/tree/master/blog/20150104-2/

はじめに

今回は Gradle を使ってビルド・実行するため、下記のような build.gradle を用意しました。

build.gradle
apply plugin: 'application'

repositories {
    maven { url 'http://repo.spring.io/libs-snapshot' }
    jcenter()
}

dependencies {
    compile 'io.projectreactor:reactor-core:2.0.0.M2'
    runtime 'org.slf4j:slf4j-nop:1.7.9'
}

mainClassName = 'sample.Sample1'
//mainClassName = 'sample.Sample2'

単純な処理

ストリームへ送信された sample1 ~ sample6 をそのまま出力する処理を実装しました。 Broadcaster を使えばストリームへ要素を送信する事ができます。

Sample1.java
package sample;

import reactor.Environment;
import reactor.rx.Promise;
import reactor.rx.Streams;
import reactor.rx.stream.Broadcaster;

public class Sample1 {
    public static void main(String... args) throws Exception {
        Environment env = new Environment();

        Broadcaster<String> stream = Streams.broadcast(env);

        Promise<String> promise = stream.observe(System.out::println).next();

        Streams.range(1, 6).consume( i -> stream.onNext("sample" + i) );

        promise.await();

        env.shutdown();
    }
}

実行結果は下記の通りです。

実行結果
> gradle -q run

sample1
sample2
sample3
sample4
sample5
sample6

skip・take 処理

次に、skip と take 処理ですが、Reactor には skip に該当する処理が見当たらなかったので、filterAtomicInteger を使って自前で実装してみました。

なお、下記 (1) の箇所のように next メソッドを使って Promise を取得した場合は take した最初の要素しか出力されない点に注意が必要です。

take した全ての要素を出力させるには、下記 (2) の箇所のように最後の要素を取得する tap メソッド (戻り値は TapAndControls<O>) を使ったり、全要素をリスト化して取得する toList メソッド (戻り値は Promise<java.util.List<O>>) 等を使ったりする必要がありました。

Sample2.java
package sample;

import reactor.Environment;
import reactor.rx.Promise;
import reactor.rx.Stream;
import reactor.rx.Streams;
import reactor.rx.stream.Broadcaster;
import reactor.rx.action.support.TapAndControls;

import java.util.concurrent.atomic.AtomicInteger;

public class Sample2 {
    public static void main(String... args) throws Exception {
        Environment env = new Environment();

        skipAndTakeSampleA(env);

        System.out.println("-----");

        skipAndTakeSampleB(env);

        env.shutdown();
    }

    private static void skipAndTakeSampleA(Environment env) throws Exception {
        Broadcaster<String> stream = Streams.broadcast(env);

        // (1) next を使うと take した最初の要素のみ出力
        Promise<String> promise = skip(stream, 2)
            .take(3)
            .map(s -> "#" + s)
            .observe(System.out::println)
            .next();

        Streams.range(1, 6).consume( i -> stream.onNext("sampleA-" + i) );

        promise.await();
    }

    private static void skipAndTakeSampleB(Environment env) throws Exception {
        Broadcaster<String> stream = Streams.broadcast(env);

        // (2) take した全要素を出力するには tap や toList 等を使う必要あり
        TapAndControls<String> tap = skip(stream, 2)
            .take(3)
            .map(s -> "#" + s)
            .observe(System.out::println)
            .tap();

        Streams.range(1, 6).consume( i -> stream.onNext("sampleB-" + i) );

        tap.get();
    }

    // skip 処理
    private static <T> Stream<T> skip(Stream<T> st, int num) {
        AtomicInteger counter = new AtomicInteger();

        return st.filter(s -> counter.incrementAndGet() > num);
    }
}

実行結果は下記の通りです。 (build.gradle の mainClassName を変更して実行)

実行結果
> gradle -q run

#sampleA-3
-----
#sampleB-3
#sampleB-4
#sampleB-5
build.gradle
・・・
//mainClassName = 'sample.Sample1'
mainClassName = 'sample.Sample2'

Bacon.js で skip・take 処理

リアクティブプログラミング用ライブラリの Bacon.js を Node.js 上で使用し、「RxJS で行単位のファイル処理」 で試したような skip・take 処理のサンプルを実装してみました。

ソースは http://github.com/fits/try_samples/tree/master/blog/20150104/

はじめに

npm でインストールするために下記のような package.json を用意します。 今回は CoffeeScript で実装するので coffee-script も追加しています。

package.json
{
  ・・・
  "dependencies": {
    "baconjs": "*",
    "coffee-script": "*"
  }
}

npm install でインストールします。

インストール
> npm install

Bus の利用

まずは Bus を使って、ストリームへ push した内容をそのままログ出力する処理を実装してみます。

Bus は後から要素を push できるストリームですが、Bacon.js には他にも色々とストリームを作る方法が用意されています。 (下記は一部)

  • 配列からストリームを作る Bacon.fromArray
  • コールバック関数を使ってストリームを作る Bacon.fromCallback, Bacon.fromNodeCallback (Node.js 用のコールバック関数へ適用)
  • 任意のストリームを作る Bacon.fromBinder
sample1.coffee
Bacon = require('baconjs').Bacon

bus = new Bacon.Bus()

bus.log()

[1..6].forEach (i) -> bus.push "sample#{i}"

bus.end()

実行結果は下記の通りです。 Bus へ push した sample1 ~ sample6 を順次出力しています。

実行結果
> coffee sample1.coffee

sample1
sample2
sample3
sample4
sample5
sample6
<end>

skip・take 処理

次に、下記のような加工処理を追加してみました。

  • (1) 2つの要素を無視 (skip)
  • (2) 3つの要素だけを取得 (take)
  • (3) 先頭に # を付ける (map)
sample2.coffee
Bacon = require('baconjs').Bacon

bus = new Bacon.Bus()

bus.skip(2).take(3).map((s) -> "##{s}").log()

[1..6].forEach (i) -> bus.push "sample#{i}"

bus.end()

実行結果は下記の通りです。

実行結果
> coffee sample2.coffee

#sample3
#sample4
#sample5
<end>

Ratpack で Java Web アプリケーション作成

Ratpack は以前 「Ratpack + JHaml + Morphia で MongoDB を使った Web アプリ開発」 で試しましたが、3年以上経っているので改めて試してみました。

今回は単純な Java Web アプリケーションを Ratpack で作成する事にします。

ソースは http://github.com/fits/try_samples/tree/master/blog/20141229/

Web アプリケーションの作成

今回の作成手順は以下のようになります。

  • (1) HandlerFactory インターフェースの実装クラスを作成
  • (2) ratpack.properties で handlerFactory を設定

ファイル構成

今回は Gradle でビルドしましたので、ファイルは以下のような構成になっています。

  • build.gradle
  • src/main/java/AppHandlerFactory.java
  • src/ratpack/ratpack.properties

ビルド定義

アプリケーションの実行やアーカイブ化を簡単に行うため、Gradle 用の ratpack-java プラグインを使います。

ratpack-java を使う場合、ratpack の依存設定 (dependencies) は要らないようですが、slf4j だけは実行に要るようです。

build.gradle
buildscript {
    repositories {
        jcenter()
    }

    dependencies {
        classpath 'io.ratpack:ratpack-gradle:0.9.11'
    }
}

apply plugin: 'io.ratpack.ratpack-java'

repositories {
    jcenter()
}

dependencies {
    runtime 'org.slf4j:slf4j-simple:1.7.5'
}

(1) HandlerFactory 実装クラスの作成

とりあえず /sample/xxx へアクセスすると単に "sample - xxx" を出力するだけの処理を実装しました。 (xxx は任意の文字列)

出力には Context オブジェクトの render() メソッドを使用し、URL のパラメータ部分 (下記の :id) は PathTokensget() メソッドで取得します。

src/main/java/AppHandlerFactory.java
import static ratpack.handling.Handlers.*;

import ratpack.handling.Context;
import ratpack.handling.Handler;
import ratpack.launch.HandlerFactory;
import ratpack.launch.LaunchConfig;

public class AppHandlerFactory implements HandlerFactory {
    @Override
    public Handler create(LaunchConfig config) throws Exception {
        return chain(
            path("sample/:id", ctx -> 
                ctx.render("sample - " + ctx.getPathTokens().get("id")))
        );
    }
}

(2) handlerFactory の設定

作成した AppHandlerFactory を handlerFactory へ設定します。

こうする事で、実行時に handlerFactory として AppHandlerFactory が使用されます。

src/ratpack/ratpack.properties
handlerFactory=AppHandlerFactory

テスト実行

gradle run でテスト実行できます。

> gradle run

:compileJava UP-TO-DATE
:processResources UP-TO-DATE
:classes UP-TO-DATE
:configureRun
:prepareBaseDir UP-TO-DATE
:run
[main] INFO ratpack.server.internal.NettyRatpackServer - Ratpack started for http://localhost:5050
・・・

http://localhost:5050/sample/abc へアクセスすると、sample - abc が表示されます。

成果物の生成

gradle distZip を実行すると、build/distributions へ zip ファイルが生成されます。

> gradle distZip

:compileJava UP-TO-DATE
:processResources UP-TO-DATE
:classes UP-TO-DATE
:jar
:startScripts
:distZip

BUILD SUCCESSFUL

zip ファイルを解凍して、bin ディレクトリの起動スクリプト (今回のサンプルでは ratpack_sample) を実行すれば Web アプリケーションを単体起動できます。

MyBatis / iBatis の動的 SQL を API で作成

MyBatis / iBatisAPI を使って DB へ接続せずに Mapper XML の動的 SQL を作成する方法です。

ソースは http://github.com/fits/try_samples/tree/master/blog/20141221/

MyBatis の場合

動的 SQL の結果を取得する手順は下記のようになります。

  • (1) Configuration をインスタンス
  • (2) (1) と Mapper XML で XMLMapperBuilder をインスタンス
  • (3) Mapper XML をパース
  • (4) (1) から指定の SQL に対応した MappedStatement を取得
  • (5) (4) で取得した MappedStatement へパラメータを渡して動的 SQL を構築、結果の SQL を取得

今回は Groovy で実装してみました。

動的 SQL のパラメータはコマンドライン引数で JSON 文字列として指定するようにしています。

mybatis_sql_gen.groovy
@Grab('org.mybatis:mybatis:3.2.8')
import org.apache.ibatis.session.*
import org.apache.ibatis.builder.xml.*

import groovy.json.JsonSlurper

if (args.length < 3) {
    println '<mybatis mapper xml> <sql id> <json params>'
    return
}
// (1)
def config = new Configuration()
// (2)
def parser = new XMLMapperBuilder(new File(args[0]).newInputStream(), config, "", config.sqlFragments)
// (3)
parser.parse()
// (4)
def st = config.getMappedStatement(args[1])
// パラメータの作成(JSON 文字列から)
def params = new JsonSlurper().parseText args[2]
// (5)
def sql = st.getBoundSql(params).sql

println sql

実行

下記 Mapper XML を使って実行してみます。

mapper.xml
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="sample">
  <select id="findData">
    SELECT * FROM data
    WHERE
      title like #{title}
    <if test="author != null and author.name != null">
      AND author_name like #{author.name}
    </if>
    <if test="types">
      AND type in 
      <foreach item="type" collection="types" open="(" separator="," close=")">
        #{type}
      </foreach>
    </if>
  </select>
</mapper>
実行例1

まずはパラメータ無しの場合。{}

> groovy mybatis_sql_gen.groovy mapper.xml findData "{}"

SELECT * FROM data
    WHERE
      title like ?
実行例2

次に、author.name と types パラメータを指定した場合。{"author":{"name": 1}, "types": [1, 2, 3]}

なお、今回の動的 SQL ではパラメータの有無しか見ていませんので、値には適当な数値 (1[1, 2, 3]) を使っています。

> groovy mybatis_sql_gen.groovy gen.groovy mapper.xml findData "{\"author\":{\"name\": 1}, \"types\": [1, 2, 3]}"

SELECT * FROM data
    WHERE
      title like ?

      AND author_name like ?


      AND type in
       (
        ?
       ,
        ?
       ,
        ?
       )

iBatis の場合

iBatis の場合も、使用する API は異なりますが同じような手順で処理できます。

ibatis_sql_gen.groovy
@Grab('org.apache.ibatis:ibatis-sqlmap:2.3.4.726')
import com.ibatis.sqlmap.engine.builder.xml.*
import com.ibatis.sqlmap.engine.scope.*

import groovy.json.JsonSlurper

if (args.length < 3) {
    println '<ibatis mapper xml> <sql id> <json params>'
    return
}

def state = new XmlParserState()

def parser = new SqlMapParser(state)
parser.parse(new File(args[0]).newInputStream())

// SqlMapExecutorDelegate を取得
def dlg = state.config.delegate

def st = dlg.getMappedStatement(args[1])
def sql = st.sql

def scope = new StatementScope(new SessionScope())
scope.statement = st

// パラメータの作成(JSON 文字列から)
def params = new JsonSlurper().parseText args[2]

println sql.getSql(scope, params)

実行

下記 Mapper XML を使って同様に実行してみます。

mapper.xml
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE sqlMap PUBLIC "-//ibatis.apache.org//DTD SQL Map 2.0//EN" "http://ibatis.apache.org/dtd/sql-map-2.dtd">
<sqlMap namespace="sample">
  <select id="findData">
    SELECT * FROM data
    WHERE
      title like #title#
    <isNotNull property="author">
      <isNotNull property="author.name">
        AND author_name like #author.name#
      </isNotNull>
    </isNotNull>
    <isNotNull property="types">
      AND type in
      <iterate property="types" open="(" conjunction="," close=")">
        #types[]#
      </iterate>
    </isNotNull>
  </select>
</sqlMap>
実行例1
> groovy ibatis_sql_gen.groovy mapper.xml findData "{}"

     SELECT * FROM data     WHERE       title like ?
実行例2
> groovy ibatis_sql_gen.groovy mapper.xml findData "{\"author\":{\"name\": 1}, \"types\": [1, 2, 3]}"

     SELECT * FROM data     WHERE       title like ?                     AND author_name like ?                        AND type in       (         ?       ,     ?       ,         ?       )

Sodium で関数型リアクティブプログラミング2 - skip・take 処理

前回に続き、Sodium を試してみます。

今回は 「RxJava で行単位のファイル処理 - Groovy, Java Lambda」 で実装したものと同等の処理を Sodium を使って実装してみました。

ソースは http://github.com/fits/try_samples/tree/master/blog/20141209/

skip・take 処理

Sodium には、RxJava の際に使った skip (指定した処理数だけ無視する) や take (指定した処理数だけ取り出す) のようなメソッドが用意されていないようなので、Event と Behavior を組み合わせて実装してみる事にします。

Event クラスには gate メソッドがあり、このメソッドを使えば Behavior オブジェクトの値が true の場合だけイベントを発生するような Event オブジェクトを作成できます。

つまり、指定した数のイベントを受け取れば true と false が反転する Behavior オブジェクトを用意して gate メソッドへ与えてやれば skip や take の処理を実現した Event オブジェクトを作成できそうです。

なお、skip と take の違いは、false から true への変化か true から false への変化かの違いしかありませんので、下記のサンプルでは batch メソッドとして共通化し、判定処理を引数 Lambda1<Integer, Boolean> cond として渡すようにしました。

batch メソッドは以下のような処理内容となっています。

  • (1) イベントの発生数をカウントアップする Behavior を用意 (実際は BehaviorSink を使用)
  • (2) イベント発生時に (1) をカウントアップ
  • (3) (1) のカウント値が条件に合致した場合のみイベントを発生させる Event を作成
ReadLineFile.java
import sodium.*;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.function.*;

class ReadLineFile {
    public static void main(String... args) throws Exception {
        // skip 処理
        Function<Integer, Function<Event<String>, Event<String>>> skip = n -> {
            return ev -> batch( v -> v >= n, n, ev);
        };
        // take 処理
        Function<Integer, Function<Event<String>, Event<String>>> take = n -> {
            return ev -> batch( v -> v < n, n, ev);
        };

        // 1行スキップして 3行取得する Event を作成する処理を合成 (b) (c)
        Function<Event<String>, Event<String>> skipAndTake3 = skip.apply(1).andThen( take.apply(3) );

        // (a)
        EventSink<String> es = new EventSink<>();
        // (d)
        Listener esl = skipAndTake3.apply(es).map( v -> "# " + v ).listen( System.out::println );
        // ファイルを行単位で処理
        readFileLines(args[0], es);

        esl.unlisten();
    }

    private static void readFileLines(String fileName, EventSink<String> es) throws IOException {
        // ファイルを行単位で EventSink へ send
        try (BufferedReader br = new BufferedReader(new FileReader(fileName))) {
            br.lines().forEach( es::send );
        }
    }
    // skip・take の共通処理
    private static Event<String> batch(Lambda1<Integer, Boolean> cond, int n, Event<String> ev) {
        // (1) イベント発生数をカウントする Behavior を用意
        BehaviorSink<Integer> counter = new BehaviorSink<>(0);
        // (2) イベント発生時に counter をカウントアップ
        ev.listen( v -> counter.send(counter.sample() + 1) );
        // (3) counter の値が条件に合致した場合のみイベント発生する Event を作成
        return ev.gate(counter.map(cond));
    }
}

上記は、以下の 4つの Event が (a) -> (b) -> (c) -> (d) の順でイベントを伝播するような構成となっており、(d) に到達した場合のみ System.out::println を実行するようになっています。

  • (a) EventSink
  • (b) skip で作成した Event
  • (c) take で作成した Event
  • (d) 先頭に "# " を付与する Event

また、batch メソッドの実装内容に関しては counter の無駄なカウントアップを防止するため、下記のように unlisten するようにした方が望ましいかもしれません。

unlisten する処理を追加した batch メソッドの例
    private static Event<String> batch(Lambda1<Integer, Boolean> cond, int n, Event<String> ev) {
        BehaviorSink<Integer> counter = new BehaviorSink<>(0);
        final ArrayList<Listener> list = new ArrayList<>(1);

        list.add(ev.listen( v -> {
            int newValue = counter.sample() + 1;
            counter.send(newValue);

            if (newValue >= n) {
                list.stream().forEach( li -> li.unlisten() );
            }
        }));

        return ev.gate(counter.map(cond));
    }

実行

それでは、下記ファイルを使って実行してみます。

test1.txt
1a
2b
3c
4d
5e

実行結果は以下の通りです。 1行スキップした後、3行を先頭に "# " を付与して出力しています。

実行結果
> java -cp .;sodium.jar ReadLineFile test1.txt

# 2b
# 3c
# 4d

test1.txt を処理した際の (a) ~ (d) の Event に対するイベント伝播状況をまとめると以下のようになっていると考えられます。(○ は伝播する、× は伝播しない)

対象行 (a) (b) (c) (d)
1行目 1a × ×
2行目 2b
3行目 3c
4行目 4d
5行目 5e ×

(d) まで到達した 2 ~ 4行目だけを出力する結果となります。

Sodium で関数型リアクティブプログラミング

関数型リアクティブプログラミング(FRP)用のライブラリ Sodium を試してみました。

Sodium には現時点で JavaHaskellC++C# 用のライブラリが用意されていますが(Embedded-C や Rust 用のライブラリも実装中の模様)、今回は Java 用のモジュールを使います。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20141123/

はじめに

Sodium の Java 用モジュールは Maven のセントラルリポジトリ等で配布されていないようなので、ソースを取得してビルドする事にします。

ビルドには Java 8 と Apache Ant を使います。(build.xml の source・target が 1.8 となっています)

ビルド例
$ git clone https://github.com/SodiumFRP/sodium.git
・・・
$ cd sodium/java
$ ant

ビルドに成功すると sodium/sodium.jar ファイルが生成されます。

Event と Behavior

Sodium では下記のような EventBehavior を組み合わせて処理を組み立てます。

クラス 特徴 現在値の取得(sample メソッド イベント受信(listen メソッド
Event 離散的なストリームを扱う ×
Behavior 連続的なストリームを扱う ×

Event の処理内容

まずは Event を単純に listen するだけの処理を実装してみます。

Event に何らかの値を送信する(イベントを発火させる)には Event のサブクラスである EventSinksend メソッドを使います。

なお、今回のようなサンプルでは Listenerunlisten する必要は無いのですが、一応入れています。

EventSample.java
import sodium.*;

class EventSample {
    public static void main(String... args) {

        EventSink<String> es = new EventSink<>();
        Listener esl = es.listen(System.out::println);

        es.send("ES1");

        System.out.println("---");

        es.send("ES2");

        esl.unlisten();
    }
}

実行結果は下記の通りです。

EventSink へ send した値 (ES1ES2) が listen の処理 (System.out::println) へ渡されています。

ビルドと実行
> javac -cp sodium.jar EventSample.java

> java -cp .;sodium.jar EventSample

ES1
---
ES2

Behavior の処理内容

次は、Behavior のカレント値の変更を listen してみます。

Behavior を直接 listen する事はできませんが、updatesvalue メソッドを使えば Behavior の値の変更に対応した Event を取得できます。

updates と value の違いは、取得した Event が listen 時にカレント値を含むかどうかの違いです。

動作としては RxJava の PublishSubject と BehaviorSubject にそれぞれ該当すると思います。

Behavior の Event 取得メソッド listen 時のカレント値の扱い RxJava の類似クラス
updates 含まない rx.subjects.PublishSubject
value 含む rx.subjects.BehaviorSubject

Event と同様に Behavior のカレント値を変更するには BehaviorSinksend メソッドを使います。

BehaviorSample.java
import sodium.*;

class BehaviorSample {
    public static void main(String... args) {
        updatesSample();

        System.out.println("");

        valueSample();
    }
    // updates メソッドのサンプル
    private static void updatesSample() {
        System.out.println("*** Behavior.updates sample ***");

        BehaviorSink<String> bh = new BehaviorSink<>("BH1");
        Listener bhl = bh.updates().listen( msg -> System.out.println("behavior: " + msg) );

        bh.send("BH2");

        System.out.println("---");

        bh.send("BH3");

        bhl.unlisten();
    }
    // value メソッドのサンプル
    private static void valueSample() {
        System.out.println("*** Behavior.value sample ***");

        BehaviorSink<String> bh = new BehaviorSink<>("BH1");
        Listener bhl = bh.value().listen( msg -> System.out.println("behavior: " + msg) );

        bh.send("BH2");

        System.out.println("---");

        bh.send("BH3");

        bhl.unlisten();
    }
}

value メソッドの場合のみ、初期値として設定した値 (BH1) を出力しています。

実行結果
> java -cp .;sodium.jar BehaviorSample

*** Behavior.updates sample ***
behavior: BH2
---
behavior: BH3

*** Behavior.value sample ***
behavior: BH1
behavior: BH2
---
behavior: BH3

Event の各種メソッド

最後に Event クラスの map・merge・hold・snapshot メソッドを簡単に試してみます。

map

map メソッドによって元の Event で発火した値を加工した値を発火する Event を作成できます。

import sodium.*;

class EventMethodSample {
    public static void main(String... args) {
        mapSample();
        ・・・
    }

    private static void mapSample() {
        System.out.println("*** Event.map sample ***");

        EventSink<String> es = new EventSink<>();
        Listener esl = es.listen( msg -> System.out.println("event sink: " + msg) );

        // 元の値に !!! を付ける Event 作成
        Event<String> me = es.map( msg -> msg + "!!!" );
        Listener mel = me.listen( msg -> System.out.println("mapped event: " + msg) );

        es.send("ME1");
        es.send("ME2");

        mel.unlisten();
        esl.unlisten();
    }
    ・・・
}

ちなみに、上記では使っていませんが、Listenerappend する事が可能です。 (append で単一の Listener へまとめれば unlisten を個々に実施しなくても済みます)

実行結果
> java -cp .;sodium.jar EventMethodSample

*** Event.map sample ***
event sink: ME1
mapped event: ME1!!!
event sink: ME2
mapped event: ME2!!!
・・・

merge

merge メソッドによって二つの Event をマージできます。下記ではどちらの Event が発火しても発火する Event を作成しています。

import sodium.*;

class EventMethodSample {
    public static void main(String... args) {
        ・・・
        mergeSample();
        ・・・
    }
    ・・・
    private static void mergeSample() {
        System.out.println("*** Event.merge sample ***");

        EventSink<String> es1 = new EventSink<>();
        Listener es1l = es1.listen( msg -> System.out.println("event sink1: " + msg) );

        EventSink<String> es2 = new EventSink<>();
        Listener es2l = es2.listen( msg -> System.out.println("event sink2: " + msg) );

        Event<String> me = es1.merge(es2);
        Listener mel = me.listen( msg -> System.out.println("merged event: " + msg) );

        es1.send("ES1-1");

        System.out.println("---");

        es2.send("ES2-1");

        System.out.println("---");

        es1.send("ES1-2");

        mel.unlisten();
        es2l.unlisten();
        es1l.unlisten();
    }
    ・・・
}
実行結果
> java -cp .;sodium.jar EventMethodSample
・・・

*** Event.merge sample ***
event sink1: ES1-1
merged event: ES1-1
---
event sink2: ES2-1
merged event: ES2-1
---
event sink1: ES1-2
merged event: ES1-2

・・・

hold

hold メソッドによって Event の発火した値でカレント値が変化する Behavior を作成できます。

import sodium.*;

class EventMethodSample {
    public static void main(String... args) {
        ・・・
        holdSample();
        ・・・
    }
    ・・・
    private static void holdSample() {
        System.out.println("*** Event.hold sample ***");

        EventSink<String> es = new EventSink<>();
        Listener esl = es.listen( msg -> System.out.println("event sink: " + msg) );

        Behavior<String> bh = es.hold("BH1");
        Listener bhl = bh.value().listen( msg -> System.out.println("behavior: " + msg) );

        es.send("ES1");

        System.out.println("bh current value: " + bh.sample());

        System.out.println("---");

        es.send("ES2");

        System.out.println("bh current value: " + bh.sample());

        esl.unlisten();
        bhl.unlisten();
    }
    ・・・
}

bh の初期値は BH1 ですが、send した値 (ES1ES2) によって sample メソッドの結果が変化しています。

実行結果
> java -cp .;sodium.jar EventMethodSample
・・・

*** Event.hold sample ***
behavior: BH1
event sink: ES1
behavior: ES1
bh current value: ES1
---
event sink: ES2
behavior: ES2
bh current value: ES2

・・・

snapshot

snapshot によって Event 発火時に任意の Behavior のカレント値を発火する Event を作成できます。

import sodium.*;

class EventMethodSample {
    public static void main(String... args) {
        ・・・
        snapshotSample();
    }
    ・・・
    private static void snapshotSample() {
        System.out.println("*** Event.snapshot sample ***");

        EventSink<String> es = new EventSink<>();
        Listener esl = es.listen( msg -> System.out.println("event sink: " + msg) );

        Behavior<Integer> bh = new Behavior<>(1);
        Listener bhl = bh.value().listen( msg -> System.out.println("behavior: " + msg) );

        Event<Integer> se = es.snapshot(bh);
        Listener sel = se.listen( i -> System.out.println("snapshot event: " + i) );

        es.send("ES1");

        System.out.println("bh current value: " + bh.sample());

        System.out.println("---");

        es.send("ES2");

        System.out.println("bh current value: " + bh.sample());

        sel.unlisten();
        esl.unlisten();
        bhl.unlisten();
    }
}

snapshot で作成した Event (se) は EventSink へ send した値 (ES1ES2) に関わらず、bh のカレント値 (1) を発火しています。

実行結果
> java -cp .;sodium.jar EventMethodSample
・・・

*** Event.snapshot sample ***
behavior: 1
event sink: ES1
snapshot event: 1
bh current value: 1
---
event sink: ES2
snapshot event: 1
bh current value: 1

・・・

snapshot した Event で発火するのは Behavior のカレント値であることを確認するため、上記の Behavior を BehaviorSink へ変更し ES2 を send する前にカレント値を 2 へ変更してみました。

import sodium.*;

class EventMethodSample {
    public static void main(String... args) {
        ・・・
        snapshotSample2();
    }
    ・・・
    private static void snapshotSample2() {
        ・・・
        // BehaviorSink へ変更
        BehaviorSink<Integer> bh = new BehaviorSink<>(1);
        Listener bhl = bh.value().listen( msg -> System.out.println("behavior: " + msg) );

        Event<Integer> se = es.snapshot(bh);
        ・・・
        System.out.println("---");

        // bh のカレント値を 2 へ変更
        bh.send(2);
        es.send("ES2");

        System.out.println("bh current value: " + bh.sample());
        ・・・
    }
}

Behavior のカレント値を 2 へ変更した後、snapshot の Event は 2 の値を発火している事を確認できます。

実行結果
> java -cp .;sodium.jar EventMethodSample
・・・

*** Event.snapshot sample2 ***
behavior: 1
event sink: ES1
snapshot event: 1
bh current value: 1
---
behavior: 2
event sink: ES2
snapshot event: 2
bh current value: 2