Richard Searle

home

Using Akka CircuitBreaker with Akka Reactive Streams

14 Jun 2015

The Akka CircuitBreaker can be combined with Reactive Streams as follows.

The core of the process is

 .mapAsync(6) { x =>
      lookup(x) recover {
        case e: CircuitBreakerOpenException => "open"
        case _@ x                           => x.toString
      }
    }

which:

  1. requests a result from the rejectMoreThanTwo function.
  2. Replaces the exception generated by open CB with the value “open”.
  3. Converts an other result to its string representation.

The output is

AA
B
java.lang.IllegalArgumentException: xxxxxxxxxxxxxxxx
open
open
open

The runnable application code

package futures

import scala.collection.immutable.Seq
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt

import akka.actor.ActorSystem
import akka.pattern.CircuitBreaker
import akka.pattern.CircuitBreakerOpenException
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl.Source
import akka.util.Timeout

object AkkaCB extends App {

  implicit val system = ActorSystem("Sys")
  import system.dispatcher
  implicit val materializer = ActorFlowMaterializer()

  val breaker =
    new CircuitBreaker(system.scheduler,
      maxFailures = 1,
      callTimeout = 10.seconds,
      resetTimeout = 1.minute)

  def rejectMoreThanTwo(s: String): String =
    if (s.length > 2)
      throw new IllegalArgumentException(s);
    else
      s

  def lookup(s: String) = breaker.withCircuitBreaker(Future(rejectMoreThanTwo(s)))

  implicit val timeout = Timeout(3.seconds)
  val r = Source.apply(Seq("AA", "B", "xxxxxxxxxxxxxxxx", "Ced", "Dxxx", "E"))
    .mapAsync(6) { x =>
      lookup(x) recover {
        case e: CircuitBreakerOpenException => "open"
        case _@ x                           => x.toString
      }
    }
    .runForeach(println)

  r.onComplete {
    case x @ _ => println(x); system.shutdown
  }

}