Painlessly passing message context through Akka Streams

Photo by Jason Leung on Unsplash

In stream processing pipelines, once in a while you may bump into a similar problem: you have some kind of a rich message envelope which contains some business data and some extra metadata. You need to process the data, but at the same time pass the whole message down the stream, because the metadata is needed for later. Without any extra tools, you’d need to do it directly, like in the following example, where you read a CommittableMessage from Kafka, pass it to a Flow where only the value is processed, and then pass the whole initial message further down, so that it can be committed using the metadata:

There are a couple of problems with such design:

  • The code of your Flow is polluted with Kafka imports. This makes the application logic directly dependent on the infrastructure layer.
  • The flow is difficult to reuse outside of Kafka-based streams, it’s also uncomfortable to test it.

This issue has been in the spotlight for quite a long time. Finally, since Akka 2.5.20 there is a working solution called FlowWithContext and SourceWithContext .

Detaching the context

Akka Streams now provides a special kind of Source which is aware of how to separate the metadata (called Context ) from the data. This source will then allow you to use combinators which operate only on the data, hiding the rest of the information.

Example

As you can see, SourceWithContext can be used as a wrapper for any other kind of Source, you only have to provide a recipe for decomposition of your stream elements into tuples of data and the context.

Now your stream looks like there’s no context, just data!

def processData(data: String) = …def processDataAsync(data: String): Future[Result] = …sourceWithContext
.map(processData)
.mapAsync(processDataAsync)

Reuniting the tuple

When your processing is over, it is time to bring the context back out of hiding. This can be done with .asSource method, which converts a SourceWithContext into a normal Source of tuples.

sourceWithContext
.map(processData)
.mapAsync(processDataAsync)
.asSource // Source[(String, MsgContext), NotUsed]

Now you can handle the context as needed, for example commit the message, register some metrics, etc.

Restrictions!

Not all stream operations are available for SourceWithContext . In fact, your options are quite limited and you can do map , mapAsync , collect , filter , filterNot , grouped , sliding , mapConcat , mapContext , via . Hopefully more combinators will be available in the future, check out this part of Akka source documentation for updates.

FlowWithContext

The .via combinator gives us possibility to plug in custom flows, which encapsulate more sophisticated logic. However, these have to be implementations of FlowWithContext :

As you can see, the flow is now of type FlowWithContext . However, it is clean, has no dependency on MsgContext and can be put into your application logic layer where it belongs. The technical dependency is provided externally in the myFlow[MsgContext] call, which I like to see as a quite elegant kind of inversion of control .

Full example: streaming from Cassandra event journal

In the beginning of this article I mentioned that one of the motivating use-cases of context passing is carrying Kafka offsets through the stream. Such case is very well described in the alpakka-kafka documentation , so there’s no need to copy it here. Instead, I will show you a different case.

In this example we have a stream of events, stored in Cassandra using Akka Persistence. Let’s write a projection , which consumes these events and does some processing.

First, be aware that streaming from Cassandra is not at all as effective as streaming from Kafka. The underlying consumer actually polls the database, there is no partitioning (so no scalability), and you have to manage offset commits manually. However, following solution is good enough for many cases! Here’s a full example:

  • The createStream function can be reused to build event projection streams, where for each projection you provide its unique identifier and a flow for message processing.
  • The RestartSource outer wrapper guarantees that in case of errors our consumer will try to re-connect and read from the last committed message.
  • The most inner source starting with Source.fromFuture creates a stream of events, which reads from Cassandra journal and converts messages into tuples of (event, metadata)
  • The Source.fromTuples wrapper converts this stream into SourceWithContext , allowing via(projectionFlow)
  • The provided projectionFlow does message processing and contains no dependencies to Cassandra metadata. It can be flexibly reused and tested!
  • After the processing, we call .asSource to bring back the context, which is finally committed.
  • The implementation of JournalOffsetDao is not relevant here, it can be some sort of simple DAO.
  • If you want to know more about reading events by tag from Cassandra, check out this section of Akka documentation .

Conclusions

The SourceWithContext and FlowWithContext abstractions solve an important problem, so don’t hesitate to use them to keep good separation of concerns.

我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章