Apache Spark でロジスティック回帰
以前 ※ に R や Julia で試したロジスティック回帰を Apache Spark の MLlib (Machine Learning Library) を使って実施してみました。
サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20150427/
※「 R でロジスティック回帰 - glm, MCMCpack 」、「 Julia でロジスティック回帰-glm 」
はじめに
R の時と同じデータを使いますが、ヘッダー行を削除しています。(「R でロジスティック回帰 - glm, MCMCpack」 参照)
データ data4a.csv
8,1,9.76,C 8,6,10.48,C 8,5,10.83,C ・・・
データ内容は以下の通り。個体 i それぞれにおいて 「 個の観察種子のうち生きていて発芽能力があるものは
個」 となっています。
項目 | 内容 |
---|---|
N | 観察種子数 |
y | 生存種子数 |
x | 植物の体サイズ |
f | 施肥処理 (C: 肥料なし, T: 肥料あり) |
体サイズ x
と肥料による施肥処理 f
が種子の生存する確率(ある個体 i から得られた種子が生存している確率)にどのように影響しているかをロジスティック回帰で解析します。
MLlib によるロジスティック回帰
今回は org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
を使用します。
LogisticRegressionWithLBFGS について
LogisticRegressionWithLBFGS
で以前と同様のロジスティック回帰を実施するには以下が必要です。
setIntercept
で true を設定
この値が false (デフォルト値) の場合、結果の intercept 値が 0 になります。
なお、今回のように二項分布を使う場合は numClasses の値を変更する必要はありませんが (デフォルト値が 2 のため)、応答変数が 3状態以上の多項分布を使う場合は setNumClasses
で状態数に応じた値を設定します。
LabeledPoint について
LogisticRegressionWithLBFGS
へ与えるデータは LabeledPoint
で用意します。
R や Julia では 応答変数 ~ 説明変数1 + 説明変数2 + ・・・
のように応答変数と説明変数を指定しましたが、LabeledPoint
では下記のようにメンバー変数で表現します。
メンバー変数 | 応答変数・説明変数 |
---|---|
label | 応答変数 |
features | 説明変数 |
値は Double
とする必要がありますので、f 項目のような文字列値は数値化します。
更に、二項分布を使う場合 (numClasses = 2) は応答変数の値が 0 か 1 でなければなりません。
LabeledPoint への変換例
例えば、以下のようなデータを応答変数 y 項目、説明変数 x と f 項目の LabeledPoint
へ変換する場合
変換前のデータ (N = 8, y = 6)
8,6,10.48,C
次のようになります。
変換後のデータイメージ
LabeledPoint(label: 1.0, features: Vector(10.48, 0.0)) LabeledPoint(label: 1.0, features: Vector(10.48, 0.0)) LabeledPoint(label: 1.0, features: Vector(10.48, 0.0)) LabeledPoint(label: 1.0, features: Vector(10.48, 0.0)) LabeledPoint(label: 1.0, features: Vector(10.48, 0.0)) LabeledPoint(label: 1.0, features: Vector(10.48, 0.0)) LabeledPoint(label: 0.0, features: Vector(10.48, 0.0)) LabeledPoint(label: 0.0, features: Vector(10.48, 0.0))
8個(N)の中で 6個(y)生存していたデータのため、
label
(応答変数) の値が 1.0 (生存) のデータ 6個と 0.0 のデータ 2個へ変換します。
ちなみに、f 項目の値が C
の場合は 0.0、T
の場合は 1.0 としています。
実装
実装してみると以下のようになります。
LogisticRegression.scala
import org.apache.spark.SparkContext import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.Vectors object LogisticRegression extends App { // f項目の値を数値へ変換 val factor = (s: String) => s match { case "C" => 0 case _ => 1 } val sc = new SparkContext("local", "LogisticRegression") // データの準備 (100行のデータ -> 800個の LabeledPoint) val rdd = sc.textFile(args(0)).map(_.split(",")).flatMap { d => val n = d(0).toInt val x = d(1).toInt // 説明変数の値 val v = Vectors.dense(d(2).toDouble, factor(d(3))) // 応答変数が 1 のデータ x 個と 0 のデータ n - x 個を作成 List.fill(x)( LabeledPoint(1, v) ) ++ List.fill(n -x)( LabeledPoint(0, v) ) } // ロジスティック回帰の実行 val res = new LogisticRegressionWithLBFGS() // .setNumClasses(2) //省略可 .setIntercept(true) .run(rdd) println(res) }
ビルド
以下のような Gradle ビルド定義ファイルを使って実行します。
build.gradle
apply plugin: 'scala' apply plugin: 'application' mainClassName = 'LogisticRegression' repositories { jcenter() } dependencies { compile 'org.scala-lang:scala-library:2.11.6' compile('org.apache.spark:spark-mllib_2.11:1.3.1') { // ログ出力の抑制 exclude module: 'slf4j-log4j12' } // ログ出力の抑制 runtime 'org.slf4j:slf4j-nop:1.7.12' } run { if (project.hasProperty('args')) { args project.args.split(' ') } }
不要な WARN ログ出力を抑制するため以下のファイルも用意しました。
src/main/resources/log4j.properties
log4j.rootLogger=off
実行
実行結果は以下の通りです。
実行結果
> gradle run -Pargs=data4a.csv :clean :compileJava UP-TO-DATE :compileScala :processResources :classes :run (weights=[1.952347703282676,2.021401680901667], intercept=-19.535421113192506) BUILD SUCCESSFUL
以前に実施した R の結果 (Estimate の値) とほとんど同じ値になっています。
R の glm 関数による結果
Coefficients: Estimate Std. Error z value Pr(>|z|) (Intercept) -19.5361 1.4138 -13.82 <2e-16 *** x 1.9524 0.1389 14.06 <2e-16 *** fT 2.0215 0.2313 8.74 <2e-16 ***