The curious case of missing headers in Kafka Streams

Tom Kaszuba
6 min readDec 25, 2022

KIP-82 Introduced headers to Kafka and they’ve been a hit ever since. Headers allow you to pass metadata information such as logging context, error information or data lineage of a message without having to dirty your payload. At least in theory. Once you try to use Kafka headers in Kafka Streams you quickly find out that there is no support for them when doing any sort of stateful transformations even though the extension points in the API are there.

Why do we need headers in Kafka Streams?

The examples pointed out earlier of course hold also with Kafka Streams but there is one extra use case that most people do not think about where headers would greatly aid in performance: batching.

For me batching is when you suppress a real time stream until some sort of condition is met, it could be for example a buffer before an insert into an external system or perhaps a transpose of a certain number of messages into one message. I’ve used suppression to make DSL joins work the same as an SQL join for example. In advanced Kafka streams applications some sort of batching/suppression is routinely used to control data that is pushed out to downstream services. So how can headers help in this regard?

These are the good old signatures of the Kafka Serializer and Deserializer:

public interface Serializer<T> extends Closeable {

byte[] serialize(String topic, T data);

default byte[] serialize(String topic, Headers headers, T data) {
return serialize(topic, data);
}
}

public interface Deserializer<T> extends Closeable {

T deserialize(String topic, byte[] data);

default T deserialize(String topic, Headers headers, byte[] data) {
return deserialize(topic, data);
}
}

As you will notice it contains two separate serialize/deserialize methods, one with headers and one without headers. Observing the default with headers implementations we see that it drops the headers which points out that you will need to override these methods if you want to pass headers further along. What is a more interesting fact though is that the serialization and the deserialization of headers are treated “separately” from the payload. This fact is super important since it lets us treat headers as first class citizens. We can now decide what to do with the payload by just looking at the metadata without having to deserialize the entire payload! We could then include indexing information related to the payload without having to store it in a separate state store.

Batching use case

I’ll take the case of transposing a certain number of messages into one message as this was a case I have had with a former client. We have 600 individual Avro events that we would like to send downstream as one Avro event with an array of 600 items.

With the DSL generally we would need to do the following to get this done:

  1. Use the aggregate method to transform the 600 Avro events into 1 Avro event.
  2. Call the toStream method to output the changelog events. If you have transactions on this call will result in 600 update events since caching is turned off.
  3. Use the filter method as our suppression mechanism to drop the changelog events that don’t satisfy a business condition.

This will get the job done but it will be VERY slow.

The reason why this is slow is because that aggregation method has to call the Avro deserializer 180.300 + 600 times.

But we have only 600 items how can that result in 181k calls? Any time you have an embedded Avro, which an array item is, the Avro deserializer does a recursive call and treats the embedded Avro like a separate deserialize call. Therefore as the Avro object grows we get the following sequence plus 600 for the parent Avro.

1 + 2 + 3 + … + n:

This is a lot of overhead for ultimately ONE transposed event.

So is there a better way to do this? YES and headers can really help us out here.

Processor API to the rescue

Since the DSL is not up to the task we need to go to the bare bones and create ourselves a transformer that will do what the DSL does for us. The general solution is this. We serialize each Avro event into a state store as a List and use the headers as an index. When a new event pops in we add the serialized event to the serialized collection and update the index. When we are ready to forward the event downstream we will only deserialize then which will result in precisely 601 deserialization calls.

List Serde

Instead of using the embedded Array of an Avro object we need to create our own Serde that allows us to serialize a List:

public class ListSerde<T> implements Serde<List<T>>

I won’t go into the details of how to create such a Serde but suffice it to say that the serialization saves an object in Kafka as follows:

| number of items in the collection | size of first serialized event | first serialized event | size of second serialized event | second serialized event| … | size of n serialized event | n serialized event |

Therefore if we want to add a new object to the serialized collection all we need to do is to update the number of items in the collection at the head of the collection (this is a fixed number of bytes) and attach the serialized event to the tail of the serialized collection. In this way we don’t need to deserialize anything and work directly on the level of bytes.

The Index

We don’t specifically need to use Headers to create the index, we could just as well use a second state store for keeping the metadata, BUT since the Kafka API has given us all the extension points why not take advantage of this mechanism.

In order to serialize and deserialize the headers we need the following two new Serdes.

public class HeadersSerde implements Serde<List<Tupple<String, Byte[]>>>
public class HeaderSerde implements Serde<Tupple<String, Byte[]>

It’s a shame that Kafka doesn’t provide a Serde for the Headers, which I hope they rectify in a future release, but for the time being you can grow your own by leveraging the List Serde created previously and store the headers as a tupple of a string and a byte[].

public interface Headers extends Iterable<Header>
public interface Header {
String key();
byte[] value();
}

We now need one final Serde to link the Headers with the Value.

public class HeadersWithValueSerde<T> implements Serde<T> {
private final HeadersSerde headers;
protecte final Serde<T> values;
public HeadersWithValueSerde(Serde<T> serde) {
values = values;
}
}

In this Serde we will internally use the HeadersSerde to first Serialize the headers, put them at the beginning and then put the serialized value at the tail. For our use case we don’t want our values to be serialized so we will use the Byte Array Serde which does nothing:

public class IndexHeadersSerde extends HeadersWithValueSerde<Byte[]>
public IndexHeadersSerde() {
super(Serdes.ByteArraySerde)
}
}

For the sake of brevity I’m skipping the steps on serializing the events and adding them to the List that I described earlier which are the input to the IndexHeadersSerde.

Using the IndexHeadersSerde will now allow us to get the Headers apart from the values. As an index we need to put in any information that will allow us to work with the serialized value collection and trigger the stop of the suppression (deserialization) event:

  • num of items in the value collection
  • a business rule if applicable

In my particular use case it was the sum of probabilities, once they summed up to 1 we can consider the aggregation finished and can forward the event.

Conclusion

Of course this index approach will not work if the particular suppression trigger is not easily modeled. In such a case it is beneficial to break the event model into smaller parts so that they can easily be parallelized and then brought together after.

There’s a lot of use cases for headers, using them as an index in the Processor API is just one of them. Hopefully in the future the Kafka Streams API will be extended to fill in the missing gaps so that they can be leveraged to their fullest.

--

--

Tom Kaszuba

Java, Scala and .Net Consultant with over 20 years experience in the Financial Industry, specializing in Big Data and Integration... and especially Kafka.