Richard Searle

home

Akka Camel Consumer integration with Spark Streaming

21 Jan 2015

The following code creates a Receiver that reads from stdin via the camel stream component.

The input is gathered for 5 seconds and then reduced to a single string, written to stdout.

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.storage._
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver._

import akka.actor.{ Actor, Props }
import akka.camel.CamelMessage
import akka.camel.Consumer

class Storer extends Actor with ActorHelper with Consumer {
  def endpointUri = "stream:in"

  def receive = {
    case cm: CamelMessage => store(cm.body)
  }
}

object CamelApp {
  def main(args: Array[String]) {
    val conf = new SparkConf(false)
      .setMaster("local[*]")

    val ssc = new StreamingContext(conf, Seconds(5))

    val actorStream = ssc.actorStream[String](Props[Storer], "storer")

    actorStream.reduce(_ + " " + _).foreachRDD {
      (rdd, time) =>
        rdd.foreach {
          s: String => println(s)
        }
    }

    ssc.start()

    ssc.awaitTermination()
  }
}