読者です 読者をやめる 読者になる 読者になる

Spark SQL で CSV ファイルを処理 - GeoLite Legacy

以前、H2 を使って CSV ファイルを SQL で処理しましたが、今回は Spark SQL を使ってみました。

IPアドレスから地域を特定する2 - GeoLite Legacy Country CSV」 で使った GeoLite Legacy Country CSV を使って同様の処理を Spark SQL で実装します。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20141103-2/

Spark SQL を使って IP アドレスから国判定

Spark SQL で扱うテーブルのスキーマを定義する方法はいくつか用意されているようですが、今回はケースクラスをスキーマとして登録する方法で実装しました。

処理の手順は下記のようになります。

  • (1) スキーマ用のクラス定義
  • (2) CSV ファイルを処理して RDD 作成
  • (3) テーブル登録
  • (4) SQL の実行

(2) の処理で (1) のケースクラスを格納した RDD を作成し、(3) の処理で (2) で処理したオブジェクトをテーブルとして登録します。

(2) の処理までは通常の Spark の API を使った処理ですが、import sqlContext.createSchemaRDD によって (3) で registerTempTable メソッドを呼び出す際に RDD から Spark SQLSchemaRDD へ暗黙変換が実施されます。

registerTempTable の引数としてテーブル名を渡す事で、SQL 内でこのテーブル名を使用できるようになります。

そのあとは SQL を実行して結果を出力するだけです。

foreach の要素となる org.apache.spark.sql.Row の実体は org.apache.spark.sql.catalyst.expressions.Row トレイトで、このトレイトが Seq トレイトを extends しているため head などの Seq の API も使えます。

GetCountry.scala
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext

import java.net.InetAddress

// (1) スキーマ用のクラス定義
case class IpCountry(startIpNum: Long, endIpNum: Long, countryName: String)

object GetCountry extends App {
    if (args.length < 1) {
        println("<ip address>")
        System.exit(0)
    }

    val countryFile = "GeoIPCountryWhois.csv"

    val sc = new SparkContext("local", "GetCountry")

    val sqlContext = new SQLContext(sc)

    // RDD を SchemaRDD へ暗黙変換するための定義
    import sqlContext.createSchemaRDD

    // (2) CSV ファイルを処理して RDD 作成
    val countries = sc.textFile(countryFile).map(_.replaceAll("\"", "").split(",")).map { d =>
        IpCountry(d(2).toLong, d(3).toLong, d(5))
    }
    // (3) テーブル登録
    countries.registerTempTable("countries")

    val ipNum = Integer.toUnsignedLong( InetAddress.getByName(args(0)).hashCode )
    // (4) SQL 実行
    val rows = sqlContext.sql(s"""
        select
            countryName
        from
            countries
        where
            startIpNum <= ${ipNum} and
            endIpNum >= ${ipNum}
    """)

    rows.foreach( r => println(r.head) )
}

実行 (Gradle 利用)

  • Gradle 2.1

今回は Gradle で実行するため、下記のようなビルド定義ファイルを用意しました。

現時点では、Maven のセントラルリポジトリScala 2.11 用の Spark SQL の JAR ファイルは用意されていないようなので、Scala 2.10.4 を使います。

今回の用途では Spark の標準的なログ出力が邪魔だったので slf4j-log4j12 の代わりに slf4j-nop を使うようにしてログ出力を抑制しました。

build.gradle
apply plugin: 'application'
apply plugin: 'scala'

repositories {
    mavenCentral()
}

dependencies {
    compile 'org.scala-lang:scala-library:2.10.4'
    compile('org.apache.spark:spark-sql_2.10:1.1.0') {
        // Spark のログ出力を抑制
        exclude module: 'slf4j-log4j12'
    }
    runtime 'org.slf4j:slf4j-nop:1.7.7'
}

mainClassName = 'GetCountry'

run {
    if (project.hasProperty('args')) {
        // コマンドライン引数の設定
        args project.args.split(' ')
    }
}

更に、Gradle のログ出力 (タスクの実行経過) も抑制したいので、-q オプションを使って実行しました。

実行結果1
> gradle run -q -Pargs=1.21.127.254

Japan
実行結果2
> gradle run -q -Pargs=223.255.254.1

Singapore