Richard Searle

home

Akka system IO client

10 Mar 2012

Network clients can also be implemented using Akka System IO. The following provides a simple client that contents to the LengthCountedServer,  simplistically modified to echo back the received text to all attached clients Note that all the operations are asynchronous, including the connection creation on line 24. The API returns a SocketHandle, even if the connection cannot be made. The clarity of system state we wait for the Connected operation before registering a handle.
case Read(socket, bytes) =>
 state(socket)(Chunk(bytes))
 state.foreach{p => p._1.asWritable.write(bytes)}
import akka.actor._
import akka.util.{ ByteString, ByteStringBuilder }
import java.net.InetSocketAddress

class LengthBoundedClient(port: Int) extends Actor {
  import IO._

  var handle: Option[SocketHandle] = None
  var respondTo: Option[ActorRef] = None

  def receive = {
    case Read(socket, bytes) =>
      respondTo.foreach { _ ! bytes }

    case Closed(socket, cause) =>
      handle = None

    case Connected(h, _) => handle = Some(h)

    case s: String => handle.foreach { _ write ByteString("%04d%s".format(s.length, s)) }

    case Attach(r) =>
      respondTo = Some(r);
      IOManager(context.system).connect(new InetSocketAddress(port))

    case Detach => handle.foreach { _.close }; handle = None
  }

}

case class Attach(respondTo: ActorRef)
case object Detach

case class RespondTo extends Actor {
  def receive = {
    case bytes: ByteString => println(bytes.decodeString("US-ASCII"))
  }
}

object Client extends App {
  val port = Option(System.getenv("PORT")) map (_.toInt) getOrElse 9999
  val system = ActorSystem()
  val client = system.actorOf(Props(new LengthBoundedClient(port)))
  val respondTo = system.actorOf(Props[RespondTo])

  while (true) {
    Console.readLine() match {
      case "A" => client ! Attach(respondTo)
      case "D" => client ! Detach
      case _@ s => client ! s
    }
  }

}