The Kafka streams api uses local state stores backed up by a change log topic to maintain state. Since Kafka works on the level of bytes the state stores also require that this level of granularity be used. The translation between the domain language and the Kafka world is the responsibility of the Serde. This works alright but does impose a certain performance penalty since reading and writing needs to “always” go through the Serde, especially when using heavy Serdes like Avro. I put the always in quotes since it is possible to bypass the Serde with custom state store implementations but it does require some effort to work around the default implementation.
Performance problems with Deserialization are nicely explained by Lei Chen from Bloomberg during the Kafka Summit. There are several ways of tackling this problem. The approach taken by Bloomberg is to create a custom state store. Which is a very good solution and one that works in all situations, I’m actually confused why the framework doesn’t have a State Store like this by default. How to do this and the problems associated with this implementation can be found here. But there is also a much simpler way that might be enough for your use case, that of a caching Serde.
If your data doesn’t change often on either read or write to the state store then caching on the level of the Serde might be enough. It is simple to implement and doesn’t require any interference with the streams api.
I’ll focus only on the deserialization but the same concepts apply also to serialization. The caching deserializer works by maintaining an in memory cache. On reading it first searches the cache to see if this value has already been deserialized, if it has then it returns this value, if it hasn’t then it calls the inner deserializer and stores the deserialized value in the cache. If the maximum size of the cache is reached it starts purging the oldest entries. Any sort of cache can be used for this purpose, the one presented below uses a LinkedHashMap because it functions like a stack so the oldest entries can be removed when a certain size of the cache is reached.
The CachingDeserializer uses a Byte object wrapper for the key instead of the byte array as it’s easier to work with. When the cache size is reached an iterator is used to remove the first item in the list and hence no expensive traversals need to take place.
Using the Deserializer is as follows:
Note: When storing complex mutable objects (such as an Avro) in the cache you should make a clone of the returned object else you will modify the original object in the cache. When reading from the state store directly you do not need to worry about this since the object is cloned during deserialization. You might include this already in the Serde depending on how the Serde will be used.
So the natural question arises is why would you just not skip this all together and keep the cache directly in memory without using any state stores in the background? I guess it all depends on your use case, state stores provide fault tolerance and deal with multiple partitions very well but do introduce a lot of complexity that is sometimes difficult to deal with. Hopefully in new releases the API will grow and it will become easier to work with them.