Historized SCD lookups with Versioned Stores and IQv2 in Kafka Streams

Tom Kaszuba
5 min readFeb 21, 2024

Introduction

In my previous article I posed a problem on how to enable lookups on SCD data that is historized by using Range Queries. KIP-889, introduced with Kafka Streams version 3.5, seems to also tackle the historization problem with RocksDB segments. Is it then possible to replicate the same functionality using Versioned Stores instead of using Range Queries? Lets find out.

Setup

The first difference between a Versioned Stores and normal Key/Value Stores is the lack of iterators. There is currently no support for range queries or even the normal all iterator. It’s also not possible to find out how many elements currently are in the store. I’m sure this functionality will show up in the future but as of version 3.7 this isn’t the case.

The second surprise is that no in-memory store option is provided. All Versioned Stores are therefore RocksDB stores, which makes sense since it relies heavily on the segmenting feature of RocksDB.

Either way this isn’t a hindrance for our use case and we can proceed with the investigation.

Since we won’t be supporting any range queries we don’t need to worry about the sort order, hence a big chunk of the boiler plate code from the previous example is now gone. We also don’t need to mess around with implementing the Comparable interface making work with any sort of keys, such as Avro or Protobuf a breeze. This is a huge plus for the Versioned Store solution.

To initialize the store we will still use a globalKTable as previously. The setup of the lookup store is done using a custom processor:

builder.addGlobalStore(
new VersionedKeyValueStoreBuilder<>(
Stores.persistentVersionedKeyValueStore(
VersionedLookupStoreName, Duration.ofDays(5 * 365)),
stringSerde,
stringSerde,
Time.SYSTEM),
LookupTopic,
Consumed.with(stringSerde, keyValueSerde),
VersionedStoreSetupProcessor::new);

I’m using a KeyValue class to represent our SCD data, even though it is not a very good semantical representation, but for this example it is good enough. The Versioned Store needs to be passed the retention, I’ve chosen 5 years. This is a big difference to a normal Key/Value store which has infinite retention by default. The segment size can also be defined, which can affect query performance.

There is not much magic in the processor, we take the event timestamp and insert it into the Versioned Store along with the value.

private static class VersionedStoreSetupProcessor
extends ContextualProcessor<String, KeyValue<String, Long>, Void, Void> {

private VersionedKeyValueStore<String, String> lookupStore;

@Override
public void init(ProcessorContext<Void, Void> context) {
super.init(context);
this.lookupStore = context.getStateStore(VersionedLookupStoreName);
}

@Override
public void process(Record<String, KeyValue<String, Long>> record) {
lookupStore.put(record.key(), record.value().key, record.value().value);
}
}

Querying

Querying the SCD data is dead simple by calling the get method on the Versioned Store with a provided timestamp. If no timestamp is passed then the latest value is returned.

VersionedRecord<V> get(K key, long asOfTimestamp); 

The processor then becomes:

private static class VersionedStoreLookupProcessor
extends ContextualProcessor<String, KeyValue<String, Long>, String, Optional<String>> {

private VersionedKeyValueStore<String, String> lookupStore;

@Override
public void init(ProcessorContext<String, Optional<String>> context) {
super.init(context);
this.lookupStore = context.getStateStore(VersionedLookupStoreName);
}

@Override
public void process(Record<String, KeyValue<String, Long>> record) {
Optional<String> lookup =
Optional.ofNullable(lookupStore.get(record.value().key, record.value().value))
.map(VersionedRecord::value);
context().forward(new Record<>(record.key(), lookup, context().currentStreamTimeMs()));
}
}

Testing

The topology test driver fully supports Versioned Stores so testing follows the same pattern as in the previous example. Running the previous tests now gives:

HistorizedLookupsWithVersionedStoresTest > testLookupOrdering() PASSED
HistorizedLookupsWithVersionedStoresTest > testLatestMatch() PASSED
HistorizedLookupsWithVersionedStoresTest > testBetweenMatch() PASSED
HistorizedLookupsWithVersionedStoresTest > testEarliestMatch() FAILED
org.opentest4j.AssertionFailedError at HistorizedLookupsWithVersionedStoresTest.java:145
HistorizedLookupsWithVersionedStoresTest > testExactMatch() PASSED

As you can see the “earliest match” test fails. This is expected since the Versioned Store has no sense of order associated to it and hence we can’t round up as in the previous example. As stated previously, the round up feature could be viewed as controversial since how can an event be matched if the SCD never existed. In the case of Versioned Stores this question becomes even more murky since the SCD might have existed but has expired in the mean time. The old case of soft vs hard deletes again.

One important factor to take into consideration when using Versioned Store queries is the performance. The get method is optimized for returning the latest value, this will always be O(1) performance. For the earlier records, the Versioned Store has to be scanned through all segments and hence will be a lot slower. If there are a lot of segments then this will impact performance.

Interactive Queries version 2

Kafka version 3.7 introduces KIP-968 which allows versioned stores to be queried using the second iteration of interactive queries dubbed IQv2.

Since interactive queries return an iterator it is possible to replicate the “round up” feature we had with the range queries using interactive queries.

Our before value function now becomes:

    private Optional<VersionedRecord<String>> beforeValue(String key) {
QueryResult<VersionedRecordIterator<String>> result =
lookupStore.query(
MultiVersionedKeyQuery.<String, String>withKey(key)
.fromTime(Instant.ofEpochMilli(0L))
.withAscendingTimestamps(),
PositionBound.unbounded(),
new QueryConfig(false));
try (VersionedRecordIterator<String> iterator = result.getResult()) {
return iterator.hasNext() ? Optional.ofNullable(iterator.next()) : Optional.empty();
}
}

To construct the query request we need to pass the key, the from/to timestamp limit and the sort order of the timestamps. Since we only care about the from we set that to the minimum value and leave the max as default. We also want to order the timestamps by ascending to take the first timestamp as we did with the range queries.

We extend the process method to include this query:

    @Override
public void process(Record<String, KeyValue<String, Long>> record) {
Optional<String> lookup =
Optional.ofNullable(lookupStore.get(record.value().key, record.value().value))
.or(() -> beforeValue(record.value().key))
.map(VersionedRecord::value);
context().forward(new Record<>(record.key(), lookup, context().currentStreamTimeMs()));
}

Testing the topology now gives an all green result:

HistorizedLookupsWithVersionedStoresTest > testLookupOrdering() PASSED
HistorizedLookupsWithVersionedStoresTest > testLatestMatch() PASSED
HistorizedLookupsWithVersionedStoresTest > testBetweenMatch() PASSED
HistorizedLookupsWithVersionedStoresTest > testEarliestMatch() PASSED
HistorizedLookupsWithVersionedStoresTest > testExactMatch() PASSED

Conclusion

Using Versioned Stores it is possible to replicate all of the functionality that we had defined in the previous example using Range Queries. The major advantage of using Versioned Stores over Range Queries is the simplicity. The amount of boiler plate is minimal plus Avro and Protobuf can be used as is for keys. The disadvantages are that querying Versioned Stores is slower, which might be an issue with a large set of SCD values or segments. To top off, certain state stores features are not yet implemented such as an in-memory db store or some iterators.

All in all, Versioned Stores are a great addition to the Kafka Streams framework and I’m eager to see how they evolve.

Fully working code can be found in my github.

--

--

Tom Kaszuba

Java, Scala and .Net Consultant with over 20 years experience in the Financial Industry, specializing in Big Data and Integration... and especially Kafka.