This entry expands on the implementation of the Actor within which the flow will execute. The complete implementation is fairly complex, especially once structured to permit the usage of different actor libraries.
We will thus start with a simple implementation that illustrates the principles. It does not use any actors and is based on the
InlineProcessor
used for synchronous unit tests. Only linear flows (without any parallelism) can be evaluated.
finalResult
captures the value contained within
Result
and is the value computed by the completed workflow.
pfs
contains the RPF that is awaiting responses from sub-services that would permit it to execute. A serial flow (as in examples covered in early entries) would have only one outstanding RPF. A list is required to permit parallel invocation of sub-services.
The core
receive
method expects
- tuple containing a CI and response value from the sub-service that has completed its processingThe
pfs
RPF is expected to match the CI and its contained function is evaluated with the response value to compute the next RPF. A Result
value indicates the completion of the flow and its value is recorded for future reference (by the initiating test code). Any other value is an RPF for which a response is expected.
- The first RPF of the flow to be executed.This is simply assigned to
pfs
. This RPF is computed by evaluating the flow.
class SimpleExecutor{
var finalResult:Any = _
var pfs:RPF = _
def receive:PartialFunction[Any,Unit] = {
case (ci:CI,r:Any) => process(ci,r)
case f:RPF => pfs = f
}
private def process(ci:CI,in:Any){
pfs.apply(ci)(in) match {
case Result(r) => finalResult = r
case r:RPF => pfs = r
}
}
}
The above code is passive, so we need a
runtime to drive it through its pace.
response
records the result of the most recent sub-service invocation.
Doubler
illustrates how that interaction occurs.
The object creates an instance of
SimpleExecutor
, feeding it the result of evaluating the flow (which returns the first RPF). Each response from the sub-services are then feed to the instance to drive its flow, until it computes the final result.
A copy of
response
is required since
process receive
will cause
response
to be indirectly modified.
object InlineProcessor {
var response:(CI,Int) = _
def apply[A](flow:A=>RPF,initial:A) = {
val processor = new SimpleExecutor
processor receive Trigger(flow(initial))
while(processor.finalResult == null){
val copy = response
processor receive copy
}
processor.finalResult
}
object Doubler extends Lookup[Int,Int]{
protected def call(arg:Int):CI = {
val ci = CorrelationAllocator()
response = ci->arg*2
ci
}
}
}
A simple example flow that uses a Lookup to map an Int into some other Int.
def flow(lookup:Lookup[Int,Int]) = {arg:Int => lookup(arg){Result(_)}}
Use the above infrastructure to execute the flow with the Doubler lookup for the value 12.
InlineProcessor(flow(InlineProcessor.Doubler),12)