Custom Suppression in Kafka Streams
If you are heavy into stateful transformations in Kafka Streams you’ve most likely hit a case where you wanted to slow or suppress the emitted events and forward only the last pertinent aggregated event, for example, to reason about the end of a batched load.
In Kafka 2.1, the suppression function was introduced to the KTable DSL. While this function works fine for streams that have a lot of updates, it does not work well for topics that receive updates in batches or are fairly stale. While it might seem like an oversight, this behavior is by design and matches how the session window works in the DSL; a session window will only close if a new event arrives to open a new window and closes the old one. A KIP-424 has been put forward to extend the suppression function to also allow suppression based on the wall clock time, but it seems nobody has picked it up as of yet.
Thankfully, implementing your own suppression is fairly straightforward with the transformer API. It also gives you much more flexibility in customizing its behavior.
Prerequisites
In order to suppress events we need a way to park the events and only forward them when a certain condition is met. In order to do that we need two main things:
- A State Store
- A Windowing Mechanism
For this suppression transformer I decided to use a session window. A session window represent a period of activity separated by a defined gap of inactivity. Since the transformer is based on the wall clock time the session window will close after a certain period of inactivity once the wall clock time has passed. For example, if our session window is sized to be 10s and a new event arrives 10s after the last event it will fall into a new session window.
There are several ways to keep track of the metadata required to know when a session window was opened, my favorite is to use ValueAndTimestamp class. I usually choose this class because it doesn’t wrap the key in a Windowed[K] abstraction, which makes finding keys directly in the state store difficult, furthermore, a specialty TimestampedKeyValueStore already exists that does all the work for you. Creating such a store with the stores builder is a simple process using Stores.timestampedKeyValueStoreBuilder().build()
Partitioning
Since a suppression function should not change the key the transformer will implement the ValueTransformerWithKey
interface. This interface provides us the following transformation method which only requires the value to be returned instead of a KeyValue pair.
override def transform(readOnlyKey: K, value: V): V = ???
Punctuation
We are only interested in the wall clock time and hence we want only the ProcessorContext
to be forwarding our messages. The trick to make this happen is to make the transform
method return a null value. The full transform method looks like the following:
override def transform(readOnlyKey: K, value: V): V = {
val timestamp = {
if (useMessageTime)
context.get.timestamp()
else
Instant.now.toEpochMilli
} store.get.put(readOnlyKey, ValueAndTimestamp.make[V]
(value, timestamp + windowSize))
null.asInstanceOf[V]
}
The magic happens in the store.put
method which puts a key into the state store. If the same key already exists it overwrites it with a new session window, therefore extending it.
A timestamp override is also provided that allows one to choose the wall clock time over the event timestamp. Choosing to overwrite the event timestamp with the wall clock time effectively changes the session window into a normal tumbling time window.
We now need to set the Punctuator
to traverse our entire store and forward all the windows that have closed based on regular scheduled intervals.
cxt.schedule(
Duration.ofMillis(schedule),
PunctuationType.WALL_CLOCK_TIME, msgTime => {
autoClose(store.get.all()) { iterator =>
while (iterator.hasNext) {
val window = iterator.next()
if (msgTime > window.value.timestamp()) {
cxt.forward(window.key, window.value.value())
store.get.delete(window.key)
}
}
}
})
Note: It is important to ensure that the state store iterator is closed else you will get memory leaks.
Event Time vs Processing Time
Since we are working with the wall clock we need to work with the processing time and not the event time. There are two main reasons for this:
- Since the event timestamp in a Kafka topic is written by the client (by default) we could receive out of order events. Ie: The timestamps could be in random order.
- Our session window will close immediately if the events are old, for example during a replay or restore from backup topic.
To ensure that we are only working with the processing time we need to tell the streams app that we want to overload the provided event time with the wall clock time when the event was ingested. This is done by setting the TimestampExtractor to use the WallclockTimestampExtractor.
The Code
The entire transformer looks like this:
Testing it is straightforward with the Kafka Transformer testing kit:
Conclusion
The transformer API is a powerful way to implement custom functionality on top of the Kafka Streams DSL. A custom suppression transformer is one way to extend the framework to handle cases that Kafka Streams does not handle intrinsically.