11 Jan 2015
A trivial integration of Akka Streams with Camel, intermediated by Akka Camel.
The code reads strings from stdin and writes the uppercased string to stdout.
This could be contrasted with a direct (and complete) implementation.
package sample.stream import akka.stream.FlowMaterializer import akka.stream.scaladsl.PublisherSource import akka.stream.scaladsl.Source import akka.stream.scaladsl.Sink import akka.stream.actor.ActorSubscriber import akka.stream.actor.ActorSubscriberMessage import akka.stream.actor.OneByOneRequestStrategy import akka.actor.{ ActorSystem, Props } object CamelDriver extends App { import akka.camel.{ CamelMessage, Consumer, Producer } import akka.stream.actor.{ ActorPublisher } class CamelConsumer extends Consumer with ActorPublisher[String] { def endpointUri = "stream:in" import akka.stream.actor.ActorPublisherMessage._ def receive = { case msg: CamelMessage => msg.bodyAs[String] match { case "stop" => onComplete() case string if (totalDemand > 0) => onNext(string) } case Request(_) => //ignored case Cancel => context.stop(self) } } class CamelProducer extends Producer { def endpointUri = "stream:out" } class CamelSubscriber extends ActorSubscriber { import ActorSubscriberMessage._ override val requestStrategy = OneByOneRequestStrategy val endPoint = context.actorOf(Props[CamelProducer]) def receive = { case OnComplete => system.shutdown() case OnNext(string: String) => endPoint ! string } } implicit val system = ActorSystem("some-system") implicit val materializer = FlowMaterializer() val source = Source[String](Props[CamelConsumer]) val sink = Sink[String](Props[CamelSubscriber]) source.map(_.toUpperCase). to(sink). run() }