Akka でステートマシンを処理

前回 の有限ステートマシン(FSM)の処理を Akka の JavaAPI を使って実装してみます。

ソースは http://github.com/fits/try_samples/tree/master/blog/20171016/

ステートマシンの実装

まずは、以下のステートマシンを実装します。

  • 初期状態は Idle 状態
  • Idle 状態で On イベントが発生すると Active 状態へ遷移
  • Active 状態で Off イベントが発生すると Idle 状態へ遷移
現在の状態 Off On
Idle Active
Active Idle

AbstractFSM<状態の型, データの型> を extends する事でステートマシンを実装します。

状態以外にも状態のデータを指定できるので、Active 状態へ変化した回数をカウントするようにしてみました。

指定の状態で特定のイベントを処理するには when(現在の状態, FSMStateFunctionBuilder) を使い、イベントに対する処理は matchEventEquals() 等を使います。

goTo(遷移先の状態) で状態を遷移させ、その際に using(データ) で状態遷移後のデータを指定できます。

処理対象外のイベントを受け取った場合の処理は whenUnhandled(FSMStateFunctionBuilder) で指定でき、状態遷移の状況確認に onTransition が使えます。

sample.groovy
@Grab('com.typesafe.akka:akka-actor_2.12:2.5.6')
import akka.actor.AbstractFSM
import akka.actor.ActorSystem
import akka.actor.ActorRef
import akka.actor.Props

enum States { Idle, Active }
enum Events { On, Off }

class SampleStateMachine extends AbstractFSM<States, Integer> {
    {
        // 初期状態の設定
        startWith(States.Idle, 0)

        // On イベントで Idle から Active 状態へ遷移
        when(States.Idle, matchEventEquals(Events.On) { event, data ->
            // Active へ遷移
            goTo(States.Active).using(data + 1)
        })

        // Off イベントで Active から Idle 状態へ遷移
        when(States.Active, matchEventEquals(Events.Off) { event, data ->
            // Idle へ遷移
            goTo(States.Idle)
        })

        // 処理されないイベント発生時
        whenUnhandled(
            matchAnyEvent { event, data ->
                println "*** Unhandled event=${event}, data=${data}"
                stay()
            }
        )

        // 状態遷移の発生時
        onTransition { from, to -> 
            println "*** stateChanged: ${from} -> ${to}, data=${stateData()}, next data=${nextStateData()}"
        }
    }
}

def system = ActorSystem.create()

def actor = system.actorOf(Props.create(SampleStateMachine))

actor.tell(Events.On, ActorRef.noSender())
actor.tell(Events.Off, ActorRef.noSender())

actor.tell(Events.Off, ActorRef.noSender())

sleep 2000

system.terminate()

実行結果は以下の通りです。

実行結果
> groovy sample.groovy

*** stateChanged: Idle -> Active, data=0, next data=1
*** stateChanged: Active -> Idle, data=1, next data=1
*** Unhandled event=Off, data=1

タイムアウト付きステートマシンの実装1

次は、タイムアウト時の遷移を追加してみます。

現在の状態 Off On Timeout (2秒)
Idle Active
Active Idle Idle

when で scala.concurrent.duration.FiniteDuration を指定すると状態のタイムアウト ※ を指定できます。

 ※ ただし、このタイムアウトは
    その状態で何もメッセージを受信しなかった時間に対する
    タイムアウトのようです

    状態の遷移が発生しなくても
    メッセージを受信する度にタイムアウトはリセットされます

タイムアウト発生時のイベント(メッセージ)は StateTimeout() の戻り値と等しくなります。

timeout_sample1.groovy
・・・
import scala.concurrent.duration.Duration
import java.util.concurrent.TimeUnit

enum States { Idle, Active }
enum Events { On, Off }

class SampleStateMachine extends AbstractFSM<States, Integer> {
    {
        startWith(States.Idle, 0)

        when(States.Idle, matchEventEquals(Events.On) { event, data ->
            goTo(States.Active).using(data + 1)
        })

        when(States.Active, Duration.create(2, TimeUnit.SECONDS), 
            matchEventEquals(Events.Off) { event, data ->
                goTo(States.Idle)
            }.eventEquals(StateTimeout()) { event, data ->
                // タイムアウト時の処理
                println "*** timeout: event=${event}, data=${data}"

                goTo(States.Idle)
            /*  以下でも可
                self().tell(Events.Off, self())
                stay()
            */
            }
        )
        ・・・
    }
}

def system = ActorSystem.create()

def actor = system.actorOf(Props.create(SampleStateMachine))

actor.tell(Events.On, ActorRef.noSender())
actor.tell(Events.Off, ActorRef.noSender())

actor.tell(Events.Off, ActorRef.noSender())

actor.tell(Events.On, ActorRef.noSender())

sleep 2500

system.terminate()

実行結果は以下の通りです。

実行結果
> groovy timeout_sample1.groovy

*** stateChanged: Idle -> Active, data=0, next data=1
*** stateChanged: Active -> Idle, data=1, next data=1
*** Unhandled event=Off, data=1
*** stateChanged: Idle -> Active, data=1, next data=2
*** timeout: event=StateTimeout, data=2
*** stateChanged: Active -> Idle, data=2, next data=2

when で指定したタイムアウトはイベント(メッセージ)を受信するとリセットされる事を確認するため、以下のようにタイムアウト発生前に invalid-message という文字列を 2回 tell するようにしてみます。

timeout_sample1b.groovy (タイムアウトの検証)
・・・
def system = ActorSystem.create()

def actor = system.actorOf(Props.create(SampleStateMachine))

・・・

actor.tell(Events.On, ActorRef.noSender())

sleep 1500

// 1回目
actor.tell('invalid-message', ActorRef.noSender())

sleep 1500

// 2回目
actor.tell('invalid-message', ActorRef.noSender())

sleep 2500

system.terminate()

実行結果は、以下のように invalid-message という文字列を 2回受信した後にタイムアウトしました。

つまり、メッセージの受信でタイムアウトがリセットされていると考えられます。

実行結果(タイムアウトの検証)
> groovy timeout_sample1b.groovy

・・・
*** stateChanged: Idle -> Active, data=1, next data=2
*** Unhandled event=invalid-message, data=2
*** Unhandled event=invalid-message, data=2
*** timeout: event=StateTimeout, data=2
*** stateChanged: Active -> Idle, data=2, next data=2

タイムアウト付きステートマシンの実装2 (状態のタイムアウト

メッセージの受信有無に左右されないタイムアウトを実現するには、when のタイムアウトを使わずに setTimer() を使う事で実装できそうです。

timeout_sample2.groovy
・・・
class SampleStateMachine extends AbstractFSM<States, Integer> {
    {
        ・・・
        when(States.Active, 
            matchEventEquals(Events.Off) { event, data ->
                goTo(States.Idle)
            }.eventEquals(StateTimeout()) { event, data ->
                println "*** timeout: event=${event}, data=${data}"

                goTo(States.Idle)
            /* 以下でも可
                self().tell(Events.Off, self())
                stay()
            */
            }
        )
        ・・・
        onTransition { from, to -> 
            println "*** stateChanged: ${from} -> ${to}, data=${stateData()}, next data=${nextStateData()}"

            if (to == States.Active) {
                // Active 状態のタイムアウト設定
                setTimer(
                    'active-timeout', 
                    StateTimeout(), 
                    Duration.create(2, TimeUnit.SECONDS)
                )
            }
            else {
                // タイムアウトのキャンセル
                cancelTimer('active-timeout')
            }
        }
    }
}

def system = ActorSystem.create()

def actor = system.actorOf(Props.create(SampleStateMachine))

・・・

actor.tell(Events.On, ActorRef.noSender())

sleep 1500

actor.tell("invalid-message", ActorRef.noSender())

sleep 1500

actor.tell("invalid-message", ActorRef.noSender())

sleep 2500

system.terminate()

実行してみると、2回目の invalid-message を受信する前にタイムアウトが発生しており意図通りの動作となりました。

実行結果
> groovy timeout_sample2.groovy

・・・
*** stateChanged: Idle -> Active, data=1, next data=2
*** Unhandled event=invalid-message, data=2
*** timeout: event=StateTimeout, data=2
*** stateChanged: Active -> Idle, data=2, next data=2
*** Unhandled event=invalid-message, data=2