読者です 読者をやめる 読者になる 読者になる

Scala で WebSocket - Unfiltered

前回(id:fits:20130116)は Play2 Mini で WebSocket サーバー処理を実装しましたが、今回は同様の処理を Unfiltered で実装してみました。

  • Unfiltered 0.6.5 (Scala 2.10.0)

サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20130119/

ScalaSTM 版サンプル

前回の Play2 Mini 2.0.3 版サンプルと同様に Unfltered でも全クライアントへの接続 *1 を管理しておく必要がありますので、今回も ScalaSTM を使ってみました。

sbt 用のビルド定義ファイルは以下のようになります。

build.sbt
scalaVersion := "2.10.0"

resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"

libraryDependencies ++= Seq(
    "net.databinder" %% "unfiltered-filter" % "0.6.5",
    "net.databinder" %% "unfiltered-netty-websockets" % "0.6.5",
    "org.scala-stm" %% "scala-stm" % "0.7"
)

mainClass in (Compile, run) := Some("fits.sample.SampleApp")

WebSocket サーバー処理の実装は下記のようになります。

Unfiltered では Http の handler メソッドに unfiltered.netty.websockets.Plan を渡す事で WebSocket 処理を実装できます。

unfiltered.netty.websockets.Plan は unfiltered.netty.websockets.Planify.apply() に unfiltered.netty.websocket.Intent *2 を渡す事で取得できます。

unfiltered.netty.websocket.Intent の型は "PartialFunction[ RequestBinding, PartialFunction[SocketCallback, Unit] ]" となっています。

SampleApp.scala
package fits.sample

import unfiltered.request.Path
import unfiltered.netty._
import unfiltered.netty.websockets._
import scala.concurrent.stm._

object SampleApp extends Application {
    // WebSocket のリスト
    val wsList = Ref(List[WebSocket]())

    val wsHandle = Planify {
        case Path("/connect") => {
            // 接続開始時の処理
            case Open(socket) => wsList.single.transform( _ :+ socket ) // リストへの追加

            // 切断時の処理
            case Close(socket) =>
                // リストから削除
                wsList.single.transform {
                    _.filterNot( _ == socket )
                }
                println("*** closed")

            // メッセージ受信時の処理
            case Message(socket, Text(txt)) => 
                // 全クライアントへメッセージ送信
                wsList.single.get.foreach( _.send(txt) )

            // エラー発生時の処理
            case Error(socket, err) => println(s"error : ${err}")
        }
    }

    Http.local(8080).handler(wsHandle).start()
}

なお、Planify.apply() の第2引数や onPass() へ PassHandler を渡す事で、WebSocket ハンドラーで処理されなかった場合の HTTP 処理を指定する事も可能なようです。


実行は sbt run で行います。

実行例
> sbt run
・・・
[info] Running fits.sample.SampleApp

Akka Agent 版サンプル

上記サンプルで ScalaSTM の Ref を使っていた箇所をただ単に Akka の Agent で置き換えてみました。

SampleApp2.scala
package fits.sample

import unfiltered.request.Path
import unfiltered.netty._
import unfiltered.netty.websockets._
import akka.actor.ActorSystem
import akka.agent.Agent

object SampleApp2 extends Application {

    implicit val system = ActorSystem()

    // WebSocket のリスト
    val wsList = Agent(List[WebSocket]())

    val wsHandle = Planify {
        case Path("/connect") => {
            case Open(socket) => wsList.send( _ :+ socket ) // リストへの追加

            case Close(socket) =>
                // リストから削除
                wsList.send {
                    _.filterNot( _ == socket )
                }
                println("*** closed")

            case Message(socket, Text(txt)) => 
                // リストを取得してメッセージ送信
                wsList().foreach( _.send(txt) )

            case Error(socket, err) => println(s"error : ${err}")
        }
    }

    scala.sys.ShutdownHookThread {
        // Agent の終了処理
        wsList.close()
    }

    Http.local(8080).handler(wsHandle).start()
}

sbt 用のビルド定義ファイルは以下のようになります。

build.sbt
scalaVersion := "2.10.0"

resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"

libraryDependencies ++= Seq(
    "net.databinder" %% "unfiltered-filter" % "0.6.5",
    "net.databinder" %% "unfiltered-netty-websockets" % "0.6.5",
    "com.typesafe.akka" %% "akka-agent" % "2.1.0"
)

mainClass in (Compile, run) := Some("fits.sample.SampleApp2")

*1:unfiltered.netty.websockets.WebSocket クラスのインスタンス

*2:Unfiltered ソースの netty-websockets/src/main/scala/package.scala にコードあり