Scala の Actor - receive/react と Thread
Scala にはアクターモデルを実装した API が標準で用意されており、使い方も比較的分かり易くなっている。
ただし、! メソッドでメッセージ送信する等、初めて見ると何をやっているのか理解に苦しむ点が難点かも。(Erlang を知っていれば入り易いんだろうけど)
同様の事を自前で実現するには Active Object パターンとかを実装する事になるだろうから、実用度は高そうに思う。リモートコール用の仕組みも用意されているみたいだし。
ちなみに、Active Object パターンは並列処理関係のパターンで、非同期メッセージを処理する能動的なオブジェクトを実装するためのもの。詳しくは 増補改訂版 Java言語で学ぶデザインパターン入門 マルチスレッド編 や プログラムデザインのためのパターン言語―Pattern Languages of Program Design選集 を参照。
Actor の使い方
scala.actors.Actor オブジェクトの各種メソッドを使えば、Actor の作成やメッセージ受信後の処理を指定できるようになっている。
おおまかな内容は以下の通り。
- actor メソッドでアクターの処理内容を設定すると Actor がインスタンス化され、start が実行される
- ! メソッドで非同期メッセージ送信、!? メソッドでメッセージ送信後に結果の取得を待機(同期)
- receive や react メソッドにメッセージ受信後の処理内容を設定
- reply メソッドでメッセージの送信元に返信
- loop メソッドで処理内容を繰り返す
- self で実行中のカレント Actor 取得
actorSample.scala
import scala.actors.Actor._ case class Data(n: Int) //Actor の作成 val a = actor { //複数のメッセージを受信するためにループを指定 loop { //メッセージ受信時の処理を設定 receive { case s: String => println("receive string: " + s) case d: Data => println("receive data: " + d.n) case 0 => { println("receive zero") //メッセージを返信 reply("complete") } } } } //Actor に文字列を送信 a ! "test" //Actor に Data を送信 a ! Data(5) //Actor に 0 を送信し返信を受信 val retval = a !? 0 println("return " + retval)
actorSample.scala 実行例
>scala actorSample.scala receive string: test receive data: 5 receive zero return complete
Actor と Thread の関係
先ほどのサンプルを改造し、実行中の Thread を出力してみると、Actor を使うことで複数の Thread が追加されている事が分かる。
actorSample2.scala
import scala.actors.Actor import scala.actors.Actor._ //implicit conversion で Java のコレクションクラスを //Scala のコレクションに変換させるための設定 import scala.collection.jcl.Conversions._ def printThread(actor: Actor, msg: String) = { println(Thread.currentThread() + " " + msg + " - " + actor) } case class Data(n: Int) //Thread のリストを取得 val initThreadList = Thread.getAllStackTraces.keySet //Actor の作成 val a = actor { //複数のメッセージを受信するためにループを指定 loop { //メッセージ受信時の処理を設定 receive { case s: String => printThread(self, "receive string: " + s) case d: Data => printThread(self, "receive data: " + d.n) case 0 => { printThread(self, "receive zero") //メッセージを返信 reply("complete") } } } } ・・・ //0 を送信し返信を受信 val retval = a !? 0 printThread(self, "return " + retval) println("-------- thread list -----------") //Actor の作成後に増えた Thread のみ出力 // // implicit conversion により Java のコレクションクラスから // Scala のコレクションに自動変換されるため // -- や foreach メソッドが使用できる (Thread.getAllStackTraces.keySet -- initThreadList) foreach { thread => println("- " + thread) } println("--------------------------------")
actorSample2.scala 実行例
>scala actorSample2.scala Thread[Thread-3,5,main] receive string: test - scala.actors.Actor$$anon$1@291aff Thread[Thread-3,5,main] receive data: 5 - scala.actors.Actor$$anon$1@291aff Thread[Thread-3,5,main] receive zero - scala.actors.Actor$$anon$1@291aff Thread[main,5,main] return complete - scala.actors.ActorProxy@56a499 -------- thread list ----------- - Thread[Thread-5,5,main] - Thread[Thread-6,2,main] - Thread[Timer-0,5,main] - Thread[Thread-3,5,main] - Thread[Thread-2,5,main] - Thread[Thread-4,5,main] --------------------------------
receive/react と Thread の関係
次に、receive と react をそれぞれ使った場合に Thread がどのように使用されるのかを調べてみる。
1個の Actor に対して 10個の Actor からメッセージを送信するようなサンプルの receive 版と react 版を作って Thread の状況を出力してみたところ、以下のような結果になった。
- Actor と Thread は 1対1 に対応しない(状況に応じて動的に割り当てられる模様)
- receive は個別の Thread で処理する
- react は Thread を共有して処理する
なお、以下のサンプルではメインスレッドで受信待ちしないため、Application オブジェクトとして定義し、全てのメッセージを受信した後に exit するようにしている。
まずは receive を使ったサンプルと実行例。
基本的に、実行中の Actor 毎に Thread が割り当てられている。
receiveTest.scala(receive を使った場合)
import scala.actors.Actor import scala.actors.Actor._ import scala.actors.Exit import scala.collection.jcl.Conversions._ object receiveTest extends Application { def printThread(actor: Actor, msg: Any) = { println(Thread.currentThread() + " " + msg + " - " + actor) } //実行 Thread の差分(Actor 生成前後)を出力 def printThreadList() { (Thread.getAllStackTraces.keySet -- initThreadList) foreach { thread => println("- " + thread) } } val initThreadList = Thread.getAllStackTraces.keySet //10個の Actor からのメッセージを処理する Actor 作成 val consumer = actor { loop { receive { case num: int => { printThread(self, num) reply("received=" + num) } case Exit => { printThread(self, 10) reply("received exit") printThreadList() exit } } } } //10個の Actor を作成しメッセージを送信 for (val i <- 1 to 10) actor { //メッセージの作成 val msg = i match { case 10 => Exit case n => n } //メッセージを送信し返信を処理 consumer !? msg match { case r: String => printThread(self, r) } /* 上記は以下と同じ処理内容 consumer ! msg receive { case r: String => printThread(self, r) } */ } }
receiveTest.scala 実行例(receive を使った場合)
>scalac receiveTest.scala >scala receiveTest Thread[Thread-2,5,main] 1 - scala.actors.Actor$$anon$1@7c6768 Thread[Thread-3,5,main] received=1 - scala.actors.Actor$$anon$1@13c5982 Thread[Thread-11,5,main] 2 - scala.actors.Actor$$anon$1@7c6768 Thread[Thread-11,5,main] 3 - scala.actors.Actor$$anon$1@7c6768 Thread[Thread-11,5,main] 4 - scala.actors.Actor$$anon$1@7c6768 Thread[Thread-11,5,main] 5 - scala.actors.Actor$$anon$1@7c6768 Thread[Thread-11,5,main] 6 - scala.actors.Actor$$anon$1@7c6768 Thread[Thread-11,5,main] 7 - scala.actors.Actor$$anon$1@7c6768 Thread[Thread-11,5,main] 8 - scala.actors.Actor$$anon$1@7c6768 Thread[Thread-11,5,main] 9 - scala.actors.Actor$$anon$1@7c6768 Thread[Thread-11,5,main] 10 - scala.actors.Actor$$anon$1@7c6768 - Thread[Thread-3,5,main] - Thread[Thread-4,5,main] - Thread[Thread-10,5,main] - Thread[Thread-9,5,main] - Thread[Thread-1,5,main] - Thread[Thread-2,5,main] - Thread[Thread-7,5,main] - Thread[DestroyJavaVM,5,main] - Thread[Thread-5,5,main] - Thread[Timer-0,5,main] - Thread[Thread-6,5,main] - Thread[Thread-8,5,main] - Thread[Thread-11,5,main] Thread[Thread-2,5,main] received=2 - scala.actors.Actor$$anon$1@422ede Thread[Thread-10,5,main] received exit - scala.actors.Actor$$anon$1@dd20f6 Thread[Thread-9,5,main] received=9 - scala.actors.Actor$$anon$1@a83b8a Thread[Thread-8,5,main] received=8 - scala.actors.Actor$$anon$1@110b053 Thread[Thread-7,5,main] received=7 - scala.actors.Actor$$anon$1@111a3ac Thread[Thread-6,5,main] received=6 - scala.actors.Actor$$anon$1@b89838 Thread[Thread-5,5,main] received=5 - scala.actors.Actor$$anon$1@93dcd Thread[Thread-4,5,main] received=4 - scala.actors.Actor$$anon$1@1d9dc39 Thread[Thread-3,5,main] received=3 - scala.actors.Actor$$anon$1@112f614
次に react を使ったサンプルと実行例。
基本的に、Actor が同じ Thread 上で処理されている。
reactTest.scala(react を使った場合)
import scala.actors.Actor import scala.actors.Actor._ import scala.actors.Exit import scala.collection.jcl.Conversions._ //receiveTest の receive メソッド呼び出し箇所を react に変更しただけ object reactTest extends Application { ・・・ val consumer = actor { loop { react { ・・・ } } } for (val i <- 1 to 10) actor { ・・・ consumer ! msg react { case r: String => { printThread(self, r) } } } }
reactTest.scala 実行例(react を使った場合)
>scalac reactTest.scala >scala reactTest Thread[Thread-3,5,main] 1 - scala.actors.Actor$$anon$1@1f33675 Thread[Thread-3,5,main] react=1 - scala.actors.Actor$$anon$1@19ee1ac Thread[Thread-3,5,main] 2 - scala.actors.Actor$$anon$1@1f33675 Thread[Thread-3,5,main] react=2 - scala.actors.Actor$$anon$1@1f1fba0 Thread[Thread-3,5,main] 3 - scala.actors.Actor$$anon$1@1f33675 Thread[Thread-3,5,main] react=3 - scala.actors.Actor$$anon$1@1befab0 Thread[Thread-3,5,main] 4 - scala.actors.Actor$$anon$1@1f33675 Thread[Thread-3,5,main] react=4 - scala.actors.Actor$$anon$1@13c5982 Thread[Thread-3,5,main] 5 - scala.actors.Actor$$anon$1@1f33675 Thread[Thread-3,5,main] react=5 - scala.actors.Actor$$anon$1@1186fab Thread[Thread-3,5,main] 6 - scala.actors.Actor$$anon$1@1f33675 Thread[Thread-3,5,main] react=6 - scala.actors.Actor$$anon$1@14b7453 Thread[Thread-3,5,main] 7 - scala.actors.Actor$$anon$1@1f33675 Thread[Thread-3,5,main] react=7 - scala.actors.Actor$$anon$1@c21495 Thread[Thread-3,5,main] 8 - scala.actors.Actor$$anon$1@1f33675 Thread[Thread-3,5,main] react=8 - scala.actors.Actor$$anon$1@1d5550d Thread[Thread-3,5,main] 9 - scala.actors.Actor$$anon$1@1f33675 Thread[Thread-3,5,main] react=9 - scala.actors.Actor$$anon$1@a0dcd9 Thread[Thread-3,5,main] 10 - scala.actors.Actor$$anon$1@1f33675 - Thread[Thread-3,5,main] - Thread[Thread-2,2,main] - Thread[Thread-4,2,main] - Thread[Thread-5,5,main] - Thread[Timer-0,5,main] - Thread[Thread-1,5,main] Thread[Thread-5,5,main] react exit - scala.actors.Actor$$anon$1@723d7c