Spark SQL で CSV ファイルを処理2 - GeoLite2
前回の 「Spark SQL で CSV ファイルを処理 - GeoLite Legacy」 に続き、今回は Spark SQL を使って GeoLite2 City CSV ファイルを処理してみます。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20141112/
はじめに
GeoLite2 City の CSV は下記のような 2種類のファイルで構成しています。
GeoLite2-City-Blocks.csv で IP アドレスから geoname_id を割り出し、GeoLite2-City-Locations.csv で geoname_id から国・都市を特定します。
ファイルの内容は下記のようになっており、IP は IPv6 の形式で記載されています。
GeoLite2-City-Blocks.csv の例
network_start_ip,network_prefix_length,geoname_id,registered_country_geoname_id,represented_country_geoname_id,postal_code,latitude,longitude,is_anonymous_proxy,is_satellite_provider ・・・ ::ffff:1.0.64.0,114,1862415,1861060,,,・・・ ・・・ 2602:30a:2c1d::,48,5368361,,,・・・ ・・・
GeoLite2-City-Locations.csv の例
geoname_id,continent_code,continent_name,country_iso_code,country_name,subdivision_iso_code,subdivision_name,city_name,metro_code,time_zone 1862415,AS,Asia,JP,Japan,34,Hiroshima,・・・ ・・・
Spark SQL を使って IP アドレスから都市判定
GeoLite Legacy の Country CSV を処理した前回との違いは、下記 2点です。
- (1) GeoLite2-City-Blocks.csv と GeoLite2-City-Locations.csv の 2つの CSV を geoname_id で join する
- (2) network_start_ip と network_prefix_length を使って IP アドレスの数値の範囲を算出する
(1) は前回と同様に CSV を処理して SQL で join するだけです。 (2) は下記のようにして求める事ができます。
- (a) IP アドレスの開始値は network_start_ip を数値化
- (b) IP アドレスの終了値は (a) の値の下位
128 - network_prefix_length
ビットを全て 1 とした値
今回は IPv4 のみを対象とするため、GeoLite2-City-Blocks.csv の ::ffff:
で始まる行だけを使って (::ffff:
以降がそのまま IPv4 に該当)、上記 (a) と (b) の処理を実装してみました。
注意点として、GeoLite2-City-Locations.csv には subdivision_iso_code 以降が全て空欄のデータも含まれていました。 (例えば 2077456,OC,Oceania,AU,Australia,,,,,
を split(",")
すると Array(2077456, OC, Oceania, AU, Australia)
となってしまいます)
GetCity.scala
import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import java.net.InetAddress // GeoLite2-City-Blocks.csv 用のスキーマ定義 case class IpMapping(startIpNum: Long, endIpNum: Long, geonameId: String) // GeoLite2-City-Locations.csv 用のスキーマ定義 case class City(geonameId: String, country: String, city: String) object GetCity extends App { if (args.length < 1) { println("<ip address>") System.exit(0) } // IPv4 の数値変換 val toIpNum = (ip: String) => Integer.toUnsignedLong(InetAddress.getByName(ip).hashCode()) val locationFile = "GeoLite2-City-Locations.csv" val blockFile = "GeoLite2-City-Blocks.csv" val sc = new SparkContext("local", "GetCity") val sqlContext = new SQLContext(sc) import sqlContext.createSchemaRDD val locations = sc.textFile(locationFile).map(_.split(",")).map { r => // City 情報の無いデータ(subdivision_iso_code 以降が空欄)への対処 val city = if (r.length > 7) r(7) else "" City(r(0), r(4), city) } locations.registerTempTable("locations") // IPv4 のみ (::ffff: で始まるもの) を対象 val blocks = sc.textFile(blockFile).filter(_.startsWith("::ffff:")).map(_.split(",")).map { r => val mask = -1 << (128 - r(1).toInt) // (a) val startIpNum = toIpNum(r(0).replaceAll("::ffff:", "")) // (b) val endIpNum = startIpNum | ~mask IpMapping(startIpNum, endIpNum, r(2)) } blocks.registerTempTable("blocks") val ipNum = toIpNum(args(0)) val rows = sqlContext.sql(s""" select city, country from locations lo join blocks bl on bl.geonameId = lo.geonameId where startIpNum <= ${ipNum} and endIpNum >= ${ipNum} """) rows.foreach( r => println(s"${r(0)}, ${r(1)}") ) }
上記では、IP の終了値 (b) を算出するために、上位ビットを 1、下位ビットを 0 にした mask を作成し、これをビット反転して開始値 (a) と論理和をとっています。
例えば、network_start_ip が ::ffff:1.0.64.0
で network_prefix_length が 114
のデータの場合、(a) の値は 1.0.64.0
を数値化して 16793600
、mask 変数の値は 2進数で ・・・111100000000000000
、(b) の値は mask 変数の値をビット反転した 011111111111111
と (a) の値との論理和で 16809983
となり、16793600 ~ 16809983 の範囲内にある IP アドレスが該当する事になります。
実行 (Gradle 利用)
- Gradle 2.1
前回と同様に Gradle で実行します。
slf4j-nop を使って Spark の標準的なログ出力を抑制している点も同じです。
build.gradle
apply plugin: 'application' apply plugin: 'scala' repositories { mavenCentral() } dependencies { compile 'org.scala-lang:scala-library:2.10.4' compile('org.apache.spark:spark-sql_2.10:1.1.0') { exclude module: 'slf4j-log4j12' } runtime 'org.slf4j:slf4j-nop:1.7.7' } mainClassName = 'GetCity' run { if (project.hasProperty('args')) { args project.args.split(' ') } }
実行結果1
> gradle run -q -Pargs=1.21.127.254 Tokyo, Japan
実行結果2
> gradle run -q -Pargs=223.255.254.1 , Singapore