21 Jan 2015
The following code creates a Receiver that reads from stdin via the camel stream component.
The input is gathered for 5 seconds and then reduced to a single string, written to stdout.
import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.spark.storage._ import org.apache.spark.streaming._ import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.receiver._ import akka.actor.{ Actor, Props } import akka.camel.CamelMessage import akka.camel.Consumer class Storer extends Actor with ActorHelper with Consumer { def endpointUri = "stream:in" def receive = { case cm: CamelMessage => store(cm.body) } } object CamelApp { def main(args: Array[String]) { val conf = new SparkConf(false) .setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(5)) val actorStream = ssc.actorStream[String](Props[Storer], "storer") actorStream.reduce(_ + " " + _).foreachRDD { (rdd, time) => rdd.foreach { s: String => println(s) } } ssc.start() ssc.awaitTermination() } }