Vert.x 2.0 で Scala 言語モジュールを使用

Vert.x 2.0 用の Scala 言語モジュールを使って簡単な HTTP サーバーを実装してみます。

サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20130901_4/

Scala 言語モジュールのビルド

Vert.x 用の Scala 言語モジュールは今のところ Vert.x デフォルト設定の Maven リポジトリからは取得できないようですので、今回はソースからビルドする事にしました。

git でソースを取得し、ソースに含まれている gradlew コマンドを使ってビルドします。

> git clone https://github.com/vert-x/mod-lang-scala.git
・・・
> cd mod-lang-scala
> gradlew install
・・・
:install

BUILD SUCCESSFUL

上記のように gradlew install とする事で、ローカルの Maven リポジトリへ Scala の言語モジュールがインストールされます。(.m2/repository/io/vertx/lang-scala/2.0.0-SNAPSHOT)

Scala による単純な HTTP サーバー

今回は下記のような単純な HTTP サーバー処理を実装してみました。

SampleServer.scala
import org.vertx.scala.core.Future
import org.vertx.scala.core.http.HttpServerRequest
import org.vertx.scala.platform.Verticle

class SampleServer extends Verticle {
    override def start(future: Future[Void]): Unit = {
        start()

        vertx.newHttpServer.requestHandler { req: HttpServerRequest =>
            req.response.end("test data")
        }.listen(8080)
    }
}

今の Scala 言語モジュールは Scala ソースを直接実行できないようなので、scalac 等を使ってコンパイルしておきます。

コンパイルには下記のような JAR を CLASSPATH に指定します。

  • lang-scala-2.0.0-SNAPSHOT.jar
  • vertx-core-2.0.1-final.jar
  • vertx-platform-2.0.1-final.jar
  • netty-all-4.0.7.Final.jar

lang-scala-2.0.0-SNAPSHOT.jar は Scala 言語モジュールのビルド時に build/libs ディレクトリへ生成されており、他の JAR は Vert.x の実行環境に含まれています。

コンパイル例
> scalac -cp ・・・lang-scala-2.0.0-SNAPSHOT.jar;・・・netty-all-4.0.7.Final.jar; SampleServer.scala

実行

実行には、予め langs.properties ファイルへ Scala 言語モジュールの設定を行っておく必要があります。

Vert.x の conf/langs.properties へ追記すれば良さそうな気がしますが、現時点ではこのファイルに追記しても効果は無いようです。

ソースを見たところ CLASSPATH 上の langs.properties を参照するようですので、今回はカレントディレクトリへ下記のような langs.properties ファイルを作成する事にしました。

langs.properties
scala=io.vertx~lang-scala~2.0.0-SNAPSHOT:org.vertx.scala.platform.impl.ScalaVerticleFactory

これで vertx run scala:<クラス名> を実行すると、Maven のローカルリポジトリから Vert.x の sys-mods ディレクトリへ Scala 言語モジュールがインストールされ(初回のみ)、処理が実行されます。

実行例
> vertx run scala:SampleServer
Module io.vertx~lang-scala~2.0.0-SNAPSHOT successfully installed

ちなみに langs.properties へ .=scala と1行追記しておけば(デフォルトを scala に設定)、vertx run 時に scala: を省略できるようになります。

Play2 の Iteratee を使った行単位のファイル処理2 - Enumerator.generateM, Enumerator.fromCallback1

前回 id:fits:20130212 は Enumeratee.grouped() を使って行単位のファイル処理を実装しましたが、今回は Enumerator.generateM() と Enumerator.fromCallback1() をそれぞれ使って同様の処理を実装してみました。

sbt のビルド定義ファイルなどは前回と同様のものを使っています。

サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20130216/

Enumerator.generateM による行分割

Enumerator.generateM は、引数として渡した処理 *1 を繰り返し実行する Enumerator を作成します。

繰り返しは None を返す事で終了するため、下記 (a) ではファイルを 1行読んで Some(<1行分の文字列>) を返し、ファイルの終端に達した場合に None を返すようにしています。
こうする事でファイルの内容を 1行ずつ返す Enumerator を実現できます。

(b) の処理でファイルの内容を全て消費してしまいますが、(c) で BufferedReader をリセットする事で *2、(d) では (a) で作成した enumerator をそのまま使って 1行目から処理できるようになっています。

さらに (e) でも enumerator を利用していますが、enumerator はファイルの終端に達していないため (d) からの続きの行を処理する事になります。

ファイルのクローズ処理 (f) は Iteratee の処理とは別に一番最後で実施するようにしました。

EnumeratorLineSample.scala (Enumerator.generateM を使ったサンプル)
package fits.sample

import play.api.libs.iteratee._

import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration

import java.io.{BufferedReader, FileInputStream, InputStreamReader}
import java.nio.charset.StandardCharsets._

object EnumeratorLineSample extends App {
    import scala.concurrent.ExecutionContext.Implicits.global

    val reader = new BufferedReader(new InputStreamReader(new FileInputStream(args(0)), UTF_8))
    reader.mark(512)

    // (a) ファイルの内容を 1行ずつ返す Enumerator
    val enumerator = Enumerator.generateM( Future {
        reader.readLine match {
            case line: String => Some(line)
            case _            => None // ファイルの終端に達した場合
        }
    })

    // (b) 行毎に先頭へ # を付けて出力する処理の組み立て
    val future = enumerator |>>> Iteratee.foreach { s => 
        println(s"#${s}")
    }

    Await.ready(future, Duration.Inf)

    println("----------")
    // (c) ファイルの内容を先頭から返すようにリセット
    reader.reset

    // (d) 1行目を捨てて 2行目から 2行取り出し各行の先頭へ # を付けて出力する処理の組み立て
    val future2 = enumerator &> Enumeratee.drop(1) &> Enumeratee.take(2) |>>> Iteratee.foreach { s => 
        println(s"#${s}")
    }

    Await.ready(future2, Duration.Inf)

    println("----------")

    // (e) 1行出力。(d) からの続きとなるのでファイルの 4行目が出力される
    val future3 = enumerator &> Enumeratee.take(1) |>>> Iteratee.foreach { s => 
        println(s"#${s}")
    }

    Await.ready(future3, Duration.Inf)

    // (f) クローズ処理
    reader.close
}

実行結果は以下の通りです。

実行結果
> sbt "run sample.txt"
・・・
[info] Running fits.sample.EnumeratorLineSample sample.txt
#Play2のIterateeを使ったファイル処理の
#サンプル
#
#1行毎に処理するサンプルを
#実装してみました。
----------
#サンプル
#
----------
#1行毎に処理するサンプルを

Enumerator.fromCallback1 による行分割

次は、Enumerator.fromCallback1 *3 を使って同様の処理を実装してみます。

generateM に渡していた処理を fromCallback1 の第一引数の処理結果として返すようにすれば同じ処理内容となります。

ただし下記では、fromCallback1 の第二引数(onComplete)に BufferedReader を close する処理 (b) を渡し、Iteratee の処理が完了する度に入力ストリーム (今回はファイル) をクローズするようにしています。

(e) では (d) で使った enumerator2 を使っていますが、(d) でファイルはクローズされており Enumerator は何も生成しないため何も出力されません。

EnumeratorLineSample2.scala (Enumerator.fromCallback1 を使ったサンプル)
・・・
object EnumeratorLineSample2 extends App {
    import scala.concurrent.ExecutionContext.Implicits.global

    // (a) 1行ずつファイルの内容を返す Enumerator を作成
    def fromStreamLine(input: BufferedReader) = {
        Enumerator.fromCallback1(_ => Future {
            input.readLine match {
                case line: String => Some(line)
                case _            => None // ファイルの終端に達した場合
            }
        }, {
            println("*** close")
            // (b) ファイルのクローズ処理
            input.close
        })
    }

    val enumerator = fromStreamLine(new BufferedReader(new InputStreamReader(new FileInputStream(args(0)), UTF_8)))

    // (c) 行毎に先頭へ # を付けて出力する処理の組み立て
    val future = enumerator |>>> Iteratee.foreach { s => 
        println(s"#${s}")
    }

    Await.ready(future, Duration.Inf)

    println("----------")

    val enumerator2 = fromStreamLine(new BufferedReader(new InputStreamReader(new FileInputStream(args(0)), UTF_8)))

    // (d) 1行目を捨てて 2行目から 2行取り出し各行の先頭へ # を付けて出力する処理の組み立て
    val future2 = enumerator2 &> Enumeratee.drop(1) &> Enumeratee.take(2) |>>> Iteratee.foreach { s => 
        println(s"#${s}")
    }

    Await.ready(future2, Duration.Inf)

    println("----------")

    // (e) ファイルはクローズ済みのため何も出力されない
    val future3 = enumerator2 &> Enumeratee.take(1) |>>> Iteratee.foreach { s => 
        println(s"#${s}")
    }

    Await.ready(future3, Duration.Inf)
}

実行結果は以下の通りです。
Iteratee の処理を行う度に "*** close" が出力されている事を確認できます。

実行結果
・・・
[info] Running fits.sample.EnumeratorLineSample2 sample.txt
*** close
#Play2のIterateeを使ったファイル処理の
#サンプル
#
#1行毎に処理するサンプルを
#実装してみました。
----------
*** close
#サンプル
#
----------

*1:Future を返す処理

*2:BufferedReader の mark と reset で実現

*3:Enumerator.fromCallback もあるが、こちらは deprecated となっている

Play2 の Iteratee を使った行単位のファイル処理1 - Enumeratee.grouped() の利用

id:fits:20130116 で使った Play2 の Iteratee を単体利用して行単位のファイル処理を実装してみました。

  • play-iteratees 2.1.0

Iteratee は Enumerator や Enumeratee と組み合わせて使用し、それぞれ以下のような役割を担います。

  • Enumerator : データを生産
  • Enumeratee : Enumerator から受け取ったデータを Iteratee へ渡す
  • Iteratee : データを消費

ちなみに、ファイルの内容を "改行" で分割するような仕組みは今のところ Play2 の API に用意されていないようなので、
1行分のデータを取り出す Enumeratee や Enumerator を自前で用意する事になります。

そこで今回は Enumeratee.grouped() を使って行毎にデータをグルーピングする Enumeratee を作成してみました。
次回は Enumerator.generateM() を使って同様の処理を実装してみるつもりです。


サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20130212/

はじめに

今回使用した sbt のビルド定義ファイルは以下の通りです。

build.sbt
scalaVersion := "2.10.0"

resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"

libraryDependencies += "play" %% "play-iteratees" % "2.1.0"

mainClass in (Compile, run) := Some("fits.sample.EnumerateeLineSample")

なお、今回は下記ファイル(文字コードUTF-8)を使って動作確認する事にします。

sample.txt
Play2のIterateeを使ったファイル処理の
サンプル

1行毎に処理するサンプルを
実装してみました。

Enumeratee.grouped による行分割

Enumeratee.grouped を使った行単位の処理は以下のようになりました。

  • (a) Enumerator.fromFile() で chunkSize のサイズ分だけ読み込んだファイルの内容を Array[Byte] として生成する Enumerator 作成
  • (e) Enumerator・Enumeratee・Iteratee を結合
    • Enumeratee.mapConcat で Enumerator と Enumeratee.grouped を結び付ける Enumeratee 作成
    • Enumeratee.grouped で行毎にグルーピングする Enumeratee 作成
    • Iteratee.foreach で各行の先頭に # を付けて出力する Iteratee 作成

さらに、Enumeratee.grouped には以下のようにして組み立てた takeLine *1 を渡しています。

  • (c) takeWhile で改行が来るまで入力を溜め、Iteratee.getChunks で溜めた分の入力 List[Byte] を line へ設定
  • (d) 改行文字を take(1) で取得し Iteratee.ignore で捨てる
  • new String() で line (改行までの部分)を文字列化

なお、Iteratee はモナドなので、takeLine は for 式を使って Iteratee を組み立てています。

EnumerateeLineSample.scala
package fits.sample

import play.api.libs.iteratee._

import scala.concurrent.Await
import scala.concurrent.duration.Duration

import java.io.File
import java.nio.charset.StandardCharsets._

object EnumerateeLineSample extends App {
    import scala.concurrent.ExecutionContext.Implicits.global

    // (a) ファイルの内容を取得する Enumerator
    val enumerator = Enumerator.fromFile(new File(args(0)))

    // (b) 1行分の String を生成する Iteratee
    val takeLine = for {
        line <- Enumeratee.takeWhile[Byte](_ != '\n'.toByte) &>> Iteratee.getChunks // (c) 改行文字までの入力を line へ設定
        _    <- Enumeratee.take(1) &>> Iteratee.ignore[Byte] // (d) 改行文字を捨てる
    } yield new String(line.toArray, UTF_8)

    // (e) 行毎に先頭へ # を付けて出力する処理の組み立て
    val future = enumerator &> Enumeratee.mapConcat( _.toSeq ) &> Enumeratee.grouped(takeLine) |>>> Iteratee.foreach { s => 
        println(s"#${s}")
    }

    // (f) future の処理完了待ち
    Await.ready(future, Duration.Inf)
}

それでは sbt run で実行してみます。
"run <引数> ・・・" のように run と引数をダブルクォーテーションで囲めば、sbt run で実行時引数を指定する事が可能です。

実行結果
> sbt "run sample.txt"
・・・
[info] Running fits.sample.EnumerateeLineSample sample.txt
#Play2のIterateeを使ったファイル処理の
#サンプル
#
#1行毎に処理するサンプルを
#実装してみました。
#

実はこの sbt run は処理が完了してもプロセスが終了しないので Ctrl + C する必要があります。

プロセスを終了するには play.api.libs.iteratee.internal.defaultExecutionContext 等で生成されている ExecutorService とかを shutdown する必要があると思うのですが、今回は上手く終了する方法を見つけられませんでした。

とりあえず、以下のようにスレッドを interrupt しまくれば強引にプロセスを終了させる事は可能です。

・・・
object EnumerateeLineSample extends App {
    ・・・
    // 強引にプロセスを終了する
    import scala.collection.convert.WrapAsScala._
    Thread.getAllStackTraces().keySet().foreach(_.interrupt)
}


最後に、Enumeratee.grouped(takeLine) と Iteratee.foreach の間に Enumeratee.drop(1) と Enumeratee.take(2) を追加すれば、2行目と 3行目の先頭へ # を付けて出力するような処理に変わります。

EnumerateeLineSample2.scala
・・・
object EnumerateeLineSample2 extends App {
    ・・・
    // 1行目を捨てて 2行目から 2行取り出し各行の先頭へ # を付けて出力する処理の組み立て
    val future = enumerator &> Enumeratee.mapConcat( _.toSeq ) &> Enumeratee.grouped(takeLine) &> 
          Enumeratee.drop(1) &> Enumeratee.take(2) |>>> Iteratee.foreach { s => 

        println(s"#${s}")
    }

    Await.ready(future, Duration.Inf)
}
実行結果
・・・
[info] Running fits.sample.EnumerateeLineSample2 sample.txt
#サンプル
#

*1:型は Iteratee[Byte, String]

Scala で WebSocket - Unfiltered

前回(id:fits:20130116)は Play2 Mini で WebSocket サーバー処理を実装しましたが、今回は同様の処理を Unfiltered で実装してみました。

  • Unfiltered 0.6.5 (Scala 2.10.0)

サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20130119/

ScalaSTM 版サンプル

前回の Play2 Mini 2.0.3 版サンプルと同様に Unfltered でも全クライアントへの接続 *1 を管理しておく必要がありますので、今回も ScalaSTM を使ってみました。

sbt 用のビルド定義ファイルは以下のようになります。

build.sbt
scalaVersion := "2.10.0"

resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"

libraryDependencies ++= Seq(
    "net.databinder" %% "unfiltered-filter" % "0.6.5",
    "net.databinder" %% "unfiltered-netty-websockets" % "0.6.5",
    "org.scala-stm" %% "scala-stm" % "0.7"
)

mainClass in (Compile, run) := Some("fits.sample.SampleApp")

WebSocket サーバー処理の実装は下記のようになります。

Unfiltered では Http の handler メソッドに unfiltered.netty.websockets.Plan を渡す事で WebSocket 処理を実装できます。

unfiltered.netty.websockets.Plan は unfiltered.netty.websockets.Planify.apply() に unfiltered.netty.websocket.Intent *2 を渡す事で取得できます。

unfiltered.netty.websocket.Intent の型は "PartialFunction[ RequestBinding, PartialFunction[SocketCallback, Unit] ]" となっています。

SampleApp.scala
package fits.sample

import unfiltered.request.Path
import unfiltered.netty._
import unfiltered.netty.websockets._
import scala.concurrent.stm._

object SampleApp extends Application {
    // WebSocket のリスト
    val wsList = Ref(List[WebSocket]())

    val wsHandle = Planify {
        case Path("/connect") => {
            // 接続開始時の処理
            case Open(socket) => wsList.single.transform( _ :+ socket ) // リストへの追加

            // 切断時の処理
            case Close(socket) =>
                // リストから削除
                wsList.single.transform {
                    _.filterNot( _ == socket )
                }
                println("*** closed")

            // メッセージ受信時の処理
            case Message(socket, Text(txt)) => 
                // 全クライアントへメッセージ送信
                wsList.single.get.foreach( _.send(txt) )

            // エラー発生時の処理
            case Error(socket, err) => println(s"error : ${err}")
        }
    }

    Http.local(8080).handler(wsHandle).start()
}

なお、Planify.apply() の第2引数や onPass() へ PassHandler を渡す事で、WebSocket ハンドラーで処理されなかった場合の HTTP 処理を指定する事も可能なようです。


実行は sbt run で行います。

実行例
> sbt run
・・・
[info] Running fits.sample.SampleApp

Akka Agent 版サンプル

上記サンプルで ScalaSTM の Ref を使っていた箇所をただ単に Akka の Agent で置き換えてみました。

SampleApp2.scala
package fits.sample

import unfiltered.request.Path
import unfiltered.netty._
import unfiltered.netty.websockets._
import akka.actor.ActorSystem
import akka.agent.Agent

object SampleApp2 extends Application {

    implicit val system = ActorSystem()

    // WebSocket のリスト
    val wsList = Agent(List[WebSocket]())

    val wsHandle = Planify {
        case Path("/connect") => {
            case Open(socket) => wsList.send( _ :+ socket ) // リストへの追加

            case Close(socket) =>
                // リストから削除
                wsList.send {
                    _.filterNot( _ == socket )
                }
                println("*** closed")

            case Message(socket, Text(txt)) => 
                // リストを取得してメッセージ送信
                wsList().foreach( _.send(txt) )

            case Error(socket, err) => println(s"error : ${err}")
        }
    }

    scala.sys.ShutdownHookThread {
        // Agent の終了処理
        wsList.close()
    }

    Http.local(8080).handler(wsHandle).start()
}

sbt 用のビルド定義ファイルは以下のようになります。

build.sbt
scalaVersion := "2.10.0"

resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"

libraryDependencies ++= Seq(
    "net.databinder" %% "unfiltered-filter" % "0.6.5",
    "net.databinder" %% "unfiltered-netty-websockets" % "0.6.5",
    "com.typesafe.akka" %% "akka-agent" % "2.1.0"
)

mainClass in (Compile, run) := Some("fits.sample.SampleApp2")

*1:unfiltered.netty.websockets.WebSocket クラスのインスタンス

*2:Unfiltered ソースの netty-websockets/src/main/scala/package.scala にコードあり

Scala で WebSocket - Play2 Mini

前回 id:fits:20130114 と同等の WebSocket サーバー処理を Scala 2.10.0 で Play2 Mini 2.1 RC2 を使って実装してみました。
クライアントは id:fits:20130114 で作成したもの(HTML + JavaScript)をそのまま使用する事にします。

サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20130116/

Play2 Mini 2.1 RC2 の場合

id:fits:20130106 で使用した Play2 Mini は Play 2.0 ベースのものでしたが、今回は Scala 2.10.0 をサポートしている Play 2.1 RC2 ベースのものを使いました。

Play2 Mini 2.1 RC2 では設定ファイル(application.conf 等)が必須となっているようですので、とりあえず下記の空ファイルを作成しておきます。 *1

  • src/main/resources/application.conf

sbt 用のビルド定義ファイルは以下のようになります。

build.sbt
scalaVersion := "2.10.0"

resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"

libraryDependencies += "com.typesafe" %% "play-mini" % "2.1-RC2"

mainClass in (Compile, run) := Some("play.core.server.NettyServer")

まず、id:fits:20130106 と同様に Global を用意しておきます。

Global.scala
object Global extends com.typesafe.play.mini.Setup(fits.sample.SampleApp)

次に、本題の WebSocket サーバー処理を実装します。

Play 2.1 では Concurrent.broadcast が用意されていますので、これを利用すれば容易にブロードキャスト(全クライアントへのメッセージ配信)を実現できます。

使い方は、Concurrent.broadcast で取得した Enumerator と Channel のうち、Enumerator を WebSocket.using 等へ渡す処理の結果 *2 として返し、ブロードキャストするメッセージを Channel に push します。

なお、Iteratee.foreach() にはメッセージ受信の度に実行される処理 (ここでは受信した JSON を Channel へ push しています) を渡し、mapDone にクライアント切断時の処理を渡します。

SampleApp.scala
package fits.sample

import com.typesafe.play.mini._
import play.api.mvc._
import play.api.libs.json._
import play.api.libs.iteratee._

object SampleApp extends Application {

    val (enumerator, channel) = Concurrent.broadcast[JsValue]

    def route = Routes({
        case GET(Path("/connect")) => WebSocket.using[JsValue] { req =>
            val in = Iteratee.foreach[JsValue] { json =>
                // 全クライアントへ受信した JSON を送信
                channel.push(json)
            }.mapDone { _ => 
                // クライアント切断時の処理
                println("*** closed")
            }

            (in, enumerator)
        }
    })
}

Concurrent.broadcast のおかげで実装がかなりシンプルになっていると思います。

実行は sbt run で行います。 (下記例ではポート番号 8080 で起動しています)

実行例
> sbt -Dhttp.port=8080 run
・・・
[info] Running play.core.server.NettyServer
Play server process ID is 6932
...Please note, 2.1 will be the last release of play-mini
[info] play - Application started (Prod)
[info] play - Listening for HTTP on /0:0:0:0:0:0:0:0:8080

Play2 Mini 2.0.3 の場合

試しに Play2 Mini 2.0.3 (Scala 2.9.2) で上記と同様の処理を実装してみました。*3

2.0.3 では Concurrent.broadcast が無いので、PushEnumerator を自前で管理する事になります。

下記サンプルでは ScalaSTM を適用した List を使って PushEnumerator を管理してみました。 *4

SampleApp.scala (Play2 Mini 2.0.3 版)
package fits.sample

import com.typesafe.play.mini._
import play.api.mvc._
import play.api.libs.json._
import play.api.libs.iteratee._
import scala.concurrent.stm._

object SampleApp extends Application {

    val wsList = Ref(List[PushEnumerator[JsValue]]())

    def route = Routes({
        case GET(Path("/connect")) => WebSocket.using[JsValue] { req =>
            // PushEnumerator の作成
            val out = Enumerator.imperative[JsValue]()

            val in = Iteratee.foreach[JsValue] { json =>
                // 全クライアントへ受信した JSON を送信
                wsList.single.get.foreach( _.push(json) )
            }.mapDone { _ => 
                // クライアント切断時の処理
                // wsList から切断した PushEnumerator(=out)を削除
                wsList.single.transform {
                    _.filterNot( _ == out )
                }
                println("*** closed")
            }

            // wsList へ PushEnumerator(=out)追加
            wsList.single.transform( _ :+ out )

            (in, out)
        }
    })
}

案の定、2.1 RC2 版に比べるとコード量が増えてしまいました。

なお、2.0.3 では 2.1 RC2 のように application.conf ファイルを用意する必要はありませんでした。

*1:設定ファイルがクラスパス上に無いとエラーが発生し実行に失敗します

*2:using の場合は Iteratee と Enumerator のタプル

*3:ソースは http://github.com/fits/try_samples/tree/master/blog/20130116/play2-mini_2.0 参照

*4:他にも java.util.concurrent.CopyOnWriteArrayList を使う方法や Akka の Agent を使う方法等があります

軽量 Web フレームワークで REST API を実装 - Vert.x, Gretty, Play2 Mini, Socko, Restify

個人的に REST API の実装では JAX-RSJava*1SinatraRuby) あたりを使っていますが、今回は選択肢を増やす目的で下記のようなフレームワークを試してみました。

ちなみに、今回試した Java 系のフレームワーク(Restify 以外)は内部的に Netty を使っています。

サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20130106/

はじめに

今回は以下のような単純な REST API を実装する事にします。

  • /user/ で JSON 文字列 を GET
  • /user で JSON 文字列 を POST

動作確認は、以下の Ruby スクリプトで行う事にします。

client.rb
#coding:utf-8
require 'net/http'
require 'json/pure'

Net::HTTP.start('localhost', 8080) { |http|
    # GET 処理
    res = http.get('/user/1')
    puts "#{res}, #{res.code}, #{res.content_type}, #{res.body}"

    data = {
        'name' => 'test',
        'note' => 'サンプル'
    }
    # POST 処理
    res = http.post('/user', JSON.generate(data), {'Content-Type' => 'application/json'})
    puts "#{res}, #{res.code}, #{res.body}"
}

Vert.x

  • Vert.x 1.3.0

id:fits:20120513 や id:fits:20120708 で扱った Vert.x です。

Vert.x は JVM 用の Node.js ライクなサーバーフレームワークで、JavaJavaScript・CoffeeScript・Groovy・RubyPython と多様な言語をサポートしています。

RouteMatcher を使えば Sinatra のように HTTP Method と URL パターンの組み合わせで処理を実装できます。

下記サンプルは Groovy で実装しました。

vert.x/server.groovy
import org.vertx.groovy.core.http.RouteMatcher
import org.vertx.java.core.json.impl.Json

def rm = new RouteMatcher()

rm.get '/user/:id', { req ->
    def res = req.response

    res.putHeader('Content-Type', 'application/json')
    res.end Json.encode([
        id: req.params['id'],
        name: 'vert.x sample'
    ])
}

rm.post '/user', { req ->
    req.bodyHandler {
        // JSON を Map へデコード
        def data = Json.decodeValue(it.toString(), Map)
        println data

        req.response.end()
    }
}

vertx.createHttpServer().requestHandler(rm.asClosure()).listen 8080

println "server started ..."
実行と動作確認結果(サーバー側)
> vertx run server.groovy
server started ...
[name:test, note:サンプル]
動作確認結果(クライアント側)
> jruby client.rb
#<Net::HTTPOK:0x889b125>, 200, application/json, {"id":"1","name":"vert.x sample"}
#<Net::HTTPOK:0x7ff843da>, 200,

Gretty

  • Gretty 0.4.302

Gretty は Netty をベースにしたサーバーフレームワークJava・Groovy・Scala をサポートしています。

下記サンプルは Groovy++ で実装していますが、Groovy++ は今のところ Groovy 2.0 をサポートしていないようで、Groovy 1.8 で実行する必要がありました。

gretty/server.groovy
/*
 *  Groovy 1.8 で実行する必要あり
 *  Groovy 2.0 では ExceptionInitilizerError が発生
 */
@GrabResolver(name = 'gretty', root = 'http://groovypp.artifactoryonline.com/groovypp/libs-releases-local/')
@Grab('org.mbte.groovypp:gretty:0.4.302')
import static java.nio.charset.StandardCharsets.*

import static org.mbte.gretty.JacksonCategory.*
import org.mbte.gretty.httpserver.GrettyServer

GrettyServer server = []

server.groovy = [
    localAddress: new InetSocketAddress('localhost', 8080),
    '/user/:id': {
        get {
            response.json = [
                id: request.parameters['id'],
                name: 'gretty sample'
            ]
        }
    },
    '/user': {
        post {
            /*
             * request.contentText を使うとプラットフォームの
             * デフォルトエンコードが使われるようなので
             * 明示的に UTF-8 で処理
             */
            def data = fromJson(Map, request.content.toString(UTF_8))
            println data

            response.json = ''
        }
    }
]

server.start()
実行と動作確認結果(サーバー側)
> groovy server.groovy
1 06, 2013 10:25:00 午前 org.mbte.gretty.AbstractServer
情報: Started server on localhost/127.0.0.1:8080
[name:test, note:サンプル]
動作確認結果(クライアント側)
> jruby client.rb
#<Net::HTTPOK:0x12da7d5f>, 200, application/json, {"id":"1","name":"gretty sample"}
#<Net::HTTPOK:0x889b125>, 200,

Play2 Mini

  • Play2 Mini 2.0.3

Play2 Mini は Play2 をベースにした簡易フレームワークで、JavaScala をサポートしています。

Scala で URL パスを正規表現マッチさせるには Through を使います。 *2
Through は引数の種類によってブロックに渡る引数の内容(下記サンプルの groups: List[String])が異なります。

Throughの引数 ブロックの引数 Throughの引数例 URL例 ブロックの引数例(List[String])
正規表現 正規表現のグループ "/user/(.*)".r /user/a/b ["a/b"]
文字列 指定の文字列以降を "/" でスプリットしたもの "/user" /user/a/b ["a", "b"]

なお、Play2 Mini では以下のようにして実装します。

  • (1) com.typesafe.play.mini.Application トレイトを extends して route を実装
  • (2) (1) を指定した com.typesafe.play.mini.Setup を extends したグローバルパッケージの Global を作成
  • (3) play.core.server.NettyServer で実行
play-mini/Server.scala (1)
package fits.sample

import com.typesafe.play.mini._
import play.api.mvc._
import play.api.mvc.Results._
import play.api.libs.json._

object Server extends Application {
    def route = Routes(
        Through("/user/([^/]*)".r) { groups: List[String] =>
            Action {
                val id :: Nil = groups

                Ok(Json.toJson {
                    Map(
                        "id" -> id,
                        "name" -> "play-mini sample"
                    )
                })
            }
        },
        {
            case POST(Path("/user")) => Action { req =>
                val data = req.body.asJson
                data.foreach(println)

                Ok("")
            }
        }
    )
}
play-mini/Global.scala (2)
object Global extends com.typesafe.play.mini.Setup(fits.sample.Server)
play-mini/build.sbt (3)
scalaVersion := "2.9.2"

resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"

libraryDependencies += "com.typesafe" %% "play-mini" % "2.0.3"

mainClass in (Compile, run) := Some("play.core.server.NettyServer")

デフォルトのポート番号 9000 ではなく 8080 で実行するため、sbt run 時に -Dhttp.port=8080 を指定します。

実行と動作確認結果(サーバー側)
> sbt -Dhttp.port=8080 run
・・・
[info] play - Listening for HTTP on port 8080...
{"name":"test","note":"サンプル"}
動作確認結果(クライアント側)
> jruby client.rb
#<Net::HTTPOK:0x7ff843da>, 200, application/json, {"id":"1","name":"play-mini sample"}
#<Net::HTTPOK:0x1d7ae341>, 200,

Socko

  • Socko 0.2.3

Socko は Netty と Akka をベースとした Web サーバーフレームワークです。
Akka の Actor として処理を実装するので、多少コード量が多くなります。

socko/Server.scala
package fits.sample

import scala.util.parsing.json.{JSON, JSONObject}

import org.mashupbots.socko.events.HttpRequestEvent
import org.mashupbots.socko.routes.{Routes, GET, POST}
import org.mashupbots.socko.webserver.{WebServer, WebServerConfig}

import akka.actor.{Actor, ActorSystem, Props}

object Server extends App {
    class UserGetHandler extends Actor {
        def receive = {
            case req: HttpRequestEvent =>
                val path = req.request.endPoint.path
                val id = path.replace("/user/", "").split("/").head

                req.response.write(
                    JSONObject(
                        Map(
                            "id" -> id,
                            "name" -> "socko sample"
                        )
                    ).toString(), 
                    "application/json"
                )
                context.stop(self)
        }
    }

    class UserPostHandler extends Actor {
        def receive = {
            case req: HttpRequestEvent =>
                val content = req.request.content.toString
                val data = JSON.parseFull(content)

                data.foreach(println)

                req.response.write("")
                context.stop(self)
        }
    }

    val actorSystem = ActorSystem("SampleActorSystem")

    val routes = Routes({
        case GET(req) => actorSystem.actorOf(Props[UserGetHandler]) ! req

        case POST(req) if req.endPoint.path == "/user" =>
            actorSystem.actorOf(Props[UserPostHandler]) ! req
    })

    val server = new WebServer(WebServerConfig(port = 8080), routes, actorSystem)
    server.start()

    Runtime.getRuntime.addShutdownHook(new Thread() {
        override def run { server.stop() }
    })
}
socko/build.sbt
scalaVersion := "2.9.2"

resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"

libraryDependencies += "org.mashupbots.socko" %% "socko-webserver" % "0.2.3"

mainClass in (Compile, run) := Some("fits.sample.Server")
実行と動作確認結果(サーバー側)
> sbt run
・・・
[info] Running fits.sample.Server
11:14:00.300 [run-main] INFO  o.m.socko.webserver.WebServer - Socko server 'WebServer' started on localhost:8080
11:14:21.032 [New I/O  worker #1] DEBUG o.m.socko.webserver.RequestHandler - HTTP EndPoint(GET,localhost:8080,/user/1) CHANNEL=-384208271
11:14:21.098 [New I/O  worker #1] DEBUG o.m.socko.webserver.RequestHandler - HTTP EndPoint(POST,localhost:8080,/user) CHANNEL=-384208271
Map(name -> test, note -> サンプル)
動作確認結果(クライアント側)
> jruby client.rb
#<Net::HTTPOK:0x7ff843da>, 200, application/json, {"id" : "1", "name" : "socko sample"}
#<Net::HTTPOK:0x6eddcf85>, 200,

Restify

  • Restify 1.4.2

Restify は Express によく似た Node.js 用のフレームワークで、REST API の実装に特化しています。

最新バージョンは Restify 2.0.4 でしたが、Windows OS 上の npm install に失敗するので、今回は古いバージョンを使っています。

app.coffee
restify = require 'restify'

server = restify.createServer()

server.use restify.bodyParser()

server.get '/user/:id', (req, res, next) ->
    res.json
        id: req.params.id
        name: 'restify sample'

    next()

server.post '/user', (req, res, next) ->
    data = JSON.parse req.body
    console.log data

    res.json null
    next()

server.listen 8080, -> console.log "server started ..."
実行と動作確認結果(サーバー側)
> coffee app.coffee
server started ...
{ name: 'test', note: 'サンプル' }
動作確認結果(クライアント側)
> jruby client.rb
#<Net::HTTPOK:0x14980563>, 200, application/json, {"id":"1","name":"restify sample"}
#<Net::HTTPOK:0x42eded9>, 200,

*1:Jersey, RESTEasy 等

*2:Java で実装する場合は JAX-RS と同様にアノテーション(@URL)で URL のパスを指定するようです

Scalaz で Codensity モナド

Scalaz の Codensity を試してみました。

Codensity モナドは継続モナドと基本的に同じですが、処理の型が以下のように異なっています。

継続モナドの場合 Codensityモナドの場合
(A => R) => R (A => F[B]) => F[B]

つまり、Codensity は何らかのコンテナ(List・Option 等)で包んだ値(上記の F[B] に該当)を返します。

処理結果をモナド化する用途等に使えそうですが、効果的なサンプルを思いつかなかったので、
とりあえずは id:fits:20121125 の継続モナドのサンプルと同等のものを実装してみました。


サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20121222/


今回使用した sbt 用のビルドファイルは以下の通りです。

build.sbt
scalaVersion := "2.10.0-RC5"

libraryDependencies += "org.scalaz" % "scalaz-core" % "7.0.0-M6" cross CrossVersion.full

Codensity の単純なサンプル

まずは、Codensity のメソッドをいくつか試してみます。

Codensity はトレイトとコンパニオンオブジェクトからなるシンプルな構成で、id:fits:20121125 で作成した継続モナドの runCont に該当するのが apply メソッドです。

また、以下のようなメソッドが Codensity コンパニオンオブジェクトに用意されています。

メソッド名 処理内容
pureCodensity 普通の値から Codensity を作成
rep 値を格納したコンテナから Codensity を作成

下記サンプルの (1) 〜 (3) では apply の結果として Function0[Unit] ・ List[Int] ・ Option[Int] をそれぞれ返すようにしてみました。 println(x)" の部分が Function0[Unit] に該当">*1

以降のサンプルでは Codensity の apply を省略せずに使うようにしています。

CodensitySample.scala
package fits.sample

import scalaz._
import Scalaz._

object CodensitySample extends App {
    // (1) Codensity[Function0, Int] で apply の結果は Function0[Unit]
    Codensity.pureCodensity(1).apply { (x) => () => println(x) }()

    // (2) Codensity[List, Int] で apply の結果は List[Int]
    Codensity.pureCodensity(2).apply { (x) => List(x) } |> println

    // (3) Codensity[Option, Int] で apply の結果は Option[Int]
    Codensity.pureCodensity(3).apply { Option(_) } |> println

    // (4) rep を使って List[Int] から Codensity[List, Int] を作成
    Codensity.rep(List(1, 2)).apply { (x) => List(x, x * 10, x * 100) } |> println

    /* (5) バインド >>= を使うには improve を使う
     *   'Codensity.pureCodensity(5) >>= ・・・' とするとコンパイルエラー
     */
    (Codensity.pureCodensity(5).improve >>= { (x) => Codensity.pureCodensity[Option, Int](x + 3) }) apply { (x) => Option(x * 10) } foreach(println)
}

なお、pureCodensity を使う際にコンテナの型 (型パラメータ F) が自明でなければ (5) の "Codensity.pureCodensity[Option, Int](x + 3)" のように型を明示します。

実行結果
> sbt console
・・・
scala> fits.sample.CodensitySample.main(null)
1
List(2)
Some(3)
List(1, 10, 100, 2, 20, 200)
80

flatMap の処理

前回ののバインド処理サンプルと同様のものを Codensity で実装してみました。
ここではバインド >>= の代わりに flatMap で処理を繋げ、apply には値を Option に格納する関数を渡しています。

CodensitySample2.scala
・・・
object CodensitySample2 extends App {

    def cont[F[+_]](a: Int) = Codensity.pureCodensity[F, Int](a)

    def calc1[F[+_]](x: Int) = cont[F](x + 3)

    def calc2[F[+_]](x: Int) = cont[F](x * 10)

    def calc3[F[+_]](x: Int) = cont[F](x + 4)

    def calcAll[F[+_]](x: Int) = cont[F](x).flatMap(calc1).flatMap(calc2).flatMap(calc3)

    // a. 2 + 3 = 5
    calc1(2).apply { Option(_) } foreach(println)

    // b. ((2 + 3) * 10) + 4 = 54
    calcAll(2).apply { Option(_) } foreach(println) 

    // c. 54 - 9 = 45
    calcAll(2).apply { (x) => Option(x - 9) } foreach(println)
}
実行結果
scala> fits.sample.CodensitySample2.main(null)
5
54
45

callCC の実装

実用性はともかく、次は callCC を Codensity で実装してみました。

継続モナドにおける callCC の肝は以下の処理ですが。

  • 入れ子になった継続モナドにおいて、自分に渡された継続(関数)を無視して親の継続を呼び出す

Scalaz の Codensity では apply メソッドを実装しなければならない関係上、本来無視するはずの関数の戻り値の型に依存してしまい工夫が必要となります。

Codensity トレイトの apply メソッド
def apply[B](f: A => F[B]): F[B]

例えば、以下のように実装すると (1) の型パラメータ C と (2) の型パラメータ C は別物なのでコンパイル時に type mismatch エラーが発生します。

コンパイルエラー(type mismatch)が発生する callCC の実装例
def callCC[F[+_], A, B](f: (A => Codensity[F, B]) => Codensity[F, A]): Codensity[F, A] = {
    new Codensity[F, A] { 
        // (1)
        def apply[C](k: A => F[C]) = {
            f { a: A =>
                new Codensity[F, B] {
                    // (2) ここでの C は (1) の C とは別の型
                    override def apply[C](f: B => F[C]) = {
                        // この結果の型は (1) の C で
                        // (2) の C とは型が異なり type mismatch となる
                        k(a)
                    }
                }
            }.apply(k)
        }
    }
}

また、(2) で override def apply(f: B => F[C]) のように型パラメータを省略する事もできません。 *2


試行錯誤してみましたが、この問題に対する良い解決策を思いつかなかったので、とりあえず下記 (1) のように asInstanceOf でキャストして回避しました。

CodensityFunc.scala (callCC の実装)
・・・
object CodensityFunc {
    def callCC[F[+_], A, B](f: (A => Codensity[F, B]) => Codensity[F, A]): Codensity[F, A] = {
        new Codensity[F, A] { 
            def apply[C](k: A => F[C]) = {
                f { a: A =>
                    new Codensity[F, B] {
                        override def apply[D](f: B => F[D]) = {
                            // (1)
                            k(a).asInstanceOf[F[D]]
                        }
                    }
                }.apply(k)
            }
        }
    }
}

なお、以前の Codensity トレイトには sealed が付いていたので、このように apply メソッドを自前で実装する事はできなかったのですが、最近のバージョンでは sealed が外れて実装できるようになっています。

callCC の処理1

それでは、callCC を使った単純なサンプルを実装してみます。

CallCCSample.scala
・・・
object CallCCSample extends App {

    def sample[F[+_]](n: Int): Codensity[F, Int] = CodensityFunc.callCC { cc1: (Int => Codensity[F, Int]) =>
        if (n % 2 == 1) {
            cc1(n) // (1)
        }
        else {
            Codensity.pureCodensity(n * 10) // (2)
        }
    }

    sample(1).apply { Option(_) } foreach(println)
    sample(2).apply { Option(_) } foreach(println)
    sample(3).apply { Option(_) } foreach(println)
    sample(4).apply { Option(_) } foreach(println)
}

sample の処理内容は以下のようになっており、apply には値を Option に格納する関数を渡しています。

  • 引数 n が奇数なら n の値を適用する Codensity を返す (1)
  • 引数 n が偶数なら n * 10 の値を適用する Codensity を返す (2)

実行結果は以下の通りです。
奇数ならそのままの値、偶数なら 10 倍した値が出力されます。

実行結果
scala> fits.sample.CallCCSample.main(null)
1
20
3
40

callCC の処理2

最後に callCC をネストさせたサンプルです。

CallCCSample2.scala
・・・
object CallCCSample2 extends App {

    def sample[F[+_]](n: Int): Codensity[F, Int] = CodensityFunc.callCC { cc1: (Int => Codensity[F, Int]) =>
        if (n % 2 == 1) {
            cc1(n) // (1)
        }
        else {
            for {
                x <- CodensityFunc.callCC { cc2: (Int => Codensity[F, Int]) =>
                    n match {
                        case x if (x < 4) => cc2(n * 1000) // (2)
                        case 4 => cc1(n * 100) // (3)
                        case _ => Codensity.pureCodensity[F, Int](n * 10) // (4)
                    }
                }
            } yield (x + 1) // (5)
        }
    }

    sample(1).apply { Option(_) } foreach(println) // (1)
    sample(2).apply { Option(_) } foreach(println) // (2) (5)
    sample(3).apply { Option(_) } foreach(println) // (1)
    sample(4).apply { Option(_) } foreach(println) // (3)
    sample(5).apply { Option(_) } foreach(println) // (1)
    sample(6).apply { Option(_) } foreach(println) // (4) (5)
}

sample の処理内容は以下のようになります。

  • 引数 n が奇数なら n の値を適用する Codensity を返す (1)
  • 引数 n が偶数の場合
    • 4 より小さいと 1000 倍した値に +1 した値を適用する Codensity を返す (2) (5)
    • 4 なら 100 倍した値を適用する Codensity を返す (3)
    • それ以外は 10 倍した値に +1 した値を適用する Codensity を返す (4) (5)

(3) のように 2つ目の callCC 内で cc1 を呼び出すと (5) は実行されず、(2) のように cc2 を呼び出した場合は (5) が適用される事になります。

実行結果
scala> fits.sample.CallCCSample2.main(null)
1
2001
3
400
5
61

*1:"() => println(x)" の部分が Function0[Unit] に該当

*2:method apply overrides nothing エラーとなる