Play2 の Iteratee を使った行単位のファイル処理2 - Enumerator.generateM, Enumerator.fromCallback1

前回 id:fits:20130212 は Enumeratee.grouped() を使って行単位のファイル処理を実装しましたが、今回は Enumerator.generateM() と Enumerator.fromCallback1() をそれぞれ使って同様の処理を実装してみました。

sbt のビルド定義ファイルなどは前回と同様のものを使っています。

サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20130216/

Enumerator.generateM による行分割

Enumerator.generateM は、引数として渡した処理 *1 を繰り返し実行する Enumerator を作成します。

繰り返しは None を返す事で終了するため、下記 (a) ではファイルを 1行読んで Some(<1行分の文字列>) を返し、ファイルの終端に達した場合に None を返すようにしています。
こうする事でファイルの内容を 1行ずつ返す Enumerator を実現できます。

(b) の処理でファイルの内容を全て消費してしまいますが、(c) で BufferedReader をリセットする事で *2、(d) では (a) で作成した enumerator をそのまま使って 1行目から処理できるようになっています。

さらに (e) でも enumerator を利用していますが、enumerator はファイルの終端に達していないため (d) からの続きの行を処理する事になります。

ファイルのクローズ処理 (f) は Iteratee の処理とは別に一番最後で実施するようにしました。

EnumeratorLineSample.scala (Enumerator.generateM を使ったサンプル)
package fits.sample

import play.api.libs.iteratee._

import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration

import java.io.{BufferedReader, FileInputStream, InputStreamReader}
import java.nio.charset.StandardCharsets._

object EnumeratorLineSample extends App {
    import scala.concurrent.ExecutionContext.Implicits.global

    val reader = new BufferedReader(new InputStreamReader(new FileInputStream(args(0)), UTF_8))
    reader.mark(512)

    // (a) ファイルの内容を 1行ずつ返す Enumerator
    val enumerator = Enumerator.generateM( Future {
        reader.readLine match {
            case line: String => Some(line)
            case _            => None // ファイルの終端に達した場合
        }
    })

    // (b) 行毎に先頭へ # を付けて出力する処理の組み立て
    val future = enumerator |>>> Iteratee.foreach { s => 
        println(s"#${s}")
    }

    Await.ready(future, Duration.Inf)

    println("----------")
    // (c) ファイルの内容を先頭から返すようにリセット
    reader.reset

    // (d) 1行目を捨てて 2行目から 2行取り出し各行の先頭へ # を付けて出力する処理の組み立て
    val future2 = enumerator &> Enumeratee.drop(1) &> Enumeratee.take(2) |>>> Iteratee.foreach { s => 
        println(s"#${s}")
    }

    Await.ready(future2, Duration.Inf)

    println("----------")

    // (e) 1行出力。(d) からの続きとなるのでファイルの 4行目が出力される
    val future3 = enumerator &> Enumeratee.take(1) |>>> Iteratee.foreach { s => 
        println(s"#${s}")
    }

    Await.ready(future3, Duration.Inf)

    // (f) クローズ処理
    reader.close
}

実行結果は以下の通りです。

実行結果
> sbt "run sample.txt"
・・・
[info] Running fits.sample.EnumeratorLineSample sample.txt
#Play2のIterateeを使ったファイル処理の
#サンプル
#
#1行毎に処理するサンプルを
#実装してみました。
----------
#サンプル
#
----------
#1行毎に処理するサンプルを

Enumerator.fromCallback1 による行分割

次は、Enumerator.fromCallback1 *3 を使って同様の処理を実装してみます。

generateM に渡していた処理を fromCallback1 の第一引数の処理結果として返すようにすれば同じ処理内容となります。

ただし下記では、fromCallback1 の第二引数(onComplete)に BufferedReader を close する処理 (b) を渡し、Iteratee の処理が完了する度に入力ストリーム (今回はファイル) をクローズするようにしています。

(e) では (d) で使った enumerator2 を使っていますが、(d) でファイルはクローズされており Enumerator は何も生成しないため何も出力されません。

EnumeratorLineSample2.scala (Enumerator.fromCallback1 を使ったサンプル)
・・・
object EnumeratorLineSample2 extends App {
    import scala.concurrent.ExecutionContext.Implicits.global

    // (a) 1行ずつファイルの内容を返す Enumerator を作成
    def fromStreamLine(input: BufferedReader) = {
        Enumerator.fromCallback1(_ => Future {
            input.readLine match {
                case line: String => Some(line)
                case _            => None // ファイルの終端に達した場合
            }
        }, {
            println("*** close")
            // (b) ファイルのクローズ処理
            input.close
        })
    }

    val enumerator = fromStreamLine(new BufferedReader(new InputStreamReader(new FileInputStream(args(0)), UTF_8)))

    // (c) 行毎に先頭へ # を付けて出力する処理の組み立て
    val future = enumerator |>>> Iteratee.foreach { s => 
        println(s"#${s}")
    }

    Await.ready(future, Duration.Inf)

    println("----------")

    val enumerator2 = fromStreamLine(new BufferedReader(new InputStreamReader(new FileInputStream(args(0)), UTF_8)))

    // (d) 1行目を捨てて 2行目から 2行取り出し各行の先頭へ # を付けて出力する処理の組み立て
    val future2 = enumerator2 &> Enumeratee.drop(1) &> Enumeratee.take(2) |>>> Iteratee.foreach { s => 
        println(s"#${s}")
    }

    Await.ready(future2, Duration.Inf)

    println("----------")

    // (e) ファイルはクローズ済みのため何も出力されない
    val future3 = enumerator2 &> Enumeratee.take(1) |>>> Iteratee.foreach { s => 
        println(s"#${s}")
    }

    Await.ready(future3, Duration.Inf)
}

実行結果は以下の通りです。
Iteratee の処理を行う度に "*** close" が出力されている事を確認できます。

実行結果
・・・
[info] Running fits.sample.EnumeratorLineSample2 sample.txt
*** close
#Play2のIterateeを使ったファイル処理の
#サンプル
#
#1行毎に処理するサンプルを
#実装してみました。
----------
*** close
#サンプル
#
----------

*1:Future を返す処理

*2:BufferedReader の mark と reset で実現

*3:Enumerator.fromCallback もあるが、こちらは deprecated となっている