Custom Akka Stream Processing

The Akka Stream API comes with a suite of versatile tools for stream processing. Besides the Graph DSL , a set of built-in stream operators is also readily available. Yet, if more custom streams are needed, GraphStage allows one to create streaming operators with specific stream processing logic between the input and output ports.

As illustrated in the Akka Stream doc re: custom stream processing , one can come up with a transformation function like map or filter with a custom GraphStage in just a few lines of code. For example, method map can be implemented as a Flow using GraphStage:

// Implementing `map` using GraphStage
class Map[A, B](f: A => B) extends GraphStage[FlowShape[A, B]] {
 
  val in = Inlet[A]("Map.in")
  val out = Outlet[B]("Map.out")
 
  override val shape = FlowShape.of(in, out)
 
  override def createLogic(attr: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      setHandler(in, new InHandler {
        override def onPush(): Unit = {
          push(out, f(grab(in)))
        }
      })
      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          pull(in)
        }
      })
    }
}

In analyzing a given stream operation, it’s easier to reason about the flow logic starting from the downstream and trace upward. With that in mind, let’s look at the above snippet. When there is a demand from the downstream to pull an element out of the output port, callback method onPull is called which initiates a pull of a new element into the input port which, upon push from the upstream, triggers the onPush callback to grab the the element on the input port, apply function f and push it to the output port.

What is a GraphStage?

A GraphStage represents a stream processing operator. Below is the source code of abstract classes GraphStage and GraphStageLogic :

// Class GraphStage
abstract class GraphStage[S <: Shape] extends GraphStageWithMaterializedValue[S, NotUsed] {
  final override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, NotUsed) =
    (createLogic(inheritedAttributes), NotUsed)
 
  @throws(classOf[Exception])
  def createLogic(inheritedAttributes: Attributes): GraphStageLogic
}
 
// Class GraphStageLogic
abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: Int) {
 
  import GraphInterpreter._
  import GraphStageLogic._
 
  def this(shape: Shape) = this(shape.inlets.size, shape.outlets.size)
 
  ...
 
  final protected def setHandler(in: Inlet[_], handler: InHandler): Unit = {
    handlers(in.id) = handler
    if (_interpreter != null) _interpreter.setHandler(conn(in), handler)
  }
 
  final protected def setHandler(out: Outlet[_], handler: OutHandler): Unit = {
    handlers(out.id + inCount) = handler
    if (_interpreter != null) _interpreter.setHandler(conn(out), handler)
  }
 
  ...
}

To use (i.e. extend) a GraphStage , one needs to implement method createLogic which returns a GraphStageLogic that takes a shape and consists of defined method setHandler which, in turn, takes two arguments Inlet/Outlet and InHandler/OutHandler. These InHandler and OutHandler routines are where the custom processing logic for every stream element resides.

As illustrated in the map or filter implementation in the mentioned Akka doc, to define a GraphStage one would need to minimally define in , out and shape (FlowShape in those examples) of the graph, as well as the stream processing logic in the InHander and OutHandler.

Handling external asynchronous events

Among various customizing features, one can extend a GraphStage to handle asynchronous events (i.e. Scala Futures) that aren’t part of the stream. To do that, simply define a callback using getAsyncCallback to create an AsyncCallback , which will be invoked by the external event via method invoke .

As an exercise for building custom stream processing operators with GraphStage, we’re going to modify the above map Flow to one that dynamically changes the transformation function upon triggering by an asynchronous event. Let’s name the class DynamicMap which takes a switch event of type Future[Unit] and two ‘A => B’ transformation functions ( f being the original function and g the switched one).

// Dynamically switching `map` functions upon triggering by an asynchronous event
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage._
import akka.{Done, NotUsed}
 
import scala.concurrent.{ExecutionContext, Future}
 
class DynamicMap[A, B](switch: Future[Unit], f: A => B, g: A => B)(implicit ec: ExecutionContext)
  extends GraphStage[FlowShape[A, B]] {
 
  val in = Inlet[A]("DynamicMap.in")
  val out = Outlet[B]("DynamicMap.out")
 
  override val shape = FlowShape.of(in, out)
 
  override def createLogic(attr: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      var switched = false
 
      override def preStart(): Unit = {
        val callback = getAsyncCallback[Unit] { _ =>
          switched = true
        }
        switch.foreach(callback.invoke)
      }
 
      setHandler(in, new InHandler {
        override def onPush(): Unit = {
          push(out, if (switched) g(grab(in)) else f(grab(in)))
        }
      })
 
      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          pull(in)
        }
      })
    }
}

In this case, callback simply flips the mutable variable switched from its initial false to true so that when being pushed on the input port the InHandler will now push g(elem) rather than f(elem) to the output port. In addition, an ExecutionContext , required by method invoke for callback invocation, is passed in as an implicit parameter.

Note that to avoid race conditions, the callback is defined and invoked using the preStart lifecycle hook, rather than in the constructor of GraphStageLogic.

Testing DynamicMap with a dummy asynchronous event switch that simply returns in a milliseconds and a couple of trivial ‘DataIn => DataOut’ transformation functions:

// Testing `DynamicMap`
case class DataIn(id: Int)
case class DataOut(content: String)
 
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
 
val source: Source[DataIn, NotUsed] = Source(1 to 2000).map(DataIn(_))
val sink: Sink[DataOut, Future[Done]] = Sink.foreach(println)
 
val switch = Future{ Thread.sleep(1) }
val f = (di: DataIn) => DataOut(s"ID-${di.id}-OLD")
val g = (di: DataIn) => DataOut(s"ID-${di.id}-NEW")
 
source.via(new DynamicMap[DataIn, DataOut](switch, f, g)).runWith(sink)
 
// OUTPUT:
//
// DataOut(ID-1-OLD)
// DataOut(ID-2-OLD)
// ...
// DataOut(ID-982-OLD)
// DataOut(ID-983-OLD)
// DataOut(ID-984-NEW)
// DataOut(ID-985-NEW)
// DataOut(ID-986-NEW)
// ...
// ...
我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章