Spark SQL で CSV ファイルを処理2 - GeoLite2
前回の 「Spark SQL で CSV ファイルを処理 - GeoLite Legacy」 に続き、今回は Spark SQL を使って GeoLite2 City CSV ファイルを処理してみます。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20141112/
はじめに
GeoLite2 City の CSV は下記のような 2種類のファイルで構成しています。
GeoLite2-City-Blocks.csv で IP アドレスから geoname_id を割り出し、GeoLite2-City-Locations.csv で geoname_id から国・都市を特定します。
ファイルの内容は下記のようになっており、IP は IPv6 の形式で記載されています。
GeoLite2-City-Blocks.csv の例
network_start_ip,network_prefix_length,geoname_id,registered_country_geoname_id,represented_country_geoname_id,postal_code,latitude,longitude,is_anonymous_proxy,is_satellite_provider ・・・ ::ffff:1.0.64.0,114,1862415,1861060,,,・・・ ・・・ 2602:30a:2c1d::,48,5368361,,,・・・ ・・・
GeoLite2-City-Locations.csv の例
geoname_id,continent_code,continent_name,country_iso_code,country_name,subdivision_iso_code,subdivision_name,city_name,metro_code,time_zone 1862415,AS,Asia,JP,Japan,34,Hiroshima,・・・ ・・・
Spark SQL を使って IP アドレスから都市判定
GeoLite Legacy の Country CSV を処理した前回との違いは、下記 2点です。
- (1) GeoLite2-City-Blocks.csv と GeoLite2-City-Locations.csv の 2つの CSV を geoname_id で join する
- (2) network_start_ip と network_prefix_length を使って IP アドレスの数値の範囲を算出する
(1) は前回と同様に CSV を処理して SQL で join するだけです。 (2) は下記のようにして求める事ができます。
- (a) IP アドレスの開始値は network_start_ip を数値化
- (b) IP アドレスの終了値は (a) の値の下位
128 - network_prefix_length
ビットを全て 1 とした値
今回は IPv4 のみを対象とするため、GeoLite2-City-Blocks.csv の ::ffff:
で始まる行だけを使って (::ffff:
以降がそのまま IPv4 に該当)、上記 (a) と (b) の処理を実装してみました。
注意点として、GeoLite2-City-Locations.csv には subdivision_iso_code 以降が全て空欄のデータも含まれていました。 (例えば 2077456,OC,Oceania,AU,Australia,,,,,
を split(",")
すると Array(2077456, OC, Oceania, AU, Australia)
となってしまいます)
GetCity.scala
import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import java.net.InetAddress // GeoLite2-City-Blocks.csv 用のスキーマ定義 case class IpMapping(startIpNum: Long, endIpNum: Long, geonameId: String) // GeoLite2-City-Locations.csv 用のスキーマ定義 case class City(geonameId: String, country: String, city: String) object GetCity extends App { if (args.length < 1) { println("<ip address>") System.exit(0) } // IPv4 の数値変換 val toIpNum = (ip: String) => Integer.toUnsignedLong(InetAddress.getByName(ip).hashCode()) val locationFile = "GeoLite2-City-Locations.csv" val blockFile = "GeoLite2-City-Blocks.csv" val sc = new SparkContext("local", "GetCity") val sqlContext = new SQLContext(sc) import sqlContext.createSchemaRDD val locations = sc.textFile(locationFile).map(_.split(",")).map { r => // City 情報の無いデータ(subdivision_iso_code 以降が空欄)への対処 val city = if (r.length > 7) r(7) else "" City(r(0), r(4), city) } locations.registerTempTable("locations") // IPv4 のみ (::ffff: で始まるもの) を対象 val blocks = sc.textFile(blockFile).filter(_.startsWith("::ffff:")).map(_.split(",")).map { r => val mask = -1 << (128 - r(1).toInt) // (a) val startIpNum = toIpNum(r(0).replaceAll("::ffff:", "")) // (b) val endIpNum = startIpNum | ~mask IpMapping(startIpNum, endIpNum, r(2)) } blocks.registerTempTable("blocks") val ipNum = toIpNum(args(0)) val rows = sqlContext.sql(s""" select city, country from locations lo join blocks bl on bl.geonameId = lo.geonameId where startIpNum <= ${ipNum} and endIpNum >= ${ipNum} """) rows.foreach( r => println(s"${r(0)}, ${r(1)}") ) }
上記では、IP の終了値 (b) を算出するために、上位ビットを 1、下位ビットを 0 にした mask を作成し、これをビット反転して開始値 (a) と論理和をとっています。
例えば、network_start_ip が ::ffff:1.0.64.0
で network_prefix_length が 114
のデータの場合、(a) の値は 1.0.64.0
を数値化して 16793600
、mask 変数の値は 2進数で ・・・111100000000000000
、(b) の値は mask 変数の値をビット反転した 011111111111111
と (a) の値との論理和で 16809983
となり、16793600 ~ 16809983 の範囲内にある IP アドレスが該当する事になります。
実行 (Gradle 利用)
- Gradle 2.1
前回と同様に Gradle で実行します。
slf4j-nop を使って Spark の標準的なログ出力を抑制している点も同じです。
build.gradle
apply plugin: 'application' apply plugin: 'scala' repositories { mavenCentral() } dependencies { compile 'org.scala-lang:scala-library:2.10.4' compile('org.apache.spark:spark-sql_2.10:1.1.0') { exclude module: 'slf4j-log4j12' } runtime 'org.slf4j:slf4j-nop:1.7.7' } mainClassName = 'GetCity' run { if (project.hasProperty('args')) { args project.args.split(' ') } }
実行結果1
> gradle run -q -Pargs=1.21.127.254 Tokyo, Japan
実行結果2
> gradle run -q -Pargs=223.255.254.1 , Singapore
Spark SQL で CSV ファイルを処理 - GeoLite Legacy
以前、H2 を使って CSV ファイルを SQL で処理しましたが、今回は Spark SQL を使ってみました。
「IPアドレスから地域を特定する2 - GeoLite Legacy Country CSV」 で使った GeoLite Legacy Country CSV を使って同様の処理を Spark SQL で実装します。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20141103-2/
Spark SQL を使って IP アドレスから国判定
Spark SQL で扱うテーブルのスキーマを定義する方法はいくつか用意されているようですが、今回はケースクラスをスキーマとして登録する方法で実装しました。
処理の手順は下記のようになります。
(2) の処理で (1) のケースクラスを格納した RDD
を作成し、(3) の処理で (2) で処理したオブジェクトをテーブルとして登録します。
(2) の処理までは通常の Spark の API を使った処理ですが、import sqlContext.createSchemaRDD
によって (3) で registerTempTable
メソッドを呼び出す際に RDD
から Spark SQL の SchemaRDD
へ暗黙変換が実施されます。
registerTempTable
の引数としてテーブル名を渡す事で、SQL 内でこのテーブル名を使用できるようになります。
そのあとは SQL を実行して結果を出力するだけです。
foreach の要素となる org.apache.spark.sql.Row
の実体は org.apache.spark.sql.catalyst.expressions.Row
トレイトで、このトレイトが Seq
トレイトを extends しているため head
などの Seq の API も使えます。
GetCountry.scala
import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import java.net.InetAddress // (1) スキーマ用のクラス定義 case class IpCountry(startIpNum: Long, endIpNum: Long, countryName: String) object GetCountry extends App { if (args.length < 1) { println("<ip address>") System.exit(0) } val countryFile = "GeoIPCountryWhois.csv" val sc = new SparkContext("local", "GetCountry") val sqlContext = new SQLContext(sc) // RDD を SchemaRDD へ暗黙変換するための定義 import sqlContext.createSchemaRDD // (2) CSV ファイルを処理して RDD 作成 val countries = sc.textFile(countryFile).map(_.replaceAll("\"", "").split(",")).map { d => IpCountry(d(2).toLong, d(3).toLong, d(5)) } // (3) テーブル登録 countries.registerTempTable("countries") val ipNum = Integer.toUnsignedLong( InetAddress.getByName(args(0)).hashCode ) // (4) SQL 実行 val rows = sqlContext.sql(s""" select countryName from countries where startIpNum <= ${ipNum} and endIpNum >= ${ipNum} """) rows.foreach( r => println(r.head) ) }
実行 (Gradle 利用)
- Gradle 2.1
今回は Gradle で実行するため、下記のようなビルド定義ファイルを用意しました。
現時点では、Maven のセントラルリポジトリに Scala 2.11 用の Spark SQL の JAR ファイルは用意されていないようなので、Scala 2.10.4 を使います。
今回の用途では Spark の標準的なログ出力が邪魔だったので slf4j-log4j12 の代わりに slf4j-nop を使うようにしてログ出力を抑制しました。
build.gradle
apply plugin: 'application' apply plugin: 'scala' repositories { mavenCentral() } dependencies { compile 'org.scala-lang:scala-library:2.10.4' compile('org.apache.spark:spark-sql_2.10:1.1.0') { // Spark のログ出力を抑制 exclude module: 'slf4j-log4j12' } runtime 'org.slf4j:slf4j-nop:1.7.7' } mainClassName = 'GetCountry' run { if (project.hasProperty('args')) { // コマンドライン引数の設定 args project.args.split(' ') } }
更に、Gradle のログ出力 (タスクの実行経過) も抑制したいので、-q
オプションを使って実行しました。
実行結果1
> gradle run -q -Pargs=1.21.127.254 Japan
実行結果2
> gradle run -q -Pargs=223.255.254.1 Singapore
Arrow (Kleisli) で List モナド - Haskell, Frege, Scalaz
「Scalaz でリストモナド - Kleisli による関数合成 」等で試してきた List モナドを使ったチェスのナイト移動の処理を Arrow (Kleisli) を使って実装し直してみました。
Arrow は計算のための汎用的なインターフェースで、モナドを扱うための Arrow として Kleisli があります。
ソースは http://github.com/fits/try_samples/tree/master/blog/20140810/
Haskell の場合
Haskell の Arrow は >>>
や <<<
で合成できるようになっています。
Kleisli はモナドを扱うための Arrow なので、下記では List モナドを返す関数 moveKnight を Kleisli へ包んで合成しています。
Kleisli から包んだ関数を取り出すには runKleisli
を使います。
3手版
まずは 3手版です。
以前の List モナド版との違いは in3
と canReachIn3
関数を Arrow で実装し直した点です。
Kleisli を使えば、モナド値が無くてもモナドを返す関数 (通常の値を取ってモナドを返す関数) を簡単に合成できるので in3
はポイントフリースタイルで定義しました。 (このため canReachIn3 関数の引数の順序が 以前のもの と異なっています)
また、通常の関数は Arrow のインスタンスなので、canReachIn3 関数の部分は単純に canReachIn3 end = runKleisli in3 >>> elem end
とする事も可能です。
move_knight.hs
import Control.Arrow type KnightPos = (Int, Int) moveKnight :: KnightPos -> [KnightPos] moveKnight (c, r) = filter onBoard [ (c + 2, r - 1), (c + 2, r + 1), (c - 2, r - 1), (c - 2, r + 1), (c + 1, r - 2), (c + 1, r + 2), (c - 1, r - 2), (c - 1, r + 2) ] where onBoard (c', r') = c' `elem` [1..8] && r' `elem` [1..8] -- 3手先の移動位置を列挙 in3 :: Kleisli [] KnightPos KnightPos in3 = Kleisli moveKnight >>> Kleisli moveKnight >>> Kleisli moveKnight -- 指定位置に3手で到達できるか否かを判定 canReachIn3 :: Arrow a => KnightPos -> a KnightPos Bool canReachIn3 end = arr (runKleisli in3) >>> arr (elem end) -- 以下でも可 -- canReachIn3 :: KnightPos -> KnightPos -> Bool -- canReachIn3 end = runKleisli in3 >>> elem end main = do print $ runKleisli in3 $ (6, 2) print $ canReachIn3 (6, 1) $ (6, 2) print $ canReachIn3 (7, 3) $ (6, 2)
実行結果
> runghc move_knight.hs [(8,1),(8,3),・・・ ・・・ ,(3,4),(3,8)] True False
N手版
3手版と同様に inMany
と canReachInMany
関数を Arrow で実装し直してみました。
move_knight_many.hs
・・・ -- N手先の移動位置を列挙 inMany :: Int -> Kleisli [] KnightPos KnightPos inMany x = foldr (>>>) returnA (replicate x (Kleisli moveKnight)) -- 指定位置にN手で到達できるか否かを判定 canReachInMany :: Arrow a => Int -> KnightPos -> a KnightPos Bool canReachInMany x end = arr (runKleisli (inMany x)) >>> arr (elem end) -- 以下でも可 -- canReachInMany :: Int -> KnightPos -> KnightPos -> Bool -- canReachInMany x end = runKleisli (inMany x) >>> elem end main = do print $ runKleisli (inMany 3) $ (6, 2) print $ canReachInMany 3 (6, 1) $ (6, 2) print $ canReachInMany 3 (7, 3) $ (6, 2)
実行結果
> runghc move_knight_many.hs [(8,1),(8,3),・・・ ・・・ ,(3,4),(3,8)] True False
Frege の場合
Frege は Haskell とほとんど同じ実装になりますが、下記の点が異なります。
>>>
の代わりに.
で Arrow を合成runKleisli
の代わりにrun
を使用
なお、.
は >>>
と合成の向きが異なります。
3手版
3手版です。
move_knight.fr
package sample.MoveKnight where import frege.control.Arrow import frege.control.arrow.Kleisli type KnightPos = (Int, Int) moveKnight :: KnightPos -> [KnightPos] moveKnight (c, r) = filter onBoard [ (c + 2, r - 1), (c + 2, r + 1), (c - 2, r - 1), (c - 2, r + 1), (c + 1, r - 2), (c + 1, r + 2), (c - 1, r - 2), (c - 1, r + 2) ] where onBoard (c', r') = c' `elem` [1..8] && r' `elem` [1..8] -- 3手先の移動位置を列挙 in3 :: Kleisli [] KnightPos KnightPos in3 = Kleisli moveKnight . Kleisli moveKnight . Kleisli moveKnight -- 指定位置に3手で到達できるか否かを判定 canReachIn3 :: Arrow a => KnightPos -> a KnightPos Bool canReachIn3 end = arr (elem end) . arr in3.run -- 以下でも可 -- canReachIn3 :: KnightPos -> KnightPos -> Bool -- canReachIn3 end = elem end . in3.run main args = do println $ in3.run $ (6, 2) println $ canReachIn3 (6, 1) $ (6, 2) println $ canReachIn3 (7, 3) $ (6, 2)
実行結果
> java -jar frege3.21.586-g026e8d7.jar move_knight.fr ・・・ > java -cp .;frege3.21.586-g026e8d7.jar sample.MoveKnight [(8, 1), (8, 3), ・・・ ・・・ (3, 4), (3, 8)] true false
N手版
N手版です。
move_knight_many.fr
package sample.MoveKnightMany where ・・・ -- N手先の移動位置を列挙 inMany :: Int -> Kleisli [] KnightPos KnightPos inMany x = foldr (.) id (replicate x (Kleisli moveKnight)) -- 指定位置にN手で到達できるか否かを判定 canReachInMany :: Arrow a => Int -> KnightPos -> a KnightPos Bool canReachInMany x end = arr (elem end) . arr (inMany x).run -- 以下でも可 -- canReachInMany :: Int -> KnightPos -> KnightPos -> Bool -- canReachInMany x end = elem end . (inMany x).run main args = do println $ (inMany 3).run $ (6, 2) println $ canReachInMany 3 (6, 1) $ (6, 2) println $ canReachInMany 3 (7, 3) $ (6, 2)
実行結果
> java -jar frege3.21.586-g026e8d7.jar move_knight_many.fr ・・・ > java -cp .;frege3.21.586-g026e8d7.jar sample.MoveKnightMany [(8, 1), (8, 3), ・・・ ・・・ (3, 4), (3, 8)] true false
Scalaz の場合
最後に Scalaz を使った Scala による実装です。
Haskell と同様に >>>
で Arrow を合成できるようになっています。
3手版
3手版です。
MoveKnight.scala
package sample import scalaz._ import Scalaz._ object MoveKnight extends App { type KnightPos = Tuple2[Int, Int] val inRange = (p: Int) => 1 to 8 contains p val moveKnight = (p: KnightPos) => List( (p._1 + 2, p._2 - 1), (p._1 + 2, p._2 + 1), (p._1 - 2, p._2 - 1), (p._1 - 2, p._2 + 1), (p._1 + 1, p._2 - 2), (p._1 + 1, p._2 + 2), (p._1 - 1, p._2 - 2), (p._1 - 1, p._2 + 2) ).filter { case (x, y) => inRange(x) && inRange(y) } // 3手先の移動位置を列挙 val in3 = Kleisli(moveKnight) >>> Kleisli(moveKnight) >>> Kleisli(moveKnight) // 以下でも可 // val in3 = Kleisli(moveKnight) >==> moveKnight >==> moveKnight // 指定位置に3手で到達できるか否かを判定 val canReachIn3 = (end: KnightPos) => in3.run >>> { xs => xs.contains(end) } in3 (6, 2) |> println (6, 2) |> canReachIn3 (6, 1) |> println (6, 2) |> canReachIn3 (7, 3) |> println }
実行結果
> gradle run MoveKnight :compileJava UP-TO-DATE :compileScala UP-TO-DATE :processResources UP-TO-DATE :classes UP-TO-DATE :run List((8,1), (8,3), ・・・ ・・・ ・・・, (3,4), (3,8)) true false
N手版
N手版です。
MoveKnightMany.scala
package sample import scalaz._ import Scalaz._ object MoveKnightMany extends App { ・・・ // N手先の移動位置を列挙 val inMany = (x: Int) => List.fill(x) { Kleisli(moveKnight) }.reduce { (a, b) => a >>> b } // 以下でも可 // val inMany = (x: Int) => List.fill(x) { Kleisli(moveKnight) }.reduce { (a, b) => a >=> b } // 指定位置にN手で到達できるか否かを判定 val canReachInMany = (x: Int) => (end: KnightPos) => inMany(x).run >>> { xs => xs.contains(end) } (6, 2) |> inMany(3) |> println (6, 2) |> canReachInMany(3)(6, 1) |> println (6, 2) |> canReachInMany(3)(7, 3) |> println }
実行結果
> gradle run -Pmany MoveKnightMany :compileJava UP-TO-DATE :compileScala UP-TO-DATE :processResources UP-TO-DATE :classes UP-TO-DATE :run List((8,1), (8,3), ・・・ ・・・ ・・・, (3,4), (3,8)) true false
なお、ビルドと実行には下記のような Gradle ビルド定義ファイルを使用しました。
build.gradle
apply plugin: 'application' apply plugin: 'scala' repositories { mavenCentral() } dependencies { compile 'org.scala-lang:scala-library:2.11.2' compile 'org.scalaz:scalaz-core_2.11:7.1.0' } if (!hasProperty('many')) { println 'MoveKnight' mainClassName = 'sample.MoveKnight' } else { println 'MoveKnightMany' mainClassName = 'sample.MoveKnightMany' }
ジニ不純度の算出 - Groovy, Scala , Java 8, Frege
書籍 「集合知プログラミング」 の 「7章 決定木によるモデリング」 にあったジニ不純度(ジニ係数)の計算を下記の JVM 言語で関数言語的に実装してみました。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20140601/
はじめに
ジニ不純度の算出は、下記の (1) と (2) で異なるアイテムを取り出す確率を求める事になります。
- (1) ある集合から 1つアイテムを取り出す
- (2) 取り出したアイテムを戻して再度 1つアイテムを取り出す
例えば、["A", "B", "B", "C", "B", "A"]
のような集合の場合に A、B、C を取り出す確率はそれぞれ以下のようになります。
A = 2/6 = 1/3 B = 3/6 = 1/2 C = 1/6
ここで、ジニ不純度は以下のような 2通りの方法で算出できます。
(下記の XY
は (1) で X が出て (2) で Y が出る確率を表している)
(a) ジニ不純度 = 1 - (AA + BB + CC) = 1 - (1/3 × 1/3 + 1/2 × 1/2 + 1/6 × 1/6) = 11/18 = 0.61 (b) ジニ不純度 = AB + AC + BA + BC + CA + CB = 1/3 × 1/2 + 1/3 × 1/6 + ・・・ = 11/18 = 0.61
(a) の方がシンプルな実装になると思います。
Groovy で実装
それではそれぞれの言語で実装してみます。
Groovy では countBy
メソッドで要素毎のカウント値を簡単に取得できます。
異なる要素同士の組み合わせは、今回 nCopies
、combinations
、findAll
を使って取得しました。
下記で list.countBy {it}
の結果は [A:2, B:3, C:1]
、
nCopies(2, list.countBy { it })
の結果は [[A:2, B:3, C:1], [A:2, B:3, C:1]]
、
nCopies(2, list.countBy { it }).combinations()
の結果は [[A=2, A=2], [B=3, A=2], ・・・, [B=3, C=1], [C=1, C=1]]
となります。
gini.groovy
import static java.util.Collections.nCopies // (a) 1 - (AA + BB + CC) def giniA = { xs -> 1 - xs.countBy { it }*.value.sum { (it / xs.size()) ** 2 } } // (b) AB + AC + BA + BC + CA + CB def giniB = { xs -> nCopies(2, xs.countBy { it }).combinations().findAll { // 同じ要素同士の組み合わせを除外 it.first().key != it.last().key }.sum { (it.first().value / xs.size()) * (it.last().value / xs.size()) } } def list = ['A', 'B', 'B', 'C', 'B', 'A'] println giniA(list) println giniB(list)
実行結果
> groovy gini.groovy 0.61111111112222222222 0.61111111112222222222
Scala で実装
Scala では Groovy の countBy に該当するメソッドが無さそうだったので groupBy
を使いました。
List で combinations(2)
とすればリスト内要素の 2要素の組み合わせ (下記では AB、AC、BC の 3種類の組み合わせ) を取得できます。
下記で list.groupBy(identity)
の結果は Map(A -> List(A, A), C -> List(C), B -> List(B, B, B))
となります。
gini.scala
import scala.math.pow // (a) 1 - (AA + BB + CC) val giniA = (xs: List[_]) => 1 - xs.groupBy(identity).mapValues( v => pow(v.size.toDouble / xs.size, 2) ).values.sum // (b) AC × 2 + AB × 2 + CB × 2 val giniB = (xs: List[_]) => xs.groupBy(identity).mapValues( v => v.size.toDouble / xs.size ).toList.combinations(2).map( x => x.head._2 * x.last._2 * 2 ).sum val list = List("A", "B", "B", "C", "B", "A") println( giniA(list) ) println( giniB(list) )
実行結果
> scala gini.scala 0.6111111111111112 0.611111111111111
Java 8 で実装
Java 8 の Stream API では groupingBy
と counting
メソッドを組み合わせて collect
すると要素毎のカウントを取得できます。
要素の組み合わせを取得するようなメソッドは無さそうだったので自前で実装しました。
下記で countBy(list)
の結果は {A=2, B=3, C=1}
、
combination(countBy(list))
の結果は [[A=2, B=3], [A=2, C=1], [B=3, A=2], [B=3, C=1], [C=1, A=2], [C=1, B=3]]
のようになります。
Gini.java
import static java.util.stream.Collectors.*; import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.function.Function; import java.util.stream.Stream; class Gini { public static void main(String... args) { List<String> list = Arrays.asList("A", "B", "B", "C", "B", "A"); System.out.println( giniA(list) ); System.out.println( giniB(list) ); } // (a) 1 - (AA + BB + CC) private static double giniA(List<String> xs) { return 1 - countBy(xs).values().stream().mapToDouble( x -> Math.pow(x.doubleValue() / xs.size(), 2) ).sum(); } // (b) AB + AC + BA + BC + CA + CB private static double giniB(List<String> xs) { return combination(countBy(xs)).stream().mapToDouble( s -> s.stream().mapToDouble( t -> t.getValue().doubleValue() / xs.size() ).reduce(1.0, (a, b) -> a * b ) ).sum(); } private static <T> Map<T, Long> countBy(Collection<T> xs) { return xs.stream().collect(groupingBy(Function.identity(), counting())); } private static <T, S> Collection<? extends List<Map.Entry<T, S>>> combination(Map<T, S> data) { return data.entrySet().stream().flatMap( x -> data.entrySet().stream().flatMap ( y -> (x.getKey().equals(y.getKey()))? Stream.empty(): Stream.of(Arrays.asList(x, y)) ) ).collect(toList()); } }
実行結果
> java Gini 0.6111111111111112 0.611111111111111
Frege で実装
Frege の group
関数では連続した同じ要素をグルーピングしますので sort
してから使う必要があります。
下記で、group . sort $ list
の結果は [["A", "A"], ["B", "B", "B"], ["C"]]
となります。
組み合わせ (AB, AC 等) の確率計算にはリスト内包表記を使ってみました。
gini.fr
package sample.Gini where import frege.prelude.Math (**) import Data.List size = fromIntegral . length -- (a) 1 - (AA + BB + CC) giniA xs = (1 - ) . sum . map calc . group . sort $ xs where listSize = size xs calc x = (size x / listSize) ** 2 -- (b) AB + AC + BA + BC + CA + CB giniB xs = fold (+) 0 . calcProb . map prob . group . sort $ xs where listSize = size xs prob ys = (head ys, size ys / listSize) calcProb zs = [ snd x * snd y | x <- zs, y <- zs, fst x /= fst y] main args = do let list = ["A", "B", "B", "C", "B", "A"] println $ giniA list println $ giniB list
実行結果
> java -cp .;frege3.21.586-g026e8d7.jar sample.Gini 0.6111111111111112 0.611111111111111 runtime ・・・
備考
giniB 関数の fold (+) 0
の部分は sum
でも問題ないように思うのですが 、sum を使うと下記のようなエラーが発生しました。
giniB 関数で sum を使った場合のエラー内容
E sample.fr:14: inferred type is more constrained than expected type inferred: (Real t17561,Show t17561) => [String] -> IO () expected: [String] -> IO ()
ちなみに、ほぼ同じコードが Haskell で動作するのですが、Haskell の場合は sum を使っても問題ありませんでした。 (gini.hs 参照)
Scala 2.10 で Apache Spark を使用
Apache Spark をソースからビルドして Scala 2.10 上で実行してみました。
サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20131121/
Spark のソースをビルド
まずは、Spark ソースの scala-2.10
ブランチを git clone
します。
ソースの取得
$ git clone -b scala-2.10 git://github.com/apache/incubator-spark.git
次に、sbt ディレクトリにある sbt
コマンドで assembly
を実行します。
なお、現時点のデフォルトでは Hadoop 1.0.4 用のモジュールが生成されるようですが、SPARK_HADOOP_VERSION
と SPARK_YARN
環境変数で Hadoop のバージョンを変更できるようです。
ビルド
$ cd incubator-spark $ sbt/sbt assembly
ビルドが正常に完了すると、必要なモジュールを一通り組み込んだ assembly/target/scala-2.10/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop1.0.4.jar
が生成されます。 (examples の方にも JAR が作成されます)
Scala 2.10 で実行
それでは、前回とほぼ同じ内容の下記ソースを実行してみます。
src/main/scala/money_count.scala
package fits.sample import org.apache.spark._ import SparkContext._ object MoneyCount { def main(args: Array[String]) { val spark = new SparkContext("local", "MoneyCount") val file = spark.textFile(args(0)) val res = file.map { (_, 1) }.reduceByKey(_ + _) res.foreach { t => println(s"${t._1} = ${t._2}") } } }
実行は今回も Gradle で行います。
Spark のソースをビルドしてできた spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop1.0.4.jar
を lib ディレクトリへ配置しました。
build.gradle
apply plugin: 'scala' apply plugin: 'application' def scalaVersion = '2.10.3' repositories { mavenCentral() } dependencies { compile "org.scala-lang:scala-library:${scalaVersion}" compile files('lib/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop1.0.4.jar') } mainClassName = "fits.sample.MoneyCount" run { if (project.hasProperty('args')) { args project.args } }
ディレクトリ構成は以下のようにしました。
ディレクトリ構成
- lib |_ spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop1.0.4.jar - src |_ main |_ scala | |_ money_count.scala | |_ resources |_ log4j.properties - build.gradle - input_sample.txt
なお、ログ出力には slf4j-log4j12 が使われるので、とりあえず log4j の設定ファイルを用意しておきました。
src/main/resources/log4j.properties
log4j.rootLogger=WARN, CONSOLE log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout log4j.appender.CONSOLE.layout.ConversionPattern=%5p %m%n
gradle run
で実行してみると下記のような結果となり、Scala 2.10 上での動作が確認できました。
実行結果
> gradle run -Pargs=input_sample.txt :compileJava UP-TO-DATE :compileScala UP-TO-DATE :processResources :classes :run WARN Unable to load native-hadoop library for your platform... using builtin-java classes where applicable WARN Snappy native library not loaded 1 = 2 10 = 2 10000 = 2 5 = 3 100 = 2 1000 = 3 50 = 1 500 = 1 2000 = 1 BUILD SUCCESSFUL
Java で Apache Spark を使用
以前、sbt を使って Scala で Hadoop MapReduce 実装 や Groovy で Storm を使う で実施したお金の数え上げ処理を Spark 0.8 を使って Java で実装してみました。
Spark は以前、Spark を使って単純なレコメンドを実施 で 0.4 を試しましたが、0.8 でも API に大きな変化はないようです。(パッケージ名は org.apache.spark へ変わってますが)
サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20131116/
はじめに
実装する処理内容は、下記のようなファイルを読み込んで数値毎にカウントするだけの単純なものです。
input_sample.txt
100 1 5 50 500 1000 10000 1000 1 10 5 5 10 100 1000 10000 2000
Java で実装
- Java SE 7.0
実装した処理内容は下記の通りです。
今回は単一のワーカースレッドでローカル実行させるだけなので JavaSparkContext の第1引数に local と指定しています。
複数のワーカースレッドでローカル実行するには local[スレッド数]
のように指定します。(例えば 4スレッドの場合は local[4]
)
src/main/java/fits/sample/MoneyCount.java
package fits.sample; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; class MoneyCount { public static void main(String... args) { JavaSparkContext spark = new JavaSparkContext("local", "MoneyCount"); JavaRDD<String> file = spark.textFile(args[0]); JavaPairRDD<String, Integer> res = file.map(new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }).reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer t1, Integer t2) { return t1 + t2; } }); res.foreach(new VoidFunction<Tuple2<String, Integer>>() { public void call(Tuple2<String, Integer> t) { System.out.println(t._1 + " = " + t._2); } }); } }
gradle でビルド・実行してみます。
Spark 0.8 は古めの Akka ライブラリに依存しているため http://repo.typesafe.com/typesafe/releases/ リポジトリを追加する必要があるようです。
build.gradle
apply plugin: 'java' apply plugin: 'application' repositories { mavenCentral() maven { url 'http://repo.typesafe.com/typesafe/releases/' } } dependencies { compile 'org.apache.spark:spark-core_2.9.3:0.8.0-incubating' compile 'org.slf4j:slf4j-nop:1.7.5' } mainClassName = "fits.sample.MoneyCount" run { if (project.hasProperty('args')) { args project.args } }
実行結果は下記のようになります。
実行結果
> gradle run -Pargs=input_sample.txt :compileJava :processResources UP-TO-DATE :classes :run 13/11/16 02:12:37 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 13/11/16 02:12:37 WARN snappy.LoadSnappy: Snappy native library not loaded 13/11/16 02:12:37 INFO mapred.FileInputFormat: Total input paths to process : 1 1 = 2 10 = 2 10000 = 2 5 = 3 100 = 2 1000 = 3 50 = 1 500 = 1 2000 = 1 BUILD SUCCESSFUL
なお、Java 8 や Groovy で同様の処理を試そうとしてみましたが、実行時に ASM 関係でエラーが発生し、正常に実行させる事はできませんでした。
Scala で実装
同じ処理を Scala で実装すると下記のようにもっとシンプルになります。
src/main/scala/money_count.scala
package fits.sample import org.apache.spark._ import SparkContext._ object MoneyCount { def main(args: Array[String]) { val spark = new SparkContext("local", "MoneyCount") val file = spark.textFile(args(0)) val res = file.map { (_, 1) }.reduceByKey(_ + _) res.foreach { t => println(t._1 + " = " + t._2) } } }
Maven セントラルリポジトリには、Scala 2.10 用の Spark モジュールが見当たらないため、基本的に Scala 2.9 で実行する事になります。
ちなみに、Spark(バージョンは 0.9)をソースからビルドすれば Scala 2.10 で実行できました。(git://github.com/apache/incubator-spark.git の scala-2.10 ブランチ使用)
build.gradle
apply plugin: 'scala' apply plugin: 'application' def scalaVersion = '2.9.3' repositories { mavenCentral() maven { url 'http://repo.typesafe.com/typesafe/releases/' } } dependencies { compile "org.scala-lang:scala-library:${scalaVersion}" compile "org.apache.spark:spark-core_${scalaVersion}:0.8.0-incubating" compile 'org.slf4j:slf4j-nop:1.7.5' } mainClassName = "fits.sample.MoneyCount" run { if (project.hasProperty('args')) { args project.args } }
実行結果
> gradle run -Pargs=input_sample.txt :compileJava UP-TO-DATE :compileScala UP-TO-DATE :processResources UP-TO-DATE :classes UP-TO-DATE :run 13/11/16 01:38:39 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 13/11/16 01:38:39 WARN snappy.LoadSnappy: Snappy native library not loaded 13/11/16 01:38:39 INFO mapred.FileInputFormat: Total input paths to process : 1 1 = 2 10 = 2 10000 = 2 5 = 3 100 = 2 1000 = 3 50 = 1 500 = 1 2000 = 1 BUILD SUCCESSFUL
Gradle の Scala プラグインで -Xprint オプションを使用
Gradle の Scala プラグインで -Xprint オプションを試してみました。
- Gradle 1.7
-Xprint はコンパイル途中のコードを出力する Scala コンパイラのオプションで、
-Xprint:<フェーズ>
のようにコンパイルフェーズを指定して使用します。
例えば -Xprint:typer
と指定する事で implicit による暗黙変換などを処理した後のコードが出力されます。
-Xprint オプション指定方法
Gradle の Scala プラグインで -Xprint のようなオプションを指定するには下記のように compileScala.scalaCompileOptions.additionalParameters
を使います。
build.gradle 設定例 (-Xprint:typer 指定)
apply plugin: 'scala' ・・・ compileScala { scalaCompileOptions.additionalParameters = ['-Xprint:typer', ・・・] }
ここで compileScala.scalaCompileOptions.useAnt
の値によって、使用される Scala コンパイラクラスや -Xprint の出力に差が生じる点に注意が必要です。
useAnt の値 | Scala コンパイラクラス | -Xprint の出力 |
---|---|---|
true | AntScalaCompiler | gradle コマンドで -i オプションを指定すると出力される |
false | ZincScalaCompiler | 特に何もしなくても出力される |
なお、useAnt は true
がデフォルト値です。
-Xprint:typer を指定したビルド例
それでは、下記のような build.gradle とサンプルソースを使って -Xprint:typer を試してみます。
サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20130928/
ビルド定義 build.gradle
apply plugin: 'scala' repositories { mavenCentral() } dependencies { compile 'org.scala-lang:scala-library:2.10.2' compile 'org.scalaz:scalaz-core_2.10:7.1.0-M3' } compileScala { scalaCompileOptions.additionalParameters = ['-Xprint:typer', '-feature'] // (1) scalaCompileOptions.useAnt = false // (2) // scalaCompileOptions.useAnt = true }
サンプルソース Sample.scala
package fits.sample import scala.language.postfixOps import scalaz._ import Scalaz._ object Sample extends App { val plus3: Int => Int = 3 + val times: Int => Int = 2 * // 2 * (3 + 4) = 14 println( 4 |> plus3 >>> times ) // (3 + 4, 2 * 5) = (7, 10) println( (4, 5) |> plus3 *** times ) // (3 + 5, 2 * 5) = (8, 10) println( 5 |> plus3 &&& times ) }
(1) useAnt = false の場合
useAnt を false
に変更した場合の gradle build
結果は下記の通りです。
(-Xprint:typer の内容が出力されます)
実行結果
> gralde build :compileJava UP-TO-DATE :compileScala [[syntax trees at end of typer]] // Sample.scala package fits.sample { import scala.language.postfixOps; import scalaz._; import scalaz.Scalaz._; object Sample extends AnyRef with App { def <init>(): fits.sample.Sample.type = { Sample.super.<init>(); () }; private[this] val plus3: Int => Int = { ((x: Int) => 3.+(x)) }; <stable> <accessor> def plus3: Int => Int = Sample.this.plus3; private[this] val times: Int => Int = { ((x: Int) => 2.*(x)) }; <stable> <accessor> def times: Int => Int = Sample.this.times; scala.this.Predef.println(scalaz.Scalaz.ToIdOps[Int](4).|>[Int](scalaz.Scalaz.ToComposeOps[Function1, Int, Int](Sample.this.plus3)(scalaz.Scalaz.function1Instance).>>>[Int](Sample.this.times))); scala.this.Predef.println(scalaz.Scalaz.ToIdOps[(Int, Int)](scala.Tuple2.apply[Int, Int](4, 5)).|>[(Int, Int)](scalaz.Scalaz.ToArrowOps[Function1, Int, Int](Sample.this.plus3)(scalaz.Scalaz.function1Instance).***[Int, Int](Sample.this.times))); scala.this.Predef.println(scalaz.Scalaz.ToIdOps[Int](5).|>[(Int, Int)](scalaz.Scalaz.ToArrowOps[Function1, Int, Int](Sample.this.plus3)(scalaz.Scalaz.function1Instance).&&&[Int](Sample.this.times))) } } :processResources UP-TO-DATE :classes :jar :assemble :compileTestJava UP-TO-DATE :compileTestScala UP-TO-DATE :processTestResources UP-TO-DATE :testClasses UP-TO-DATE :test UP-TO-DATE :check UP-TO-DATE :build BUILD SUCCESSFUL
(2) useAnt = true の場合
useAnt が true
(デフォルト値) の場合の gradle build
結果は下記の通りです。
(-Xprint:typer の内容は出力されません)
実行結果1
> gralde build :compileJava UP-TO-DATE :compileScala :processResources UP-TO-DATE :classes :jar :assemble :compileTestJava UP-TO-DATE :compileTestScala UP-TO-DATE :processTestResources UP-TO-DATE :testClasses UP-TO-DATE :test UP-TO-DATE :check UP-TO-DATE :build BUILD SUCCESSFUL
ここで gradle build -i
と実行すれば -Xprint:typer の内容が出力されます。
実行結果2
> gralde build -i ・・・ [ant:scalac] [[syntax trees at end of typer]] // Sample.scala [ant:scalac] package fits.sample { [ant:scalac] import scala.language.postfixOps; [ant:scalac] import scalaz._; [ant:scalac] import scalaz.Scalaz._; [ant:scalac] object Sample extends AnyRef with App { [ant:scalac] def <init>(): fits.sample.Sample.type = { [ant:scalac] Sample.super.<init>(); [ant:scalac] () [ant:scalac] }; [ant:scalac] private[this] val plus3: Int => Int = { [ant:scalac] ((x: Int) => 3.+(x)) [ant:scalac] }; [ant:scalac] <stable> <accessor> def plus3: Int => Int = Sample.this.plus3; [ant:scalac] private[this] val times: Int => Int = { [ant:scalac] ((x: Int) => 2.*(x)) [ant:scalac] }; [ant:scalac] <stable> <accessor> def times: Int => Int = Sample.this.times; [ant:scalac] scala.this.Predef.println(scalaz.Scalaz.ToIdOps[Int](4).|>[Int](scalaz.Scalaz.ToComposeOps[Function1, Int, Int](Sample.this.plus3)(scalaz.Scalaz.function1Instance).>>>[Int](Sample.this.times))); [ant:scalac] scala.this.Predef.println(scalaz.Scalaz.ToIdOps[(Int, Int)](scala.Tuple2.apply[Int, Int](4, 5)).|>[(Int, Int)](scalaz.Scalaz.ToArrowOps[Function1, Int, Int](Sample.this.plus3)(scalaz.Scalaz.function1Instance).***[Int, Int](Sample.this.times))); [ant:scalac] scala.this.Predef.println(scalaz.Scalaz.ToIdOps[Int](5).|>[(Int, Int)](scalaz.Scalaz.ToArrowOps[Function1, Int, Int](Sample.this.plus3)(scalaz.Scalaz.function1Instance).&&&[Int](Sample.this.times))) [ant:scalac] } [ant:scalac] } [ant:scalac] :compileScala (Thread[main,5,main]) - complete ・・・