Spark上でmahoutを使ってitemsimilarityを計算する場合は、
以下の記事で紹介されている「spark-itemsimilarity」のバッチを使って
実行することができます。

Intro to Cooccurrence Recommenders with Spark | mahout https://mahout.apache.org/users/algorithms/intro-cooccurrence-spark.html

このエントリでは、このバッチと同様のことを、
バッチではなくspark-shellからscalaのコードで実行する流れを紹介します。

  • データの準備
  • spark-shellでのmahoutの利用
  • itemsimilarityの実行

データの準備

まず、テスト用のサンプルデータを用意します。

MovieLensのデータをhdfsに登録

動作確認用にMovieLensのデータをhdfsにします。

以下のURLサイトから、ml-latest-small.zipをダウンロードします。

MovieLens | grouplens
https://grouplens.org/datasets/movielens/

ml-latest-small.zip (size: 1 MB)
↑このように記載されているリンクからダウンロード

ダウンロードしたファイルを展開します。

$ unzip ./ml-latest-small.zip

展開したファイルからratings.csvをhdfsにアップロードします。

$ hadoop fs -mkdir -p /sampledata/ml/ratings
$ hadoop fs -put ml-latest-small/ratings.csv /sampledata/ml/ratings/

spark-shellでのmahoutの利用

次に、spark-shellからitemsimilarityの計算を実行してみます。

spark-shellの起動

spark, mahoutはセットアップ出来ていていれば、
以下のコマンドでmahoutライブラリを読み込んた状態でspark-shellを起動できます。

$ mahout spark-shell

# macosの場合であれば、
# brewでapache-spark, mahoutをinstallし、
# JAVA_HOME, SPARK_HOME, MAHOUT_HOME あたりの
# 環境変数が指定されていれば大丈夫だと思います。
#
# hadoopの環境構築は、以下の記事などを参考にしてください。
# Mac上でのhiveセットアップ手順(mysql上にmetastoreを作成)
# http://takemikami.com/2016/04/20/Machivemysqlmetastore.html

mahoutをインストールしなくても、
以下のようにオプションを指定して起動した後に、
明示的にmahoutのライブラリをimportすれば、
先ほどの方法と同じ状態にすることが出来ます。

起動コマンド

$ spark-shell \
    --packages org.apache.mahout:mahout-spark_2.10:0.13.0,org.apache.mahout:mahout-hdfs:0.13.0,org.apache.mahout:mahout-integration:0.13.0,org.apache.mahout:mahout-math:0.13.0,org.apache.mahout:mahout-math-scala_2.10:0.13.0,org.apache.mahout:mahout-mr:0.13.0,org.apache.hadoop:hadoop-client:2.8.0 \
    --conf spark.serializer=org.apache.spark.serializer.KryoSerializer

初期化(明示的なmahoutライブラリのインポート)

scala> import org.apache.mahout.math._
scala> import org.apache.mahout.math.scalabindings._
scala> import org.apache.mahout.math.drm._
scala> import org.apache.mahout.math.scalabindings.RLikeOps._
scala> import org.apache.mahout.math.drm.RLikeDrmOps._
scala> import org.apache.mahout.sparkbindings._
scala> implicit val sdc: org.apache.mahout.sparkbindings.SparkDistributedContext = sc2sdc(sc)

mahoutの利用

前項のようにspark-shellを起動出来ていれば、
次のようなコードで行列演算を行うことが出来ます。

scala> val matrixA = dense((1, 2, 3), (3, 4, 5))
scala> val matrixB = dense((1, 3), (2, 4), (5, 6))
scala> val A = drmParallelize(matrixA)
scala> val B = drmParallelize(matrixB)
scala> val C = A %*% B
scala> val matrixC = C.collect
scala> print(matrixC)
{
 0 =>   {0:20.0,1:29.0}
 1 =>   {0:36.0,1:55.0}
}

上記のコードのmatrixA,B,Cがdense(密行列オブジェクト)で、
A,B,Cが分散環境用の行列、DRM(distributed row matrix)です。

上記で行った処理は、以下のような流れです。

  • denseで行列を定義
  • drmParallelizeで行列を、DRMに変換
  • 「C = A %*% B」で行列の内積を計算
  • collectでdenseに戻して表示

このあたりの内容は、
以下URLの線形代数の演算をやってみるチュートリアルが参考になります。

Playing with Mahout’s Spark Shell | mahout
https://mahout.apache.org/users/sparkbindings/play-with-shell.html

さらに詳しく知りたい場合は、以下にマニュアルがあります。

Mahout Scala BindingsandMahout Spark Bindingsfor Linear Algebra Subroutines
http://apache.github.io/mahout/doc/ScalaSparkBindings.html

itemsimilarityの実行

spark-shellからmahoutのライブラリを利用出来ることが確認できたので、
次はitemsimilarityを計算する処理を実行してみます。

spark-itemsimilarityのバッチ処理が以下のコードなので、
これをspark-shell上で順に実行していく流れになります。

https://github.com/apache/mahout/blob/master/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala

ratingsファイルの読み込み

以下のコードのようにして、先ほどhdfsに登録したratingsデータを読み込みます。

scala> import org.apache.mahout.math.indexeddataset.indexedDatasetDFSReadElements
scala> import org.apache.mahout.common.HDFSPathSearch
scala> import org.apache.mahout.math.indexeddataset.Schema
scala> val inFiles = HDFSPathSearch("hdfs:///sampledata/ml/ratings", "^.*", true).uris
scala> val readSchema = new Schema("delim" -> ",", "filter" -> "", "rowIDColumn" -> 0, "columnIDPosition" -> 1, "filterColumn" -> -1)
scala> val datasetA = indexedDatasetDFSReadElements(inFiles, readSchema)

念のため、以下のようにして読み込んだデータのサイズを確認しておきます。
columnが映画の数、rowが評価者の人数の行列になっています。

scala> val matrixA = datasetA.matrix.collect
scala> print(matrixA.columnSize)
9067
scala> print(matrixA.rowSize)
672

itemsimilarityの計算

以下のようにして、itemsimilarityを計算します。

scala> import org.apache.mahout.math.cf.SimilarityAnalysis
scala> val datasets = Array(datasetA)
scala> val idss = SimilarityAnalysis.cooccurrencesIDSs(datasets, 0xdeadbeef, 50, 500)

idss(0).matrixにアイテムの類似度行列が入っているので、サイズを確認しておきます。
行列ともに映画の数と一致していることを確認出来ます。

scala> val matrixIdss = idss(0).matrix.collect
scala> print(matrixIdss.columnSize)
9067
scala> print(matrixIdss.rowSize)
9067

類似度行列の出力

最後に計算した類似度行列をhdfsにファイルとして出力します。

scala> val writeSchema = new Schema("rowKeyDelim" -> "\t", "columnIdStrengthDelim" ->":", "omitScore" -> false, "elementDelim" -> " ")
scala> idss(0).dfsWrite("hdfs:///sampledata/ml/itemsimilarity", writeSchema)

出力したファイルは以下のように確認出来ます。

$ hadoop fs -ls /sampledata/ml/itemsimilarity
-rw-r--r--   1 ※ユーザ※ supergroup          0 2017-08-16 16:57 /sampledata/ml/itemsimilarity/_SUCCESS
-rw-r--r--   1 ※ユーザ※ supergroup      12773 2017-08-16 16:57 /sampledata/ml/itemsimilarity/part-00000
-rw-r--r--   1 ※ユーザ※ supergroup      14303 2017-08-16 16:57 /sampledata/ml/itemsimilarity/part-00001
-rw-r--r--   1 ※ユーザ※ supergroup      13856 2017-08-16 16:57 /sampledata/ml/itemsimilarity/part-00002
※省略※
$ hadoop fs -cat /sampledata/ml/itemsimilarity/part-00000 | head
※省略※
5302
8989    4297:15.019027847114558 7087:15.019027847114558 7944:15.019027847114558 5899:15.019027847114558 ※省略※
※省略※

以上の流れで、
「spark-itemsimilarity」のバッチと同様の処理をspark-shellで実行することが出来ました。

このようにコードから処理の呼び出しが出来ると、
前処理とあわせたコードなどを作りやすいのではないかと思います。
また、hiveやmllibとの連携も考えられそうな気がします。