Mono で Rx を使用する

Mono で Rx (Reactive Extensions) を使ってみました。

Rx は非同期やイベント処理を LINQ で実装できるようにする API で、id:fits:20130212 や id:fits:20130216 で試した Iteratee によく似ていると思います。

生産者 消費者
Iteratee Enumerator Iteratee
Rx IObservable IObserver

今回作成したソースは http://github.com/fits/try_samples/tree/master/blog/20130224/

Rx インストール手順(コマンドライン版 NuGet の利用)

コマンドライン版の NuGet を使って Mono で Rx をインストールするには以下のような手順を実施します。

  • (1) Mono に SSL 証明書をインストール
  • (2) コマンドライン版 NuGet 取得とアップデート
  • (3) Rx インストール
(1) Mono に SSL 証明書をインストール

NuGet がパッケージをダウンロードできるように mozroots コマンドを使って Mono に SSL 証明書をインストールしておきます。

> mozroots --import --sync
(2) コマンドライン版 NuGet 取得とアップデート

下記 NuGet ダウンロードページから NuGet.exe Command Line をダウンロードして任意のディレクトリに保存します。

次に、mono コマンドで NuGet.exe を実行し NuGet.exe 2.2.1 へのアップデートを行います。

> mono NuGet.exe

NuGet bootstrapper 1.0.0.0
Found NuGet.exe version 2.2.1.
Downloading...
Update complete.
(3) Rx インストール

NuGet.exe を使って Rx-Main をインストールすると、カレントディレクトリに Rx のパッケージがインストールされます。

> mono NuGet.exe install Rx-Main

Attempting to resolve dependency 'Rx-Interfaces (? 2.1.30214.0)'.
Attempting to resolve dependency 'Rx-Core (? 2.1.30214.0)'.
Attempting to resolve dependency 'Rx-Linq (? 2.1.30214.0)'.
Attempting to resolve dependency 'Rx-PlatformServices (? 2.1.30214.0)'.
Successfully installed 'Rx-Interfaces 2.1.30214.0'.
Successfully installed 'Rx-Core 2.1.30214.0'.
Successfully installed 'Rx-Linq 2.1.30214.0'.
Successfully installed 'Rx-PlatformServices 2.1.30214.0'.
Successfully installed 'Rx-Main 2.1.30214.0'.

単純な Rx サンプル

以下のような単純な処理を行うサンプルを作成してみます。 *1

  • (1) 1 〜 3 を出力
  • (2) 1 〜 4 の偶数だけを取り出し先頭に # を付けて出力

Rx では、Subscribe メソッドを使って IObservable へ IObserver を登録します。

ObservableExtensions *2 には Subscribe 用の拡張メソッドが色々と定義されており、下記では "Subscribe(this IObservable source, Action onNext)" 拡張メソッドを利用し Console.WriteLine を AnonymousObserver として Subscribe しています。

RxSample.cs
using System;
using System.Linq;
using System.Reactive.Linq;

class RxSample
{
    static void Main(string[] args)
    {
        // (1) 1 〜 3 を出力
        Observable.Range(1, 3).Subscribe(Console.WriteLine);

        Console.WriteLine("-----");

        // (2) 1 〜 4 の偶数だけを取り出し先頭に # を付けて出力
        Observable.Range(1, 4).Where(x => x % 2 == 0).Select(x => "#" + x).Subscribe(Console.WriteLine);

        // 以下でも可
        //(from x in Observable.Range(1, 3) where x % 2 == 0 select "#" + x).Subscribe(Console.WriteLine);
    }
}

mcs でソースをビルドし mono で実行します。

ビルド
> mcs -r:System.Reactive.Core,System.Reactive.Linq RxSample.cs
実行
> mono RxSample.exe
1
2
3
-----
#2
#4

Rx で行単位のファイル処理

id:fits:20130216 で実装したような行単位のファイル処理を Rx で実装してみました。

Observable.Create() という IObservable オブジェクトを作成するメソッドが用意されているので、今回はこれを使用する事にします。

Observable.Create() には IObserver を引数とするラムダ式を渡し、ラムダ式内で以下のような処理を実装します。

  • OnNext を実行しデータを PUSH (ファイルから 1行読んで OnNext 実行)
  • 完了時に OnCompleted 実行 (ファイルの読み込み完了時に OnCompleted 実行)
  • エラー発生時に OnError 実行

なお、今回はキャンセル機能を用意しないため Disposable.Empty を返しています。

RxFileSample.cs
using System;
using System.IO;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;

class RxFileSample
{
    static void Main(string[] args)
    {
        // 全行出力
        FromFile(args[0]).Subscribe(Console.WriteLine);

        Console.WriteLine("-----");

        // 1行目をスキップして 2・3 行目の先頭に # を付けて出力
        FromFile(args[0]).Skip(1).Take(2).Select(x => "#" + x).Subscribe(Console.WriteLine);
    }

    private static IObservable<string> FromFile(string fileName)
    {
        return Observable.Create<string>(observer => {
            try
            {
                using(var reader = File.OpenText(fileName))
                {
                    while (!reader.EndOfStream)
                    {
                        // 1行分のデータを PUSH
                        observer.OnNext(reader.ReadLine());
                    }
                }
                // 完了時
                observer.OnCompleted();
            }
            catch (Exception error) {
                // エラー発生時
                observer.OnError(error);
            }
            return Disposable.Empty;
        });
    }
}
ビルド
> mcs -r:System.Reactive.Core,System.Reactive.Linq RxFileSample.cs
実行
> mono RxFileSample.exe sample.txt
Rx を使ったファイル処理の
サンプル

1行毎に処理するサンプルを
実装してみました。
-----
#サンプル
#

Rx で行単位の非同期ファイル処理

最後に、ファイルの 1行読み込み部分を async/await を使って非同期化してみました。

Observable.Create() へ渡すラムダ式に async を付けて 1行読み込みの箇所を await reader.ReadLineAsync() とします。

RxAsyncFileSample.cs
・・・
class RxAsyncFileSample
{
    static void Main(string[] args)
    {
        // 全行出力
        FromAsyncFile(args[0]).Subscribe(Console.WriteLine);

        Console.WriteLine("-----");

        // 1行目をスキップして 2・3 行目の先頭に # を付けて出力
        FromAsyncFile(args[0]).Skip(1).Take(2).Select(x => "#" + x).Subscribe(Console.WriteLine);
    }

    private static IObservable<string> FromAsyncFile(string fileName)
    {
        // async を付ける
        return Observable.Create<string>(async observer => {
            try
            {
                using(var reader = File.OpenText(fileName))
                {
                    while (!reader.EndOfStream)
                    {
                        // await と ReadLineAsync() を使用
                        var line = await reader.ReadLineAsync();
                        observer.OnNext(line);
                    }
                }
                observer.OnCompleted();
            }
            catch (Exception error) {
                observer.OnError(error);
            }
            return Disposable.Empty;
        });
    }
}

ただし、このサンプルは Mono 3.0.3 では正常に動作しません。
Mono 3.0.3 では 2行目の読み込み時に以下のようなエラーが発生し、正常に処理が続きません。

Mono 3.0.3 で実行した場合のエラー内容
System.InvalidOperationException: Operation is not valid due to the current state of the object
・・・

なお、.NET Framework SDK 4.5 でビルドし実行した場合は動作しました。 *3

ビルド(.NET Framework 4.5)
> csc /r:System.Reactive.Core.dll;System.Reactive.Linq.dll;System.Reactive.Interfaces.dll;System.Threading.Tasks.dll;System.Runtime.dll RxAsyncFileSample.cs

Microsoft (R) Visual C# Compiler Version 4.0.30319.17929
for Microsoft (R) .NET Framework 4.5
Copyright (C) Microsoft Corporation. All rights reserved.
実行(.NET Framework 4.5)
> RxAsyncFileSample.exe sample.txt
Rx を使ったファイル処理の
サンプル

1行毎に処理するサンプルを
実装してみました。
-----
#サンプル
#

*1:NuGet を実行したディレクトリにソースを作成します

*2:ソースは Observable.Extensions.cs

*3:.NET Framework 4.5 の場合はカレントディレクトリに Rx 関連のアセンブリ(.dll)を配置してビルドと実行を行いました