Synchronize two lookup topics using foreign key joins in Kafka Streams

Part 1: The theory

Tom Kaszuba
4 min readFeb 13, 2024

Introduction

Global kTables in Kafka streams are a great way to enrich your streaming applications with data from other topics. In order to use global kTables effectively, having a properly prepared data feed is important. A snapshot of the latest data using hard deletes guarantees that we only keep the latest events. This allows our global kTables to be built quickly and to use in memory stores which are less finicky and faster than RocksDB.

Lookup topic feeds for global kTables are often derived from other lookup topics, perhaps a master data feed. In such a scenario, we will need to create specialized derived lookup topics specifically keyed/partitioned for particular lookups. Using foreign key joins, it is possible to propagate state changes in the master topic to the derived topics ensuring that they stay in sync with the master.

State changes

For the sake of the article I will take a very simple example of a master data feed that contains a pair of key/value strings. From the master data feed we would like to derive a mirror of this topic, hence a pair of value/key strings. For simplicity we will assume that we have a 1 to 1 mapping between key and value. If we have 1 to many relationship things get more tricky.

We will then have the following use cases:

The conclusion here is that both an update and a delete in the master topic need to trigger a delete in the derived topic. We also need to detect duplicates in order to not delete an update that was just applied. Finding the events that need to be deleted will be the role of the foreign key join between the key in the master topic and the value of the derived topic.

Topology

Topics

We will need two topics, the master and the derived topic. They need to exist up front and be created outside of the topology.

Tables

There are three tables required to handle the delete use cases in the derived topic. The current state of the derived topic, the deletes state of the derived topic and the final update state of the derived topic.

The foreign key join will be done between the current state kTable and the deletes state kTable. It has the following signature.

<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

Since our duplicate detection mechanism will be done in the value joiner, it has no access to the keys, therefore will need to pass also the key in the value. For the purpose of the example I have used a tuple but it could also be an Avro/Protobuf/Json/KeyValue class, whatever, as long as we have access to the key in the value.

Final topology

Stream topology

There are two main parts to the topology. The straight forward inserts/updates and the special handling of the deletes. For the deletes handling the derived topic feeds the current state, gets updated through the foreign key join, updates the derived topic and then updates the current state. This circular logic will of course cause an infinite loop which has to be accounted for with proper cleanup of the state stores.

Conclusion

That’s about it for the theory, the solution is a lot more nuanced since several cases need to be accounted for:

  • Handling of tombstones
  • Event Suppression
  • Cleanup of state stores
  • Caching and Logging
  • Duplicate detection

I will discuss this in part two.

For the curious the working code can already be found in my github.

--

--

Tom Kaszuba

Java, Scala and .Net Consultant with over 20 years experience in the Financial Industry, specializing in Big Data and Integration... and especially Kafka.