The following code is liberally borrowed from
import akka.actor._
import com.typesafe.config.ConfigFactory
case object GetState
case class GetCount ( ID : Int )
case class State ( value : String )
case class Count ( ID : Int , value : Int )
class Manager extends Actor {
def receive = {
case GetState => sender () ! State ( "state value" )
case GetCount ( id ) => sender () ! Count ( id , 42 )
}
}
class StdInDriver ( manager : ActorRef ) extends Actor {
val lines = io . Source . stdin . getLines
def receive = {
case State ( value ) =>
println ( s "State:${value}" ); prompt ()
case Count ( id , value ) =>
println ( s "Count id:${id} value:${value}" ); prompt ()
case line : String => line . split ( ' ' ). toList match {
case "state" :: Nil => manager ! GetState
case "count" :: id :: Nil => manager ! GetCount ( id . toInt )
case Nil => prompt ()
case "" :: Nil => prompt ()
case na :: nas => println ( s "unknown command: ${na}" ); prompt ()
}
}
def prompt () : Unit = {
if ( lines . hasNext ) lines . next () match {
case "exit" => context . system . shutdown ()
case line => self ! line
}
}
override def preStart () : Unit =
prompt ()
}
object StdInDriver extends App {
val cc = ConfigFactory . load ( "common.conf" )
val system = ActorSystem ( "example" , cc )
val managerActor = system . actorOf ( Props [ Manager ], "manager" )
val driverActor = system . actorOf ( Props ( new StdInDriver ( managerActor )).
withDispatcher ( "cli-dispatcher" ), "cli" )
}
where common.conf is
cli-dispatcher {
executor = "thread-pool-executor"
type = PinnedDispatcher
}
The PinnedDispatcher is required since StdInDriver will be blocking on the stdin read and thus requires a dedicated thread.
The above code is implicitly switching between read command and await response modes.
We can make that explicit by using become
def receive = awaitCommand
def awaitResponse : PartialFunction [ Any , Unit ] = {
case State ( value ) =>
println ( s "State:${value}" ); prompt ()
case Count ( id , value ) =>
println ( s "Count id:${id} value:${value}" ); prompt ()
}
def awaitCommand : PartialFunction [ Any , Unit ] = {
case line : String => line . split ( ' ' ). toList match {
case "state" :: Nil =>
manager ! GetState ; context . become ( awaitResponse )
case "count" :: id :: Nil =>
manager ! GetCount ( id . toInt ); context . become ( awaitResponse )
case Nil => prompt ()
case "" :: Nil => prompt ()
case na :: nas => println ( s "unknown command: ${na}" ); prompt ()
}
}
The above code contains repeated references to prompt(), context.become(awaitResponse).
Those are both ugly and error prone. The three cases can be extracted as follows, eliminating
the redundancy.
private def awaitResponse : PartialFunction [ Any , Unit ] = respond {
case State ( value ) =>
println ( s "State:${value}" )
case Count ( id , value ) =>
println ( s "Count id:${id} value:${value}" )
}
private def respond [ T ]( cmd : PartialFunction [ T , Unit ]) =
( cmd andThen { _ => context . become ( awaitCommand ); prompt () })
private def awaitCommand : PartialFunction [ Any , Unit ] = {
case line : String => {
command [ List [ String ]]({
case "state" :: Nil =>
manager ! GetState
case "count" :: id :: Nil =>
manager ! GetCount ( id . toInt )
}, {
case Nil =>
case "" :: Nil =>
case na :: nas => println ( s "unknown command: ${na}" )
}). apply ( line . split ( ' ' ). toList )
}
}
private def command [ T ]( valid : PartialFunction [ T , Unit ],
invalid : PartialFunction [ T , Unit ]) = {
( valid andThen { _ => context . become ( awaitResponse ) })
orElse ( invalid andThen { _ => prompt () })
}
private def prompt () : Unit = {
if ( lines . hasNext ) lines . next () match {
case "exit" => context . system . shutdown ()
case line => self ! line
}
}
Finally, the above code terminates with an exception if the user does not enter a integer parameter for the count command.
We can resolve this by defining an extractor
object IntExtractor {
def unapply ( s : String ) : Option [ Int ] = Try {
s . toInt
}. toOption
}
and changing the case to
case "count" :: IntExtractor ( id ) :: Nil =>
The complete code is then
import akka.actor._
import com.typesafe.config.ConfigFactory
import scala.util.Try
case object GetState
case class GetCount ( ID : Int )
case class State ( value : String )
case class Count ( ID : Int , value : Int )
class Manager extends Actor {
def receive = {
case GetState => sender () ! State ( "state value" )
case GetCount ( id ) => sender () ! Count ( id , 42 )
}
}
class StdInDriver ( manager : ActorRef ) extends Actor {
val lines = io . Source . stdin . getLines
def receive = awaitCommand
private def awaitResponse : PartialFunction [ Any , Unit ] = respond {
case State ( value ) =>
println ( s "State:${value}" )
case Count ( id , value ) =>
println ( s "Count id:${id} value:${value}" )
}
private def respond [ T ]( cmd : PartialFunction [ T , Unit ]) =
( cmd andThen { _ => context . become ( awaitCommand ); prompt () })
private def awaitCommand : PartialFunction [ Any , Unit ] = {
case line : String => {
command [ List [ String ]]({
case "state" :: Nil =>
manager ! GetState
case "count" :: IntExtractor ( id ) :: Nil =>
manager ! GetCount ( id )
}, {
case Nil =>
case "" :: Nil =>
case na :: nas => println ( s "unknown command: ${na}" )
}). apply ( line . split ( ' ' ). toList )
}
}
private def command [ T ]( valid : PartialFunction [ T , Unit ],
invalid : PartialFunction [ T , Unit ]) = {
( valid andThen { _ => context . become ( awaitResponse ) }) orElse
( invalid andThen { _ => prompt () })
}
private def prompt () : Unit = {
if ( lines . hasNext ) lines . next () match {
case "exit" => context . system . shutdown ()
case line => self ! line
}
}
override def preStart () : Unit =
prompt ()
}
object IntExtractor {
def unapply ( s : String ) : Option [ Int ] = Try {
s . toInt
}. toOption
}
object StdInDriver extends App {
val cc = ConfigFactory . load ( "common.conf" )
val system = ActorSystem ( "example" , cc )
val managerActor = system . actorOf ( Props [ Manager ], "manager" )
val driverActor = system . actorOf ( Props ( new StdInDriver ( managerActor )).
withDispatcher ( "cli-dispatcher" ), "cli" )
}