twemproxy + Redis 環境を Docker で構築

twemproxy(別名 nutcracker) + Redis の環境を Docker で構築してみます。 (twemproxy は memcached・Redis 用の軽量なプロキシです)

使用した設定ファイル等は http://github.com/fits/try_samples/tree/master/blog/20151124/

Redis の Docker イメージを取得

Redis の Docker イメージは docker pull で取得する事にします。

$ docker pull redis

twemproxy の Docker イメージを構築

twemproxy の Docker イメージは、下記 Dockerfile を使って centos のイメージをベースに twemproxy のソースをビルドして作成する事にします。

Dockerfile
FROM centos

RUN yum -y update
RUN yum -y install make automake libtool git

RUN git clone https://github.com/twitter/twemproxy.git
RUN cd twemproxy && autoreconf -fvi && ./configure && make && make install
RUN rm -fr twemproxy

RUN yum clean all

上記 Dockerfile で docker build を実行すれば Docker イメージを作成できます。

Docker イメージの作成
$ docker build --no-cache -t sample/twemproxy:0.1 .

・・・
Successfully built ・・・

twemproxy のビルドで warning は出ましたが、Docker イメージの作成に成功しました。

twemproxy + Redis × 2 の実行

twemproxy と Redis 2台を個別の Docker コンテナで実行してみます。

twemproxy の設定ファイルで接続する Redis サーバーを <ホスト名 or IPアドレス>:<ポート番号>:<重み> で指定する必要があり、これが課題となります。

twemproxy 設定ファイル例
sample:
  ・・・
  redis: true
  servers:
   - 127.0.0.1:6380:1
   - 127.0.0.1:6381:1
   ・・・

今回は、起動スクリプトを使って twemproxy と Redis の Docker コンテナをまとめて起動するようにしてみました。

該当コンテナを docker start (コンテナを再起動) してみて、失敗した場合は docker run を行うようにしています。 (docker start し易いように --name=<コンテナ名> で名前を付けています)

また、コンテナ間の接続を容易にするため、docker run 時に -h <ホスト名> でホスト名も付けました。 (コンテナ名をそのままホスト名に使っています)

/vagrant/tmp/start_twemproxy (twemproxy + Redis コンテナの起動スクリプト
#!/bin/sh

INDENT="    "
CONF_DIR=/vagrant/tmp/conf
CONTAINER_CONF_DIR=/conf
TWEMPROXY_IMAGE=sample/twemproxy:0.1

CONTAINER_REDIS_LIST="redis1 redis2"
CONTAINER_TWEMPROXY=twemproxy1

REDIS_SERVERS=""

# Redis のコンテナを個々に起動
for s in $CONTAINER_REDIS_LIST
do
  if test -z `docker start $s`; then
    docker run --name=$s -h $s -d redis
  else
    echo start $s
  fi

  REDIS_SERVERS="$REDIS_SERVERS$INDENT- $s:6379:1\n"
done

# twemproxy の設定ファイルを生成
sed -e "s/#{SERVERS}/$REDIS_SERVERS/g" `dirname $0`/nutcracker.yml.tpl > $CONF_DIR/nutcracker.yml

# twemproxy のコンテナ起動
if test -z `docker start $CONTAINER_TWEMPROXY`; then
  docker run --name=$CONTAINER_TWEMPROXY -h $CONTAINER_TWEMPROXY -d -p 6379:6379 -v $CONF_DIR:$CONTAINER_CONF_DIR $TWEMPROXY_IMAGE nutcracker -c $CONTAINER_CONF_DIR/nutcracker.yml
else
  echo start $CONTAINER_TWEMPROXY
fi

twemproxy は nutcracker -c <設定ファイル> で起動しています。

また、twemproxy 設定ファイル(nutcracker.yml)は Redis サーバーの構成に合わせて下記テンプレートの #{SERVERS}sed で置き換えて生成するようにしています。

/vagrant/tmp/nutcracker.yml.tpl (twemproxy 設定ファイルのテンプレート)
sample:
  listen: 0.0.0.0:6379
  hash: fnv1a_64
  distribution: ketama
  timeout: 500
  redis: true
  servers:
#{SERVERS}

実行

それでは実行してみます。
初回起動時は docker start に失敗し docker run を実施する事になります。

twemproxy1, redis1, redis2 コンテナの実行例
$ /vagrant/tmp/start_twemproxy

Error response from daemon: no such id: redis1
Error: failed to start containers: [redis1]
bdc12db5・・・
Error response from daemon: no such id: redis2
Error: failed to start containers: [redis2]
fd4546fc・・・
Error response from daemon: no such id: twemproxy1
Error: failed to start containers: [twemproxy1]
87bb567e・・・

redis-cli で twemproxy1 へ接続する Docker コンテナを起動し (Redis の Docker イメージを使っています) 、動作確認してみます。

動作確認
$ docker run --rm -it redis redis-cli -h twemproxy1

twemproxy1:6379> set a 123
OK
twemproxy1:6379> get a
"123"

twemproxy は正常に動作しているようです。

redis1・2 へ直接接続するコンテナをそれぞれ実行し確認すると、値は redis2 の方に設定されていました。

$ docker run --rm -it redis redis-cli -h redis1
redis1:6379> keys *
(empty list or set)
redis1:6379> exit

$ docker run --rm -it redis redis-cli -h redis2
redis2:6379> keys *
1) "a"
redis2:6379> exit

備考 - Vagrant 利用時

Vagrant で起動したゲスト OS 上で Docker を実行している場合、Vagrantfile へ以下のような設定を追加すれば provision で各コンテナを起動しホスト OS から twemproxy へ接続できるようになります。

Vagrantfile
・・・
Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
    ・・・
    # twemproxy 用のポートフォワード設定
    config.vm.network "forwarded_port", guest: 6379, host: 6379

    # twemproxy + redis の起動スクリプト実行
    config.vm.provision :shell, :inline => "/vagrant/tmp/start_twemproxy"
end
provision 実施例
> vagrant up --provision

・・・
==> default: Running provisioner: shell...
    default: Running: inline script
==> default: start redis1
==> default: start redis2
==> default: start twemproxy1
ホスト OS からの接続例
> redis-cli
127.0.0.1:6379> get a
"123"

リストをN個に分割 - Groovy, Java, Python

リストをなるべく均等に N 分割する処理を Groovy・JavaPython で実装してみました。

今回は、[0, 1, 2, 3, 4, 5, 6, 7] を 3分割した結果が [[0, 1, 2], [3, 4, 5], [6, 7]] となるような処理を想定しています。 (余り分を先頭から順に1つずつ分配)

ソースは http://github.com/fits/try_samples/tree/master/blog/20151109/

Groovy の場合

  • Groovy 2.4.5

畳み込みを使って実装してみました。

  • (1) サブリストとして取り出す範囲を算出
  • (2) 取り出したサブリストを結果へ追加
divide_list.groovy
def divideList = { xs, n ->
    int q = xs.size() / n
    int m = xs.size() % n

    (0..<n).inject([]) { acc, i ->
        // (1) サブリスト化する範囲の開始・終了位置を算出
        def fr = acc*.size().sum(0)
        def to = fr + q + ((i < m)? 1: 0)

        // (2) サブリストを取り出し、結果へ追加
        acc << xs[fr..<to]
    }
}

println divideList(0..<8, 3)
println divideList(0..<7, 3)
println divideList(0..<6, 3)
println divideList(0..<6, 6)
実行結果
> groovy divide_list.groovy

[[0, 1, 2], [3, 4, 5], [6, 7]]
[[0, 1, 2], [3, 4], [5, 6]]
[[0, 1], [2, 3], [4, 5]]
[[0], [1], [2], [3], [4], [5]]

なお、上記の実装内容では divideList(0..<3, 5) のように要素数よりも分割数を多くすると [[0], [1], [2], [], []] のように不足分が空リストとなります。

場合によっては、以下のように小さい数に合わせた方が実用的かもしれません。

def divideList = { xs, n ->
    ・・・
    nn = Math.min(n, xs.size())

    (0..<nn).inject([]) { acc, i ->
        ・・・
    }
}

Java 8 の場合

Groovy 版と同様に実装しました。 Streamcollect メソッドを使っています。

DivideList.java
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class DivideList {
    public static void main(String... args) {
        System.out.println(divideList(range(0, 8), 3));
        System.out.println(divideList(range(0, 7), 3));
        System.out.println(divideList(range(0, 6), 3));
        System.out.println(divideList(range(0, 6), 6));
    }

    private static <T> List<List<T>> divideList(List<T> xs, int n) {
        int q = xs.size() / n;
        int m = xs.size() % n;

        return IntStream.range(0, n).collect(
                ArrayList::new,
                (acc, i) -> {
                    int fr = acc.stream().mapToInt(List::size).sum();
                    int to = fr + q + ((i < m)? 1: 0);

                    acc.add(xs.subList(fr, to));
                },
                ArrayList::addAll
        );
    }

    private static List<Integer> range(int start, int end) {
        return IntStream.range(start, end).boxed().collect(Collectors.toList());
    }
}
実行結果
> java DivideList

[[0, 1, 2], [3, 4, 5], [6, 7]]
[[0, 1, 2], [3, 4], [5, 6]]
[[0, 1], [2, 3], [4, 5]]
[[0], [1], [2], [3], [4], [5]]

Python の場合

Python も同様に実装しました。

変数 fr に値を保持させるため、ラムダを入れ子にして引数のデフォルト値を利用しています。

また、以下のように / で割ると結果が実数になって都合が悪いため、// を使っています。

結果
5 / 2 2.5
5 // 2 2
divide_list.py
from functools import reduce

def divide_list(xs, n):
    q = len(xs) // n
    m = len(xs) % n

    return reduce(
        lambda acc, i:
            (lambda fr = sum([ len(x) for x in acc ]):
                acc + [ xs[fr:(fr + q + (1 if i < m else 0))] ]
            )()
        ,
        range(n),
        []
    )

range_list = lambda n: list(range(n))

print(divide_list(range_list(8), 3))
print(divide_list(range_list(7), 3))
print(divide_list(range_list(6), 3))
print(divide_list(range_list(6), 6))
実行結果
> python divide_list.py

[[0, 1, 2], [3, 4, 5], [6, 7]]
[[0, 1, 2], [3, 4], [5, 6]]
[[0, 1], [2, 3], [4, 5]]
[[0], [1], [2], [3], [4], [5]]

Python の機械学習環境を Docker イメージで作成

書籍「データサイエンティスト養成読本 機械学習入門編 (Software Design plus)」を参考に、numpy・scipy・matplotlib・scikit-learn パッケージをインストールした Python 3.5.0 の環境を Docker イメージとして作成してみました。

サンプルソースhttp://github.com/fits/try_samples/tree/master/blog/20151029/

python イメージをベースに作成

まずは、Docker の最新 python イメージ (この時は Python 3.5.0) をベースに Docker イメージを作成します。

なお、書籍では libblas-dev をインストールしていましたが、ここでは代わりに libatlas-base-dev をインストールしています。

Dockerfile
FROM python

RUN apt-get update && apt-get upgrade -y

RUN apt-get install -y libfreetype6-dev libatlas-base-dev liblapack-dev gfortran

RUN pip install numpy
RUN pip install scipy
RUN pip install matplotlib
RUN pip install scikit-learn

RUN apt-get clean

docker build で上記の Dockerfile からイメージを作成します。

Docker イメージの作成

$ docker build --no-cache -t sample/python-ml:0.1 .

動作確認

scikit-learn を使った単純な Python スクリプトSVM で "あやめ" の品種を予測) を実行してみます。

/vagrant/svm_sample.py
from sklearn import datasets
from sklearn import svm

iris = datasets.load_iris()

svc = svm.SVC()
svc.fit(iris.data, iris.target)

print(svc.predict([[5, 3, 4, 1], [6, 3, 5, 2]]))

先程作成した Docker イメージを docker run で実行します。 DeprecationWarning が出力されますが、動作しているようです。

実行結果
$ docker run --rm -it -v /vagrant:/work sample/python-ml:0.1 python /work/svm_sample.py
・・・
[1 2]

DeprecationWarning の内容は inspect.getargspec() is deprecated, use inspect.signature() instead です。

centos イメージをベースに作成

次に、CentOS をベースに Python 3.5.0 をソースからビルドした Docker イメージを作成してみました。

atlas を使う場合、そのままでは libcblas.so が作られず scikit-learn のビルドに失敗するようなので、libtatlas.soシンボリックリンクとして libcblas.so を作成するようにしています。

pip はデフォルトで pip3.5 としてインストールされるため、今回は pip3.5 をそのまま使って numpy 等をインストールしています。

なお、python コマンドでは Python 2.7.5 が起動するので、Python 3.5 は python3 コマンドで起動します。

Dockerfile
FROM centos

RUN yum update -y && yum install -y make automake libtool openssl-devel curl

RUN curl -O https://www.python.org/ftp/python/3.5.0/Python-3.5.0.tgz
RUN tar zxf Python-3.5.0.tgz
RUN cd Python-3.5.0 && ./configure && make && make install
RUN rm -fr Python-3.5.0 && rm -f Python-3.5.0.tgz

RUN yum install -y lapack-devel atlas-devel gcc-c++ freetype-devel libpng-devel

RUN cd /usr/lib64/atlas && ln -s libtatlas.so libcblas.so

RUN pip3.5 install numpy
RUN pip3.5 install scipy
RUN pip3.5 install matplotlib
RUN pip3.5 install scikit-learn

RUN yum clean all

Docker イメージの作成

$ docker build --no-cache -t sample/python-ml-centos:0.1 .

動作確認

docker run で実行すると、先程と同じ結果になりました。

実行結果
$ docker run --rm -it -v /vagrant:/work sample/python-ml-centos:0.1 python3 /work/svm_sample.py
・・・
[1 2]

ResultSet の Stream 化

java.sql.ResultSetjava.util.stream.Stream 化する方法はいくつか考えられますが、今回は以下の方法を試してみました。

  • Spliterator インターフェースの実装クラスを作成

サンプルソースhttp://github.com/fits/try_samples/tree/master/blog/20151026/

Spliterator インターフェースを直接実装

java.util.Spliterator インターフェースを直接実装する場合、以下の 4つのメソッドを実装します。

メソッド 内容 備考
tryAdvance 要素を個々にトラバースする 要素が存在する場合に引数の Consumer をその要素で実行し true を返す
estimateSize 残りの要素数の推定値を返す 不明な場合は Long.MAX_VALUE を返す
characteristics 構造や要素の特性を返す 複数の特性値を | で連結できる
trySplit 要素の一部を分割できる場合に分割する 分割できない場合は null を返す

今回は tryAdvance メソッドの引数 Consumer へ ResultSet を加工した値を渡すように、SQLException を伴う処理を TryFunction インターフェースとして扱っています。

characteristics では要素の順序が定義されている事を示す ORDERED のみを返すようにしていますが、(今回のように加工しないで) ResultSet をそのまま使うのなら NONNULL も追加できると思います。

なお、Stream を返す stream メソッドは、あると便利なので定義しているだけです。 (無くても問題ありません)

src/main/java/sample/ResultSetSpliterator.java
package sample;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class ResultSetSpliterator<T> implements Spliterator<T> {

    private ResultSet resultSet;
    // ResultSet を加工する処理
    private TryFunction<ResultSet, T, SQLException> converter;

    public ResultSetSpliterator(ResultSet resultSet, TryFunction<ResultSet, T, SQLException> converter) {
        this.resultSet = resultSet;
        this.converter = converter;
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> consumer) {
        try {
            if (resultSet.next()) {
                consumer.accept(converter.apply(resultSet));
                return true;
            }
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
        return false;
    }

    @Override
    public Spliterator<T> trySplit() {
        return null;
    }

    @Override
    public long estimateSize() {
        return Long.MAX_VALUE;
    }

    @Override
    public int characteristics() {
        return ORDERED;
    }

    // Stream 化して返す
    public Stream<T> stream() {
        return StreamSupport.stream(this, false);
    }
}

TryFunction の内容は次の通りです。

src/main/java/sample/TryFunction.java
package sample;

@FunctionalInterface
public interface TryFunction<T, R, E extends Exception> {
    R apply(T t) throws E;

    static <T, E extends Exception> TryFunction<T, T, E> identity() {
        return r -> r;
    }
}

動作確認

動作確認のための実行クラスを用意します。

src/main/java/sample/SampleApp.java
package sample;

import java.sql.*;

public class SampleApp {
    private static final String SQL = "select * from product";

    public static void main(String... args) throws Exception {
        try (Connection con = DriverManager.getConnection(args[0])) {
            sample1(con);

            System.out.println("---");

            sample2(con);
        }
    }

    private static void sample1(Connection con) throws SQLException {
        try (
            PreparedStatement ps = con.prepareStatement(SQL);
            ResultSet rs = ps.executeQuery()
        ) {
            new ResultSetSpliterator<>(
                rs,
                r -> r.getString("name")
            ).stream().forEach(System.out::println);
        }
    }

    private static void sample2(Connection con) throws SQLException {
        try (
            PreparedStatement ps = con.prepareStatement(SQL);
            ResultSet rs = ps.executeQuery()
        ) {
            long count = new ResultSetSpliterator<>(
                rs,
                TryFunction.identity()
            ).stream().count();

            System.out.println("count : " + count);
        }
    }
}

Gradle による実行結果は以下の通り。 一応、Stream として処理できているようです。

実行結果
> gradle run -Pargs="jdbc:mysql://localhost/sample?user=root"

・・・
sample1
sample2
sample3
---
count : 3

Gradle のビルド定義には以下を使いました。

build.gradle
apply plugin: 'application'

mainClassName = 'sample.SampleApp'

repositories {
    jcenter()
}

dependencies {
    runtime 'mysql:mysql-connector-java:5.1.36'
}

run {
    if (project.hasProperty('args')) {
        args project.args
    }
}

Spliterators.AbstractSpliterator をサブクラス化

直接 Spliterator インターフェースを実装するよりも、Spliterators.AbstractSpliterator のサブクラスとした方が、少しだけコードを減らせます。

この場合、estimateSize・characteristics の値は super コンストラクタで指定します。

src/main/java/sample/ResultSetSpliterator2.java
package sample;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Spliterators;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class ResultSetSpliterator2<T> extends Spliterators.AbstractSpliterator<T> {

    private ResultSet resultSet;
    private TryFunction<ResultSet, T, SQLException> converter;

    public ResultSetSpliterator2(ResultSet resultSet, TryFunction<ResultSet, T, SQLException> converter) {
        // estimateSize・characteristics の値を指定
        super(Long.MAX_VALUE, ORDERED);

        this.resultSet = resultSet;
        this.converter = converter;
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> consumer) {
        try {
            if (resultSet.next()) {
                consumer.accept(converter.apply(resultSet));
                return true;
            }
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
        return false;
    }

    public Stream<T> stream() {
        return StreamSupport.stream(this, false);
    }
}

FunctionalJava の DB モナド?

FunctionalJava における fj.control.db.DB クラスの使い方を調べてみました。

サンプルソースhttp://github.com/fits/try_samples/tree/master/blog/20151013/

はじめに

処理内容を見る限り fj.control.db.DB は Reader モナドをベースにしていますが、以下の点が異なります。

  • 適用する状態が java.sql.Connection に固定されている

また、実行処理の run メソッドが SQLException を throws するようになっています。

使い方は、概ね以下のようになると思います。

fj.control.db.DB 使用例
// 検索の場合は DbState.reader メソッドを使用
DbState dbWriter = DbState.writer("<DB接続URL>");

// DB 処理を構築
DB<?> q = DB.db(con -> ・・・)・・・;

// 実行
dbWriter.run(q);

DbState の run メソッド内で java.sql.Connection を取得し、fj.control.db.DB の run メソッドを実行します。

ただし、以下のような注意点があり、実際のところ既存のメソッドだけで処理を組み立てるのは難しいように思います。

  • (a) DB.db(F<java.sql.Connection,A> f) メソッドでは SQLException を throw する処理から DB オブジェクトを作成できない
  • (b) DbStaterun メソッドは SQLException を throw した場合のみロールバックする (他の例外ではロールバックしない)
  • (c) リソース(PreparedStatement や ResultSet 等)の解放 close は基本的に自前で実施する必要あり ※
※ DbState の run メソッドを使えば、Connection の close はしてくれます

例えば、prepareStatement メソッドは SQLException を throw するため、DB.db(con -> con.prepareStatement("select * from ・・・")) のようにするとコンパイルエラーになります。

サンプル1

(a) と (c) に対処するためのヘルパーメソッド(以下)を用意して、サンプルコードを書いてみました。

  • DB<A> db(Try1<Connection, A, SQLException> func) メソッドを定義
  • 更新・検索処理を実施する DB オブジェクトの生成メソッド commandquery をそれぞれ定義 (try-with-resources 文で PreparedStatement 等を close)

動作確認のため、以下のような更新処理を行う DB オブジェクトを組み立てました。

  • (1) product テーブルへ insert
  • (2) (1) で自動採番された id を取得 (OptionalInt へ設定)
  • (3) OptionalInt から id の値(数値)を取り出し
  • (4) (3) の値を使って product_variation テーブルへ 2レコードを insert

なお、(b) によって SQLException 以外ではロールバックしないため、仮に (3) で NoSuchElementException が throw された場合にロールバックされません。

src/main/java/sample/DbSample1.java
package sample;

import fj.Unit;
import fj.control.db.DB;
import fj.control.db.DbState;
import fj.function.Try1;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.OptionalInt;

public class DbSample1 {
    public static void main(String... args) throws Exception {
        DbState dbWriter = DbState.writer(args[0]);

        final String insertVariationSql = "insert into product_variation (product_id, color, size) " +
                "values (?, ?, ?)";
        // 更新処理の組み立て
        DB<Integer> q1 = command("insert into product (name, price) values (?, ?)", "sample1", 1500) // (1)
                .bind(v -> query("select last_insert_id()", DbSample1::scalarValue)) // (2)
                .map(OptionalInt::getAsInt) // (3)
                .bind(id ->
                        command(insertVariationSql, id, "Black", "L")
                                .bind(_v -> command(insertVariationSql, id, "White", "S")));

        // 更新処理の実行
        System.out.println(dbWriter.run(q1));

        DbState dbReader = DbState.reader(args[0]);

        final String selectSql = "select name, color, size from product p " +
                "join product_variation v on v.product_id = p.id";
        // 検索処理の組み立て
        DB<Unit> q2 = query(selectSql, rs -> {
            while (rs.next()) {
                System.out.println(rs.getString("name") + ", " + 
                        rs.getString("color") + ", " + rs.getString("size"));
            }
            return Unit.unit();
        });

        // 検索処理の実行
        dbReader.run(q2);
    }

    private static OptionalInt scalarValue(ResultSet rs) throws SQLException {
        return rs.next() ? OptionalInt.of(rs.getInt(1)) : OptionalInt.empty();
    }

    // DB.db の代用メソッド
    private static <A> DB<A> db(Try1<Connection, A, SQLException> func) {
        return new DB<A>() {
            public A run(Connection con) throws SQLException {
                return func.f(con);
            }
        };
    }

    // 更新用
    private static DB<Integer> command(String sql, Object... params) {
        return db(con -> {
            try (PreparedStatement ps = createStatement(con, sql, params)) {
                return ps.executeUpdate();
            }
        });
    }

    // 検索用
    private static <T> DB<T> query(String sql, Try1<ResultSet, T, SQLException> handler, Object... params) {
        return db(con -> {
            try (
                PreparedStatement ps = createStatement(con, sql, params);
                ResultSet rs = ps.executeQuery()
            ) {
                return handler.f(rs);
            }
        });
    }

    private static PreparedStatement createStatement(Connection con, String sql, Object... params)
            throws SQLException {

        PreparedStatement ps = con.prepareStatement(sql);

        for (int i = 0; i < params.length; i++) {
            ps.setObject(i + 1, params[i]);
        }

        return ps;
    }
}

実行

Gradle で実行しました。

build.gradle
apply plugin: 'application'

mainClassName = 'sample.DbSample1'

repositories {
    jcenter()
}

dependencies {
    compile 'org.functionaljava:functionaljava:4.4'
    runtime 'mysql:mysql-connector-java:5.1.36'
}

run {
    if (project.hasProperty('args')) {
        args project.args
    }
}
実行結果
> gradle run -Pargs="jdbc:mysql://localhost:3306/sample1?user=root"

・・・
1
sample1, Black, L
sample1, White, S

サンプル2

(b) への対策として、SQLException 以外を throw してもロールバックする run メソッドを用意しました。

src/main/java/sample/DbSample2.java
package sample;

・・・
import fj.function.TryEffect1;
・・・

public class DbSample2 {
    public static void main(String... args) throws Exception {
        Connector connector = DbState.driverManager(args[0]);

        final String insertVariationSql = "insert into product_variation (product_id, color, size) " +
                "values (?, ?, ?)";

        DB<Integer> q1 = command("insert into product (name, price) values (?, ?)", "sample2", 2000)
                .bind(v -> query("select last_insert_id()", DbSample2::scalarValue))
                .map(OptionalInt::getAsInt)
                .bind(id ->
                        command(insertVariationSql, id, "Green", "L")
                                .bind(_v -> command(insertVariationSql, id, "Blue", "S")));

        // 更新処理の実行
        System.out.println(run(connector, q1));

        final String selectSql = "select name, color, size from product p " +
                "join product_variation v on v.product_id = p.id";

        DB<Unit> q2 = query(selectSql, rs -> {
            while (rs.next()) {
                System.out.println(rs.getString("name") + ", " + 
                        rs.getString("color") + ", " + rs.getString("size"));
            }
            return Unit.unit();
        });

        // 検索処理の実行
        runReadOnly(connector, q2);
    }

    // 検索用の実行処理(常にロールバック)
    private static <A> A runReadOnly(Connector connector, DB<A> dba) throws SQLException {
        return run(connector, dba, Connection::rollback);
    }

    // 更新用の実行処理
    private static <A> A run(Connector connector, DB<A> dba) throws SQLException {
        return run(connector, dba, Connection::commit);
    }

    private static <A> A run(Connector connector, DB<A> dba, TryEffect1<Connection, SQLException> trans)
            throws SQLException {
        try (Connection con = connector.connect()) {
            con.setAutoCommit(false);

            try {
                A result = dba.run(con);

                trans.f(con);

                return result;

            } catch (Throwable e) {
                con.rollback();
                throw e;
            }
        }
    }
    ・・・
}

実行

build.gradle
・・・
mainClassName = 'sample.DbSample2'
・・・
実行結果
> gradle run -Pargs="jdbc:mysql://localhost:3306/sample2?user=root"

・・・
1
sample2, Green, L
sample2, Blue, S

Spring Data Redis を Tomcat で JNDI リソース化

前回、Jedis を Tomcat 上で JNDI リソース化しましたが、今回は Spring Data Redis を JNDI リソース化してみます。

実際は org.springframework.data.redis.connection.jedis.JedisConnectionFactory を JNDI リソース化します。

サンプルソースhttp://github.com/fits/try_samples/tree/master/blog/20151005/

はじめに

今回もカスタムリソースファクトリを作成して対応します。

JedisConnectionFactory は JavaBean として扱えるため org.apache.naming.factory.BeanFactory も一応使えるのですが、その場合は JedisShardInfo をどのように設定するかが課題となります。 (JedisShardInfo が未設定だと JedisConnection 取得時にエラーが発生します)

なお、afterPropertiesSet メソッドを実行すれば、JedisShardInfo が未設定(null)の場合に JedisShardInfo をインスタンス化して設定してくれます。

カスタムリソースファクトリの作成

JedisConnectionFactory 専用のカスタムファクトリを作っても良かったのですが、今回はもう少し汎用的に org.springframework.beans.factory.InitializingBean を JNDI リソース化するカスタムファクトリとして作成しました。

基本的な処理内容は 前回 と同じですが、リソースのクラス名を Reference オブジェクトの getClassName メソッドで取得しています。

getClassName の戻り値は context.xml における Resource 要素の type 属性で指定した値です。

カスタムリソースファクトリ (src/main/java/sample/resource/InitializingBeanFactory.java
package sample.resource;

import javax.naming.Context;
import javax.naming.Name;
import javax.naming.RefAddr;
import javax.naming.Reference;
import javax.naming.spi.ObjectFactory;
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.Hashtable;
import java.util.List;

import org.apache.commons.beanutils.BeanUtils;
import org.springframework.beans.factory.InitializingBean;

public class InitializingBeanFactory implements ObjectFactory {
    private final List<String> ignoreProperties =
            Arrays.asList("factory", "auth", "scope", "singleton");

    @Override
    public Object getObjectInstance(Object obj, Name name, Context nameCtx,
                                    Hashtable<?, ?> environment) throws Exception {

        return (obj instanceof Reference)? createBean((Reference) obj): null;
    }

    private Object createBean(Reference ref) throws Exception {
        // リソースクラスのインスタンス化
        Object bean = loadClass(ref.getClassName()).newInstance();

        setProperties(bean, ref);
        // afterPropertiesSet の実行
        ((InitializingBean)bean).afterPropertiesSet();

        return bean;
    }
    // 各プロパティの設定
    private void setProperties(Object bean, Reference ref)
            throws InvocationTargetException, IllegalAccessException {

        for (Enumeration<RefAddr> em = ref.getAll(); em.hasMoreElements();) {
            RefAddr ra = em.nextElement();

            String name = ra.getType();

            if (!ignoreProperties.contains(name)) {
                BeanUtils.setProperty(bean, name, ra.getContent());
            }
        }
    }
    // リソースクラスのロード
    private Class<?> loadClass(String className) throws ClassNotFoundException {
        ClassLoader loader = Thread.currentThread().getContextClassLoader();

        return (loader == null)? Class.forName(className): loader.loadClass(className);
    }
}

動作確認

今回は Spring MVC を使って動作確認します。

Tomcat 用のリソース設定は以下の通りです。

type 属性で JNDI リソース化するクラス名 JedisConnectionFactory を指定します。 JNDI 名は redis/ConnectionFactory としました。

設定ファイル (src/main/webapp/META-INF/context.xml
<Context>
    <Resource name="redis/ConnectionFactory" auth="Container"
              type="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"
              factory="sample.resource.InitializingBeanFactory"
              closeMethod="destroy"
              port="6380"
              timeout="500"
              poolConfig.maxTotal="100"
              poolConfig.maxIdle="100"
            />
</Context>

web.xml が不要な WebApplicationInitializer を使う方式にします。

WebApplicationInitializer 実装クラス (src/main/java/sample/config/WebAppInitializer.java
package sample.config;

import org.springframework.web.servlet.support.AbstractAnnotationConfigDispatcherServletInitializer;

public class WebAppInitializer extends AbstractAnnotationConfigDispatcherServletInitializer {
    @Override
    protected Class<?>[] getRootConfigClasses() {
        return null;
    }

    @Override
    protected Class<?>[] getServletConfigClasses() {
        // Web アプリケーション設定クラスの指定
        return new Class<?>[]{WebConfig.class};
    }

    @Override
    protected String[] getServletMappings() {
        return new String[]{"/"};
    }
}

Web アプリケーション用の設定クラスへ JNDI を使った JedisConnectionFactory の取得や RedisTemplate をインスタンス化する処理を実装しました。

設定クラス (src/main/java/sample/config/WebConfig.java
package sample.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.jndi.JndiObjectFactoryBean;

import javax.naming.NamingException;

@Configuration
@ComponentScan("sample.controller")
public class WebConfig {
    @Bean
    public JedisConnectionFactory jedisConnectionFactory() throws NamingException {
        JndiObjectFactoryBean factory = new JndiObjectFactoryBean();

        // 取得対象の JNDI 名を設定
        factory.setJndiName("java:comp/env/redis/ConnectionFactory");
        factory.afterPropertiesSet();

        // JedisConnectionFactory 取得
        return (JedisConnectionFactory)factory.getObject();
    }

    @Bean
    public StringRedisSerializer stringRedisSerializer() {
        return new StringRedisSerializer();
    }

    @Bean
    public RedisTemplate<String, String> redisTemplate() throws NamingException {
        RedisTemplate<String, String> template = new RedisTemplate<>();
        // JNDI で取得した JedisConnectionFactory を設定
        template.setConnectionFactory(jedisConnectionFactory());

        template.setKeySerializer(stringRedisSerializer());
        template.setValueSerializer(stringRedisSerializer());

        return template;
    }
}
コントローラークラス (src/main/java/sample/controller/SampleController.java
package sample.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;

@Controller
public class SampleController {
    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    @RequestMapping(value = "/app/{key}", method = RequestMethod.GET)
    @ResponseBody
    public String sample(@PathVariable String key) {
        // Redis から値を取得
        return redisTemplate.boundValueOps(key).get();
    }
}

ビルド

Gradle のビルド定義は以下の通りです。

ビルド定義 (build.gradle)
apply plugin: 'war'

war.baseName = 'sample'

repositories {
    jcenter()
}

dependencies {
    compile 'redis.clients:jedis:2.7.3'
    compile 'commons-beanutils:commons-beanutils:1.9.2'
    compile 'org.springframework.data:spring-data-redis:1.6.0.RELEASE'
    compile 'org.springframework:spring-web:4.2.1.RELEASE'
    compile 'org.springframework:spring-webmvc:4.2.1.RELEASE'

    providedCompile 'javax.servlet:javax.servlet-api:3.1.0'
}
ビルドの実行
> gradle build

ビルド結果の build/libs/sample.war を Tomcat へ配置し、Tomcat と Redis (ポート番号を 6380 へ変更) を実行しておきます。

Redis へデータ設定

Redis へ確認用のデータを設定します。

> redis-cli -p 6380

127.0.0.1:6380> set a1 sample-data1
OK
実行結果

curl でアクセスすると Redis からデータを取得できました。

> curl http://localhost:8080/sample/app/a1

sample-data1

Jedis を Tomcat で JNDI リソース化

Jedis ※ を Tomcat 上で JNDI リソースとして扱えるようにしてみます。

 ※ 厳密には redis.clients.jedis.JedisPool を JNDI リソース化します

サンプルソースhttp://github.com/fits/try_samples/tree/master/blog/20150924/

はじめに

Tomcat では標準的に下記を JNDI リソース化できるようになっていますが、JedisPool クラスは JavaBean としては扱えず、そのまま使えそうなものはありません。

そこで今回は、カスタムリソースファクトリを作成して対応する事にします。

リソースの種類 使用するリソースファクトリ
JavaBean org.apache.naming.factory.BeanFactory
UserDatabase org.apache.catalina.users.MemoryUserDatabaseFactoryなど
JavaMail Session org.apache.naming.factory.ResourceFactory (org.apache.naming.factory.MailSessionFactory)
JDBC DataSource org.apache.naming.factory.ResourceFactory (org.apache.tomcat.dbcp.dbcp2.BasicDataSourceFactory)

ちなみに、他にも以下のようなリソースファクトリクラスが用意されているようです。

  • org.apache.naming.factory.DataSourceLinkFactory ※
  • org.apache.naming.factory.EjbFactory
  • org.apache.naming.factory.SendMailFactory
 ※ ResourceLink 要素で使用するリソースファクトリクラス
    グローバルに定義した DataSource を複数のコンテキストで共通利用するために使用する模様

JedisPool 用のリソースファクトリクラス1

まずは、ホスト名とポート番号を設定するだけの単純な JedisPool のリソースファクトリクラスを作成してみます。

Tomcat 用のカスタムリソースファクトリは、以下の点に注意して javax.naming.spi.ObjectFactory を実装するだけです。

  • getObjectInstance メソッドでリソースオブジェクトを作成して返す
  • getObjectInstance の第一引数は javax.naming.Referenceインスタンスとなる (実体は org.apache.naming.ResourceRef
  • 設定ファイルの Resource 要素で設定したプロパティは javax.naming.Reference から javax.naming.RefAddr として取得可能
  • リソースを破棄(close)するメソッドは設定ファイルの closeMethod 属性で指定可能

RefAddr から getType でプロパティ名、getContent でプロパティ値(基本的に String)を取得できます。

また、singleton の設定 (Resource 要素で設定可能) によって getObjectInstance の呼ばれ方が以下のように変化します。

singleton 設定値 getObjectInstance の呼び出し
true (デフォルト) 初回 lookup 時に 1回だけ
false lookup 実行毎に毎回

今回は、getObjectInstance の第一引数 obj が javax.naming.Referenceインスタンスだった場合にのみ JedisPoolインスタンス化して返すように実装しました。

カスタムリソースファクトリ (src/main/java/sample/SimpleJedisPoolFactory.java
package sample;

import javax.naming.Context;
import javax.naming.Name;
import javax.naming.RefAddr;
import javax.naming.Reference;
import javax.naming.spi.ObjectFactory;
import java.util.Enumeration;
import java.util.Hashtable;

import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Protocol;

public class SimpleJedisPoolFactory implements ObjectFactory {
    @Override
    public Object getObjectInstance(Object obj, Name name, Context nameCtx,
                                    Hashtable<?, ?> environment) throws Exception {

        return (obj instanceof Reference)? createPool((Reference) obj): null;
    }
    // JedisPool の作成
    private JedisPool createPool(Reference ref) throws Exception {
        String host = Protocol.DEFAULT_HOST;
        int port = Protocol.DEFAULT_PORT;

        for (Enumeration<RefAddr> em = ref.getAll(); em.hasMoreElements();) {
            RefAddr ra = em.nextElement();

            String name = ra.getType();
            String value = (String)ra.getContent();

            switch (name) {
                case "host":
                    host = value;
                    break;
                case "port":
                    port = Integer.parseInt(value);
                    break;
            }
        }

        return new JedisPool(host, port);
    }
}

動作確認

SimpleJedisPoolFactory の動作確認のために Servlet と設定ファイルを用意しました。 (JNDI 名は redis/Pool としています)

Servlet クラス (src/main/java/sample/SampleServlet.java
package sample;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;

@WebServlet(urlPatterns = "/app")
public class SampleServlet extends HttpServlet {
    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse res)
            throws ServletException, IOException {

        // Redis から該当データを取得
        String data = getFromRedis(req.getParameter("key"));

        res.getWriter().println(data);
    }

    private String getFromRedis(String key) {
        // JedisPool から Jedis 取得
        try (Jedis jedis = getPool().getResource()) {
            // Redis からデータ取得
            return jedis.get(key);
        }
    }

    private JedisPool getPool() {
        try {
            Context ctx = new InitialContext();
            // JNDI で JedisPool を取得
            return (JedisPool) ctx.lookup("java:comp/env/redis/Pool");
        } catch (NamingException e) {
            throw new RuntimeException(e);
        }
    }
}

確認のため Redis の port 番号をデフォルト値(6379)から 6380 へ変えています。

JedisPool を適切に close するよう closeMethod 属性を使いました。

設定ファイル (src/main/webapp/META-INF/context.xml
<Context>
    <Resource name="redis/Pool" auth="Container"
              factory="sample.SimpleJedisPoolFactory"
              closeMethod="close"
              port="6380"
            />
</Context>

Gradle で war ファイルを作成します。

ビルド定義 (build.gradle)
apply plugin: 'war'

repositories {
    jcenter()
}

dependencies {
    compile 'redis.clients:jedis:2.7.3'

    providedCompile 'javax.servlet:javax.servlet-api:3.1.0'
}

ビルド

> gradle build

Tomcat 起動

ビルド結果の build/libs/sample.warTomcat へ配置して Tomcat を起動します。 一応、Tomcat 7.0.64 と 8.0.26 上で動作する事を確認しています。

> startup

Redis 起動

ポート番号を 6380 へ設定変更した redis.conf を使って Redis を起動します。

> redis-server redis.conf

                _._
           _.-``__ ''-._
      _.-``    `.  `_.  ''-._           Redis 2.8.19 (00000000/0) 64 bit
  .-`` .-```.  ```\/    _.,_ ''-._
 (    '      ,       .-`  | `,    )     Running in stand alone mode
 |`-._`-...-` __...-.``-._|'` _.-'|     Port: 6380
・・・

Redis へデータ登録 (redis-cli 使用)

Redis へデータを登録しておきます。

> redis-cli -p 6380

127.0.0.1:6380> set sample 12345
OK

実行結果

curlServlet へアクセスすると Redis から正常にデータを取得する事を確認できました。

> curl http://localhost:8080/sample/app?key=sample

12345

JedisPool 用のリソースファクトリクラス2

次に JedisPoolConfig を使ってプーリングの設定を変更できるようにしてみます。

今回は、Commons BeanUtils を使って、JedisPool 生成用の自作クラス JedisPoolBuilder へ設定ファイルの設定内容を反映するようにしました。

なお、Reference の getAll() には factory, auth, scope, singleton 等のプロパティをデフォルトで含んでいる点にご注意ください。

BeanUtils.setProperty は存在しないプロパティを指定しても無視するだけですので、今回の用途では factory 等のデフォルトプロパティを特に気にする必要は無かったのですが、以下では一応スキップするようにしました。

カスタムリソースファクトリ (src/main/java/sample/JedisPoolFactory.java
package sample;

import javax.naming.Context;
import javax.naming.Name;
import javax.naming.RefAddr;
import javax.naming.Reference;
import javax.naming.spi.ObjectFactory;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.Hashtable;
import java.util.List;

import org.apache.commons.beanutils.BeanUtils;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.Protocol;

public class JedisPoolFactory implements ObjectFactory {
    // 無視するプロパティのリスト
    private final List<String> ignoreProperties =
            Arrays.asList("factory", "auth", "scope", "singleton");

    @Override
    public Object getObjectInstance(Object obj, Name name, Context nameCtx,
                                    Hashtable<?, ?> environment) throws Exception {

        return (obj instanceof Reference)? createPool((Reference) obj): null;
    }

    private JedisPool createPool(Reference ref) throws Exception {
        JedisPoolBuilder builder = new JedisPoolBuilder();

        for (Enumeration<RefAddr> em = ref.getAll(); em.hasMoreElements();) {
            RefAddr ra = em.nextElement();

            String name = ra.getType();

            if (!ignoreProperties.contains(name)) {
                // プロパティの設定
                BeanUtils.setProperty(builder, name, ra.getContent());
            }
        }

        return builder.build();
    }
    // JedisPool の生成クラス
    public class JedisPoolBuilder {
        private JedisPoolConfig poolConfig = new JedisPoolConfig();

        private String host = Protocol.DEFAULT_HOST;
        private int port = Protocol.DEFAULT_PORT;
        private int timeout = Protocol.DEFAULT_TIMEOUT;

        public void setHost(String host) {
            this.host = host;
        }

        public void setPort(int port) {
            this.port = port;
        }

        public void setTimeout(int timeout) {
            this.timeout = timeout;
        }

        public JedisPoolConfig getPoolConfig() {
            return poolConfig;
        }

        public JedisPool build() {
            return new JedisPool(poolConfig, host, port, timeout);
        }
    }
}

プーリングの設定例は以下の通りです。

設定ファイル例 (src/main/webapp/META-INF/context.xml
<Context>
    <Resource name="redis/Pool" auth="Container"
              factory="sample.JedisPoolFactory"
              closeMethod="close"
              port="6380"
              timeout="500"
              poolConfig.maxTotal="100"
              poolConfig.maxIdle="100"
            />
</Context>

ビルド定義は以下の通りです。

ビルド定義 (build.gradle)
apply plugin: 'war'

repositories {
    jcenter()
}

dependencies {
    compile 'redis.clients:jedis:2.7.3'
    compile 'commons-beanutils:commons-beanutils:1.9.2'

    providedCompile 'javax.servlet:javax.servlet-api:3.1.0'
}