RxJS で行単位のファイル処理

前々回(id:fits:20130224)・前回(id:fits:20130310)のファイル処理と同様の処理を RxJS を使い CoffeeScript で実装してみました。

今回作成したソースは http://github.com/fits/try_samples/tree/master/blog/20130313/

RxJS インストール

RxJS をインストールするため下記のような package.json を用意して npm install しました。

package.json
{
  ・・・
  "dependencies": {
    "coffee-script": "*",
    "rxjs": "*"
  }
}
npm でインストール
> npm install
・・・
npm http GET https://registry.npmjs.org/rxjs
npm http GET https://registry.npmjs.org/coffee-script
・・・
rxjs@1.0.10621 node_modules\rxjs

coffee-script@1.6.1 node_modules\coffee-script

readFile を使ったサンプル

ファイルの内容を一括で読み込んでコールバック関数を呼び出す fs.readFile を使って実装してみました。

1行分のデータを PUSH する Observable はこれまでと同じように Observable.create を使って作成します。

ファイルの内容がコールバック関数の data に設定されてくるので、改行で split して 1行毎に onNext を呼び出します。

今回もキャンセル機能は用意しないため、何もしない関数 "->" を返しています。*1
ここで関数を返さないと実行時に Type Error が発生する事になります。

readline_file.coffee
rx = require 'rxjs'
fs = require 'fs'

fromFile = (file) ->
    rx.Observable.create (observer) ->
        fs.readFile file, (err, data) ->
            if err
                # エラー発生時
                observer.onError err
            else
                data.toString().split(/\n|\r\n/).forEach (line) ->
                    # 1行分のデータを PUSH
                    observer.onNext line
                # 完了時
                observer.onCompleted()

        # 何もしない関数を返す
        ->

# 1行目をスキップして 2・3 行目の先頭に # を付けて出力
fromFile(process.argv[2]).skip(1).take(2).subscribe (x) -> console.log '#' + x
実行結果
> coffee readline_file.coffee sample.txt
#サンプル
#

Stream を使ったサンプル

次は Stream を使って同様の処理を実装してみました。

fs.createReadStream で Stream を取得します。
ここで encoding を指定しておくと data イベントで取得できる内容が encoding で指定した文字コードの文字列となります。

Stream の場合は、基本的にバッファサイズでデータが分割される点に注意が必要で、適切な encoding を指定しておかないとバッファサイズによっては文字化けする事になります。

readline_file2.coffee
rx = require 'rxjs'
fs = require 'fs'

fromFile = (file) ->
    rx.Observable.create (observer) ->
        stream = fs.createReadStream file, {encoding: 'utf-8'}

        # エラー発生時
        stream.on 'error', (ex) -> observer.onError ex
        # 完了時
        stream.on 'end', -> observer.onCompleted()

        stream.on 'close', -> console.log '*** close'

        buf = ''

        stream.on 'data', (data) ->
            list = (buf + data).split(/\n|\r\n/)
            # 次に持ち越すデータ
            buf = list.pop()

            # 1行分ずつデータを PUSH
            observer.onNext line for line in list

        # 何もしない関数を返す
        ->

# 1行目をスキップして 2・3 行目の先頭に # を付けて出力
fromFile(process.argv[2]).skip(1).take(2).subscribe (x) -> console.log '#' + x
実行結果
> coffee readline_file2.coffee sample.txt
#サンプル
#
*** close

*1:Java の Observable.noOpSubscription() や C# の Disposable.Empty に該当