Richard Searle

home

Simple Akka Stream and Camel integration

11 Jan 2015

A trivial integration of Akka Streams with Camel, intermediated by Akka Camel.

The code reads strings from stdin and writes the uppercased string to stdout.

This could be contrasted with a direct (and complete) implementation.

package sample.stream

import akka.stream.FlowMaterializer
import akka.stream.scaladsl.PublisherSource
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.Sink
import akka.stream.actor.ActorSubscriber
import akka.stream.actor.ActorSubscriberMessage
import akka.stream.actor.OneByOneRequestStrategy
import akka.actor.{ ActorSystem, Props }

object CamelDriver extends App {
  import akka.camel.{ CamelMessage, Consumer, Producer }
  import akka.stream.actor.{ ActorPublisher }

  class CamelConsumer extends Consumer with ActorPublisher[String] {
    def endpointUri = "stream:in"

    import akka.stream.actor.ActorPublisherMessage._

    def receive = {
      case msg: CamelMessage =>
        msg.bodyAs[String] match {
          case "stop" => onComplete()
          case string if (totalDemand > 0) =>
            onNext(string)
        }
      case Request(_) => //ignored
      case Cancel =>
        context.stop(self)
    }
  }

  class CamelProducer extends Producer {
    def endpointUri = "stream:out"
  }

  class CamelSubscriber extends ActorSubscriber {
    import ActorSubscriberMessage._

    override val requestStrategy = OneByOneRequestStrategy

    val endPoint = context.actorOf(Props[CamelProducer])

    def receive = {
      case OnComplete             => system.shutdown()
      case OnNext(string: String) => endPoint ! string
    }
  }

  implicit val system = ActorSystem("some-system")
  implicit val materializer = FlowMaterializer()

  val source = Source[String](Props[CamelConsumer])
  val sink = Sink[String](Props[CamelSubscriber])

  source.map(_.toUpperCase).
    to(sink).
    run()
}