RxJava で行単位のファイル処理 - Groovy, Java Lambda
前回 (id:fits:20130224) のファイル処理と同様の処理を RxJava を使って実装してみました。
今回作成したソースは http://github.com/fits/try_samples/tree/master/blog/20130310/
Groovy で実装
まずは Groovy で実装してみました。
- Groovy 2.1.1
前回の C# と同様、Observable.create を使ってファイルの内容を 1行毎に PUSH する Observable を作成しています。
rxjava-groovy を使えば create や subscribe 等のメソッドに Groovy のクロージャを渡す事ができるようになります。 *1
また、ログ出力を無効化するために slf4j-nop を @Grab で指定しています。
readline_file.groovy
@Grab('com.netflix.rxjava:rxjava-groovy:0.5.4') @Grab('org.slf4j:slf4j-nop:1.7.2') import rx.* def fromFile = { file -> Observable.create { observer -> try { new File(file).eachLine { // 1行分のデータを PUSH observer.onNext(it) } // 完了時 observer.onCompleted() } catch (e) { // エラー発生時 observer.onError(e) } } } // 1行目をスキップして 2・3 行目の先頭に # をつけて出力 fromFile(args[0]).skip(1).take(2).map { "#${it}" } subscribe { println it }
実行結果
> groovy readline_file.groovy sample.txt #サンプル #
Java 7 で実装
次は Java で同様の処理を実装してみました。
- Java SE 7
クロージャを使えないので Groovy と比べると煩雑になりました。
map や create には Func1 を subscribe には Action1 インターフェースの実装を渡しています。
途中キャンセル機能は用意しないので Observable.noOpSubscription() を使って NoOpObservableSubscription を返しています。
ReadLineFile.java
package fits.sample; import static java.nio.charset.StandardCharsets.*; import java.io.BufferedReader; import java.nio.file.Files; import java.nio.file.Paths; import rx.Observable; import rx.Observer; import rx.Subscription; import rx.util.functions.Action1; import rx.util.functions.Func1; public class ReadLineFile { public static void main(String... args) { // 1行目をスキップして 2・3 行目の先頭に # をつけて出力 fromFile(args[0]).skip(1).take(2).map(new Func1<String, String>() { public String call(String s) { return "#" + s; } }).subscribe(new Action1<String>() { public void call(String s) { System.out.println(s); } }); } private static Observable<String> fromFile(final String file) { return Observable.create(new Func1<Observer<String>, Subscription>() { public Subscription call(Observer<String> observer) { try (BufferedReader reader = Files.newBufferedReader(Paths.get(file), UTF_8)) { String line = null; while ((line = reader.readLine()) != null) { // 1行分のデータを PUSH observer.onNext(line); } } catch (Exception ex) { // エラー発生時 observer.onError(ex); } // 完了時 observer.onCompleted(); return Observable.noOpSubscription(); } }); } }
ビルドと実行には gradle を使いました。
なお、今のところ gradle run の際に実行時引数を直接指定する事はできないようなので、gradle ビルド定義ファイルでプロジェクトプロパティとして指定された args の値を実行時引数として渡すようにしました。
build.gradle
apply plugin: 'application' repositories { mavenCentral() } dependencies { compile "com.netflix.rxjava:rxjava-core:0.5.4" } mainClassName = "fits.sample.ReadLineFile" run { // プロジェクトプロパティ args を実行時引数として渡すための処理 if (project.hasProperty('args')) { args project.args } }
実行結果は以下の通り。
プロジェクトプロパティは "-P<プロパティ名>=<値>" で指定する事ができます。
実行結果
> gradle -q run -Pargs=sample.txt #サンプル #
Java 8 Lambda で実装
最後に、Java 8 Lambda Support のラムダを使って実装すると多少シンプルになりました。
ReadLineFileLambda.java
package fits.sample; import static java.nio.charset.StandardCharsets.*; import java.io.BufferedReader; import java.nio.file.Files; import java.nio.file.Paths; import rx.Observable; import rx.Observer; import rx.Subscription; public class ReadLineFileLambda { public static void main(String... args) { // 1行目をスキップして 2・3 行目の先頭に # をつけて出力 fromFile(args[0]).skip(1).take(2).map( (s) -> "#" + s ).subscribe( (s) -> System.out.println(s) ); } private static Observable<String> fromFile(final String file) { return Observable.create( (observer) -> { try (BufferedReader reader = Files.newBufferedReader(Paths.get(file), UTF_8)) { String line = null; while ((line = reader.readLine()) != null) { // 1行分のデータを PUSH observer.onNext(line); } } catch (Exception ex) { // エラー発生時 observer.onError(ex); } // 完了時 observer.onCompleted(); return Observable.noOpSubscription(); }); } }
実行結果
> gradle -q run -Pargs=sample.txt #サンプル #
*1:Functions.from(Object) メソッド内で、クロージャ等を処理するため各言語毎に用意されている FunctionLanguageAdaptor 実装クラス (Groovy の場合は GroovyAdaptor) へ処理を委譲するようになっている