Mono で Rx を使用する
Mono で Rx (Reactive Extensions) を使ってみました。
Rx は非同期やイベント処理を LINQ で実装できるようにする API で、id:fits:20130212 や id:fits:20130216 で試した Iteratee によく似ていると思います。
生産者 | 消費者 | |
---|---|---|
Iteratee | Enumerator | Iteratee |
Rx | IObservable | IObserver |
今回作成したソースは http://github.com/fits/try_samples/tree/master/blog/20130224/
Rx インストール手順(コマンドライン版 NuGet の利用)
コマンドライン版の NuGet を使って Mono で Rx をインストールするには以下のような手順を実施します。
- (1) Mono に SSL 証明書をインストール
- (2) コマンドライン版 NuGet 取得とアップデート
- (3) Rx インストール
(1) Mono に SSL 証明書をインストール
NuGet がパッケージをダウンロードできるように mozroots コマンドを使って Mono に SSL 証明書をインストールしておきます。
> mozroots --import --sync
(2) コマンドライン版 NuGet 取得とアップデート
下記 NuGet ダウンロードページから NuGet.exe Command Line をダウンロードして任意のディレクトリに保存します。
次に、mono コマンドで NuGet.exe を実行し NuGet.exe 2.2.1 へのアップデートを行います。
> mono NuGet.exe NuGet bootstrapper 1.0.0.0 Found NuGet.exe version 2.2.1. Downloading... Update complete.
(3) Rx インストール
NuGet.exe を使って Rx-Main をインストールすると、カレントディレクトリに Rx のパッケージがインストールされます。
> mono NuGet.exe install Rx-Main Attempting to resolve dependency 'Rx-Interfaces (? 2.1.30214.0)'. Attempting to resolve dependency 'Rx-Core (? 2.1.30214.0)'. Attempting to resolve dependency 'Rx-Linq (? 2.1.30214.0)'. Attempting to resolve dependency 'Rx-PlatformServices (? 2.1.30214.0)'. Successfully installed 'Rx-Interfaces 2.1.30214.0'. Successfully installed 'Rx-Core 2.1.30214.0'. Successfully installed 'Rx-Linq 2.1.30214.0'. Successfully installed 'Rx-PlatformServices 2.1.30214.0'. Successfully installed 'Rx-Main 2.1.30214.0'.
単純な Rx サンプル
以下のような単純な処理を行うサンプルを作成してみます。 *1
- (1) 1 〜 3 を出力
- (2) 1 〜 4 の偶数だけを取り出し先頭に # を付けて出力
Rx では、Subscribe メソッドを使って IObservable へ IObserver を登録します。
ObservableExtensions *2 には Subscribe 用の拡張メソッドが色々と定義されており、下記では "Subscribe
RxSample.cs
using System; using System.Linq; using System.Reactive.Linq; class RxSample { static void Main(string[] args) { // (1) 1 〜 3 を出力 Observable.Range(1, 3).Subscribe(Console.WriteLine); Console.WriteLine("-----"); // (2) 1 〜 4 の偶数だけを取り出し先頭に # を付けて出力 Observable.Range(1, 4).Where(x => x % 2 == 0).Select(x => "#" + x).Subscribe(Console.WriteLine); // 以下でも可 //(from x in Observable.Range(1, 3) where x % 2 == 0 select "#" + x).Subscribe(Console.WriteLine); } }
mcs でソースをビルドし mono で実行します。
ビルド
> mcs -r:System.Reactive.Core,System.Reactive.Linq RxSample.cs
実行
> mono RxSample.exe 1 2 3 ----- #2 #4
Rx で行単位のファイル処理
id:fits:20130216 で実装したような行単位のファイル処理を Rx で実装してみました。
Observable.Create() という IObservable オブジェクトを作成するメソッドが用意されているので、今回はこれを使用する事にします。
Observable.Create() には IObserver を引数とするラムダ式を渡し、ラムダ式内で以下のような処理を実装します。
- OnNext を実行しデータを PUSH (ファイルから 1行読んで OnNext 実行)
- 完了時に OnCompleted 実行 (ファイルの読み込み完了時に OnCompleted 実行)
- エラー発生時に OnError 実行
なお、今回はキャンセル機能を用意しないため Disposable.Empty を返しています。
RxFileSample.cs
using System; using System.IO; using System.Linq; using System.Reactive.Disposables; using System.Reactive.Linq; class RxFileSample { static void Main(string[] args) { // 全行出力 FromFile(args[0]).Subscribe(Console.WriteLine); Console.WriteLine("-----"); // 1行目をスキップして 2・3 行目の先頭に # を付けて出力 FromFile(args[0]).Skip(1).Take(2).Select(x => "#" + x).Subscribe(Console.WriteLine); } private static IObservable<string> FromFile(string fileName) { return Observable.Create<string>(observer => { try { using(var reader = File.OpenText(fileName)) { while (!reader.EndOfStream) { // 1行分のデータを PUSH observer.OnNext(reader.ReadLine()); } } // 完了時 observer.OnCompleted(); } catch (Exception error) { // エラー発生時 observer.OnError(error); } return Disposable.Empty; }); } }
ビルド
> mcs -r:System.Reactive.Core,System.Reactive.Linq RxFileSample.cs
実行
> mono RxFileSample.exe sample.txt Rx を使ったファイル処理の サンプル 1行毎に処理するサンプルを 実装してみました。 ----- #サンプル #
Rx で行単位の非同期ファイル処理
最後に、ファイルの 1行読み込み部分を async/await を使って非同期化してみました。
Observable.Create() へ渡すラムダ式に async を付けて 1行読み込みの箇所を await reader.ReadLineAsync() とします。
RxAsyncFileSample.cs
・・・ class RxAsyncFileSample { static void Main(string[] args) { // 全行出力 FromAsyncFile(args[0]).Subscribe(Console.WriteLine); Console.WriteLine("-----"); // 1行目をスキップして 2・3 行目の先頭に # を付けて出力 FromAsyncFile(args[0]).Skip(1).Take(2).Select(x => "#" + x).Subscribe(Console.WriteLine); } private static IObservable<string> FromAsyncFile(string fileName) { // async を付ける return Observable.Create<string>(async observer => { try { using(var reader = File.OpenText(fileName)) { while (!reader.EndOfStream) { // await と ReadLineAsync() を使用 var line = await reader.ReadLineAsync(); observer.OnNext(line); } } observer.OnCompleted(); } catch (Exception error) { observer.OnError(error); } return Disposable.Empty; }); } }
ただし、このサンプルは Mono 3.0.3 では正常に動作しません。
Mono 3.0.3 では 2行目の読み込み時に以下のようなエラーが発生し、正常に処理が続きません。
Mono 3.0.3 で実行した場合のエラー内容
System.InvalidOperationException: Operation is not valid due to the current state of the object ・・・
なお、.NET Framework SDK 4.5 でビルドし実行した場合は動作しました。 *3
ビルド(.NET Framework 4.5)
> csc /r:System.Reactive.Core.dll;System.Reactive.Linq.dll;System.Reactive.Interfaces.dll;System.Threading.Tasks.dll;System.Runtime.dll RxAsyncFileSample.cs Microsoft (R) Visual C# Compiler Version 4.0.30319.17929 for Microsoft (R) .NET Framework 4.5 Copyright (C) Microsoft Corporation. All rights reserved.
実行(.NET Framework 4.5)
> RxAsyncFileSample.exe sample.txt Rx を使ったファイル処理の サンプル 1行毎に処理するサンプルを 実装してみました。 ----- #サンプル #
*1:NuGet を実行したディレクトリにソースを作成します
*2:ソースは Observable.Extensions.cs
*3:.NET Framework 4.5 の場合はカレントディレクトリに Rx 関連のアセンブリ(.dll)を配置してビルドと実行を行いました