Groovy で Elasticsearch を組み込み実行

Groovy で Elasticsearch を組み込み実行してみました。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170203/

(a) クライアント接続しない場合

まずは、クライアント接続が不可な Elasticsearch を起動して、ドキュメント登録や検索を行ってみます。

ポート番号 クライアント接続
9200 (HTTP) ×
9300 (Transport) ×

Elasticsearch の組み込み実行は、適切な設定を行った Settings(もしくは Environment)を使って Node を作成し start を実行するだけです。

path.home の設定は必須で、指定したパスの data ディレクトリを使用します。(無ければ自動的に作成されます)

transport.typelocal へ、http.enabledfalse へ設定すればクライアントの接続を受け付けない状態になります。※

 ※ クライアント接続を受け付けるためのプラグインを適用していない場合、
    このように設定しておかないと実行時にエラーとなります

この場合、Node の client メソッドで取得した Client を使ってインデックス等を操作します。

els_local.groovy
@Grab('org.elasticsearch:elasticsearch:5.2.0')
// log4j のモジュールが必要(無い場合は NoClassDefFoundError が発生)
@Grab('org.apache.logging.log4j:log4j-api:2.8')
@Grab('org.apache.logging.log4j:log4j-core:2.8')
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.node.Node

def index = args[0] // インデックス
def type = args[1]  // タイプ

// 設定
def setting = Settings.builder()
    .put('path.home', '.') // data ディレクトリの配置先を指定
    .put('transport.type', 'local')
    .put('http.enabled', 'false')
    .build()

new Node(setting).withCloseable { node ->
    // Elasticsearch の実行
    node.start()

    node.client().withCloseable { client ->
        // インデックスへのドキュメント登録
        def r1 = client.prepareIndex(index, type)
                    .setSource('time', System.currentTimeMillis())
                    .execute()
                    .get()

        println r1

        // 検索結果へ即時反映されなかったので適度に待機
        sleep(1000)

        println '-----'
        // 検索
        def r2 = client.prepareSearch(index)
                    .setTypes(type)
                    .execute()
                    .get()

        println r2
    }
}

動作確認

実行結果は以下の通りです。

log4j2 の設定ファイルが見つからない旨のエラーログが出力されていますが、 Elasticsearch の組み込み実行は成功しているようです。

実行結果
> groovy els_local.groovy a1 item

ERROR StatusLogger No log4j2 configuration file found. ・・・
IndexResponse[index=a1,type=item,id=AVn6bOp-0Vu_EXj66Fj9,version=1,result=created,shards={"_shards":{"total":2,"successful":1,"failed":0}}]
-----
{"took":140,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":1,"max_score":1.0,"hits":[{"_index":"a1","_type":"item","_id":"AVn6bOp-0Vu_EXj66Fj9","_score":1.0,"_source":{"time":1485965157491}}]}}

(b) クライアント接続する場合

次に、クライアント接続が可能な Elasticsearch を組み込み実行します。

ポート番号 クライアント接続
9200 (HTTP)
9300 (Transport)

これらのポート番号でクライアント接続を受けるにはプラグインが必要なので、 今回は Netty4Plugin (transport-netty4-client) を使用しました。

Node の public コンストラクタはプラグインクラスを直接指定できないため、今回は protected コンストラクタ ※ を直接呼び出して Netty4Plugin を適用しています。

 ※ 第 2引数で Plugin クラスを指定できるようになっている
    protected Node(Environment environment, Collection<Class<? extends Plugin>> plugins)

なお、close メソッドの実行有無に関係なく Node は終了してしまうので、System.in.read() を使って終了を止めています。

els_netty.groovy
@Grab('org.elasticsearch:elasticsearch:5.2.0')
@Grab('org.elasticsearch.plugin:transport-netty4-client:5.2.0')
@Grab('org.apache.logging.log4j:log4j-api:2.8')
@Grab('org.apache.logging.log4j:log4j-core:2.8')
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.env.Environment
import org.elasticsearch.node.Node
import org.elasticsearch.transport.Netty4Plugin

def setting = Settings.builder()
    .put('path.home', '.')
    .build()

def env = new Environment(setting)

// Netty4Plugin を指定して Node をインスタンス化
new Node(env, [Netty4Plugin]).withCloseable { node ->

    node.start()

    println 'started server ...'

    // 終了するのを止めるための措置
    System.in.read()
}

動作確認

Elasticsearch を実行します。

Elasticsearch 組み込み実行
> groovy els_netty.groovy

ERROR StatusLogger No log4j2 configuration file found. ・・・
started server ...

(1) HTTP 接続(9200 ポート)

curl で 9200 ポートへ接続した結果は以下の通りです。

実行結果(HTTP)
$ curl -s http://localhost:9200/b1/item -d "{\"time\": `date +%s%3N`}"

{"_index":"b1","_type":"item","_id":"AVn6rxw5O4vwdZ1ibdCo","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"created":true}


$ curl -s http://localhost:9200/b1/item/_search

{"took":78,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":1,"max_score":1.0,"hits":[{"_index":"b1","_type":"item","_id":"AVn6rxw5O4vwdZ1ibdCo","_score":1.0,"_source":{"time": 1485969495792}}]}}

(2) Transport 接続(9300 ポート)

9300 ポートへ接続して (a) と同等のクライアント処理を実施するスクリプトは以下のようになります。

9300 ポートへ接続するために TransportClient を使用しています。

els_client.groovy
@Grab('org.elasticsearch.client:transport:5.2.0')
@Grab('org.apache.logging.log4j:log4j-api:2.8')
@Grab('org.apache.logging.log4j:log4j-core:2.8')
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.transport.client.PreBuiltTransportClient
import org.elasticsearch.common.transport.InetSocketTransportAddress

def index = args[0]
def type = args[1]

def addr = new InetSocketTransportAddress(
                    InetAddress.getLoopbackAddress(), 9300)

// TransportClient の生成
def transportClient = new PreBuiltTransportClient(Settings.EMPTY)
                            .addTransportAddress(addr)

transportClient.withCloseable { client ->
    // インデックスへのドキュメント登録
    def r1 = client.prepareIndex(index, type)
                .setSource('time', System.currentTimeMillis())
                .execute()
                .get()

    println r1

    sleep(1000)

    println '-----'
    // 検索
    def r2 = client.prepareSearch(index)
                .setTypes(type)
                .execute()
                .get()

    println r2
}

実行結果は以下の通りです。

実行結果(Transport)
> groovy els_client.groovy b2 item

・・・
IndexResponse[index=b2,type=item,id=AVn6tvd3WlzxY0bEAQ4r,version=1,result=created,shards={"_shards":{"total":2,"successful":1,"failed":0}}]
-----
{"took":78,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":1,"max_score":1.0,"hits":[{"_index":"b2","_type":"item","_id":"AVn6tvd3WlzxY0bEAQ4r","_score":1.0,"_source":{"time":1485970011680}}]}}

備考. HTTP 接続の無効化

9200 ポートの接続だけを無効化するには http.enabledfalse にします。

ポート番号 クライアント接続
9200 (HTTP) ×
9300 (Transport)
els_netty_nohttp.groovy
・・・

def setting = Settings.builder()
    .put('path.home', '.')
    .put('http.enabled', 'false') // HTTP 接続(9200 ポート)の無効化
    .build()

・・・

new Node(env, [Netty4Plugin]).withCloseable { node ->
    ・・・
}