Richard Searle

home

Actor CLI implementation

14 Jan 2015

The following code is liberally borrowed from

import akka.actor._
import com.typesafe.config.ConfigFactory

case object GetState
case class GetCount(ID: Int)

case class State(value: String)
case class Count(ID: Int, value: Int)

class Manager extends Actor {
  def receive = {
    case GetState     => sender() ! State("state value")
    case GetCount(id) => sender() ! Count(id, 42)
  }
}

class StdInDriver(manager: ActorRef) extends Actor {
  val lines = io.Source.stdin.getLines

  def receive = {
    case State(value) =>
      println(s"State:${value}"); prompt()
    case Count(id, value) =>
      println(s"Count id:${id} value:${value}"); prompt()
    case line: String => line.split(' ').toList match {
      case "state" :: Nil       => manager ! GetState
      case "count" :: id :: Nil => manager ! GetCount(id.toInt)
      case Nil                  => prompt()
      case "" :: Nil            => prompt()
      case na :: nas            => println(s"unknown command: ${na}"); prompt()
    }
  }

  def prompt(): Unit = {
    if (lines.hasNext) lines.next() match {
      case "exit" => context.system.shutdown()
      case line   => self ! line
    }
  }

  override def preStart(): Unit =
    prompt()
}

object StdInDriver extends App {
  val cc = ConfigFactory.load("common.conf")

  val system = ActorSystem("example", cc)
  val managerActor = system.actorOf(Props[Manager], "manager")
  val driverActor = system.actorOf(Props(new StdInDriver(managerActor)).
      withDispatcher("cli-dispatcher"), "cli")
}

where common.conf is

cli-dispatcher {
  executor = "thread-pool-executor"
  type = PinnedDispatcher
}

The PinnedDispatcher is required since StdInDriver will be blocking on the stdin read and thus requires a dedicated thread.

The above code is implicitly switching between read command and await response modes. We can make that explicit by using become

  def receive = awaitCommand

  def awaitResponse: PartialFunction[Any, Unit] = {
    case State(value) =>
      println(s"State:${value}"); prompt()
    case Count(id, value) =>
      println(s"Count id:${id} value:${value}"); prompt()
  }

  def awaitCommand: PartialFunction[Any, Unit] = {
    case line: String => line.split(' ').toList match {
      case "state" :: Nil       =>
        manager ! GetState; context.become(awaitResponse)
      case "count" :: id :: Nil =>
        manager ! GetCount(id.toInt); context.become(awaitResponse)
      case Nil                  => prompt()
      case "" :: Nil            => prompt()
      case na :: nas            => println(s"unknown command: ${na}"); prompt()
    }
  }

The above code contains repeated references to prompt(), context.become(awaitResponse). Those are both ugly and error prone. The three cases can be extracted as follows, eliminating the redundancy.

  private def awaitResponse: PartialFunction[Any, Unit] = respond {
    case State(value) =>
      println(s"State:${value}")
    case Count(id, value) =>
      println(s"Count id:${id} value:${value}")
  }

  private def respond[T](cmd: PartialFunction[T, Unit]) =
    (cmd andThen { _ =>   context.become(awaitCommand); prompt() })

  private def awaitCommand: PartialFunction[Any, Unit] = {
    case line: String => {
      command[List[String]]({
        case "state" :: Nil =>
          manager ! GetState
        case "count" :: id :: Nil =>
          manager ! GetCount(id.toInt)
      }, {
        case Nil       =>
        case "" :: Nil =>
        case na :: nas => println(s"unknown command: ${na}")
      }).apply(line.split(' ').toList)
    }
  }

  private def command[T](valid: PartialFunction[T, Unit],
                 invalid: PartialFunction[T, Unit]) = {
    (valid andThen { _ => context.become(awaitResponse) })
       orElse (invalid andThen { _ => prompt() })
  }
  
   private def prompt(): Unit = {
    if (lines.hasNext) lines.next() match {
      case "exit" => context.system.shutdown()
      case line   => self ! line
    }
  }

Finally, the above code terminates with an exception if the user does not enter a integer parameter for the count command. We can resolve this by defining an extractor

object IntExtractor {
  def unapply(s: String): Option[Int] = Try {
    s.toInt
  }.toOption
}

and changing the case to

 case "count" :: IntExtractor(id) :: Nil =>

The complete code is then

import akka.actor._
import com.typesafe.config.ConfigFactory
import scala.util.Try

case object GetState
case class GetCount(ID: Int)

case class State(value: String)
case class Count(ID: Int, value: Int)

class Manager extends Actor {
  def receive = {
    case GetState     => sender() ! State("state value")
    case GetCount(id) => sender() ! Count(id, 42)
  }
}

class StdInDriver(manager: ActorRef) extends Actor {
  val lines = io.Source.stdin.getLines

  def receive = awaitCommand

  private def awaitResponse: PartialFunction[Any, Unit] = respond {
    case State(value) =>
      println(s"State:${value}")
    case Count(id, value) =>
      println(s"Count id:${id} value:${value}")
  }

  private def respond[T](cmd: PartialFunction[T, Unit]) =
    (cmd andThen { _ => context.become(awaitCommand); prompt() })

  private def awaitCommand: PartialFunction[Any, Unit] = {
    case line: String => {
      command[List[String]]({
        case "state" :: Nil =>
          manager ! GetState
        case "count" :: IntExtractor(id) :: Nil =>
          manager ! GetCount(id)
      }, {
        case Nil       =>
        case "" :: Nil =>
        case na :: nas => println(s"unknown command: ${na}")
      }).apply(line.split(' ').toList)
    }
  }

  private def command[T](valid: PartialFunction[T, Unit],
                         invalid: PartialFunction[T, Unit]) = {
    (valid andThen { _ => context.become(awaitResponse) }) orElse 
      (invalid andThen { _ => prompt() })
  }

  private def prompt(): Unit = {
    if (lines.hasNext) lines.next() match {
      case "exit" => context.system.shutdown()
      case line   => self ! line
    }
  }

  override def preStart(): Unit =
    prompt()
}

object IntExtractor {
  def unapply(s: String): Option[Int] = Try {
    s.toInt
  }.toOption
}

object StdInDriver extends App {
  val cc = ConfigFactory.load("common.conf")

  val system = ActorSystem("example", cc)
  val managerActor = system.actorOf(Props[Manager], "manager")
  val driverActor = system.actorOf(Props(new StdInDriver(managerActor)).
      withDispatcher("cli-dispatcher"), "cli")
}