Historized SCD lookups with Range Queries in Kafka Streams

Tom Kaszuba
7 min readFeb 18, 2024

Introduction

About 5 years ago, while working for a client, I encountered an interesting problem related to lookups on slowly changing dimensions. As the project grew, it became apparent that historical events need to be linked to historical SCD data derived from Master Data. At the time all lookups were done using joins in Kafka Streams which require an exact match to fire, this would be “impossible” to do with historized data that was defined by a date range. I argued that this should still be possible in Kafka Streams with some data messaging but alas the label of “impossible” stuck and a solution using an external database to do these queries was chosen.

While there is nothing wrong with the external database approach, there are clear benefits to not leaving the Kafka Streams framework, mainly:

  • transaction support
  • faster query times
  • lazy evaluated iterator instead of db cursor
  • OOM safety
  • less operational risk

Hence here I am, 5 years later while between contracts, showing how this can be done and highlighting an underutilized and misunderstood feature of Kafka Streams: Range Queries.

The Problem

Slowly Changing Dimensions change over time and are therefore valid within a certain time range. When processing historical events we need to link with the SCD that is valid during the time period of the historical event. We therefore can model the SCD as the following:

SCD changing over time

In the graphic above we have 4 date events for Key1 modeled in blue and 4 date events for Key2 modeled in red. They can overlap but for demonstration purposes it is easier to show them separately.

We then have the following use cases:

Exact Match

Exact match between Historic event and SCD date

This is the simplest scenario where the Historic event date matches the SCD date exactly and a normal join would suffice. Since dates and especially date/times rarely match such a scenario is very rare.

Between Match

Range match between Historic event and SCD date

The most common scenario is where the Historic event date lands between two SCD events. In such a case we want to “round down” and take the earlier date. The blue DE is then linked to the blue D2 and the red DE is linked to the red D3.

Earliest Match

Earliest match between Historic event and SCD date

In this case a historic event has a date that is earlier than the first SCD event. It is up to discussion on what to do in such a case but usually we want to “round up” to the earliest available SCD event. The blue DE is then linked to the blue D1 and the red DE is linked to the red D1, ignoring the blue D4.

Latest Match

Latest match between Historic event and SCD date

The last scenario is where the historic event has a date that is still valid with the current SCD event. In such a case we want to “round down” and match to the latest. The blue DE is then linked to the blue D4, ignoring the red D1, and the red DE is linked to the red D4.

Sorting

The entire trick to range queries lies in the sort order of the keys in the state stores. All state stores in Kafka Streams behave like a NavigableMap. The in-memory store, for example, uses a TreeMap. Looking at the java doc we see the following:

A Red-Black tree based NavigableMap implementation. The map is sorted according to the natural ordering of its keys, or by a Comparator provided at map creation time, depending on which constructor is used.

If simple primitive keys, such as strings or numbers are used, then we have the keys already sorted in the state stores and don’t need to do more work. For complex objects, such as compound keys, we need to provide a Comparator.

I’m a big fan of not using Avro/Protobuf/Json for internal keys due to schema versioning issues hence I will extend the already existing KeyValue in Streams and make it comparable.

public class ComparableKeyValue<K extends Comparable, V extends Comparable> extends KeyValue<K, V>
implements Comparable<KeyValue<K, V>> {

public ComparableKeyValue(final K key, final V value) {
super(key, value);
}

public ComparableKeyValue(final KeyValue<K, V> keyValue) {
this(keyValue.key, keyValue.value);
}

@Override
public int compareTo(KeyValue<K, V> other) {
int keyCompare = key.compareTo(other.key);
return (keyCompare == 0) ? value.compareTo(other.value) : keyCompare;
}
}

We also need a Serde to tell the state stores how to store this complex object which I have provided in my github.

We can now test to ensure our keys are sorted in the correct order in the state store with a unit test that utilizes the topology test driver.

Unit Test

Following the scenario outlined in the problem section we will need 8 SCD events for the blue key1 and the red key2. The event dates we will store as longs, replicating Avro logical date types.

  private final ComparableKeyValue<String, Long> key1V1 = create(key1, LocalDate.of(2021, 1, 1));
private final ComparableKeyValue<String, Long> key1V2 = create(key1, LocalDate.of(2021, 2, 1));
private final ComparableKeyValue<String, Long> key1V3 = create(key1, LocalDate.of(2021, 3, 1));
private final ComparableKeyValue<String, Long> key1V4 = create(key1, LocalDate.of(2021, 4, 1));

private final ComparableKeyValue<String, Long> key2V1 = create(key2, LocalDate.of(2022, 1, 1));
private final ComparableKeyValue<String, Long> key2V2 = create(key2, LocalDate.of(2022, 2, 1));
private final ComparableKeyValue<String, Long> key2V3 = create(key2, LocalDate.of(2022, 3, 1));
private final ComparableKeyValue<String, Long> key2V4 = create(key2, LocalDate.of(2022, 4, 1));

We will then send these events out of order to highlight that insert order is not used.

  private void pipeToLookupOutOfOrder() {
lookupTopic.pipeInput(key1V3, "key1V3");
lookupTopic.pipeInput(key1V1, "key1V1");
lookupTopic.pipeInput(key2V2, "key2V2");
lookupTopic.pipeInput(key2V1, "key2V1");
lookupTopic.pipeInput(key1V2, "key1V2");
lookupTopic.pipeInput(key1V4, "key1V4");
lookupTopic.pipeInput(key2V4, "key2V4");
lookupTopic.pipeInput(key2V3, "key2V3");
}

Using a normal iterator we can see that in fact all the events are now ordered using the Comparator of the ComparableKeyValue.

  @Test
public void testLookupOrdering() {
pipeToLookupOutOfOrder();

KeyValueStore<ComparableKeyValue<String, Long>, String> store =
testDriver.getKeyValueStore(HistorizedLookupsWithRangeQueries.VersionedLookupStoreName);

assertEquals(8, store.approximateNumEntries());

var list = new ArrayList<ComparableKeyValue<String, Long>>();
try (KeyValueIterator<ComparableKeyValue<String, Long>, String> iterator = store.all()) {
while (iterator.hasNext()) {
list.add(iterator.next().key);
}
}

assertEquals(key1V1, list.get(0));
assertEquals(key1V2, list.get(1));
assertEquals(key1V3, list.get(2));
assertEquals(key1V4, list.get(3));
assertEquals(key2V1, list.get(4));
assertEquals(key2V2, list.get(5));
assertEquals(key2V3, list.get(6));
assertEquals(key2V4, list.get(7));
}

Range Queries

All key value state stores in Kafka Streams support range queries through the processor API.

        KeyValueIterator<K, V> range(K from, K to);
default KeyValueIterator<K, V> reverseRange(K from, K to)

Since the KeyValueIterator is lazy evaluated it allows us to deal with each record incrementally, similar to how a db cursor acts. In our case we only want to return one value, hence only one iterator evaluation will be required making the read very fast at O(1).

We require two range queries where only one will ever be evaluated.

The beforeValue range query sets the min date starting from 0 and uses a reverseRange to “round down” to a value before the event time. While the afterValue uses the Max Instant second to do a range query to “round up” to the next available value.

    @Override
public void process(Record<String, ComparableKeyValue<String, Long>> record) {
Optional<String> lookup = beforeValue(record.value()).or(() -> afterValue(record.value()));
context().forward(new Record<>(record.key(), lookup, context().currentStreamTimeMs()));
}

private Optional<String> beforeValue(ComparableKeyValue<String, Long> value) {
ComparableKeyValue<String, Long> min = new ComparableKeyValue<>(value.key, 0L);
try (KeyValueIterator<ComparableKeyValue<String, Long>, ValueAndTimestamp<String>> iterator =
lookupStore.reverseRange(min, value)) {
return iterator.hasNext()
? Optional.ofNullable(iterator.next().value.value())
: Optional.empty();
}
}

private Optional<String> afterValue(ComparableKeyValue<String, Long> value) {
ComparableKeyValue<String, Long> max =
new ComparableKeyValue<>(value.key, Instant.MAX.getEpochSecond());
try (KeyValueIterator<ComparableKeyValue<String, Long>, ValueAndTimestamp<String>> iterator =
lookupStore.range(value, max)) {
return iterator.hasNext()
? Optional.ofNullable(iterator.next().value.value())
: Optional.empty();
}
}

Testing

We will now a issue a query command to model our Historic event and see if it is linked to the correct historized SCD event as outlined in the “Problem” section.

Exact Match

  @Test
public void testExactMatch() {
pipeToLookupOutOfOrder();

queryCommandTopic.pipeInput(commandId, key1V2);
var res = queryResponseTopic.readRecord();
assertEquals(commandId, res.key());
assertEquals("key1V2", res.value());

queryCommandTopic.pipeInput(commandId, key2V3);
var res2 = queryResponseTopic.readRecord();
assertEquals(commandId, res2.key());
assertEquals("key2V3", res2.value());
}

Between Match

  @Test
public void testBetweenMatch() {
pipeToLookupOutOfOrder();

queryCommandTopic.pipeInput(commandId, create(key1, LocalDate.of(2021, 2, 27)));
var res = queryResponseTopic.readRecord();
assertEquals(commandId, res.key());
assertEquals("key1V2", res.value());

queryCommandTopic.pipeInput(commandId, create(key2, LocalDate.of(2022, 3, 27)));
var res2 = queryResponseTopic.readRecord();
assertEquals(commandId, res2.key());
assertEquals("key2V3", res2.value());
}

Earliest Match

  @Test
public void testEarliestMatch() {
pipeToLookupOutOfOrder();

queryCommandTopic.pipeInput(commandId, create(key1, LocalDate.of(2020, 2, 27)));
var res = queryResponseTopic.readRecord();
assertEquals(commandId, res.key());
assertEquals("key1V1", res.value());

queryCommandTopic.pipeInput(commandId, create(key2, LocalDate.of(2021, 3, 27)));
var res2 = queryResponseTopic.readRecord();
assertEquals(commandId, res2.key());
assertEquals("key2V1", res2.value());
}

Latest Match

  @Test
public void testLatestMatch() {
pipeToLookupOutOfOrder();

queryCommandTopic.pipeInput(commandId, create(key1, LocalDate.of(2022, 2, 27)));
var res = queryResponseTopic.readRecord();
assertEquals(commandId, res.key());
assertEquals("key1V4", res.value());

queryCommandTopic.pipeInput(commandId, create(key2, LocalDate.of(2023, 3, 27)));
var res2 = queryResponseTopic.readRecord();
assertEquals(commandId, res2.key());
assertEquals("key2V4", res2.value());
}

Results

All tests pass with the range queries working as expected.

HistorizedLookupsWithRangeQueriesTest > testLookupOrdering() PASSED
HistorizedLookupsWithRangeQueriesTest > testLatestMatch() PASSED
HistorizedLookupsWithRangeQueriesTest > testBetweenMatch() PASSED
HistorizedLookupsWithRangeQueriesTest > testEarliestMatch() PASSED
HistorizedLookupsWithRangeQueriesTest > testExactMatch() PASSED
HistorizedLookupsWithRangeQueriesTest > testLatestRangeMatch() PASSED

Conclusion

As you can see it is not “impossible” to do range lookups on historized data but it does require a bit more boiler plate to massage the keys.

p.s: I’ve skipped over some of the more boring parts but the fully working example can be found in my github.

Up Next

KIP-889 has introduced Versioned Stores which seem to be a perfect fit for resolving the problem of historized SCD lookups. Lets see if it is possible to implement the same functionality using versioned stores. https://tkaszuba.medium.com/historized-scd-lookups-with-versioned-stores-in-kafka-streams-0c40f2ac4b67

--

--

Tom Kaszuba

Java, Scala and .Net Consultant with over 20 years experience in the Financial Industry, specializing in Big Data and Integration... and especially Kafka.