読者です 読者をやめる 読者になる 読者になる

Play2 の Iteratee を使った行単位のファイル処理1 - Enumeratee.grouped() の利用

id:fits:20130116 で使った Play2 の Iteratee を単体利用して行単位のファイル処理を実装してみました。

  • play-iteratees 2.1.0

Iteratee は Enumerator や Enumeratee と組み合わせて使用し、それぞれ以下のような役割を担います。

  • Enumerator : データを生産
  • Enumeratee : Enumerator から受け取ったデータを Iteratee へ渡す
  • Iteratee : データを消費

ちなみに、ファイルの内容を "改行" で分割するような仕組みは今のところ Play2 の API に用意されていないようなので、
1行分のデータを取り出す Enumeratee や Enumerator を自前で用意する事になります。

そこで今回は Enumeratee.grouped() を使って行毎にデータをグルーピングする Enumeratee を作成してみました。
次回は Enumerator.generateM() を使って同様の処理を実装してみるつもりです。


サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20130212/

はじめに

今回使用した sbt のビルド定義ファイルは以下の通りです。

build.sbt
scalaVersion := "2.10.0"

resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"

libraryDependencies += "play" %% "play-iteratees" % "2.1.0"

mainClass in (Compile, run) := Some("fits.sample.EnumerateeLineSample")

なお、今回は下記ファイル(文字コードUTF-8)を使って動作確認する事にします。

sample.txt
Play2のIterateeを使ったファイル処理の
サンプル

1行毎に処理するサンプルを
実装してみました。

Enumeratee.grouped による行分割

Enumeratee.grouped を使った行単位の処理は以下のようになりました。

  • (a) Enumerator.fromFile() で chunkSize のサイズ分だけ読み込んだファイルの内容を Array[Byte] として生成する Enumerator 作成
  • (e) Enumerator・Enumeratee・Iteratee を結合
    • Enumeratee.mapConcat で Enumerator と Enumeratee.grouped を結び付ける Enumeratee 作成
    • Enumeratee.grouped で行毎にグルーピングする Enumeratee 作成
    • Iteratee.foreach で各行の先頭に # を付けて出力する Iteratee 作成

さらに、Enumeratee.grouped には以下のようにして組み立てた takeLine *1 を渡しています。

  • (c) takeWhile で改行が来るまで入力を溜め、Iteratee.getChunks で溜めた分の入力 List[Byte] を line へ設定
  • (d) 改行文字を take(1) で取得し Iteratee.ignore で捨てる
  • new String() で line (改行までの部分)を文字列化

なお、Iteratee はモナドなので、takeLine は for 式を使って Iteratee を組み立てています。

EnumerateeLineSample.scala
package fits.sample

import play.api.libs.iteratee._

import scala.concurrent.Await
import scala.concurrent.duration.Duration

import java.io.File
import java.nio.charset.StandardCharsets._

object EnumerateeLineSample extends App {
    import scala.concurrent.ExecutionContext.Implicits.global

    // (a) ファイルの内容を取得する Enumerator
    val enumerator = Enumerator.fromFile(new File(args(0)))

    // (b) 1行分の String を生成する Iteratee
    val takeLine = for {
        line <- Enumeratee.takeWhile[Byte](_ != '\n'.toByte) &>> Iteratee.getChunks // (c) 改行文字までの入力を line へ設定
        _    <- Enumeratee.take(1) &>> Iteratee.ignore[Byte] // (d) 改行文字を捨てる
    } yield new String(line.toArray, UTF_8)

    // (e) 行毎に先頭へ # を付けて出力する処理の組み立て
    val future = enumerator &> Enumeratee.mapConcat( _.toSeq ) &> Enumeratee.grouped(takeLine) |>>> Iteratee.foreach { s => 
        println(s"#${s}")
    }

    // (f) future の処理完了待ち
    Await.ready(future, Duration.Inf)
}

それでは sbt run で実行してみます。
"run <引数> ・・・" のように run と引数をダブルクォーテーションで囲めば、sbt run で実行時引数を指定する事が可能です。

実行結果
> sbt "run sample.txt"
・・・
[info] Running fits.sample.EnumerateeLineSample sample.txt
#Play2のIterateeを使ったファイル処理の
#サンプル
#
#1行毎に処理するサンプルを
#実装してみました。
#

実はこの sbt run は処理が完了してもプロセスが終了しないので Ctrl + C する必要があります。

プロセスを終了するには play.api.libs.iteratee.internal.defaultExecutionContext 等で生成されている ExecutorService とかを shutdown する必要があると思うのですが、今回は上手く終了する方法を見つけられませんでした。

とりあえず、以下のようにスレッドを interrupt しまくれば強引にプロセスを終了させる事は可能です。

・・・
object EnumerateeLineSample extends App {
    ・・・
    // 強引にプロセスを終了する
    import scala.collection.convert.WrapAsScala._
    Thread.getAllStackTraces().keySet().foreach(_.interrupt)
}


最後に、Enumeratee.grouped(takeLine) と Iteratee.foreach の間に Enumeratee.drop(1) と Enumeratee.take(2) を追加すれば、2行目と 3行目の先頭へ # を付けて出力するような処理に変わります。

EnumerateeLineSample2.scala
・・・
object EnumerateeLineSample2 extends App {
    ・・・
    // 1行目を捨てて 2行目から 2行取り出し各行の先頭へ # を付けて出力する処理の組み立て
    val future = enumerator &> Enumeratee.mapConcat( _.toSeq ) &> Enumeratee.grouped(takeLine) &> 
          Enumeratee.drop(1) &> Enumeratee.take(2) |>>> Iteratee.foreach { s => 

        println(s"#${s}")
    }

    Await.ready(future, Duration.Inf)
}
実行結果
・・・
[info] Running fits.sample.EnumerateeLineSample2 sample.txt
#サンプル
#

*1:型は Iteratee[Byte, String]