Spark SQL で CSV ファイルを処理2 - GeoLite2

前回の 「Spark SQL で CSV ファイルを処理 - GeoLite Legacy」 に続き、今回は Spark SQL を使って GeoLite2 City CSV ファイルを処理してみます。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20141112/

はじめに

GeoLite2 City の CSV は下記のような 2種類のファイルで構成しています。

  • GeoLite2-City-Blocks.csv (IP と都市情報とのマッピング
  • GeoLite2-City-Locations.csv (国・都市情報)

GeoLite2-City-Blocks.csv で IP アドレスから geoname_id を割り出し、GeoLite2-City-Locations.csv で geoname_id から国・都市を特定します。

ファイルの内容は下記のようになっており、IP は IPv6 の形式で記載されています。

GeoLite2-City-Blocks.csv の例
network_start_ip,network_prefix_length,geoname_id,registered_country_geoname_id,represented_country_geoname_id,postal_code,latitude,longitude,is_anonymous_proxy,is_satellite_provider
・・・
::ffff:1.0.64.0,114,1862415,1861060,,,・・・
・・・
2602:30a:2c1d::,48,5368361,,,・・・
・・・
GeoLite2-City-Locations.csv の例
geoname_id,continent_code,continent_name,country_iso_code,country_name,subdivision_iso_code,subdivision_name,city_name,metro_code,time_zone
1862415,AS,Asia,JP,Japan,34,Hiroshima,・・・
・・・

Spark SQL を使って IP アドレスから都市判定

GeoLite Legacy の Country CSV を処理した前回との違いは、下記 2点です。

  • (1) GeoLite2-City-Blocks.csv と GeoLite2-City-Locations.csv の 2つの CSV を geoname_id で join する
  • (2) network_start_ip と network_prefix_length を使って IP アドレスの数値の範囲を算出する

(1) は前回と同様に CSV を処理して SQL で join するだけです。 (2) は下記のようにして求める事ができます。

  • (a) IP アドレスの開始値は network_start_ip を数値化
  • (b) IP アドレスの終了値は (a) の値の下位 128 - network_prefix_length ビットを全て 1 とした値

今回は IPv4 のみを対象とするため、GeoLite2-City-Blocks.csv::ffff: で始まる行だけを使って (::ffff: 以降がそのまま IPv4 に該当)、上記 (a) と (b) の処理を実装してみました。

注意点として、GeoLite2-City-Locations.csv には subdivision_iso_code 以降が全て空欄のデータも含まれていました。 (例えば 2077456,OC,Oceania,AU,Australia,,,,,split(",") すると Array(2077456, OC, Oceania, AU, Australia) となってしまいます)

GetCity.scala
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext

import java.net.InetAddress

// GeoLite2-City-Blocks.csv 用のスキーマ定義
case class IpMapping(startIpNum: Long, endIpNum: Long, geonameId: String)
// GeoLite2-City-Locations.csv 用のスキーマ定義
case class City(geonameId: String, country: String, city: String)

object GetCity extends App {
    if (args.length < 1) {
        println("<ip address>")
        System.exit(0)
    }

    // IPv4 の数値変換
    val toIpNum = (ip: String) => Integer.toUnsignedLong(InetAddress.getByName(ip).hashCode())

    val locationFile = "GeoLite2-City-Locations.csv"
    val blockFile = "GeoLite2-City-Blocks.csv"

    val sc = new SparkContext("local", "GetCity")

    val sqlContext = new SQLContext(sc)

    import sqlContext.createSchemaRDD

    val locations = sc.textFile(locationFile).map(_.split(",")).map { r =>
        // City 情報の無いデータ(subdivision_iso_code 以降が空欄)への対処
        val city = if (r.length > 7) r(7) else ""
        City(r(0), r(4), city)
    }

    locations.registerTempTable("locations")

    // IPv4 のみ (::ffff: で始まるもの) を対象
    val blocks = sc.textFile(blockFile).filter(_.startsWith("::ffff:")).map(_.split(",")).map { r =>
        val mask = -1 << (128 - r(1).toInt)
        // (a)
        val startIpNum = toIpNum(r(0).replaceAll("::ffff:", ""))
        // (b)
        val endIpNum = startIpNum | ~mask

        IpMapping(startIpNum, endIpNum, r(2))
    }

    blocks.registerTempTable("blocks")

    val ipNum = toIpNum(args(0))

    val rows = sqlContext.sql(s"""
        select
            city,
            country
        from
            locations lo
            join blocks bl on
                bl.geonameId = lo.geonameId
        where
            startIpNum <= ${ipNum} and
            endIpNum >= ${ipNum}
    """)

    rows.foreach( r => println(s"${r(0)}, ${r(1)}") )
}

上記では、IP の終了値 (b) を算出するために、上位ビットを 1、下位ビットを 0 にした mask を作成し、これをビット反転して開始値 (a) と論理和をとっています。

例えば、network_start_ip が ::ffff:1.0.64.0 で network_prefix_length が 114 のデータの場合、(a) の値は 1.0.64.0 を数値化して 16793600、mask 変数の値は 2進数で ・・・111100000000000000、(b) の値は mask 変数の値をビット反転した 011111111111111 と (a) の値との論理和16809983 となり、16793600 ~ 16809983 の範囲内にある IP アドレスが該当する事になります。

実行 (Gradle 利用)

  • Gradle 2.1

前回と同様に Gradle で実行します。
slf4j-nop を使って Spark の標準的なログ出力を抑制している点も同じです。

build.gradle
apply plugin: 'application'
apply plugin: 'scala'

repositories {
    mavenCentral()
}

dependencies {
    compile 'org.scala-lang:scala-library:2.10.4'
    compile('org.apache.spark:spark-sql_2.10:1.1.0') {
        exclude module: 'slf4j-log4j12'
    }
    runtime 'org.slf4j:slf4j-nop:1.7.7'
}

mainClassName = 'GetCity'

run {
    if (project.hasProperty('args')) {
        args project.args.split(' ')
    }
}
実行結果1
> gradle run -q -Pargs=1.21.127.254

Tokyo, Japan
実行結果2
> gradle run -q -Pargs=223.255.254.1

, Singapore

Spark SQL で CSV ファイルを処理 - GeoLite Legacy

以前、H2 を使って CSV ファイルを SQL で処理しましたが、今回は Spark SQL を使ってみました。

IPアドレスから地域を特定する2 - GeoLite Legacy Country CSV」 で使った GeoLite Legacy Country CSV を使って同様の処理を Spark SQL で実装します。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20141103-2/

Spark SQL を使って IP アドレスから国判定

Spark SQL で扱うテーブルのスキーマを定義する方法はいくつか用意されているようですが、今回はケースクラスをスキーマとして登録する方法で実装しました。

処理の手順は下記のようになります。

  • (1) スキーマ用のクラス定義
  • (2) CSV ファイルを処理して RDD 作成
  • (3) テーブル登録
  • (4) SQL の実行

(2) の処理で (1) のケースクラスを格納した RDD を作成し、(3) の処理で (2) で処理したオブジェクトをテーブルとして登録します。

(2) の処理までは通常の Spark の API を使った処理ですが、import sqlContext.createSchemaRDD によって (3) で registerTempTable メソッドを呼び出す際に RDD から Spark SQLSchemaRDD へ暗黙変換が実施されます。

registerTempTable の引数としてテーブル名を渡す事で、SQL 内でこのテーブル名を使用できるようになります。

そのあとは SQL を実行して結果を出力するだけです。

foreach の要素となる org.apache.spark.sql.Row の実体は org.apache.spark.sql.catalyst.expressions.Row トレイトで、このトレイトが Seq トレイトを extends しているため head などの Seq の API も使えます。

GetCountry.scala
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext

import java.net.InetAddress

// (1) スキーマ用のクラス定義
case class IpCountry(startIpNum: Long, endIpNum: Long, countryName: String)

object GetCountry extends App {
    if (args.length < 1) {
        println("<ip address>")
        System.exit(0)
    }

    val countryFile = "GeoIPCountryWhois.csv"

    val sc = new SparkContext("local", "GetCountry")

    val sqlContext = new SQLContext(sc)

    // RDD を SchemaRDD へ暗黙変換するための定義
    import sqlContext.createSchemaRDD

    // (2) CSV ファイルを処理して RDD 作成
    val countries = sc.textFile(countryFile).map(_.replaceAll("\"", "").split(",")).map { d =>
        IpCountry(d(2).toLong, d(3).toLong, d(5))
    }
    // (3) テーブル登録
    countries.registerTempTable("countries")

    val ipNum = Integer.toUnsignedLong( InetAddress.getByName(args(0)).hashCode )
    // (4) SQL 実行
    val rows = sqlContext.sql(s"""
        select
            countryName
        from
            countries
        where
            startIpNum <= ${ipNum} and
            endIpNum >= ${ipNum}
    """)

    rows.foreach( r => println(r.head) )
}

実行 (Gradle 利用)

  • Gradle 2.1

今回は Gradle で実行するため、下記のようなビルド定義ファイルを用意しました。

現時点では、Maven のセントラルリポジトリScala 2.11 用の Spark SQL の JAR ファイルは用意されていないようなので、Scala 2.10.4 を使います。

今回の用途では Spark の標準的なログ出力が邪魔だったので slf4j-log4j12 の代わりに slf4j-nop を使うようにしてログ出力を抑制しました。

build.gradle
apply plugin: 'application'
apply plugin: 'scala'

repositories {
    mavenCentral()
}

dependencies {
    compile 'org.scala-lang:scala-library:2.10.4'
    compile('org.apache.spark:spark-sql_2.10:1.1.0') {
        // Spark のログ出力を抑制
        exclude module: 'slf4j-log4j12'
    }
    runtime 'org.slf4j:slf4j-nop:1.7.7'
}

mainClassName = 'GetCountry'

run {
    if (project.hasProperty('args')) {
        // コマンドライン引数の設定
        args project.args.split(' ')
    }
}

更に、Gradle のログ出力 (タスクの実行経過) も抑制したいので、-q オプションを使って実行しました。

実行結果1
> gradle run -q -Pargs=1.21.127.254

Japan
実行結果2
> gradle run -q -Pargs=223.255.254.1

Singapore

Arrow (Kleisli) で List モナド - Haskell, Frege, Scalaz

Scalaz でリストモナド - Kleisli による関数合成 」等で試してきた List モナドを使ったチェスのナイト移動の処理を Arrow (Kleisli) を使って実装し直してみました。

Arrow は計算のための汎用的なインターフェースで、モナドを扱うための Arrow として Kleisli があります。

ソースは http://github.com/fits/try_samples/tree/master/blog/20140810/

Haskell の場合

Haskell の Arrow は >>><<< で合成できるようになっています。

Kleisli はモナドを扱うための Arrow なので、下記では List モナドを返す関数 moveKnight を Kleisli へ包んで合成しています。

Kleisli から包んだ関数を取り出すには runKleisli を使います。

3手版

まずは 3手版です。

以前の List モナド版との違いは in3canReachIn3 関数を Arrow で実装し直した点です。

Kleisli を使えば、モナド値が無くてもモナドを返す関数 (通常の値を取ってモナドを返す関数) を簡単に合成できるので in3 はポイントフリースタイルで定義しました。 (このため canReachIn3 関数の引数の順序が 以前のもの と異なっています)

また、通常の関数は Arrow のインスタンスなので、canReachIn3 関数の部分は単純に canReachIn3 end = runKleisli in3 >>> elem end とする事も可能です。

move_knight.hs
import Control.Arrow

type KnightPos = (Int, Int)

moveKnight :: KnightPos -> [KnightPos]
moveKnight (c, r) = filter onBoard
    [
        (c + 2, r - 1), (c + 2, r + 1),
        (c - 2, r - 1), (c - 2, r + 1),
        (c + 1, r - 2), (c + 1, r + 2),
        (c - 1, r - 2), (c - 1, r + 2)
    ]
    where onBoard (c', r') = c' `elem` [1..8] && r' `elem` [1..8]

-- 3手先の移動位置を列挙
in3 :: Kleisli [] KnightPos KnightPos
in3 = Kleisli moveKnight >>> Kleisli moveKnight >>> Kleisli moveKnight

-- 指定位置に3手で到達できるか否かを判定
canReachIn3 :: Arrow a => KnightPos -> a KnightPos Bool
canReachIn3 end = arr (runKleisli in3) >>> arr (elem end)
-- 以下でも可
-- canReachIn3 :: KnightPos -> KnightPos -> Bool
-- canReachIn3 end = runKleisli in3 >>> elem end

main = do
    print $ runKleisli in3 $ (6, 2)

    print $ canReachIn3 (6, 1) $ (6, 2)
    print $ canReachIn3 (7, 3) $ (6, 2)
実行結果
> runghc move_knight.hs

[(8,1),(8,3),・・・
・・・
,(3,4),(3,8)]
True
False

N手版

3手版と同様に inManycanReachInMany 関数を Arrow で実装し直してみました。

move_knight_many.hs
・・・
-- N手先の移動位置を列挙
inMany :: Int -> Kleisli [] KnightPos KnightPos
inMany x = foldr (>>>) returnA (replicate x (Kleisli moveKnight))

-- 指定位置にN手で到達できるか否かを判定
canReachInMany :: Arrow a => Int -> KnightPos -> a KnightPos Bool
canReachInMany x end = arr (runKleisli (inMany x)) >>> arr (elem end)
-- 以下でも可
-- canReachInMany :: Int -> KnightPos -> KnightPos -> Bool
-- canReachInMany x end = runKleisli (inMany x) >>> elem end

main = do
    print $ runKleisli (inMany 3) $ (6, 2)

    print $ canReachInMany 3 (6, 1) $ (6, 2)
    print $ canReachInMany 3 (7, 3) $ (6, 2)
実行結果
> runghc move_knight_many.hs

[(8,1),(8,3),・・・
・・・
,(3,4),(3,8)]
True
False

Frege の場合

Frege は Haskell とほとんど同じ実装になりますが、下記の点が異なります。

  • >>> の代わりに . で Arrow を合成
  • runKleisli の代わりに run を使用

なお、.>>> と合成の向きが異なります。

3手版

3手版です。

move_knight.fr
package sample.MoveKnight where

import frege.control.Arrow
import frege.control.arrow.Kleisli

type KnightPos = (Int, Int)

moveKnight :: KnightPos -> [KnightPos]
moveKnight (c, r) = filter onBoard
    [
        (c + 2, r - 1), (c + 2, r + 1),
        (c - 2, r - 1), (c - 2, r + 1),
        (c + 1, r - 2), (c + 1, r + 2),
        (c - 1, r - 2), (c - 1, r + 2)
    ]
    where onBoard (c', r') = c' `elem` [1..8] && r' `elem` [1..8]

-- 3手先の移動位置を列挙
in3 :: Kleisli [] KnightPos KnightPos
in3 = Kleisli moveKnight . Kleisli moveKnight . Kleisli moveKnight

-- 指定位置に3手で到達できるか否かを判定
canReachIn3 :: Arrow a => KnightPos -> a KnightPos Bool
canReachIn3 end = arr (elem end) . arr in3.run
-- 以下でも可
-- canReachIn3 :: KnightPos -> KnightPos -> Bool
-- canReachIn3 end = elem end . in3.run

main args = do
    println $ in3.run $ (6, 2)

    println $ canReachIn3 (6, 1) $ (6, 2)
    println $ canReachIn3 (7, 3) $ (6, 2)
実行結果
> java -jar frege3.21.586-g026e8d7.jar move_knight.fr
・・・
> java -cp .;frege3.21.586-g026e8d7.jar sample.MoveKnight

[(8, 1), (8, 3), ・・・
・・・
 (3, 4), (3, 8)]
true
false

N手版

N手版です。

move_knight_many.fr
package sample.MoveKnightMany where

・・・
-- N手先の移動位置を列挙
inMany :: Int -> Kleisli [] KnightPos KnightPos
inMany x = foldr (.) id (replicate x (Kleisli moveKnight))

-- 指定位置にN手で到達できるか否かを判定
canReachInMany :: Arrow a => Int -> KnightPos -> a KnightPos Bool
canReachInMany x end = arr (elem end) . arr (inMany x).run
-- 以下でも可
-- canReachInMany :: Int -> KnightPos -> KnightPos -> Bool
-- canReachInMany x end = elem end . (inMany x).run

main args = do
    println $ (inMany 3).run $ (6, 2)

    println $ canReachInMany 3 (6, 1) $ (6, 2)
    println $ canReachInMany 3 (7, 3) $ (6, 2)
実行結果
> java -jar frege3.21.586-g026e8d7.jar move_knight_many.fr
・・・
> java -cp .;frege3.21.586-g026e8d7.jar sample.MoveKnightMany

[(8, 1), (8, 3), ・・・
・・・
 (3, 4), (3, 8)]
true
false

Scalaz の場合

最後に Scalaz を使った Scala による実装です。
Haskell と同様に >>> で Arrow を合成できるようになっています。

3手版

3手版です。

MoveKnight.scala
package sample

import scalaz._
import Scalaz._

object MoveKnight extends App {

    type KnightPos = Tuple2[Int, Int]

    val inRange = (p: Int) => 1 to 8 contains p

    val moveKnight = (p: KnightPos) => List(
        (p._1 + 2, p._2 - 1), (p._1 + 2, p._2 + 1),
        (p._1 - 2, p._2 - 1), (p._1 - 2, p._2 + 1),
        (p._1 + 1, p._2 - 2), (p._1 + 1, p._2 + 2),
        (p._1 - 1, p._2 - 2), (p._1 - 1, p._2 + 2)
    ).filter { case (x, y) => inRange(x) && inRange(y) }

    // 3手先の移動位置を列挙
    val in3 = Kleisli(moveKnight) >>> Kleisli(moveKnight) >>> Kleisli(moveKnight)
    // 以下でも可
    // val in3 = Kleisli(moveKnight) >==> moveKnight >==> moveKnight

    // 指定位置に3手で到達できるか否かを判定
    val canReachIn3 = (end: KnightPos) => in3.run >>> { xs => xs.contains(end) }

    in3 (6, 2) |> println

    (6, 2) |> canReachIn3 (6, 1) |> println
    (6, 2) |> canReachIn3 (7, 3) |> println
}
実行結果
> gradle run

MoveKnight
:compileJava UP-TO-DATE
:compileScala UP-TO-DATE
:processResources UP-TO-DATE
:classes UP-TO-DATE
:run
List((8,1), (8,3), ・・・
・・・
・・・, (3,4), (3,8))
true
false

N手版

N手版です。

MoveKnightMany.scala
package sample

import scalaz._
import Scalaz._

object MoveKnightMany extends App {
    ・・・
    // N手先の移動位置を列挙
    val inMany = (x: Int) => List.fill(x) { Kleisli(moveKnight) }.reduce { (a, b) => a >>> b }
    // 以下でも可
    // val inMany = (x: Int) => List.fill(x) { Kleisli(moveKnight) }.reduce { (a, b) => a >=> b }

    // 指定位置にN手で到達できるか否かを判定
    val canReachInMany = (x: Int) => (end: KnightPos) => inMany(x).run >>> { xs => xs.contains(end) }

    (6, 2) |> inMany(3) |> println

    (6, 2) |> canReachInMany(3)(6, 1) |> println
    (6, 2) |> canReachInMany(3)(7, 3) |> println
}
実行結果
> gradle run -Pmany

MoveKnightMany
:compileJava UP-TO-DATE
:compileScala UP-TO-DATE
:processResources UP-TO-DATE
:classes UP-TO-DATE
:run
List((8,1), (8,3), ・・・
・・・
・・・, (3,4), (3,8))
true
false

なお、ビルドと実行には下記のような Gradle ビルド定義ファイルを使用しました。

build.gradle
apply plugin: 'application'
apply plugin: 'scala'

repositories {
    mavenCentral()
}

dependencies {
    compile 'org.scala-lang:scala-library:2.11.2'
    compile 'org.scalaz:scalaz-core_2.11:7.1.0'
}

if (!hasProperty('many')) {
    println 'MoveKnight'
    mainClassName = 'sample.MoveKnight'
}
else {
    println 'MoveKnightMany'
    mainClassName = 'sample.MoveKnightMany'
}

ジニ不純度の算出 - Groovy, Scala , Java 8, Frege

書籍 「集合知プログラミング」 の 「7章 決定木によるモデリング」 にあったジニ不純度(ジニ係数)の計算を下記の JVM 言語で関数言語的に実装してみました。

今回のソースは http://github.com/fits/try_samples/tree/master/blog/20140601/

はじめに

ジニ不純度の算出は、下記の (1) と (2) で異なるアイテムを取り出す確率を求める事になります。

  • (1) ある集合から 1つアイテムを取り出す
  • (2) 取り出したアイテムを戻して再度 1つアイテムを取り出す

例えば、["A", "B", "B", "C", "B", "A"] のような集合の場合に A、B、C を取り出す確率はそれぞれ以下のようになります。

A = 2/6 = 1/3
B = 3/6 = 1/2
C = 1/6 

ここで、ジニ不純度は以下のような 2通りの方法で算出できます。 (下記の XY は (1) で X が出て (2) で Y が出る確率を表している)

(a) ジニ不純度 = 1 - (AA + BB + CC) = 1 - (1/3 × 1/3 + 1/2 × 1/2 + 1/6 × 1/6) = 11/18 = 0.61
(b) ジニ不純度 = AB + AC + BA + BC + CA + CB = 1/3 × 1/2 + 1/3 × 1/6 + ・・・ = 11/18 = 0.61

(a) の方がシンプルな実装になると思います。

Groovy で実装

それではそれぞれの言語で実装してみます。

Groovy では countBy メソッドで要素毎のカウント値を簡単に取得できます。

異なる要素同士の組み合わせは、今回 nCopiescombinationsfindAll を使って取得しました。

下記で list.countBy {it} の結果は [A:2, B:3, C:1]
nCopies(2, list.countBy { it }) の結果は [[A:2, B:3, C:1], [A:2, B:3, C:1]]
nCopies(2, list.countBy { it }).combinations() の結果は [[A=2, A=2], [B=3, A=2], ・・・, [B=3, C=1], [C=1, C=1]]
となります。

gini.groovy
import static java.util.Collections.nCopies

// (a) 1 - (AA + BB + CC)
def giniA = { xs ->
    1 - xs.countBy { it }*.value.sum { (it / xs.size()) ** 2 }
}

// (b) AB + AC + BA + BC + CA + CB
def giniB = { xs ->
    nCopies(2, xs.countBy { it }).combinations().findAll {
        // 同じ要素同士の組み合わせを除外
        it.first().key != it.last().key
    }.sum {
        (it.first().value / xs.size()) * (it.last().value / xs.size())
    }
}

def list = ['A', 'B', 'B', 'C', 'B', 'A']

println giniA(list)
println giniB(list)
実行結果
> groovy gini.groovy

0.61111111112222222222
0.61111111112222222222

Scala で実装

Scala では Groovy の countBy に該当するメソッドが無さそうだったので groupBy を使いました。

List で combinations(2) とすればリスト内要素の 2要素の組み合わせ (下記では AB、AC、BC の 3種類の組み合わせ) を取得できます。

下記で list.groupBy(identity) の結果は Map(A -> List(A, A), C -> List(C), B -> List(B, B, B)) となります。

gini.scala
import scala.math.pow

// (a) 1 - (AA + BB + CC)
val giniA = (xs: List[_]) => 1 - xs.groupBy(identity).mapValues( v =>
    pow(v.size.toDouble / xs.size, 2)
).values.sum

// (b) AC × 2 + AB × 2 + CB × 2
val giniB = (xs: List[_]) => xs.groupBy(identity).mapValues( v =>
    v.size.toDouble / xs.size
).toList.combinations(2).map( x =>
    x.head._2 * x.last._2 * 2
).sum

val list = List("A", "B", "B", "C", "B", "A")

println( giniA(list) )
println( giniB(list) )
実行結果
> scala gini.scala

0.6111111111111112
0.611111111111111

Java 8 で実装

Java 8 の Stream API では groupingBycounting メソッドを組み合わせて collect すると要素毎のカウントを取得できます。

要素の組み合わせを取得するようなメソッドは無さそうだったので自前で実装しました。

下記で countBy(list) の結果は {A=2, B=3, C=1}
combination(countBy(list)) の結果は [[A=2, B=3], [A=2, C=1], [B=3, A=2], [B=3, C=1], [C=1, A=2], [C=1, B=3]]
のようになります。

Gini.java
import static java.util.stream.Collectors.*;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Stream;

class Gini {
    public static void main(String... args) {
        List<String> list = Arrays.asList("A", "B", "B", "C", "B", "A");

        System.out.println( giniA(list) );
        System.out.println( giniB(list) );
    }

    // (a) 1 - (AA + BB + CC)
    private static double giniA(List<String> xs) {
        return 1 - countBy(xs).values().stream().mapToDouble( x -> Math.pow(x.doubleValue() / xs.size(), 2) ).sum();
    }

    // (b) AB + AC + BA + BC + CA + CB
    private static double giniB(List<String> xs) {
        return combination(countBy(xs)).stream().mapToDouble( s ->
            s.stream().mapToDouble( t -> 
                t.getValue().doubleValue() / xs.size()
            ).reduce(1.0, (a, b) -> a * b ) 
        ).sum();
    }

    private static <T> Map<T, Long> countBy(Collection<T> xs) {
        return xs.stream().collect(groupingBy(Function.identity(), counting()));
    }

    private static <T, S> Collection<? extends List<Map.Entry<T, S>>> combination(Map<T, S> data) {
        return data.entrySet().stream().flatMap( x ->
            data.entrySet().stream().flatMap ( y ->
                (x.getKey().equals(y.getKey()))? Stream.empty(): Stream.of(Arrays.asList(x, y))
            )
        ).collect(toList());
    }
}
実行結果
> java Gini

0.6111111111111112
0.611111111111111

Frege で実装

Frege の group 関数では連続した同じ要素をグルーピングしますので sort してから使う必要があります。

下記で、group . sort $ list の結果は [["A", "A"], ["B", "B", "B"], ["C"]] となります。

組み合わせ (AB, AC 等) の確率計算にはリスト内包表記を使ってみました。

gini.fr
package sample.Gini where

import frege.prelude.Math (**)
import Data.List

size = fromIntegral . length

-- (a) 1 - (AA + BB + CC)
giniA xs = (1 - ) . sum . map calc . group . sort $ xs
    where
        listSize = size xs
        calc x = (size x / listSize) ** 2

-- (b) AB + AC + BA + BC + CA + CB
giniB xs = fold (+) 0 . calcProb . map prob . group . sort $ xs
    where
        listSize = size xs
        prob ys = (head ys, size ys / listSize)
        calcProb zs = [ snd x * snd y | x <- zs, y <- zs, fst x /= fst y]

main args = do
    let list = ["A", "B", "B", "C", "B", "A"]

    println $ giniA list
    println $ giniB list
実行結果
> java -cp .;frege3.21.586-g026e8d7.jar sample.Gini

0.6111111111111112
0.611111111111111
runtime ・・・

備考

giniB 関数の fold (+) 0 の部分は sum でも問題ないように思うのですが 、sum を使うと下記のようなエラーが発生しました。

giniB 関数で sum を使った場合のエラー内容
E sample.fr:14: inferred type is more constrained than expected type
    inferred:  (Real t17561,Show t17561) => [String] -> IO ()
    expected:  [String] -> IO ()

ちなみに、ほぼ同じコードが Haskell で動作するのですが、Haskell の場合は sum を使っても問題ありませんでした。 (gini.hs 参照)

Scala 2.10 で Apache Spark を使用

Apache Spark をソースからビルドして Scala 2.10 上で実行してみました。

サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20131121/

Spark のソースをビルド

まずは、Spark ソースの scala-2.10 ブランチを git clone します。

ソースの取得
$ git clone -b scala-2.10 git://github.com/apache/incubator-spark.git

次に、sbt ディレクトリにある sbt コマンドで assembly を実行します。

なお、現時点のデフォルトでは Hadoop 1.0.4 用のモジュールが生成されるようですが、SPARK_HADOOP_VERSIONSPARK_YARN 環境変数Hadoop のバージョンを変更できるようです。

ビルド
$ cd incubator-spark
$ sbt/sbt assembly

ビルドが正常に完了すると、必要なモジュールを一通り組み込んだ assembly/target/scala-2.10/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop1.0.4.jar が生成されます。 (examples の方にも JAR が作成されます)

Scala 2.10 で実行

それでは、前回とほぼ同じ内容の下記ソースを実行してみます。

src/main/scala/money_count.scala
package fits.sample

import org.apache.spark._
import SparkContext._

object MoneyCount {
    def main(args: Array[String]) {

        val spark = new SparkContext("local", "MoneyCount")

        val file = spark.textFile(args(0))

        val res = file.map {
            (_, 1)
        }.reduceByKey(_ + _)

        res.foreach { t => println(s"${t._1} = ${t._2}") }
    }
}

実行は今回も Gradle で行います。

Spark のソースをビルドしてできた spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop1.0.4.jar を lib ディレクトリへ配置しました。

build.gradle
apply plugin: 'scala'
apply plugin: 'application'

def scalaVersion = '2.10.3'

repositories {
    mavenCentral()
}

dependencies {
    compile "org.scala-lang:scala-library:${scalaVersion}"
    compile files('lib/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop1.0.4.jar')
}

mainClassName = "fits.sample.MoneyCount"

run {
    if (project.hasProperty('args')) {
        args project.args
    }
}

ディレクトリ構成は以下のようにしました。

ディレクトリ構成
- lib
   |_ spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop1.0.4.jar

- src
   |_ main
       |_ scala
       |   |_ money_count.scala
       |
       |_ resources
           |_ log4j.properties

- build.gradle
- input_sample.txt

なお、ログ出力には slf4j-log4j12 が使われるので、とりあえず log4j の設定ファイルを用意しておきました。

src/main/resources/log4j.properties
log4j.rootLogger=WARN, CONSOLE

log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%5p %m%n

gradle run で実行してみると下記のような結果となり、Scala 2.10 上での動作が確認できました。

実行結果

> gradle run -Pargs=input_sample.txt

:compileJava UP-TO-DATE
:compileScala UP-TO-DATE
:processResources
:classes
:run
 WARN Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
 WARN Snappy native library not loaded
1 = 2
10 = 2
10000 = 2
5 = 3
100 = 2
1000 = 3
50 = 1
500 = 1
2000 = 1

BUILD SUCCESSFUL

Java で Apache Spark を使用

以前、sbt を使って Scala で Hadoop MapReduce 実装Groovy で Storm を使う で実施したお金の数え上げ処理を Spark 0.8 を使って Java で実装してみました。

Spark は以前、Spark を使って単純なレコメンドを実施 で 0.4 を試しましたが、0.8 でも API に大きな変化はないようです。(パッケージ名は org.apache.spark へ変わってますが)

サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20131116/

はじめに

実装する処理内容は、下記のようなファイルを読み込んで数値毎にカウントするだけの単純なものです。

input_sample.txt
100
1
5
50
500
1000
10000
1000
1
10
5
5
10
100
1000
10000
2000

Java で実装

実装した処理内容は下記の通りです。

今回は単一のワーカースレッドでローカル実行させるだけなので JavaSparkContext の第1引数に local と指定しています。

複数のワーカースレッドでローカル実行するには local[スレッド数] のように指定します。(例えば 4スレッドの場合は local[4]

src/main/java/fits/sample/MoneyCount.java
package fits.sample;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

class MoneyCount {
    public static void main(String... args) {
        JavaSparkContext spark = new JavaSparkContext("local", "MoneyCount");

        JavaRDD<String> file = spark.textFile(args[0]);

        JavaPairRDD<String, Integer> res = file.map(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String s) {
                return new Tuple2<String, Integer>(s, 1);
            }
        }).reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer t1, Integer t2) {
                return t1 + t2;
            }
        });

        res.foreach(new VoidFunction<Tuple2<String, Integer>>() {
            public void call(Tuple2<String, Integer> t) {
                System.out.println(t._1 + " = " + t._2);
            }
        });
    }
}

gradle でビルド・実行してみます。

Spark 0.8 は古めの Akka ライブラリに依存しているため http://repo.typesafe.com/typesafe/releases/ リポジトリを追加する必要があるようです。

build.gradle
apply plugin: 'java'
apply plugin: 'application'

repositories {
    mavenCentral()

    maven {
        url 'http://repo.typesafe.com/typesafe/releases/'
    }
}

dependencies {
    compile 'org.apache.spark:spark-core_2.9.3:0.8.0-incubating'
    compile 'org.slf4j:slf4j-nop:1.7.5'
}

mainClassName = "fits.sample.MoneyCount"

run {
    if (project.hasProperty('args')) {
        args project.args
    }
}

実行結果は下記のようになります。

実行結果

> gradle run -Pargs=input_sample.txt

:compileJava
:processResources UP-TO-DATE
:classes
:run
13/11/16 02:12:37 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
13/11/16 02:12:37 WARN snappy.LoadSnappy: Snappy native library not loaded
13/11/16 02:12:37 INFO mapred.FileInputFormat: Total input paths to process : 1
1 = 2
10 = 2
10000 = 2
5 = 3
100 = 2
1000 = 3
50 = 1
500 = 1
2000 = 1

BUILD SUCCESSFUL

なお、Java 8 や Groovy で同様の処理を試そうとしてみましたが、実行時に ASM 関係でエラーが発生し、正常に実行させる事はできませんでした。

Scala で実装

同じ処理を Scala で実装すると下記のようにもっとシンプルになります。

src/main/scala/money_count.scala
package fits.sample

import org.apache.spark._
import SparkContext._

object MoneyCount {
    def main(args: Array[String]) {
        val spark = new SparkContext("local", "MoneyCount")

        val file = spark.textFile(args(0))

        val res = file.map {
            (_, 1)
        }.reduceByKey(_ + _)

        res.foreach { t => println(t._1 + " = " + t._2) }
    }
}

Maven セントラルリポジトリには、Scala 2.10 用の Spark モジュールが見当たらないため、基本的に Scala 2.9 で実行する事になります。

ちなみに、Spark(バージョンは 0.9)をソースからビルドすれば Scala 2.10 で実行できました。(git://github.com/apache/incubator-spark.git の scala-2.10 ブランチ使用)

build.gradle
apply plugin: 'scala'
apply plugin: 'application'

def scalaVersion = '2.9.3'

repositories {
    mavenCentral()

    maven {
        url 'http://repo.typesafe.com/typesafe/releases/'
    }
}

dependencies {
    compile "org.scala-lang:scala-library:${scalaVersion}"
    compile "org.apache.spark:spark-core_${scalaVersion}:0.8.0-incubating"
    compile 'org.slf4j:slf4j-nop:1.7.5'
}

mainClassName = "fits.sample.MoneyCount"

run {
    if (project.hasProperty('args')) {
        args project.args
    }
}

実行結果

> gradle run -Pargs=input_sample.txt

:compileJava UP-TO-DATE
:compileScala UP-TO-DATE
:processResources UP-TO-DATE
:classes UP-TO-DATE
:run
13/11/16 01:38:39 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
13/11/16 01:38:39 WARN snappy.LoadSnappy: Snappy native library not loaded
13/11/16 01:38:39 INFO mapred.FileInputFormat: Total input paths to process : 1
1 = 2
10 = 2
10000 = 2
5 = 3
100 = 2
1000 = 3
50 = 1
500 = 1
2000 = 1

BUILD SUCCESSFUL

Gradle の Scala プラグインで -Xprint オプションを使用

Gradle の Scala プラグインで -Xprint オプションを試してみました。

  • Gradle 1.7

-Xprint はコンパイル途中のコードを出力する Scala コンパイラのオプションで、
-Xprint:<フェーズ> のようにコンパイルフェーズを指定して使用します。

例えば -Xprint:typer と指定する事で implicit による暗黙変換などを処理した後のコードが出力されます。

-Xprint オプション指定方法

Gradle の Scala プラグインで -Xprint のようなオプションを指定するには下記のように compileScala.scalaCompileOptions.additionalParameters を使います。

build.gradle 設定例 (-Xprint:typer 指定)
apply plugin: 'scala'
・・・
compileScala {
    scalaCompileOptions.additionalParameters = ['-Xprint:typer', ・・・]
}

ここで compileScala.scalaCompileOptions.useAnt の値によって、使用される Scala コンパイラクラスや -Xprint の出力に差が生じる点に注意が必要です。

useAnt の値 Scala コンパイラクラス -Xprint の出力
true AntScalaCompiler gradle コマンドで -i オプションを指定すると出力される
false ZincScalaCompiler 特に何もしなくても出力される

なお、useAnt は true がデフォルト値です。

-Xprint:typer を指定したビルド例

それでは、下記のような build.gradle とサンプルソースを使って -Xprint:typer を試してみます。

サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20130928/

ビルド定義 build.gradle
apply plugin: 'scala'

repositories {
    mavenCentral()
}

dependencies {
    compile 'org.scala-lang:scala-library:2.10.2'
    compile 'org.scalaz:scalaz-core_2.10:7.1.0-M3'
}

compileScala {
    scalaCompileOptions.additionalParameters = ['-Xprint:typer', '-feature']
    // (1)
    scalaCompileOptions.useAnt = false
    // (2)
    // scalaCompileOptions.useAnt = true
}
サンプルソース Sample.scala
package fits.sample

import scala.language.postfixOps

import scalaz._
import Scalaz._

object Sample extends App {

    val plus3: Int => Int = 3 +
    val times: Int => Int = 2 *

    // 2 * (3 + 4) = 14
    println( 4 |> plus3 >>> times )

    // (3 + 4, 2 * 5) = (7, 10)
    println( (4, 5) |> plus3 *** times )

    // (3 + 5, 2 * 5) = (8, 10)
    println( 5 |> plus3 &&& times )
}

(1) useAnt = false の場合

useAnt を false に変更した場合の gradle build 結果は下記の通りです。
(-Xprint:typer の内容が出力されます)

実行結果
> gralde build

:compileJava UP-TO-DATE
:compileScala
[[syntax trees at end of                     typer]] // Sample.scala
package fits.sample {
  import scala.language.postfixOps;
  import scalaz._;
  import scalaz.Scalaz._;
  object Sample extends AnyRef with App {
    def <init>(): fits.sample.Sample.type = {
      Sample.super.<init>();
      ()
    };
    private[this] val plus3: Int => Int = {
      ((x: Int) => 3.+(x))
    };
    <stable> <accessor> def plus3: Int => Int = Sample.this.plus3;
    private[this] val times: Int => Int = {
      ((x: Int) => 2.*(x))
    };
    <stable> <accessor> def times: Int => Int = Sample.this.times;
    scala.this.Predef.println(scalaz.Scalaz.ToIdOps[Int](4).|>[Int](scalaz.Scalaz.ToComposeOps[Function1, Int, Int](Sample.this.plus3)(scalaz.Scalaz.function1Instance).>>>[Int](Sample.this.times)));
    scala.this.Predef.println(scalaz.Scalaz.ToIdOps[(Int, Int)](scala.Tuple2.apply[Int, Int](4, 5)).|>[(Int, Int)](scalaz.Scalaz.ToArrowOps[Function1, Int, Int](Sample.this.plus3)(scalaz.Scalaz.function1Instance).***[Int, Int](Sample.this.times)));
    scala.this.Predef.println(scalaz.Scalaz.ToIdOps[Int](5).|>[(Int, Int)](scalaz.Scalaz.ToArrowOps[Function1, Int, Int](Sample.this.plus3)(scalaz.Scalaz.function1Instance).&&&[Int](Sample.this.times)))
  }
}

:processResources UP-TO-DATE
:classes
:jar
:assemble
:compileTestJava UP-TO-DATE
:compileTestScala UP-TO-DATE
:processTestResources UP-TO-DATE
:testClasses UP-TO-DATE
:test UP-TO-DATE
:check UP-TO-DATE
:build

BUILD SUCCESSFUL

(2) useAnt = true の場合

useAnt が true (デフォルト値) の場合の gradle build 結果は下記の通りです。
(-Xprint:typer の内容は出力されません)

実行結果1
> gralde build

:compileJava UP-TO-DATE
:compileScala
:processResources UP-TO-DATE
:classes
:jar
:assemble
:compileTestJava UP-TO-DATE
:compileTestScala UP-TO-DATE
:processTestResources UP-TO-DATE
:testClasses UP-TO-DATE
:test UP-TO-DATE
:check UP-TO-DATE
:build

BUILD SUCCESSFUL

ここで gradle build -i と実行すれば -Xprint:typer の内容が出力されます。

実行結果2
> gralde build -i

・・・
[ant:scalac] [[syntax trees at end of                     typer]] // Sample.scala
[ant:scalac] package fits.sample {

[ant:scalac]   import scala.language.postfixOps;

[ant:scalac]   import scalaz._;

[ant:scalac]   import scalaz.Scalaz._;

[ant:scalac]   object Sample extends AnyRef with App {

[ant:scalac]     def <init>(): fits.sample.Sample.type = {

[ant:scalac]       Sample.super.<init>();

[ant:scalac]       ()

[ant:scalac]     };

[ant:scalac]     private[this] val plus3: Int => Int = {

[ant:scalac]       ((x: Int) => 3.+(x))

[ant:scalac]     };

[ant:scalac]     <stable> <accessor> def plus3: Int => Int = Sample.this.plus3;

[ant:scalac]     private[this] val times: Int => Int = {

[ant:scalac]       ((x: Int) => 2.*(x))

[ant:scalac]     };

[ant:scalac]     <stable> <accessor> def times: Int => Int = Sample.this.times;

[ant:scalac]     scala.this.Predef.println(scalaz.Scalaz.ToIdOps[Int](4).|>[Int](scalaz.Scalaz.ToComposeOps[Function1, Int, Int](Sample.this.plus3)(scalaz.Scalaz.function1Instance).>>>[Int](Sample.this.times)));

[ant:scalac]     scala.this.Predef.println(scalaz.Scalaz.ToIdOps[(Int, Int)](scala.Tuple2.apply[Int, Int](4, 5)).|>[(Int, Int)](scalaz.Scalaz.ToArrowOps[Function1, Int, Int](Sample.this.plus3)(scalaz.Scalaz.function1Instance).***[Int, Int](Sample.this.times)));

[ant:scalac]     scala.this.Predef.println(scalaz.Scalaz.ToIdOps[Int](5).|>[(Int, Int)](scalaz.Scalaz.ToArrowOps[Function1, Int, Int](Sample.this.plus3)(scalaz.Scalaz.function1Instance).&&&[Int](Sample.this.times)))

[ant:scalac]   }

[ant:scalac] }
[ant:scalac] 
:compileScala (Thread[main,5,main]) - complete
・・・