Richard Searle


Outlining the flow implementation infrastructure

04 Apr 2011

This entry expands on the implementation of the Actor within which the flow will execute. The complete implementation is fairly complex, especially once structured to permit the usage of different actor libraries. We will thus start with a simple implementation that illustrates the principles. It does not use any actors and is based on the InlineProcessor used for synchronous unit tests. Only linear flows (without any parallelism) can be evaluated. finalResult captures the value contained within Result and is the value computed by the completed workflow. pfs contains the RPF that is awaiting responses from sub-services that would permit it to execute. A serial flow (as in examples covered in early entries) would have only one outstanding RPF. A list is required to permit parallel invocation of sub-services. The core receive method expects
class SimpleExecutor{
  var finalResult:Any = _
  var pfs:RPF = _
  def receive:PartialFunction[Any,Unit] = {
         case (ci:CI,r:Any) => process(ci,r)
         case f:RPF => pfs = f
   private def process(ci:CI,in:Any){
     pfs.apply(ci)(in) match {
        case Result(r) => finalResult = r
        case r:RPF => pfs = r
The above code is passive, so we need a runtime to drive it through its pace. response records the result of the most recent sub-service invocation. Doubler illustrates how that interaction occurs. The object creates an instance of SimpleExecutor, feeding it the result of evaluating the flow (which returns the first RPF). Each response from the sub-services are then feed to the instance to drive its flow, until it computes the final result. A copy of response is required since process receive will cause response to be indirectly modified.
object InlineProcessor {

  var response:(CI,Int) = _

   def apply[A](flow:A=>RPF,initial:A) = {
      val processor = new SimpleExecutor
      processor receive Trigger(flow(initial))

      while(processor.finalResult == null){
         val copy = response
         processor receive copy

   object Doubler extends Lookup[Int,Int]{
       protected def call(arg:Int):CI = {
           val ci = CorrelationAllocator()
           response = ci->arg*2

A simple example flow that uses a Lookup to map an Int into some other Int.
def flow(lookup:Lookup[Int,Int]) = {arg:Int => lookup(arg){Result(_)}}
Use the above infrastructure to execute the flow with the Doubler lookup for the value 12.