Netty 4 で WebSocket

以前、Jetty, Grizzly, Netty(3.2.4), EM-WebSocket を使って WebSocket の簡単なサンプルを実装しましたが、今回は Netty 4.0.7 を使って同様の処理を実装してみました。

  • Netty 4.0.7 Final

サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20130901_2/

WebSocket クライアント

WebSocket のクライアントは下記のように以前と基本的に同じです。 以前は Chrome でしか動作確認できませんでしたが、今では Firefox でも問題なく動作します。

index.html (WebSocket クライアント)
<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8" />
    <script>
        var ws = new WebSocket("ws://localhost:8080/sample");

        ws.onopen = function(event) {
            console.log("websocket open");
            stateChange("opened")
        };

        ws.onmessage = function(event) {
            document.getElementById("log").innerHTML += "<li>" + event.data + "</li>";
        };

        ws.onclose = function(event) {
            console.log("websocket close");
            stateChange("closed")
        };

        ws.onerror = function(event) {
            console.log("error");
            stateChange("error")
        };

        function sendMessage() {
            var msg = document.getElementById("message").value;
            ws.send(msg);
        }

        function stateChange(state) {
            document.getElementById("state").innerHTML = state;
        }
    </script>
</head>
<body>
    <input id="message" type="text" />
    <input type="button" value="send" onclick="sendMessage()" />
    <span id="state">closed</span>
    <ul id="log"></ul>
</body>
</html>

Netty 4 による WebSocket サーバー1

以前はハンドシェイク処理を自前で実装する必要がありましたが、Netty 4 では下記のようなパイプラインを構成すれば、割と簡単に WebSocket を処理できるようになっています。

  • (1) HttpServerCodec (HttpRequestDecoder + HttpResponseEncoder でも代替可能)
  • (2) HttpObjectAggregator
  • (3) WebSocketServerProtocolHandler
  • (4) WebSocket を処理する自前のハンドラー
echo_server.groovy
@Grab('io.netty:netty-all:4.0.7.Final')
import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInitializer
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.codec.http.HttpObjectAggregator
import io.netty.handler.codec.http.HttpServerCodec
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler

// (4) の実装
class SampleHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    // WebSocket 処理
    @Override
    void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame frame) {
        println "read: ${frame}"
        // flush が必要なので writeAndFlush を使用
        ctx.channel().writeAndFlush(new TextWebSocketFrame("echo : ${frame.text()}"))
    }
}

class SampleChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    void initChannel(SocketChannel ch) {
        ch.pipeline().addLast(
            // (1)
            new HttpServerCodec(),
            // (2)
            new HttpObjectAggregator(65536),
            // (3)
            new WebSocketServerProtocolHandler("/sample"),
            // (4)
            new SampleHandler()
        )
    }
}

def b = new ServerBootstrap()

def bgroup = new NioEventLoopGroup()
def wgroup = new NioEventLoopGroup()

try {
    b.group(bgroup, wgroup)
        .channel(NioServerSocketChannel)
        .childHandler(new SampleChannelInitializer())

    def ch = b.bind(8080).sync().channel()

    println "start ..."

    ch.closeFuture().sync()

} finally {
    bgroup.shutdownGracefully()
    wgroup.shutdownGracefully()
}
実行例
> groovy echo_server.groovy
start ...

Netty 4 による WebSocket サーバー2

WebSocketServerProtocolHandler の代わりに自前のハンドラーで WebSocketServerHandshakerFactory・WebSocketServerHandshaker を使えば、下記のような細かい制御も可能です。

echo_server2.groovy
@Grab('io.netty:netty-all:4.0.7.Final')
import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInitializer
import io.netty.channel.ChannelInboundHandlerAdapter
import io.netty.channel.socket.SocketChannel
import io.netty.handler.codec.http.HttpObjectAggregator
import io.netty.handler.codec.http.HttpServerCodec
import io.netty.handler.codec.http.FullHttpRequest
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame
import io.netty.handler.codec.http.websocketx.WebSocketFrame
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory

class SampleHandler extends ChannelInboundHandlerAdapter {
    private WebSocketServerHandshaker ws

    @Override
    void channelRead(ChannelHandlerContext ctx, Object msg) {
        println "read: ${msg}"

        if (msg instanceof FullHttpRequest) {
            handleHttp(ctx, msg)
        }
        else if (msg instanceof WebSocketFrame) {
            handleWebSocket(ctx, msg)
        }
    }

    @Override
    void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush()
    }

    private void handleHttp(ChannelHandlerContext ctx, FullHttpRequest req) {
        def factory = new WebSocketServerHandshakerFactory('ws://localhost:8080/sample', null, false)

        ws = factory.newHandshaker(req)

        if (ws == null) {
            WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel())
        }
        else {
            // ハンドシェイク処理
            ws.handshake(ctx.channel(), req)
        }
    }

    // WebSocket 処理
    private void handleWebSocket(ChannelHandlerContext ctx, WebSocketFrame frame) {
        if (frame instanceof CloseWebSocketFrame) {
            ws.close(ctx.channel(), frame.retain())
        }
        else if (!(frame instanceof TextWebSocketFrame)) {
            throw new UnsupportedOperationException("not supported : ${frame.getClass()}")
        }
        else {
            ctx.channel().write(new TextWebSocketFrame("echo : ${frame.text()}"))
        }
    }
}

class SampleChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    void initChannel(SocketChannel ch) {
        ch.pipeline().addLast(
            new HttpServerCodec(),
            new HttpObjectAggregator(65536),
            new SampleHandler()
        )
    }
}

def b = new ServerBootstrap()
・・・
実行例
> groovy echo_server2.groovy
start ...