diff --git a/cwm_worker_operator/kafka_streamer.py b/cwm_worker_operator/kafka_streamer.py index e7906ef..bcaa7e8 100644 --- a/cwm_worker_operator/kafka_streamer.py +++ b/cwm_worker_operator/kafka_streamer.py @@ -95,6 +95,7 @@ def commit(topic, consumer, domains_config, agg_data, no_kafka_commit=False): raise NotImplementedError(f"topic {topic} is not supported") if not no_kafka_commit: consumer.commit() + agg_data.clear() def delete_records(topic, latest_partition_offset):