The Akka CircuitBreaker can be combined with Reactive Streams as follows.
The core of the process is
.mapAsync(6) { x =>
lookup(x) recover {
case e: CircuitBreakerOpenException => "open"
case _@ x => x.toString
}
}
which:
- requests a result from the rejectMoreThanTwo function.
- Replaces the exception generated by open CB with the value “open”.
- Converts an other result to its string representation.
The output is
AA
B
java.lang.IllegalArgumentException: xxxxxxxxxxxxxxxx
open
open
open
The runnable application code
package futures
import scala.collection.immutable.Seq
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
import akka.actor.ActorSystem
import akka.pattern.CircuitBreaker
import akka.pattern.CircuitBreakerOpenException
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl.Source
import akka.util.Timeout
object AkkaCB extends App {
implicit val system = ActorSystem("Sys")
import system.dispatcher
implicit val materializer = ActorFlowMaterializer()
val breaker =
new CircuitBreaker(system.scheduler,
maxFailures = 1,
callTimeout = 10.seconds,
resetTimeout = 1.minute)
def rejectMoreThanTwo(s: String): String =
if (s.length > 2)
throw new IllegalArgumentException(s);
else
s
def lookup(s: String) = breaker.withCircuitBreaker(Future(rejectMoreThanTwo(s)))
implicit val timeout = Timeout(3.seconds)
val r = Source.apply(Seq("AA", "B", "xxxxxxxxxxxxxxxx", "Ced", "Dxxx", "E"))
.mapAsync(6) { x =>
lookup(x) recover {
case e: CircuitBreakerOpenException => "open"
case _@ x => x.toString
}
}
.runForeach(println)
r.onComplete {
case x @ _ => println(x); system.shutdown
}
}