The following code is liberally borrowed from
import akka.actor._
import com.typesafe.config.ConfigFactory
case object GetState
case class GetCount(ID: Int)
case class State(value: String)
case class Count(ID: Int, value: Int)
class Manager extends Actor {
def receive = {
case GetState => sender() ! State("state value")
case GetCount(id) => sender() ! Count(id, 42)
}
}
class StdInDriver(manager: ActorRef) extends Actor {
val lines = io.Source.stdin.getLines
def receive = {
case State(value) =>
println(s"State:${value}"); prompt()
case Count(id, value) =>
println(s"Count id:${id} value:${value}"); prompt()
case line: String => line.split(' ').toList match {
case "state" :: Nil => manager ! GetState
case "count" :: id :: Nil => manager ! GetCount(id.toInt)
case Nil => prompt()
case "" :: Nil => prompt()
case na :: nas => println(s"unknown command: ${na}"); prompt()
}
}
def prompt(): Unit = {
if (lines.hasNext) lines.next() match {
case "exit" => context.system.shutdown()
case line => self ! line
}
}
override def preStart(): Unit =
prompt()
}
object StdInDriver extends App {
val cc = ConfigFactory.load("common.conf")
val system = ActorSystem("example", cc)
val managerActor = system.actorOf(Props[Manager], "manager")
val driverActor = system.actorOf(Props(new StdInDriver(managerActor)).
withDispatcher("cli-dispatcher"), "cli")
}
where common.conf is
cli-dispatcher {
executor = "thread-pool-executor"
type = PinnedDispatcher
}
The PinnedDispatcher is required since StdInDriver will be blocking on the stdin read and thus requires a dedicated thread.
The above code is implicitly switching between read command and await response modes.
We can make that explicit by using become
def receive = awaitCommand
def awaitResponse: PartialFunction[Any, Unit] = {
case State(value) =>
println(s"State:${value}"); prompt()
case Count(id, value) =>
println(s"Count id:${id} value:${value}"); prompt()
}
def awaitCommand: PartialFunction[Any, Unit] = {
case line: String => line.split(' ').toList match {
case "state" :: Nil =>
manager ! GetState; context.become(awaitResponse)
case "count" :: id :: Nil =>
manager ! GetCount(id.toInt); context.become(awaitResponse)
case Nil => prompt()
case "" :: Nil => prompt()
case na :: nas => println(s"unknown command: ${na}"); prompt()
}
}
The above code contains repeated references to prompt(), context.become(awaitResponse). Those are both ugly and error prone. The three cases can be extracted as follows, eliminating the redundancy.
private def awaitResponse: PartialFunction[Any, Unit] = respond {
case State(value) =>
println(s"State:${value}")
case Count(id, value) =>
println(s"Count id:${id} value:${value}")
}
private def respond[T](cmd: PartialFunction[T, Unit]) =
(cmd andThen { _ => context.become(awaitCommand); prompt() })
private def awaitCommand: PartialFunction[Any, Unit] = {
case line: String => {
command[List[String]]({
case "state" :: Nil =>
manager ! GetState
case "count" :: id :: Nil =>
manager ! GetCount(id.toInt)
}, {
case Nil =>
case "" :: Nil =>
case na :: nas => println(s"unknown command: ${na}")
}).apply(line.split(' ').toList)
}
}
private def command[T](valid: PartialFunction[T, Unit],
invalid: PartialFunction[T, Unit]) = {
(valid andThen { _ => context.become(awaitResponse) })
orElse (invalid andThen { _ => prompt() })
}
private def prompt(): Unit = {
if (lines.hasNext) lines.next() match {
case "exit" => context.system.shutdown()
case line => self ! line
}
}
Finally, the above code terminates with an exception if the user does not enter a integer parameter for the count command. We can resolve this by defining an extractor
object IntExtractor {
def unapply(s: String): Option[Int] = Try {
s.toInt
}.toOption
}
and changing the case to
case "count" :: IntExtractor(id) :: Nil =>
The complete code is then
import akka.actor._
import com.typesafe.config.ConfigFactory
import scala.util.Try
case object GetState
case class GetCount(ID: Int)
case class State(value: String)
case class Count(ID: Int, value: Int)
class Manager extends Actor {
def receive = {
case GetState => sender() ! State("state value")
case GetCount(id) => sender() ! Count(id, 42)
}
}
class StdInDriver(manager: ActorRef) extends Actor {
val lines = io.Source.stdin.getLines
def receive = awaitCommand
private def awaitResponse: PartialFunction[Any, Unit] = respond {
case State(value) =>
println(s"State:${value}")
case Count(id, value) =>
println(s"Count id:${id} value:${value}")
}
private def respond[T](cmd: PartialFunction[T, Unit]) =
(cmd andThen { _ => context.become(awaitCommand); prompt() })
private def awaitCommand: PartialFunction[Any, Unit] = {
case line: String => {
command[List[String]]({
case "state" :: Nil =>
manager ! GetState
case "count" :: IntExtractor(id) :: Nil =>
manager ! GetCount(id)
}, {
case Nil =>
case "" :: Nil =>
case na :: nas => println(s"unknown command: ${na}")
}).apply(line.split(' ').toList)
}
}
private def command[T](valid: PartialFunction[T, Unit],
invalid: PartialFunction[T, Unit]) = {
(valid andThen { _ => context.become(awaitResponse) }) orElse
(invalid andThen { _ => prompt() })
}
private def prompt(): Unit = {
if (lines.hasNext) lines.next() match {
case "exit" => context.system.shutdown()
case line => self ! line
}
}
override def preStart(): Unit =
prompt()
}
object IntExtractor {
def unapply(s: String): Option[Int] = Try {
s.toInt
}.toOption
}
object StdInDriver extends App {
val cc = ConfigFactory.load("common.conf")
val system = ActorSystem("example", cc)
val managerActor = system.actorOf(Props[Manager], "manager")
val driverActor = system.actorOf(Props(new StdInDriver(managerActor)).
withDispatcher("cli-dispatcher"), "cli")
}