Spark MLlibのロジスティック回帰のサンプルを動かすまでの流れのメモです。
以下の流れで、動かしていきます。

  • HadoopとSparkのセットアップ
  • サンプルのデータ(sample_libsvm_data.txt)を見る
  • サンプル(logistic regression)を動かしてみる

このエントリの実行内容は、MacOS上のSpark vsersion 2.0.1で確認しています。

試すサンプル:
http://spark.apache.org/docs/latest/ml-classification-regression.html#logistic-regression

HadoopとSparkのセットアップ

Hadoopは、以下のエントリの「Hadoopとhiveのセットアップ」と同じなので省略。
# hiveを入れる必要は無いですが。

Mac上でのhiveセットアップ手順(mysql上にmetastoreを作成)
http://takemikami.com/2016/04/20/Machivemysqlmetastore.html

Sparkも、以下のエントリの「apache-sparkのセットアップ」と同じなので省略。

Mac上でのspark-sqlのセットアップ手順
http://takemikami.com/2016/06/16/Macsparksql.html

サンプルのデータ(sample_libsvm_data.txt)を見る

次にサンプルデータを読み込んでSparkで中身を見てみます。

サンプルをhdfsにコピー

以下の手順で、サンプルデータ(sample_libsvm_data.txt)をhdfsにコピーします。

$ hadoop fs -mkdir -p /tmp/mllib
$ hadoop fs -cp file://$(brew --prefix apache-spark)/libexec/data/mllib/sample_libsvm_data.txt /tmp/mllib/sample_libsvm_data.txt

コピー出来たことを確認します。

$ hadoop fs -ls /tmp/mllib

サンプルデータを見てみる

spark-shellを起動し、サンプルデータをdataframeに読み込みます。

$ spark-shell
scala> val dataset = spark.read.format("libsvm").load("/tmp/mllib/sample_libsvm_data.txt")
dataset: org.apache.spark.sql.DataFrame = [label: double, features: vector]

取り込んだサンプルデータのdataframeを少し見てみます。

スキーマの表示

scala>  dataset.printSchema()
root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)

データの表示

scala> dataset.select("label", "features").show()
+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(692,[127,128,129...|
|  1.0|(692,[158,159,160...|
|  1.0|(692,[124,125,126...|
|  1.0|(692,[152,153,154...|
|  1.0|(692,[151,152,153...|
※途中省略
+-----+--------------------+
only showing top 20 rows

1件目のレコードを細かめに

scala> dataset.first()
res2: org.apache.spark.sql.Row = [0.0,(692,[127,128,129,130,131,154,155,156,157,158,159,181,182,183,184,185,186,187,188,189,207,208,209,210,211,212,213,214,215,216,217,235,236,237,238,239,240,241,242,243,244,245,262,263,264,265,266,267,268,269,270,271,272,273,289,290,291,292,293,294,295,296,297,300,301,302,316,317,318,319,320,321,328,329,330,343,344,345,346,347,348,349,356,357,358,371,372,373,374,384,385,386,399,400,401,412,413,414,426,427,428,429,440,441,442,454,455,456,457,466,467,468,469,470,482,483,484,493,494,495,496,497,510,511,512,520,521,522,523,538,539,540,547,548,549,550,566,567,568,569,570,571,572,573,574,575,576,577,578,594,595,596,597,598,599,600,601,602,603,604,622,623,624,625,626,627,628,629,630,651,652,653,654,655,656,657],[51.0,159.0,253.0,159.0,50.0,48.0,238.0,252.0,25...

scala>  dataset.first().getAs[Double]("label")
res3: Double = 0.0

scala>  dataset.first().getAs[org.apache.spark.ml.linalg.SparseVector]("features").foreachActive( (k,v) => println(k + ":" + v) )
127:51.0
128:159.0
129:253.0
130:159.0
※省略

正例負例別のレコード数

scala> dataset.groupBy("label").count().show()
+-----+-----+
|label|count|
+-----+-----+
|  0.0|   43|
|  1.0|   57|
+-----+-----+

サンプル(logistic regression)を動かしてみる

データセットを教師データと検証データに分割します。

scala> val Array(training, valid) = dataset.randomSplit(Array(0.8, 0.2))
scala>  training.count()
res6: Long = 82

scala> valid.count()
res7: Long = 18

教師データを使って学習させます。

scala> import org.apache.spark.ml.classification.LogisticRegression
scala> val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8)
scala> val lrModel = lr.fit(training)

検証データに対して予測を行います。

scala> val predicted = lrModel.transform(valid)

検証データによる予測の精度を確認します。

scala> predicted.filter($"label" === $"prediction").count().toDouble / predicted.count()
res8: Double = 0.8888888888888888

以上で、チュートリアルに従った、サンプルの実行ができました。

このエントリは手始めということで、
次はhiveのクエリで取得したデータで、学習させる流れをエントリに書きたいと思います。