読者です 読者をやめる 読者になる 読者になる

RxJava で行単位のファイル処理 - Groovy, Java Lambda

Java Groovy Rx

前回 (id:fits:20130224) のファイル処理と同様の処理を RxJava を使って実装してみました。

今回作成したソースは http://github.com/fits/try_samples/tree/master/blog/20130310/

Groovy で実装

まずは Groovy で実装してみました。

  • Groovy 2.1.1

前回の C# と同様、Observable.create を使ってファイルの内容を 1行毎に PUSH する Observable を作成しています。

rxjava-groovy を使えば create や subscribe 等のメソッドに Groovy のクロージャを渡す事ができるようになります。 *1

また、ログ出力を無効化するために slf4j-nop を @Grab で指定しています。

readline_file.groovy
@Grab('com.netflix.rxjava:rxjava-groovy:0.5.4')
@Grab('org.slf4j:slf4j-nop:1.7.2')
import rx.*

def fromFile = { file ->
    Observable.create { observer ->
        try {
            new File(file).eachLine {
                // 1行分のデータを PUSH
                observer.onNext(it)
            }
            // 完了時
            observer.onCompleted()
        } catch (e) {
            // エラー発生時
            observer.onError(e)
        }
    }
}
// 1行目をスキップして 2・3 行目の先頭に # をつけて出力
fromFile(args[0]).skip(1).take(2).map { "#${it}" } subscribe { println it }
実行結果
> groovy readline_file.groovy sample.txt
#サンプル
#

Java 7 で実装

次は Java で同様の処理を実装してみました。

クロージャを使えないので Groovy と比べると煩雑になりました。

map や create には Func1 を subscribe には Action1 インターフェースの実装を渡しています。

途中キャンセル機能は用意しないので Observable.noOpSubscription() を使って NoOpObservableSubscription を返しています。

ReadLineFile.java
package fits.sample;

import static java.nio.charset.StandardCharsets.*;
import java.io.BufferedReader;
import java.nio.file.Files;
import java.nio.file.Paths;

import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.util.functions.Action1;
import rx.util.functions.Func1;

public class ReadLineFile {
    public static void main(String... args) {
        // 1行目をスキップして 2・3 行目の先頭に # をつけて出力
        fromFile(args[0]).skip(1).take(2).map(new Func1<String, String>() {
            public String call(String s) {
                return "#" + s;
            }
        }).subscribe(new Action1<String>() {
            public void call(String s) {
                System.out.println(s);
            }
        });
    }

    private static Observable<String> fromFile(final String file) {
        return Observable.create(new Func1<Observer<String>, Subscription>() {

            public Subscription call(Observer<String> observer) {

                try (BufferedReader reader = Files.newBufferedReader(Paths.get(file), UTF_8)) {
                    String line = null;
                    while ((line = reader.readLine()) != null) {
                        // 1行分のデータを PUSH
                        observer.onNext(line);
                    }
                } catch (Exception ex) {
                    // エラー発生時
                    observer.onError(ex);
                }
                // 完了時
                observer.onCompleted();

                return Observable.noOpSubscription();
            }
        });
    }
}

ビルドと実行には gradle を使いました。

なお、今のところ gradle run の際に実行時引数を直接指定する事はできないようなので、gradle ビルド定義ファイルでプロジェクトプロパティとして指定された args の値を実行時引数として渡すようにしました。

build.gradle
apply plugin: 'application'

repositories {
    mavenCentral()
}

dependencies {
    compile "com.netflix.rxjava:rxjava-core:0.5.4"
}

mainClassName = "fits.sample.ReadLineFile"

run {
    // プロジェクトプロパティ args を実行時引数として渡すための処理
    if (project.hasProperty('args')) {
        args project.args
    }
}

実行結果は以下の通り。
プロジェクトプロパティは "-P<プロパティ名>=<値>" で指定する事ができます。

実行結果
> gradle -q run -Pargs=sample.txt
#サンプル
#

Java 8 Lambda で実装

最後に、Java 8 Lambda Support のラムダを使って実装すると多少シンプルになりました。

ReadLineFileLambda.java
package fits.sample;

import static java.nio.charset.StandardCharsets.*;
import java.io.BufferedReader;
import java.nio.file.Files;
import java.nio.file.Paths;

import rx.Observable;
import rx.Observer;
import rx.Subscription;

public class ReadLineFileLambda {
    public static void main(String... args) {
        // 1行目をスキップして 2・3 行目の先頭に # をつけて出力
        fromFile(args[0]).skip(1).take(2).map( (s) -> "#" + s ).subscribe( (s) -> System.out.println(s) );
    }

    private static Observable<String> fromFile(final String file) {
        return Observable.create( (observer) -> {
            try (BufferedReader reader = Files.newBufferedReader(Paths.get(file), UTF_8)) {
                String line = null;
                while ((line = reader.readLine()) != null) {
                    // 1行分のデータを PUSH
                    observer.onNext(line);
                }
            } catch (Exception ex) {
                // エラー発生時
                observer.onError(ex);
            }
            // 完了時
            observer.onCompleted();

            return Observable.noOpSubscription();
        });
    }
}
実行結果
> gradle -q run -Pargs=sample.txt
#サンプル
#

*1:Functions.from(Object) メソッド内で、クロージャ等を処理するため各言語毎に用意されている FunctionLanguageAdaptor 実装クラス (Groovy の場合は GroovyAdaptor) へ処理を委譲するようになっている