Play2 の Iteratee を使った行単位のファイル処理1 - Enumeratee.grouped() の利用
id:fits:20130116 で使った Play2 の Iteratee を単体利用して行単位のファイル処理を実装してみました。
- play-iteratees 2.1.0
Iteratee は Enumerator や Enumeratee と組み合わせて使用し、それぞれ以下のような役割を担います。
- Enumerator : データを生産
- Enumeratee : Enumerator から受け取ったデータを Iteratee へ渡す
- Iteratee : データを消費
ちなみに、ファイルの内容を "改行" で分割するような仕組みは今のところ Play2 の API に用意されていないようなので、
1行分のデータを取り出す Enumeratee や Enumerator を自前で用意する事になります。
そこで今回は Enumeratee.grouped() を使って行毎にデータをグルーピングする Enumeratee を作成してみました。
次回は Enumerator.generateM() を使って同様の処理を実装してみるつもりです。
サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20130212/
はじめに
今回使用した sbt のビルド定義ファイルは以下の通りです。
build.sbt
scalaVersion := "2.10.0" resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/" libraryDependencies += "play" %% "play-iteratees" % "2.1.0" mainClass in (Compile, run) := Some("fits.sample.EnumerateeLineSample")
sample.txt
Play2のIterateeを使ったファイル処理の サンプル 1行毎に処理するサンプルを 実装してみました。
Enumeratee.grouped による行分割
Enumeratee.grouped を使った行単位の処理は以下のようになりました。
- (a) Enumerator.fromFile() で chunkSize のサイズ分だけ読み込んだファイルの内容を Array[Byte] として生成する Enumerator 作成
- (e) Enumerator・Enumeratee・Iteratee を結合
- Enumeratee.mapConcat で Enumerator と Enumeratee.grouped を結び付ける Enumeratee 作成
- Enumeratee.grouped で行毎にグルーピングする Enumeratee 作成
- Iteratee.foreach で各行の先頭に # を付けて出力する Iteratee 作成
さらに、Enumeratee.grouped には以下のようにして組み立てた takeLine *1 を渡しています。
- (c) takeWhile で改行が来るまで入力を溜め、Iteratee.getChunks で溜めた分の入力 List[Byte] を line へ設定
- (d) 改行文字を take(1) で取得し Iteratee.ignore で捨てる
- new String() で line (改行までの部分)を文字列化
なお、Iteratee はモナドなので、takeLine は for 式を使って Iteratee を組み立てています。
EnumerateeLineSample.scala
package fits.sample import play.api.libs.iteratee._ import scala.concurrent.Await import scala.concurrent.duration.Duration import java.io.File import java.nio.charset.StandardCharsets._ object EnumerateeLineSample extends App { import scala.concurrent.ExecutionContext.Implicits.global // (a) ファイルの内容を取得する Enumerator val enumerator = Enumerator.fromFile(new File(args(0))) // (b) 1行分の String を生成する Iteratee val takeLine = for { line <- Enumeratee.takeWhile[Byte](_ != '\n'.toByte) &>> Iteratee.getChunks // (c) 改行文字までの入力を line へ設定 _ <- Enumeratee.take(1) &>> Iteratee.ignore[Byte] // (d) 改行文字を捨てる } yield new String(line.toArray, UTF_8) // (e) 行毎に先頭へ # を付けて出力する処理の組み立て val future = enumerator &> Enumeratee.mapConcat( _.toSeq ) &> Enumeratee.grouped(takeLine) |>>> Iteratee.foreach { s => println(s"#${s}") } // (f) future の処理完了待ち Await.ready(future, Duration.Inf) }
それでは sbt run で実行してみます。
"run <引数> ・・・" のように run と引数をダブルクォーテーションで囲めば、sbt run で実行時引数を指定する事が可能です。
実行結果
> sbt "run sample.txt" ・・・ [info] Running fits.sample.EnumerateeLineSample sample.txt #Play2のIterateeを使ったファイル処理の #サンプル # #1行毎に処理するサンプルを #実装してみました。 #
実はこの sbt run は処理が完了してもプロセスが終了しないので Ctrl + C する必要があります。
プロセスを終了するには play.api.libs.iteratee.internal.defaultExecutionContext 等で生成されている ExecutorService とかを shutdown する必要があると思うのですが、今回は上手く終了する方法を見つけられませんでした。
とりあえず、以下のようにスレッドを interrupt しまくれば強引にプロセスを終了させる事は可能です。
・・・ object EnumerateeLineSample extends App { ・・・ // 強引にプロセスを終了する import scala.collection.convert.WrapAsScala._ Thread.getAllStackTraces().keySet().foreach(_.interrupt) }
最後に、Enumeratee.grouped(takeLine) と Iteratee.foreach の間に Enumeratee.drop(1) と Enumeratee.take(2) を追加すれば、2行目と 3行目の先頭へ # を付けて出力するような処理に変わります。
EnumerateeLineSample2.scala
・・・ object EnumerateeLineSample2 extends App { ・・・ // 1行目を捨てて 2行目から 2行取り出し各行の先頭へ # を付けて出力する処理の組み立て val future = enumerator &> Enumeratee.mapConcat( _.toSeq ) &> Enumeratee.grouped(takeLine) &> Enumeratee.drop(1) &> Enumeratee.take(2) |>>> Iteratee.foreach { s => println(s"#${s}") } Await.ready(future, Duration.Inf) }
実行結果
・・・ [info] Running fits.sample.EnumerateeLineSample2 sample.txt #サンプル #
*1:型は Iteratee[Byte, String]