FunctionalJava の DB モナド?

FunctionalJava における fj.control.db.DB クラスの使い方を調べてみました。

サンプルソースhttp://github.com/fits/try_samples/tree/master/blog/20151013/

はじめに

処理内容を見る限り fj.control.db.DB は Reader モナドをベースにしていますが、以下の点が異なります。

  • 適用する状態が java.sql.Connection に固定されている

また、実行処理の run メソッドが SQLException を throws するようになっています。

使い方は、概ね以下のようになると思います。

fj.control.db.DB 使用例
// 検索の場合は DbState.reader メソッドを使用
DbState dbWriter = DbState.writer("<DB接続URL>");

// DB 処理を構築
DB<?> q = DB.db(con -> ・・・)・・・;

// 実行
dbWriter.run(q);

DbState の run メソッド内で java.sql.Connection を取得し、fj.control.db.DB の run メソッドを実行します。

ただし、以下のような注意点があり、実際のところ既存のメソッドだけで処理を組み立てるのは難しいように思います。

  • (a) DB.db(F<java.sql.Connection,A> f) メソッドでは SQLException を throw する処理から DB オブジェクトを作成できない
  • (b) DbStaterun メソッドは SQLException を throw した場合のみロールバックする (他の例外ではロールバックしない)
  • (c) リソース(PreparedStatement や ResultSet 等)の解放 close は基本的に自前で実施する必要あり ※
※ DbState の run メソッドを使えば、Connection の close はしてくれます

例えば、prepareStatement メソッドは SQLException を throw するため、DB.db(con -> con.prepareStatement("select * from ・・・")) のようにするとコンパイルエラーになります。

サンプル1

(a) と (c) に対処するためのヘルパーメソッド(以下)を用意して、サンプルコードを書いてみました。

  • DB<A> db(Try1<Connection, A, SQLException> func) メソッドを定義
  • 更新・検索処理を実施する DB オブジェクトの生成メソッド commandquery をそれぞれ定義 (try-with-resources 文で PreparedStatement 等を close)

動作確認のため、以下のような更新処理を行う DB オブジェクトを組み立てました。

  • (1) product テーブルへ insert
  • (2) (1) で自動採番された id を取得 (OptionalInt へ設定)
  • (3) OptionalInt から id の値(数値)を取り出し
  • (4) (3) の値を使って product_variation テーブルへ 2レコードを insert

なお、(b) によって SQLException 以外ではロールバックしないため、仮に (3) で NoSuchElementException が throw された場合にロールバックされません。

src/main/java/sample/DbSample1.java
package sample;

import fj.Unit;
import fj.control.db.DB;
import fj.control.db.DbState;
import fj.function.Try1;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.OptionalInt;

public class DbSample1 {
    public static void main(String... args) throws Exception {
        DbState dbWriter = DbState.writer(args[0]);

        final String insertVariationSql = "insert into product_variation (product_id, color, size) " +
                "values (?, ?, ?)";
        // 更新処理の組み立て
        DB<Integer> q1 = command("insert into product (name, price) values (?, ?)", "sample1", 1500) // (1)
                .bind(v -> query("select last_insert_id()", DbSample1::scalarValue)) // (2)
                .map(OptionalInt::getAsInt) // (3)
                .bind(id ->
                        command(insertVariationSql, id, "Black", "L")
                                .bind(_v -> command(insertVariationSql, id, "White", "S")));

        // 更新処理の実行
        System.out.println(dbWriter.run(q1));

        DbState dbReader = DbState.reader(args[0]);

        final String selectSql = "select name, color, size from product p " +
                "join product_variation v on v.product_id = p.id";
        // 検索処理の組み立て
        DB<Unit> q2 = query(selectSql, rs -> {
            while (rs.next()) {
                System.out.println(rs.getString("name") + ", " + 
                        rs.getString("color") + ", " + rs.getString("size"));
            }
            return Unit.unit();
        });

        // 検索処理の実行
        dbReader.run(q2);
    }

    private static OptionalInt scalarValue(ResultSet rs) throws SQLException {
        return rs.next() ? OptionalInt.of(rs.getInt(1)) : OptionalInt.empty();
    }

    // DB.db の代用メソッド
    private static <A> DB<A> db(Try1<Connection, A, SQLException> func) {
        return new DB<A>() {
            public A run(Connection con) throws SQLException {
                return func.f(con);
            }
        };
    }

    // 更新用
    private static DB<Integer> command(String sql, Object... params) {
        return db(con -> {
            try (PreparedStatement ps = createStatement(con, sql, params)) {
                return ps.executeUpdate();
            }
        });
    }

    // 検索用
    private static <T> DB<T> query(String sql, Try1<ResultSet, T, SQLException> handler, Object... params) {
        return db(con -> {
            try (
                PreparedStatement ps = createStatement(con, sql, params);
                ResultSet rs = ps.executeQuery()
            ) {
                return handler.f(rs);
            }
        });
    }

    private static PreparedStatement createStatement(Connection con, String sql, Object... params)
            throws SQLException {

        PreparedStatement ps = con.prepareStatement(sql);

        for (int i = 0; i < params.length; i++) {
            ps.setObject(i + 1, params[i]);
        }

        return ps;
    }
}

実行

Gradle で実行しました。

build.gradle
apply plugin: 'application'

mainClassName = 'sample.DbSample1'

repositories {
    jcenter()
}

dependencies {
    compile 'org.functionaljava:functionaljava:4.4'
    runtime 'mysql:mysql-connector-java:5.1.36'
}

run {
    if (project.hasProperty('args')) {
        args project.args
    }
}
実行結果
> gradle run -Pargs="jdbc:mysql://localhost:3306/sample1?user=root"

・・・
1
sample1, Black, L
sample1, White, S

サンプル2

(b) への対策として、SQLException 以外を throw してもロールバックする run メソッドを用意しました。

src/main/java/sample/DbSample2.java
package sample;

・・・
import fj.function.TryEffect1;
・・・

public class DbSample2 {
    public static void main(String... args) throws Exception {
        Connector connector = DbState.driverManager(args[0]);

        final String insertVariationSql = "insert into product_variation (product_id, color, size) " +
                "values (?, ?, ?)";

        DB<Integer> q1 = command("insert into product (name, price) values (?, ?)", "sample2", 2000)
                .bind(v -> query("select last_insert_id()", DbSample2::scalarValue))
                .map(OptionalInt::getAsInt)
                .bind(id ->
                        command(insertVariationSql, id, "Green", "L")
                                .bind(_v -> command(insertVariationSql, id, "Blue", "S")));

        // 更新処理の実行
        System.out.println(run(connector, q1));

        final String selectSql = "select name, color, size from product p " +
                "join product_variation v on v.product_id = p.id";

        DB<Unit> q2 = query(selectSql, rs -> {
            while (rs.next()) {
                System.out.println(rs.getString("name") + ", " + 
                        rs.getString("color") + ", " + rs.getString("size"));
            }
            return Unit.unit();
        });

        // 検索処理の実行
        runReadOnly(connector, q2);
    }

    // 検索用の実行処理(常にロールバック)
    private static <A> A runReadOnly(Connector connector, DB<A> dba) throws SQLException {
        return run(connector, dba, Connection::rollback);
    }

    // 更新用の実行処理
    private static <A> A run(Connector connector, DB<A> dba) throws SQLException {
        return run(connector, dba, Connection::commit);
    }

    private static <A> A run(Connector connector, DB<A> dba, TryEffect1<Connection, SQLException> trans)
            throws SQLException {
        try (Connection con = connector.connect()) {
            con.setAutoCommit(false);

            try {
                A result = dba.run(con);

                trans.f(con);

                return result;

            } catch (Throwable e) {
                con.rollback();
                throw e;
            }
        }
    }
    ・・・
}

実行

build.gradle
・・・
mainClassName = 'sample.DbSample2'
・・・
実行結果
> gradle run -Pargs="jdbc:mysql://localhost:3306/sample2?user=root"

・・・
1
sample2, Green, L
sample2, Blue, S