読者です 読者をやめる 読者になる 読者になる

Scala の Actor - receive/react と Thread

Scala にはアクターモデルを実装した API が標準で用意されており、使い方も比較的分かり易くなっている。
ただし、! メソッドでメッセージ送信する等、初めて見ると何をやっているのか理解に苦しむ点が難点かも。(Erlang を知っていれば入り易いんだろうけど)

同様の事を自前で実現するには Active Object パターンとかを実装する事になるだろうから、実用度は高そうに思う。リモートコール用の仕組みも用意されているみたいだし。
ちなみに、Active Object パターンは並列処理関係のパターンで、非同期メッセージを処理する能動的なオブジェクトを実装するためのもの。詳しくは 増補改訂版 Java言語で学ぶデザインパターン入門 マルチスレッド編プログラムデザインのためのパターン言語―Pattern Languages of Program Design選集 を参照。

Actor の使い方

scala.actors.Actor オブジェクトの各種メソッドを使えば、Actor の作成やメッセージ受信後の処理を指定できるようになっている。

おおまかな内容は以下の通り。

  • actor メソッドでアクターの処理内容を設定すると Actor がインスタンス化され、start が実行される
  • ! メソッドで非同期メッセージ送信、!? メソッドでメッセージ送信後に結果の取得を待機(同期)
  • receive や react メソッドにメッセージ受信後の処理内容を設定
  • reply メソッドでメッセージの送信元に返信
  • loop メソッドで処理内容を繰り返す
  • self で実行中のカレント Actor 取得
actorSample.scala
import scala.actors.Actor._

case class Data(n: Int)

//Actor の作成
val a = actor {
    //複数のメッセージを受信するためにループを指定
    loop {
        //メッセージ受信時の処理を設定
        receive {
            case s: String => println("receive string: " + s)
            case d: Data => println("receive data: " + d.n)
            case 0 => {
                println("receive zero")
                //メッセージを返信
                reply("complete")
            }
        }
    }
}

//Actor に文字列を送信
a ! "test"
//Actor に Data を送信
a ! Data(5)
//Actor に 0 を送信し返信を受信
val retval = a !? 0
println("return " + retval)
actorSample.scala 実行例
>scala actorSample.scala
receive string: test
receive data: 5
receive zero
return complete

Actor と Thread の関係

先ほどのサンプルを改造し、実行中の Thread を出力してみると、Actor を使うことで複数の Thread が追加されている事が分かる。

actorSample2.scala
import scala.actors.Actor
import scala.actors.Actor._
//implicit conversion で Java のコレクションクラスを
//Scala のコレクションに変換させるための設定
import scala.collection.jcl.Conversions._

def printThread(actor: Actor, msg: String) = {
    println(Thread.currentThread() + " " + msg + " - " + actor)
}

case class Data(n: Int)

//Thread のリストを取得
val initThreadList = Thread.getAllStackTraces.keySet

//Actor の作成
val a = actor {
    //複数のメッセージを受信するためにループを指定
    loop {
        //メッセージ受信時の処理を設定
        receive {
            case s: String => printThread(self, "receive string: " + s)
            case d: Data => printThread(self, "receive data: " + d.n)
            case 0 => {
                printThread(self, "receive zero")
                //メッセージを返信
                reply("complete")
            }
        }
    }
}
・・・
//0 を送信し返信を受信
val retval = a !? 0
printThread(self, "return " + retval)

println("-------- thread list -----------")
//Actor の作成後に増えた Thread のみ出力
//
// implicit conversion により Java のコレクションクラスから
// Scala のコレクションに自動変換されるため
// -- や foreach メソッドが使用できる
(Thread.getAllStackTraces.keySet -- initThreadList) foreach {
    thread => println("- " + thread)
}
println("--------------------------------")
actorSample2.scala 実行例
>scala actorSample2.scala
Thread[Thread-3,5,main] receive string: test - scala.actors.Actor$$anon$1@291aff
Thread[Thread-3,5,main] receive data: 5 - scala.actors.Actor$$anon$1@291aff
Thread[Thread-3,5,main] receive zero - scala.actors.Actor$$anon$1@291aff
Thread[main,5,main] return complete - scala.actors.ActorProxy@56a499
-------- thread list -----------
- Thread[Thread-5,5,main]
- Thread[Thread-6,2,main]
- Thread[Timer-0,5,main]
- Thread[Thread-3,5,main]
- Thread[Thread-2,5,main]
- Thread[Thread-4,5,main]
--------------------------------

receive/react と Thread の関係

次に、receive と react をそれぞれ使った場合に Thread がどのように使用されるのかを調べてみる。

1個の Actor に対して 10個の Actor からメッセージを送信するようなサンプルの receive 版と react 版を作って Thread の状況を出力してみたところ、以下のような結果になった。

  • Actor と Thread は 1対1 に対応しない(状況に応じて動的に割り当てられる模様)
  • receive は個別の Thread で処理する
  • react は Thread を共有して処理する

なお、以下のサンプルではメインスレッドで受信待ちしないため、Application オブジェクトとして定義し、全てのメッセージを受信した後に exit するようにしている。

まずは receive を使ったサンプルと実行例。
基本的に、実行中の Actor 毎に Thread が割り当てられている。

receiveTest.scala(receive を使った場合)
import scala.actors.Actor
import scala.actors.Actor._
import scala.actors.Exit
import scala.collection.jcl.Conversions._

object receiveTest extends Application {
    def printThread(actor: Actor, msg: Any) = {
        println(Thread.currentThread() + " " + msg + " - " + actor)
    }

    //実行 Thread の差分(Actor 生成前後)を出力
    def printThreadList() {
        (Thread.getAllStackTraces.keySet -- initThreadList) foreach {
            thread => println("- " + thread)
        }
    }

    val initThreadList = Thread.getAllStackTraces.keySet

    //10個の Actor からのメッセージを処理する Actor 作成
    val consumer = actor {
        loop {
            receive {
                case num: int => {
                    printThread(self, num)
                    reply("received=" + num)
                }
                case Exit => {
                    printThread(self, 10)
                    reply("received exit")
                    printThreadList()
                    exit
                }
            }
        }
    }

    //10個の Actor を作成しメッセージを送信
    for (val i <- 1 to 10) actor {
        //メッセージの作成
        val msg = i match {
            case 10 => Exit
            case n => n
        }
        //メッセージを送信し返信を処理
        consumer !? msg match {
            case r: String => printThread(self, r)
        }
        /* 上記は以下と同じ処理内容
        consumer ! msg
        receive {
            case r: String => printThread(self, r)
        }
        */
    }
}
receiveTest.scala 実行例(receive を使った場合)
>scalac receiveTest.scala
>scala receiveTest
Thread[Thread-2,5,main] 1 - scala.actors.Actor$$anon$1@7c6768
Thread[Thread-3,5,main] received=1 - scala.actors.Actor$$anon$1@13c5982
Thread[Thread-11,5,main] 2 - scala.actors.Actor$$anon$1@7c6768
Thread[Thread-11,5,main] 3 - scala.actors.Actor$$anon$1@7c6768
Thread[Thread-11,5,main] 4 - scala.actors.Actor$$anon$1@7c6768
Thread[Thread-11,5,main] 5 - scala.actors.Actor$$anon$1@7c6768
Thread[Thread-11,5,main] 6 - scala.actors.Actor$$anon$1@7c6768
Thread[Thread-11,5,main] 7 - scala.actors.Actor$$anon$1@7c6768
Thread[Thread-11,5,main] 8 - scala.actors.Actor$$anon$1@7c6768
Thread[Thread-11,5,main] 9 - scala.actors.Actor$$anon$1@7c6768
Thread[Thread-11,5,main] 10 - scala.actors.Actor$$anon$1@7c6768
- Thread[Thread-3,5,main]
- Thread[Thread-4,5,main]
- Thread[Thread-10,5,main]
- Thread[Thread-9,5,main]
- Thread[Thread-1,5,main]
- Thread[Thread-2,5,main]
- Thread[Thread-7,5,main]
- Thread[DestroyJavaVM,5,main]
- Thread[Thread-5,5,main]
- Thread[Timer-0,5,main]
- Thread[Thread-6,5,main]
- Thread[Thread-8,5,main]
- Thread[Thread-11,5,main]
Thread[Thread-2,5,main] received=2 - scala.actors.Actor$$anon$1@422ede
Thread[Thread-10,5,main] received exit - scala.actors.Actor$$anon$1@dd20f6
Thread[Thread-9,5,main] received=9 - scala.actors.Actor$$anon$1@a83b8a
Thread[Thread-8,5,main] received=8 - scala.actors.Actor$$anon$1@110b053
Thread[Thread-7,5,main] received=7 - scala.actors.Actor$$anon$1@111a3ac
Thread[Thread-6,5,main] received=6 - scala.actors.Actor$$anon$1@b89838
Thread[Thread-5,5,main] received=5 - scala.actors.Actor$$anon$1@93dcd
Thread[Thread-4,5,main] received=4 - scala.actors.Actor$$anon$1@1d9dc39
Thread[Thread-3,5,main] received=3 - scala.actors.Actor$$anon$1@112f614


次に react を使ったサンプルと実行例。
基本的に、Actor が同じ Thread 上で処理されている。

reactTest.scala(react を使った場合)
import scala.actors.Actor
import scala.actors.Actor._
import scala.actors.Exit
import scala.collection.jcl.Conversions._

//receiveTest の receive メソッド呼び出し箇所を react に変更しただけ
object reactTest extends Application {
    ・・・
    val consumer = actor {
        loop {
            react {
                ・・・
            }
        }
    }

    for (val i <- 1 to 10) actor {
        ・・・
        consumer ! msg
        react {
            case r: String => {
                printThread(self, r)
            }
        }
    }
}
reactTest.scala 実行例(react を使った場合)
>scalac reactTest.scala
>scala reactTest
Thread[Thread-3,5,main] 1 - scala.actors.Actor$$anon$1@1f33675
Thread[Thread-3,5,main] react=1 - scala.actors.Actor$$anon$1@19ee1ac
Thread[Thread-3,5,main] 2 - scala.actors.Actor$$anon$1@1f33675
Thread[Thread-3,5,main] react=2 - scala.actors.Actor$$anon$1@1f1fba0
Thread[Thread-3,5,main] 3 - scala.actors.Actor$$anon$1@1f33675
Thread[Thread-3,5,main] react=3 - scala.actors.Actor$$anon$1@1befab0
Thread[Thread-3,5,main] 4 - scala.actors.Actor$$anon$1@1f33675
Thread[Thread-3,5,main] react=4 - scala.actors.Actor$$anon$1@13c5982
Thread[Thread-3,5,main] 5 - scala.actors.Actor$$anon$1@1f33675
Thread[Thread-3,5,main] react=5 - scala.actors.Actor$$anon$1@1186fab
Thread[Thread-3,5,main] 6 - scala.actors.Actor$$anon$1@1f33675
Thread[Thread-3,5,main] react=6 - scala.actors.Actor$$anon$1@14b7453
Thread[Thread-3,5,main] 7 - scala.actors.Actor$$anon$1@1f33675
Thread[Thread-3,5,main] react=7 - scala.actors.Actor$$anon$1@c21495
Thread[Thread-3,5,main] 8 - scala.actors.Actor$$anon$1@1f33675
Thread[Thread-3,5,main] react=8 - scala.actors.Actor$$anon$1@1d5550d
Thread[Thread-3,5,main] 9 - scala.actors.Actor$$anon$1@1f33675
Thread[Thread-3,5,main] react=9 - scala.actors.Actor$$anon$1@a0dcd9
Thread[Thread-3,5,main] 10 - scala.actors.Actor$$anon$1@1f33675
- Thread[Thread-3,5,main]
- Thread[Thread-2,2,main]
- Thread[Thread-4,2,main]
- Thread[Thread-5,5,main]
- Thread[Timer-0,5,main]
- Thread[Thread-1,5,main]
Thread[Thread-5,5,main] react exit - scala.actors.Actor$$anon$1@723d7c