Richard Searle

home

Integrating Futures with the real world

28 Jan 2012

The previous post described how to unit test a Futures based choreography implementation.

This post will examine how such implementation might be connected to the external world and thus perform useful work.

The Camel akka module permits integration of Camel routes with Akka actors and hence with Futures. Camel provides a wide range of components, supporting pretty much transport that might be required.

Two seperate Camel implementations are required:

  1. Expose the service to external clients
  2. Provide access to the lower level services required by this service.

The latter will always use a Request-Response MEP, while latter is determined by the needs of the client.

A JMS based service implementation that reads from one queue and responds on another is quite common. This might be implemented as follows

case class OWFlow[A, R](in: String, out: String, flow: A => Future[R]) {

  val inActor = Actor.actorOf(new InActor).start
  val outActor = Actor.actorOf(new OutActor).start

  private class OutActor extends Actor with Producer with Oneway {
    val endpointUri = out
  }

  private class InActor extends Actor with Consumer {
    val endpointUri = in

    def receive = {
      case akka.camel.Message(a: A, _) => (Future(a).flatMap(flow)).onComplete { outActor ! _ }
    }
  }
}

Where in and out are the Camel uris that reference the appropriate transport end-points. flow is the service to the connected between in and out.

The OutActor simply provides an oneway endpoint to Camel over which messages are sent.

The InActor

  1. Receives message from Camel endpoint.
  2. Creates a Future whose value is that message. The Future executes asynchronously, so the actor is not blocked.
  3. The Future executes the flow
  4. Upon completion, the result is send to the outActor
  5. The outActor delivers the result to the Camel endpoint.

An example using the JMS component

 val slbOW = new OWFlow("jms:slbIn", "jms:slbOut", SingleLineBalance.apply)

The lower level service can be implemented using a simple case class, wrapping a Map containing the data of interest.

case class Responder[K, V](map: Map[K, V]) {
  def apply(a: Any) = map(a.asInstanceOf[K])
}

This case class is then wired into a Camel route using the bean component and the Scala DSL.

val context = CamelContextManager.mandatoryContext
context.addRoutes(new RouteBuilder { "seda:num".bean(Responder(Map(
      Id(123) -> List(Num("124-555-1234"), Num("333-555-1234")))) })

The service can then be tested by sending a Num via the jms queue.

val producer = CamelContextManager.mandatoryContext.createProducerTemplate
producer.sendBody("jms:slbIn", Num("124-555-1234"))

The complete test can be found on github. That code uses seda components rather than jms.