Akka Streams で skip・take 処理

前回の 「Reactor で skip・take 処理」 と同様の処理を Akka Streams を使用し Java 8 で実装してみました。

  • Akka Streams 1.0 M2

ソースは http://github.com/fits/try_samples/tree/master/blog/20150112/

はじめに

Gradle を使ってビルド・実行するため、下記のような build.gradle を用意しました。

build.gradle
apply plugin: 'application'

repositories {
    jcenter()
}

dependencies {
    compile 'com.typesafe.akka:akka-stream-experimental_2.11:1.0-M2'
}

mainClassName = 'sample.Sample1'
//mainClassName = 'sample.Sample2'
//mainClassName = 'sample.PublisherSample1'
//mainClassName = 'sample.PublisherSample2'

Iterable を使った処理1

Iterable を使った固定的なデータストリームに対して skip・take 処理を実装してみます。

まず、下記 (2) のように ActorSystemFlowMaterializer オブジェクトを作成し、処理完了時に ActorSystem を shutdown する処理 (3) を用意しておきます。

今回は sample1 ~ sample6 のデータを Java 8 の Stream を使って作りましたが、Stream は Iterable インターフェースを持っておらず、Akka Streams には今のところ Source.from(Stream) のようなメソッドも用意されていないので、List へ変換するなどして Iterable 化する必要があります。 (4)

(5) では、(4) を使って Source を生成し、skip・take 処理を実施しています。 なお、Akka Streams では skip 処理を drop メソッドとして用意しています。

最後に、(1) は Akka の不要なログ出力 (info レベル) を抑制するための措置です。 (設定ファイルで設定する方法もあります)

akka.logleveloff にしてログ出力を無効化する事もできますが、そうするとエラーログすら出力されなくなるのでご注意下さい。

Sample1.java
package sample;

import akka.actor.ActorSystem;
import akka.dispatch.OnComplete;
import akka.stream.FlowMaterializer;
import akka.stream.javadsl.Source;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;

import scala.runtime.BoxedUnit;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class Sample1 {
    public static void main(String... args) {
        // (1) Akka の不要なログ出力を抑制するための設定
        final Config config = ConfigFactory.load()
            .withValue("akka.loglevel", ConfigValueFactory.fromAnyRef("error"));

        // (2) ActorSystem・FlowMaterializer の作成
        final ActorSystem system = ActorSystem.create("sample", config);
        final FlowMaterializer materializer = FlowMaterializer.create(system);

        // (3) 終了時の処理
        final OnComplete<BoxedUnit> complete = new OnComplete<BoxedUnit>() {
            @Override
            public void onComplete(Throwable failure, BoxedUnit success) {
                system.shutdown();
            }
        };

        // (4) sample1 ~ sample6
        List<String> data = IntStream.range(1, 7).mapToObj(i -> "sample" + i).collect(Collectors.toList());
        // 以下でも可
        //Iterable<String> data = () -> IntStream.range(1, 7).mapToObj(i -> "sample" + i).iterator();

        // (5) 処理
        Source.from(data)
            .drop(3)
            .take(2)
            .map(s -> "#" + s)
            .foreach(System.out::println, materializer)
            .onComplete(complete, system.dispatcher());
    }
}

また、(4) (5) の箇所は下記のように実装する事もできます。

Stream<String> stream = IntStream.range(1, 7).mapToObj(i -> "sample" + i);

Source.from((Iterable<String>)stream::iterator)
    .drop(3)
    ・・・
実行結果
> gradle -q run

#sample4
#sample5

Iterable を使った処理2

Sample1.javaFlow を使って書き換えると下記のようになります。
Flow によってストリームの加工処理部分を分離できます。

Sample2.java
・・・
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
・・・

public class Sample2 {
    public static void main(String... args) {
        ・・・

        Flow<String, String> flow = Flow.<String>create()
            .drop(3)
            .take(2)
            .map(s -> "#" + s);

        List<String> data = IntStream.range(1, 7).mapToObj(i -> "sample" + i).collect(Collectors.toList());

        flow.runWith(
            Source.from(data), 
            Sink.foreach(System.out::println), 
            materializer
        ).onComplete(complete, system.dispatcher());
    }
}
実行結果
> gradle -q run

#sample4
#sample5
build.gradle
・・・
//mainClassName = 'sample.Sample1'
mainClassName = 'sample.Sample2'

Publisher を使った処理1

これまで固定的なデータを使って処理を行いましたが、前回Broadcaster のような処理 (ストリームへ任意の要素を送信) を Akka Streams で実現するには org.reactivestreams.Publisher を利用できます。

RxJava 等と組み合わせるのが簡単だと思いますが、今回は Publisher の実装クラスを自作してみました。(SamplePublisher クラス)

まずは sample1 ~ sample6 をそのまま出力する処理を実装してみましたが、2点ほど注意点がありました。

  • (1) akka.stream.materializer.initial-input-buffer-size のデフォルト値である 4 を超えたデータ数がバッファに格納されるとエラー (Input buffer overrun) が発生する
  • (2) Subscriber に対して onSubscribe しないと onNext が機能しない

(1) を回避するために buffer メソッドを使って十分なバッファ数を確保するように変更しました。

OverflowStrategy に関しては用途に応じて設定する事になると思いますが、dropXXX や error を使用すると、バッファが溢れた際にデータが欠ける事になるので注意が必要です。

ちなみに、(1) のデフォルト設定は akka-stream-experimental_2.11-1.0-M2.jar 内の reference.conf ファイルで設定されています。

また、(2) の対策のため、Subscriber (実体は ActorProcessor) へ何も処理を行わない Subscription を onSubscribe しています。

PublisherSample1.java
・・・
import akka.stream.OverflowStrategy;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
・・・

public class PublisherSample1 {
    public static void main(String... args) {
        ・・・

        try (SamplePublisher<String> publisher = new SamplePublisher<>()) {

            Source.from(publisher)
                // (1) Input buffer overrun の防止策
                .buffer(10, OverflowStrategy.backpressure())
                .map(s -> "#" + s)
                .foreach(System.out::println, materializer)
                .onComplete(complete, system.dispatcher());

            IntStream.range(1, 7)
                .mapToObj(i -> "sample" + i)
                .forEach(d -> publisher.send(d));
        }
    }

    private static class SamplePublisher<T> implements Publisher<T>, AutoCloseable {
        private List<Subscriber<? super T>> list = new ArrayList<>();

        @Override
        public void subscribe(Subscriber<? super T> s) {
            list.add(s);
            // (2) 下記を実施しないと onNext が機能しない
            s.onSubscribe(new Subscription() {
                @Override public void request(long n) {}
                @Override public void cancel() {}
            });
        }

        @Override
        public void close() {
            list.forEach(Subscriber::onComplete);
        }

        public void send(T msg) {
            // Subscriber へデータ送信
            list.forEach(s -> s.onNext(msg));
        }
    }
}

SamplePublisher クラス内で Subscriber を List にて管理していますが、上記処理では subscribe は 1度しか実行されないので List で管理する必要は特にありません。

実行結果
> gradle -q run

#sample1
#sample2
#sample3
#sample4
#sample5
#sample6
build.gradle
・・・
//mainClassName = 'sample.Sample2'
mainClassName = 'sample.PublisherSample1'

Publisher を使った処理2

最後に skip(drop)・take の処理を追加してみます。

PublisherSample2.java
・・・

public class PublisherSample2 {
    public static void main(String... args) {
        ・・・

        try (SamplePublisher<String> publisher = new SamplePublisher<>()) {

            Source.from(publisher)
                // Input buffer overrun の防止策
                .buffer(10, OverflowStrategy.backpressure())
                .drop(3)
                .take(2)
                .map(s -> "#" + s)
                .foreach(System.out::println, materializer)
                .onComplete(complete, system.dispatcher());

            IntStream.range(1, 7)
                .mapToObj(i -> "sample" + i)
                .forEach(d -> publisher.send(d));
        }
    }

    private static class SamplePublisher<T> implements Publisher<T>, AutoCloseable {
        ・・・
    }
}
実行結果
> gradle -q run

#sample4
#sample5
build.gradle
・・・
//mainClassName = 'sample.PublisherSample1'
mainClassName = 'sample.PublisherSample2'