How to purge Kafka Streams DSL state stores

Tom Kaszuba
3 min readMay 2, 2024

Introduction

Sometimes I encounter questions on how to remove records from the Kafka Streams DSL created state stores, so that an aggregation can restart aggregating from scratch for example.

There are two primary ways, with both being fairly simple to do. One is to use Tombstones and the other is the processor API.

Problem

Say you have the following simple stream that does a simple count across keys.

    builder.stream(Incoming, Consumed.with(stringSerde, integerSerde))
.groupByKey()
.aggregate(
() -> 0,
(k, v, agg) -> agg + v,
Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as(AggStoreName)
.withCachingDisabled())
.toStream()
.to(Outgoing);
}

Since the backup topic is compacted, and hence has infinite retention, this counter will keep counting forever. How can you reset this counter to start from the initializer again from outside of the stream with an event pushed through the incoming topic?

Tombstones

The easiest way to delete or purge a key in a state store is to use a null value, aka Tombstone. If you aren’t familiar with the semantics of tombstones in Kafka Streams I encourage you to read my previous article on hard vs soft deletes.

But sending a tombstone directly into the incoming topic will produce the following message:

[WARN] Skipping record due to null key or value. topic=[input-topic] partition=[0] offset=[4]

This is by design, as explained on the kafka mailing list.

We therefore need to pass some sort of event into the aggregation that we can act upon and issue the tombstone from withing the aggregation. For the sake of the example I will use a “DeleteEvent” (Integer.MIN_VALUE) but normally you would do this using a field in Avro or Protobuf.

The stream now becomes:

public final class SimpleAggregateDelete {

public static final String Incoming = "input-topic";
public static final String Outgoing = "output-topic";
public static final String AggStoreName = "agg-store";
public static final int DeleteEvent = Integer.MIN_VALUE;
private static final Serde<String> stringSerde = Serdes.String();
private static final Serde<Integer> integerSerde = Serdes.Integer();

public static void createStream(final StreamsBuilder builder) {
builder.stream(Incoming, Consumed.with(stringSerde, integerSerde))
.groupByKey()
.aggregate(
() -> 0,
(k, v, agg) -> (v == DeleteEvent) ? null : agg + v,
Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as(AggStoreName)
.withCachingDisabled())
.toStream()
.filter((k, v) -> v != null)
.to(Outgoing);
}
}

If we encounter a “DeleteEvent” we will issue a tombstone. Since the tombstone will be passed downstream we also need to filter it out after the aggregation call, in this case after the .toStream() call.

Testing the stream, shows that the aggregation is properly reset after the “DeleteEvent” is issued.

  @Test
public void testDelete() {
String key = "Key1";

inputTopic.pipeInput(key, 1);
inputTopic.pipeInput(key, 1);
inputTopic.pipeInput(key, 1);
inputTopic.pipeInput(key, 1);

inputTopic.pipeInput(key, null);
inputTopic.pipeInput(key, SimpleAggregateDelete.DeleteEvent);

inputTopic.pipeInput(key, 1);
inputTopic.pipeInput(key, 1);
inputTopic.pipeInput(key, 1);
inputTopic.pipeInput(key, 1);

assertEquals(1, outputTopic.readRecord().value());
assertEquals(2, outputTopic.readRecord().value());
assertEquals(3, outputTopic.readRecord().value());
assertEquals(4, outputTopic.readRecord().value());

assertEquals(1, outputTopic.readRecord().value());
assertEquals(2, outputTopic.readRecord().value());
assertEquals(3, outputTopic.readRecord().value());
assertEquals(4, outputTopic.readRecord().value());
}

Processor API

The second way of deleting data from a DSL created state store is to use the processor API. This way is more flexible but perhaps a bit more messy since the processor has to be placed downstream from the DSL, the DSL is what creates the state store first, we can’t delete something from a missing state store. Unfortunately the DSL doesn’t allow you to create a state store and inject it into the DSL which would circumvent this problem.

The stream now becomes:

  public static void createStream(final StreamsBuilder builder) {
builder.stream(Incoming, Consumed.with(stringSerde, integerSerde))
.groupByKey()
.aggregate(
() -> 0,
(k, v, agg) -> (v == DeleteEvent) ? DeleteEvent : agg + v,
Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as(AggStoreName)
.withCachingDisabled())
.toStream()
.processValues(StorePurgeProcessor::new, AggStoreName)
.to(Outgoing);
}

As you can see the “DeleteEvent” now has to be passed through the aggregation down into the processor to trigger the delete. If the processor could be placed before the aggregation this would not be required.

The processor is straightforward and looks like the following:

  private static class StorePurgeProcessor
extends ContextualFixedKeyProcessor<String, Integer, Integer> {

private TimestampedKeyValueStore<String, Integer> dslStore;

@Override
public void init(FixedKeyProcessorContext<String, Integer> context) {
super.init(context);
this.dslStore = context.getStateStore(AggStoreName);
}

@Override
public void process(FixedKeyRecord<String, Integer> record) {
if (record.value() == DeleteEvent) dslStore.delete(record.key());
else context().forward(record);
}
}

If we receive the “DeleteEvent” we call store.delete which deletes the value. (it issues a tombstone to the changelog topic in the background) If not, it functions like a normal pass through processor.

Running the same tests produces the same correct result.

You can find the code for both examples in my github.

--

--

Tom Kaszuba

Java, Scala and .Net Consultant with over 20 years experience in the Financial Industry, specializing in Big Data and Integration... and especially Kafka.