Apache Mahout でレコメンドを実践 - GitHub リポジトリをレコメンド

前回(id:fits:20111113)は適当なサンプルデータを使ったレコメンドだったので、今回は GitHub から API 経由で取得した実データを使ってレコメンドしてみます。

サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20111123/

GitHub からデータを取得する方法(GitHub API v3 使用)

GitHub からデータを取得するには GitHub API v3 が使えます。

取得したいデータに応じた URL へ GET するだけなので非常に簡単です。(UTF-8 の JSON として取得できる)

今回は以下のような URL を使ってデータを取得しました。

リポジトリの詳細を取得するURL
https://api.github.com/repos/<ユーザー名>/<リポジトリ名>
指定リポジトリを watch しているユーザーを取得するURL
https://api.github.com/repos/<ユーザー名>/<リポジトリ名>/watchers
指定ユーザーが watch しているリポジトリを取得するURL
https://api.github.com/users/<ユーザー名>/watched

ただし、GitHub API v3 では 1時間当たり 5000 の実行回数制限があり(残回数は HTTP ヘッダーの X-RateLimit-Remaining で確認できる)、デフォルトでは結果数を最大 30件しか取得しない点に注意が必要です。(URL に ?per_page=100 パラメータを付ければ最大 100件まで取得するようになる)


データを取得しきれなかった場合は HTTP ヘッダーの Link に次ページの URL が設定されます。

次ページがある場合の HTTP ヘッダー(Link)例
Link: <https://api.github.com/repos?page=3&per_page=100>; rel="next", <https://api.github.com/repos?page=50&per_page=100>; rel="last"

ちなみに回数制限を超えると以下のような JSON が返ってきます。(xxx.xxx.xxx.xxx の部分は IPアドレス

回数制限を超えた場合の JSON 例
{
  "message": "API Rate Limit Exceeded for xxx.xxx.xxx.xxx"
}

GitHub からデータ取得

それでは、実際に GitHub からデータを取得するためのスクリプトを作成します。

今回は以下のような 2段階の処理を行ってユーザー毎に watch しているリポジトリのデータを収集しました。

  1. 指定のリポジトリを watch しているユーザーを取得
  2. 取得したユーザーがそれぞれ watch しているリポジトリを取得

まず、指定のリポジトリを watch しているユーザーの一覧を取得するスクリプトです。

リポジトリを watch しているユーザー取得(repos_watcher.groovy)
import groovy.json.JsonSlurper

def process(String url, Closure closure) {
    def con = new URL(url).openConnection()

    new JsonSlurper().parseText(con.inputStream.getText("UTF-8")).each {
        closure(it)
    }

    //次ページの URL チェック
    def m = con.getHeaderField("Link") =~ /<([^>]*)>; rel="next"/

    if (m) {
        //次ページがあれば再帰処理
        process(m[0][1], closure)
    }
}

def user = args[0]
def repos = args[1]

process("https://api.github.com/repos/${user}/${repos}/watchers?per_page=100") {
    println "${it.id},${it.login},${it.url}"
}
実行例
 groovy repos_watcher.groovy grails grails > grails_watcher.csv
出力例(grails_watcher.csv
・・・
261649,fits,https://api.github.com/users/fits
・・・

次に、取得したユーザー毎に watch しているリポジトリを取得するスクリプトです。
今回は fork しているリポジトリを同一に扱えるよう fork している場合は fork の大本を使うようにしています。

fork している場合は "fork": true となっており、fork の大本はリポジトリの source で取得できます。(直接の fork 元は parent で取得できる)

ユーザーが watch しているリポジトリ取得(users_watch.groovy)
import groovyx.gpars.GParsExecutorsPool
import groovy.json.JsonSlurper

def process(String url, Closure closure) {
    def con = new URL(url).openConnection()

    def data = con.inputStream.getText("UTF-8")

    new JsonSlurper().parseText(data).each {
        closure(it)
    }

    def m = con.getHeaderField("Link") =~ /<([^>]*)>; rel="next"/

    if (m) {
        process(m[0][1], closure)
    }
}

GParsExecutorsPool.withPool(50) {
    System.in.readLines() eachParallel {
        def items = it.split(",")

        def userId = items[0]
        def user = items[1]

        def url = "https://api.github.com/users/${user}/watched?per_page=100"

        def printItems = []

        try {
            process(url) {json ->
                //fork している場合は fork の大本を取得
                if (json.fork) {
                    //fork の大本を取得するためリポジトリの詳細を取得
                    def data = new URL(json.url).getText("UTF-8")
                    //fork の大本を設定
                    json = new JsonSlurper().parseText(data).source
                }

                if (!printItems.contains(json.id)) {
                    println "${userId},${user},${json.id},${json.name},${json.html_url}"
                    printItems.add(json.id)
                }
            }
        } catch (e) {
            System.err.println "failed: ${it}"
        }
    }
}
実行例
 groovy users_watch.groovy < grails_watcher.csv > grails_watcher_watched.csv
出力例(grails_watcher_watched.csv
・・・
261649,fits,158886,grails,https://github.com/grails/grails
261649,fits,108110,mongo,https://github.com/mongodb/mongo
261649,fits,2404027,storm,https://github.com/nathanmarz/storm
・・・

レコメンド処理

今回は上記で出力した CSV を処理するための FileDataModel サブクラスを作成しました。

CSV の処理をカスタムするには processLine() メソッドをオーバーライドします。今回は評価値を用いないため BooleanPreference を使ってユーザーとアイテムのマッピングを実現しています。

今回使用する CSV フォーマット
<ユーザーID>,<ユーザー名>,<リポジトリID>,<リポジトリ名>,<HTMLページURL>

さらに今回は実行時引数で Recommender を切り替えられるようにしています。

github_recommend.groovy
@Grab("org.apache.mahout:mahout-core:0.5")
@Grab("org.slf4j:slf4j-jdk14:1.6.3")
//@Grab("org.slf4j:slf4j-nop:1.6.3")
import org.apache.mahout.cf.taste.impl.common.FastByIDMap
import org.apache.mahout.cf.taste.impl.recommender.GenericItemBasedRecommender
import org.apache.mahout.cf.taste.impl.recommender.slopeone.SlopeOneRecommender
import org.apache.mahout.cf.taste.impl.recommender.svd.*
import org.apache.mahout.cf.taste.impl.model.file.FileDataModel
import org.apache.mahout.cf.taste.impl.model.BooleanPreference
import org.apache.mahout.cf.taste.impl.similarity.*

if (args.length < 3) {
    println "${new File(System.getProperty('script.name')).name} <data file> <target userName> <recommend type>"
    return
}

//カスタムの CSV を処理するクラス
class CustomDataModel extends FileDataModel {
    Map users
    Map items

    CustomDataModel(File dataFile) {
        super(dataFile)
    }

    //users と items の初期化を実施
    @Override
    protected void reload() {
        users = [:]
        items = [:]

        super.reload()
    }

    //CSVの解析処理
    @Override
    protected void processLine(String line,
                           FastByIDMap<?> data,
                           FastByIDMap<FastByIDMap<Long>> timestamps,
                           boolean fromPriorData) {

        def cols = line.split(",")
        long userId = Long.parseLong(cols[0])
        long itemId = Long.parseLong(cols[2])

        users.put(userId, cols[1])
        items.put(itemId, cols[3])

        if (!data.containsKey(userId)) {
            data.put(userId, [])
        }

        //ユーザーとリポジトリのマッピングを登録
        data.get(userId).add(new BooleanPreference(userId, itemId))
    }
}

def selectRecommender = {t, d ->
    switch (t) {
        case "1":
            //コサイン類似度
            new GenericItemBasedRecommender(d, new UncenteredCosineSimilarity(d))
            break
        case "2":
            //ユークリッド距離
            new GenericItemBasedRecommender(d, new EuclideanDistanceSimilarity(d))
            break
        case "3":
            //マンハッタン距離
            new GenericItemBasedRecommender(d, new CityBlockSimilarity(d))
            break
        case "4":
            //Tanimoto係数
            new GenericItemBasedRecommender(d, new TanimotoCoefficientSimilarity(d))
            break
        default:
            //Slope One
            new SlopeOneRecommender(d)
    }
}

def data = new CustomDataModel(new File(args[0]))

def userId = data.users.find { it.value == args[1] }.key

def rec = selectRecommender(args[2], data)

println rec

rec.recommend(userId, 5).each {
    println "recommend : ${it.itemID}, ${data.items[it.itemID]}"
}

実行してみると 1〜4 は同じ結果で Slope One だけが異なる結果となりましたが、どっちの結果も微妙な印象です。
データをもっと集めるとか、別の Recommender を使った方が良い結果が出るかもしれません。

なお、今回は評価値を使ってないので、アイテム間の評価の差を使って評価値を推測する Slope One の利用は妥当では無いかもしれませんが試しに使ってみました。

Slope One は大量のメモリを消費する点に注意が必要です。

実行例1
> groovy github_recommend.groovy grails_watcher_watched.csv fits 1
・・・
recommend : 961114, What
recommend : 620636, webfontloader
recommend : 1762028, BlocksKit
recommend : 180212, expression-parser
recommend : 1141079, JarBrowser
実行例2(Slope One)
> set JAVA_OPTS=-Xmx1024m
> groovy github_recommend.groovy grails_watcher_watched.csv fits 5
・・・
recommend : 65510, django-flash
recommend : 1, grit
recommend : 323654, cappuccino-couchdb
recommend : 612230, nvm
recommend : 404142, go-repl