多腕バンディット問題とトンプソンサンプリング

多腕バンディット問題に対してベイズ的な手法をとるトンプソンサンプリングに興味を惹かれたので、「テストの実行 - C# を使用した Thompson Sampling」 を参考に Python で実装してみました。

ソースは http://github.com/fits/try_samples/tree/master/blog/20190603/

実装

ここで実装するのは、当たった場合の報酬を 1、外れた場合の報酬を 0 とするベルヌーイ試行によるバンディット問題です。

この場合、ベータ分布を 当たり数 + 1外れ数 + 1 のパラメータで ※ サンプリングした結果からアクションを選ぶだけのようです。

 ※ +1 するのは 0 にしないための措置だと思う

引数 arms でアクションの選択肢とその当たる(報酬が得られる)確率を渡すようにしています。

import numpy as np

def thompson_sampling(arms, n = 1000):
    states = [(0, 0) for _ in arms]

    # "当たり数 + 1" と "外れ数 + 1" でベータ分布からサンプリングし
    # アクションを決定する処理
    action = lambda: np.argmax([np.random.beta(s[0] + 1, s[1] + 1) for s in states])

    for _ in range(n):
        # アクションの決定
        a = action()
        # 当たり・外れの判定(報酬の算出)
        r = 1 if np.random.rand() < arms[a] else 0

        # 当たり・外れ数の更新
        states[a] = (states[a][0] + r, states[a][1] + 1 - r)

    return states

上記の結果を出力する処理は以下です。

def summary(states):
    for s in states:
        print(f'win={s[0]}, lose={s[1]}, p={s[0] / sum(s)}')

実行

アクションと報酬の得られる確率が以下のような構成に対して、試行回数 1,000 で実行してみます。

  • (a) 0.2
  • (b) 0.5
  • (c) 0.7
実行例1
summary( thompson_sampling([0.2, 0.5, 0.7]) )
win=2, lose=7, p=0.2222222222222222
win=7, lose=12, p=0.3684210526315789
win=673, lose=299, p=0.6923868312757202
実行例2
summary( thompson_sampling([0.2, 0.5, 0.7]) )
win=1, lose=7, p=0.125
win=11, lose=13, p=0.4583333333333333
win=690, lose=278, p=0.7128099173553719

実行の度に結果は異なりますが、アクションが選択された回数(win と lose の合計)に注目してみると、報酬の得られる確率が最も高い 3番目 (c) に集中している事が分かります。

つまり、最も報酬の得られる(確率の高い)アクションを選んでいる事になります。

実装2

報酬の得られる確率を thompson_sampling 関数の引数として与える上記の実装は微妙な気がするので、少し改良してみます。

確率を直接与える代わりに、アクションの選択肢と報酬を算出する関数を引数として与えるようにしてみました。

import numpy as np

def thompson_sampling(acts, reward_func, n = 1000):
    states = {a: (0, 0) for a in acts}
    
    def action():
        bs = {a: np.random.beta(s[0] + 1, s[1] + 1) for a, s in states.items()}
        return max(bs, key = bs.get)
    
    for _ in range(n):
        a = action()
        r = reward_func(a)
        
        states[a] = (states[a][0] + r, states[a][1] + 1 - r)
    
    return states

def probability_reward_func(probs):
    return lambda a: 1 if np.random.rand() < probs[a] else 0

def summary(states):
    for a, s in states.items():
        print(f'{a}: win={s[0]}, lose={s[1]}, p={s[0] / sum(s)}')
実行例1
probs1 = { 'a': 0.2, 'b': 0.5, 'c': 0.7 }

summary( thompson_sampling(probs1.keys(), probability_reward_func(probs1)) )
a: win=1, lose=7, p=0.125
b: win=17, lose=19, p=0.4722222222222222
c: win=674, lose=282, p=0.7050209205020921
実行例2
probs2 = { 'a': 0.2, 'b': 0.5, 'c': 0.7, 'd': 0.1, 'e': 0.8 }

summary( thompson_sampling(probs2.keys(), probability_reward_func(probs2)) )
a: win=4, lose=7, p=0.36363636363636365
b: win=10, lose=10, p=0.5
c: win=60, lose=25, p=0.7058823529411765
d: win=0, lose=4, p=0.0
e: win=701, lose=179, p=0.7965909090909091