Scala で WebSocket - Play2 Mini

前回 id:fits:20130114 と同等の WebSocket サーバー処理を Scala 2.10.0 で Play2 Mini 2.1 RC2 を使って実装してみました。
クライアントは id:fits:20130114 で作成したもの(HTML + JavaScript)をそのまま使用する事にします。

サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20130116/

Play2 Mini 2.1 RC2 の場合

id:fits:20130106 で使用した Play2 Mini は Play 2.0 ベースのものでしたが、今回は Scala 2.10.0 をサポートしている Play 2.1 RC2 ベースのものを使いました。

Play2 Mini 2.1 RC2 では設定ファイル(application.conf 等)が必須となっているようですので、とりあえず下記の空ファイルを作成しておきます。 *1

  • src/main/resources/application.conf

sbt 用のビルド定義ファイルは以下のようになります。

build.sbt
scalaVersion := "2.10.0"

resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"

libraryDependencies += "com.typesafe" %% "play-mini" % "2.1-RC2"

mainClass in (Compile, run) := Some("play.core.server.NettyServer")

まず、id:fits:20130106 と同様に Global を用意しておきます。

Global.scala
object Global extends com.typesafe.play.mini.Setup(fits.sample.SampleApp)

次に、本題の WebSocket サーバー処理を実装します。

Play 2.1 では Concurrent.broadcast が用意されていますので、これを利用すれば容易にブロードキャスト(全クライアントへのメッセージ配信)を実現できます。

使い方は、Concurrent.broadcast で取得した Enumerator と Channel のうち、Enumerator を WebSocket.using 等へ渡す処理の結果 *2 として返し、ブロードキャストするメッセージを Channel に push します。

なお、Iteratee.foreach() にはメッセージ受信の度に実行される処理 (ここでは受信した JSON を Channel へ push しています) を渡し、mapDone にクライアント切断時の処理を渡します。

SampleApp.scala
package fits.sample

import com.typesafe.play.mini._
import play.api.mvc._
import play.api.libs.json._
import play.api.libs.iteratee._

object SampleApp extends Application {

    val (enumerator, channel) = Concurrent.broadcast[JsValue]

    def route = Routes({
        case GET(Path("/connect")) => WebSocket.using[JsValue] { req =>
            val in = Iteratee.foreach[JsValue] { json =>
                // 全クライアントへ受信した JSON を送信
                channel.push(json)
            }.mapDone { _ => 
                // クライアント切断時の処理
                println("*** closed")
            }

            (in, enumerator)
        }
    })
}

Concurrent.broadcast のおかげで実装がかなりシンプルになっていると思います。

実行は sbt run で行います。 (下記例ではポート番号 8080 で起動しています)

実行例
> sbt -Dhttp.port=8080 run
・・・
[info] Running play.core.server.NettyServer
Play server process ID is 6932
...Please note, 2.1 will be the last release of play-mini
[info] play - Application started (Prod)
[info] play - Listening for HTTP on /0:0:0:0:0:0:0:0:8080

Play2 Mini 2.0.3 の場合

試しに Play2 Mini 2.0.3 (Scala 2.9.2) で上記と同様の処理を実装してみました。*3

2.0.3 では Concurrent.broadcast が無いので、PushEnumerator を自前で管理する事になります。

下記サンプルでは ScalaSTM を適用した List を使って PushEnumerator を管理してみました。 *4

SampleApp.scala (Play2 Mini 2.0.3 版)
package fits.sample

import com.typesafe.play.mini._
import play.api.mvc._
import play.api.libs.json._
import play.api.libs.iteratee._
import scala.concurrent.stm._

object SampleApp extends Application {

    val wsList = Ref(List[PushEnumerator[JsValue]]())

    def route = Routes({
        case GET(Path("/connect")) => WebSocket.using[JsValue] { req =>
            // PushEnumerator の作成
            val out = Enumerator.imperative[JsValue]()

            val in = Iteratee.foreach[JsValue] { json =>
                // 全クライアントへ受信した JSON を送信
                wsList.single.get.foreach( _.push(json) )
            }.mapDone { _ => 
                // クライアント切断時の処理
                // wsList から切断した PushEnumerator(=out)を削除
                wsList.single.transform {
                    _.filterNot( _ == out )
                }
                println("*** closed")
            }

            // wsList へ PushEnumerator(=out)追加
            wsList.single.transform( _ :+ out )

            (in, out)
        }
    })
}

案の定、2.1 RC2 版に比べるとコード量が増えてしまいました。

なお、2.0.3 では 2.1 RC2 のように application.conf ファイルを用意する必要はありませんでした。

*1:設定ファイルがクラスパス上に無いとエラーが発生し実行に失敗します

*2:using の場合は Iteratee と Enumerator のタプル

*3:ソースは http://github.com/fits/try_samples/tree/master/blog/20130116/play2-mini_2.0 参照

*4:他にも java.util.concurrent.CopyOnWriteArrayList を使う方法や Akka の Agent を使う方法等があります