In stream processing pipelines, once in a while you may bump into a similar problem: you have some kind of a rich message envelope which contains some business data and some extra metadata. You need to process the data, but at the same time pass the whole message down the stream, because the metadata is needed for later. Without any extra tools, you’d need to do it directly, like in the following example, where you read a CommittableMessage from Kafka, pass it to a Flow where only the value is processed, and then pass the whole initial message further down, so that it can be committed using the metadata:
There are a couple of problems with such design:
This issue has been in the spotlight for quite a long time. Finally, since Akka 2.5.20 there is a working solution called FlowWithContext and SourceWithContext .
Akka Streams now provides a special kind of Source which is aware of how to separate the metadata (called Context ) from the data. This source will then allow you to use combinators which operate only on the data, hiding the rest of the information.
As you can see, SourceWithContext can be used as a wrapper for any other kind of Source, you only have to provide a recipe for decomposition of your stream elements into tuples of data and the context.
Now your stream looks like there’s no context, just data!
def processData(data: String) = …def processDataAsync(data: String): Future[Result] = …sourceWithContext
When your processing is over, it is time to bring the context back out of hiding. This can be done with .asSource method, which converts a SourceWithContext into a normal Source of tuples.
.asSource // Source[(String, MsgContext), NotUsed]
Now you can handle the context as needed, for example commit the message, register some metrics, etc.
Not all stream operations are available for SourceWithContext . In fact, your options are quite limited and you can do map , mapAsync , collect , filter , filterNot , grouped , sliding , mapConcat , mapContext , via . Hopefully more combinators will be available in the future, check out this part of Akka source documentation for updates.
The .via combinator gives us possibility to plug in custom flows, which encapsulate more sophisticated logic. However, these have to be implementations of FlowWithContext :
As you can see, the flow is now of type FlowWithContext . However, it is clean, has no dependency on MsgContext and can be put into your application logic layer where it belongs. The technical dependency is provided externally in the myFlow[MsgContext] call, which I like to see as a quite elegant kind of inversion of control .
In the beginning of this article I mentioned that one of the motivating use-cases of context passing is carrying Kafka offsets through the stream. Such case is very well described in the alpakka-kafka documentation , so there’s no need to copy it here. Instead, I will show you a different case.
In this example we have a stream of events, stored in Cassandra using Akka Persistence. Let’s write a projection , which consumes these events and does some processing.
First, be aware that streaming from Cassandra is not at all as effective as streaming from Kafka. The underlying consumer actually polls the database, there is no partitioning (so no scalability), and you have to manage offset commits manually. However, following solution is good enough for many cases! Here’s a full example:
The SourceWithContext and FlowWithContext abstractions solve an important problem, so don’t hesitate to use them to keep good separation of concerns.