Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message gets produced twice after restart #215

Closed
lukasswm opened this issue Dec 2, 2022 · 6 comments
Closed

Message gets produced twice after restart #215

lukasswm opened this issue Dec 2, 2022 · 6 comments

Comments

@lukasswm
Copy link
Contributor

lukasswm commented Dec 2, 2022

Description

Referencing the same set up as in #212

Starting the application when there are already messages on a topic will produce the messages twice in the restart process.
Issue does not occur every time. feels like a raice condition. but when it occurs it will produce every message twice. At least I tested it with 3 Messages and all of them got duplicated.

How to reproduce

  • produce a message to the meta topic.
  • produce another message with same key and same payload to the meta topic.
  • produce a message to the value topic.
  • start the application
  • after the restart the value will be joined with the meta, but gets produced twice to the target.
@LGouellec
Copy link
Owner

How can you close your application ? Properly with stream.Dispose() or stream.Start(token) or not ?

@lukasswm
Copy link
Contributor Author

lukasswm commented Dec 2, 2022

I do both.
the Stream is hosted in a BackgroundService. await _kafkaStream.StartAsync(stoppingToken); get's the cancellation token of ExecuteAsync(CancellationToken stoppingToken) and I override IDisposable of the background service to dispose the KafkaStream propery.

using Streamiz.Kafka.Net;

namespace Test.Streams;

public class Streamer : BackgroundService
{
    private KafkaStream? _kafkaStream;
    private readonly TopologyBuilder _TopologyBuilder;
    private readonly StreamConfigBuilder _StreamConfigBuilder;

    public Streamer(TopologyBuilder TopologyBuilder, StreamConfigBuilder StreamConfigBuilder)
    {
        _TopologyBuilder = TopologyBuilder;
        _StreamConfigBuilder = StreamConfigBuilder;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        var topology = _TopologyBuilder.Build();
        var config = _StreamConfigBuilder.Build();
        _kafkaStream = new KafkaStream(topology, config);
        
        await _kafkaStream.StartAsync(stoppingToken);
    }
    
    public override void Dispose()
    {
        _kafkaStream?.Dispose();
        base.Dispose();
    }
}

@LGouellec
Copy link
Owner

@lukasswm
Can you enable debug logs and attached the debug log of your application when you reproduce the bug please ?

@lukasswm
Copy link
Contributor Author

lukasswm commented Dec 2, 2022

here is the log. I removed the message payloads and some other information. But I hope it is still fine.

info: Streamiz.Kafka.Net.KafkaStream[0]
      stream-application[domain.data.service]  Start creation of the stream application with this configuration:
        Stream property:
                client.id:
                num.stream.threads:     1
                default.key.serdes:     Streamiz.Kafka.Net.SerDes.StringSerDes
                default.value.serdes:   Streamiz.Kafka.Net.SerDes.StringSerDes
                default.timestamp.extractor:    Streamiz.Kafka.Net.Processors.Internal.FailOnInvalidTimestamp
                commit.interval.ms:     1000
                processing.guarantee:   AT_LEAST_ONCE
                transaction.timeout:    00:00:10
                poll.ms:        100
                max.poll.records:       500
                max.poll.restoring.records:     1000
                max.task.idle.ms:       0
                buffered.records.per.partition:         1000
                inner.exception.handler:        System.Func`2[System.Exception,Streamiz.Kafka.Net.ExceptionHandlerResponse]
                production.exception.handler:   System.Func`2[Confluent.Kafka.DeliveryReport`2[System.Byte[],System.Byte[]],Streamiz.Kafka.Net.ExceptionHandlerResponse]
                deserialization.exception.handler:      System.Func`4[Streamiz.Kafka.Net.ProcessorContext,Confluent.Kafka.ConsumeResult`2[System.Byte[],System.Byte[]],System.Excepti
on,Streamiz.Kafka.Net.ExceptionHandlerResponse]
                rocksdb.config.setter:  System.Action`2[System.String,Streamiz.Kafka.Net.State.RocksDb.RocksDbOptions]
                follow.metadata:        False
                state.dir:      C:\Users\lukas.schwendemann\AppData\Local\Temp\streamiz-kafka-net
                replication.factor:     1
                windowstore.changelog.additional.retention.ms:  86400000
                offset.checkpoint.manager:
                metrics.interval.ms:    30000
                metrics.recording.level:        INFO
                metrics.reporter:       System.Action`1[System.Collections.Generic.IEnumerable`1[Streamiz.Kafka.Net.Metrics.Sensor]]
                expose.librdkafka.stats:        False
                start.task.delay.ms:    5000
                parallel.processing:    False
                max.degree.of.parallelism:      8
                application.id:         domain.data.service
                schema.registry.url:    localhost:8081
                avro.serializer.auto.register.schemas:  True
                protobuf.serializer.auto.register.schemas:      True
        Client property:
                bootstrap.servers:      localhost:9095
                metadata.max.age.ms:    500
        Consumer property:
                max.poll.interval.ms:   300000
                enable.auto.commit:     False
                enable.auto.offset.store:       False
                partition.assignment.strategy:  cooperative-sticky
                heartbeat.interval.ms:  200
                auto.offset.reset:      earliest
        Producer property:
                queue.buffering.max.kbytes:     10240
        Admin client property:
                None

info: Streamiz.Kafka.Net.Processors.StreamThread[0]
      stream-thread[domain.data.service-b08971f1-20c2-4fc8-bf6a-73b3d04e8ddb-stream-thread-0] Creating shared producer client
info: Streamiz.Kafka.Net.Processors.StreamThread[0]
      stream-thread[domain.data.service-b08971f1-20c2-4fc8-bf6a-73b3d04e8ddb-stream-thread-0] Creating consumer client
info: Streamiz.Kafka.Net.KafkaStream[0]
      stream-application[domain.data.service] State transition from CREATED to REBALANCING
info: Streamiz.Kafka.Net.KafkaStream[0]
      stream-application[domain.data.service] Starting Streams client with this topology : Topologies:
         Sub-topology: 0 for global store (will not generate tasks)
          Source: KSTREAM-SOURCE-0000000000 (tocs: [meta-topic])
            --> KTABLE-SOURCE-0000000001
          Processor: KTABLE-SOURCE-0000000001 (stores: [table-store])
            --> none
            <-- KSTREAM-SOURCE-0000000000
         Sub-topology: 1
          Source: KSTREAM-SOURCE-0000000002 (tocs: [value-topic])
            --> KSTREAM-JOIN-0000000003
          Processor: KSTREAM-JOIN-0000000003 (stores: [])
            --> KSTREAM-FILTER-0000000004
            <-- KSTREAM-SOURCE-0000000002
          Processor: KSTREAM-FILTER-0000000004 (stores: [])
            --> KSTREAM-MAP-0000000005
            <-- KSTREAM-JOIN-0000000003
          Processor: KSTREAM-MAP-0000000005 (stores: [])
            --> KSTREAM-SINK-0000000006
            <-- KSTREAM-FILTER-0000000004
          Sink: KSTREAM-SINK-0000000006 (toc: domain.private.data)
            <-- KSTREAM-MAP-0000000005

dbug: Streamiz.Kafka.Net.Processors.DefaultTocManager[0]
      Starting to apply internal tocs for topology 0 in toc manager (try: 1, max retry : 10).
dbug: Streamiz.Kafka.Net.Processors.DefaultTocManager[0]
      Complete to apply internal tocs in toc manager
dbug: Streamiz.Kafka.Net.Processors.DefaultTocManager[0]
      Starting to apply internal tocs for topology 1 in toc manager (try: 1, max retry : 10).
dbug: Streamiz.Kafka.Net.Processors.DefaultTocManager[0]
      Complete to apply internal tocs in toc manager
info: Streamiz.Kafka.Net.Processors.GlobalStreamThread[0]
      global-stream-thread domain.data.service-b08971f1-20c2-4fc8-bf6a-73b3d04e8ddb-GlobalStreamThread Starting
warn: Streamiz.Kafka.Net.State.OffsetCheckpointFile[0]
      Read checkpoint offsets from recovery file : C:\Users\lukas.schwendemann\AppData\Local\Temp\streamiz-kafka-net\domain.data.service\global\.checkpoint.rec
info: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
      Restoring state for global store table-store
dbug: Streamiz.Kafka.Net.Kafka.Internal.KafkaLoggerAdapter[0]
      Log consumer domain.data.service-b08971f1-20c2-4fc8-bf6a-73b3d04e8ddb-GlobalStreamThread#consumer-1 - [thrd:main]: meta-topic [0]: offset reset (
at offset 2, broker 1001) to BEGINNING: fetch failed due to requested offset not available on the broker: Broker: Offset out of range
info: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
      Global store table-store is completely restored
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateUpdateTask[0]
      Initializing topology with processor source : Streamiz.Kafka.Net.Processors.SourceProcessor`2[System.String,Test.domain.Data.A.Models.MetaData]
dbug: Streamiz.Kafka.Net.Processors.SourceProcessor[0]
      Initializing process context
dbug: Streamiz.Kafka.Net.Processors.KTableSourceProcessor[0]
      Initializing process context
dbug: Streamiz.Kafka.Net.Processors.KTableSourceProcessor[0]
      Process context initialized
dbug: Streamiz.Kafka.Net.Processors.SourceProcessor[0]
      Process context initialized
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateUpdateTask[0]
      Initializing topology with processor source : Streamiz.Kafka.Net.Processors.KTableSourceProcessor`2[System.String,Test.domain.Data.A.Models.MetaData]
dbug: Streamiz.Kafka.Net.Processors.KTableSourceProcessor[0]
      Initializing process context
dbug: Streamiz.Kafka.Net.Processors.KTableSourceProcessor[0]
      Process context initialized
info: Streamiz.Kafka.Net.Processors.StreamThread[0]
      stream-thread[domain.data.service-b08971f1-20c2-4fc8-bf6a-73b3d04e8ddb-stream-thread-0] Starting
info: Streamiz.Kafka.Net.Processors.GlobalStreamThread[0]
      global-stream-thread domain.data.service-b08971f1-20c2-4fc8-bf6a-73b3d04e8ddb-GlobalStreamThread State transition from CREATED to RUNNING
info: Streamiz.Kafka.Net.Processors.StreamThread[0]
      stream-thread[domain.data.service-b08971f1-20c2-4fc8-bf6a-73b3d04e8ddb-stream-thread-0] State transition from CREATED to STARTING
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
      Committing all active tasks  since 2884.9211ms has elapsed (commit interval is 1000ms)
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
      Flushing all global globalStores registered in the state manager
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
      Flushing store table-store
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
      Committing all active tasks  since 1085.2583ms has elapsed (commit interval is 1000ms)
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
      Flushing all global globalStores registered in the state manager
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
      Flushing store table-store
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
      Committing all active tasks  since 1090.3973ms has elapsed (commit interval is 1000ms)
dbug: Streamiz.Kafka.Net.Processors.Internal.TaskCreator[0]
      Created task 1-0 with assigned partition value-topic [[0]]
dbug: Streamiz.Kafka.Net.Processors.Internal.TaskCreator[0]
      Created task 1-0 with assigned partition value-topic [[0]]
dbug: Streamiz.Kafka.Net.Processors.StreamTask[0]
      stream-task[1|0] Initializing state stores.
dbug: Streamiz.Kafka.Net.Processors.StreamTask[0]
      stream-task[1|0] Initializing state stores
dbug: Streamiz.Kafka.Net.Processors.StreamTask[0]
      stream-task[1|0] Initializing topology with theses source processors : KSTREAM-SOURCE-0000000002.
dbug: Streamiz.Kafka.Net.Processors.SourceProcessor[0]
      stream-task[1|0]|processor[KSTREAM-SOURCE-0000000002]- Initializing process context
dbug: Streamiz.Kafka.Net.Processors.KStreamKTableJoinProcessor[0]
      stream-task[1|0]|processor[KSTREAM-JOIN-0000000003]- Initializing process context
dbug: Streamiz.Kafka.Net.Processors.KStreamFilterProcessor[0]
      stream-task[1|0]|processor[KSTREAM-FILTER-0000000004]- Initializing process context
dbug: Streamiz.Kafka.Net.Processors.KStreamMapProcessor[0]
      stream-task[1|0]|processor[KSTREAM-MAP-0000000005]- Initializing process context
dbug: Streamiz.Kafka.Net.Processors.SinkProcessor[0]
      stream-task[1|0]|processor[KSTREAM-SINK-0000000006]- Initializing process context
dbug: Streamiz.Kafka.Net.Processors.SinkProcessor[0]
      stream-task[1|0]|processor[KSTREAM-SINK-0000000006]- Process context initialized
dbug: Streamiz.Kafka.Net.Processors.KStreamMapProcessor[0]
      stream-task[1|0]|processor[KSTREAM-MAP-0000000005]- Process context initialized
dbug: Streamiz.Kafka.Net.Processors.KStreamFilterProcessor[0]
      stream-task[1|0]|processor[KSTREAM-FILTER-0000000004]- Process context initialized
dbug: Streamiz.Kafka.Net.Processors.KStreamKTableJoinProcessor[0]
      stream-task[1|0]|processor[KSTREAM-JOIN-0000000003]- Process context initialized
dbug: Streamiz.Kafka.Net.Processors.SourceProcessor[0]
      stream-task[1|0]|processor[KSTREAM-SOURCE-0000000002]- Process context initialized
info: Streamiz.Kafka.Net.Processors.StreamThread[0]
      stream-thread[domain.data.service-b08971f1-20c2-4fc8-bf6a-73b3d04e8ddb-stream-thread-0] State transition from STARTING to PARTITIONS_ASSIGNED
info: Streamiz.Kafka.Net.Kafka.Internal.StreamsRebalanceListener[0]
      Partition assignment took 00:00:00.0337474 ms.
        Currently assigned active tasks: 1-0
        Revoked assigned active tasks:

dbug: Streamiz.Kafka.Net.Processors.Internal.RecordQueue[0]
      stream-task[1|0] - recordQueue [record-queue-value-topic-1-0] Adding new record in queue
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
      Flushing all global globalStores registered in the state manager
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
      Flushing store table-store
dbug: Streamiz.Kafka.Net.Processors.Internal.RecordQueue[0]
dbug: Streamiz.Kafka.Net.Processors.Internal.RecordQueue[0]
      stream-task[1|0] - recordQueue [record-queue-value-topic-1-0] Record added in queue. New size : 1
dbug: Streamiz.Kafka.Net.Processors.StreamTask[0]
      stream-task[1|0] Added record into the buffered queue of partition value-topic [[0]], new queue size is 1
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
      Add 1 records in tasks in 00:00:00.2112423
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
      Committing all active tasks 1-0 since 1016.2206ms has elapsed (commit interval is 1000ms)
info: Streamiz.Kafka.Net.Processors.StreamThread[0]
      stream-thread[domain.data.service-b08971f1-20c2-4fc8-bf6a-73b3d04e8ddb-stream-thread-0] State transition from PARTITIONS_ASSIGNED to RUNNING
info: Streamiz.Kafka.Net.KafkaStream[0]
      stream-application[domain.data.service] State transition from REBALANCING to RUNNING
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
      stream-thread[domain.data.service-b08971f1-20c2-4fc8-bf6a-73b3d04e8ddb-stream-thread-0]  State is RUNNING, initializing and restoring tasks if necessary
info: Streamiz.Kafka.Net.Processors.StreamTask[0]
      stream-task[1|0] Task 1-0 state transition from CREATED to RUNNING
info: Streamiz.Kafka.Net.Processors.StreamThread[0]
      Restoration took 365ms for all tasks 1-0
dbug: Streamiz.Kafka.Net.Processors.Internal.StoreChangelogReader[0]
      Finished restoring all changelogs
dbug: Streamiz.Kafka.Net.Processors.Internal.RecordQueue[0]
      stream-task[1|0] - recordQueue [record-queue-value-topic-1-0] Polling record in queue
dbug: Streamiz.Kafka.Net.Processors.Internal.RecordQueue[0]
      stream-task[1|0] - recordQueue [record-queue-value-topic-1-0] Rrecord polled. (Record info [Toc:value-topic|Partiti
on:[0]|Offset:68])
dbug: Streamiz.Kafka.Net.Processors.StreamTask[0]
      stream-task[1|0] Start processing one record [Toc:value-topic|Partition:0|Offset:68|Timestamp:1669989569475]
dbug: Streamiz.Kafka.Net.Processors.SourceProcessor[0]
      stream-task[1|0]|processor[KSTREAM-SOURCE-0000000002]- Forward<String,RawData> message with key 89818 and value com.Test.timeseriesanalytics.avro.RawData to each next proces
sor
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
      Flushing all global globalStores registered in the state manager
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
      Flushing store table-store
dbug: Streamiz.Kafka.Net.Processors.KStreamKTableJoinProcessor[0]
      stream-task[1|0]|processor[KSTREAM-JOIN-0000000003]- Forward<String,TagInfo> message with key 89818 and value Test.domain.Data.A.Models.TagInfo to each next pro
cessor
dbug: Streamiz.Kafka.Net.Processors.KStreamFilterProcessor[0]
      stream-task[1|0]|processor[KSTREAM-FILTER-0000000004]- Process<String,TagInfo> message with key 89818 and Test.domain.Data.A.Models.TagInfo with record metadata
 [toc:value-topic|partition:0|offset:68]
dbug: Streamiz.Kafka.Net.Processors.KStreamFilterProcessor[0]
      stream-task[1|0]|processor[KSTREAM-FILTER-0000000004]- Forward<String,TagInfo> message with key 89818 and value Test.domain.Data.A.Models.TagInfo to each next p
rocessor
dbug: Streamiz.Kafka.Net.Processors.KStreamMapProcessor[0]
      stream-task[1|0]|processor[KSTREAM-MAP-0000000005]- Process<String,TagInfo> message with key 89818 and Test.domain.Data.A.Models.TagInfo with record metadata [t
oc:value-topic|partition:0|offset:68]
dbug: Streamiz.Kafka.Net.Processors.KStreamMapProcessor[0]
      stream-task[1|0]|processor[KSTREAM-MAP-0000000005]- Forward<String,String> message with key 123456789 and value {...} to each next proc
essor
dbug: Streamiz.Kafka.Net.Processors.SinkProcessor[0]
      stream-task[1|0]|processor[KSTREAM-SINK-0000000006]- Process<String,String> message with key 123456789 and {...} with record metadata [
toc:value-topic|partition:0|offset:68]
dbug: Streamiz.Kafka.Net.Processors.StreamTask[0]
      stream-task[1|0] Completed processing one record [Toc:value-topic|Partition:0|Offset:68|Timestamp:1669989569475]
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
      Committing all active tasks 1-0 since 1012.8492ms has elapsed (commit interval is 1000ms)
dbug: Streamiz.Kafka.Net.Processors.StreamTask[0]
      stream-task[1|0] Comitting
dbug: Streamiz.Kafka.Net.Processors.Internal.ProcessorStateManager[0]
      stream-task[1|0] Flushing all stores registered in the state manager
dbug: Streamiz.Kafka.Net.Kafka.Internal.RecordCollector[0]
      stream-task[1|0] Flushing producer
dbug: Streamiz.Kafka.Net.Kafka.Internal.RecordCollector[0]
      stream-task[1|0] Record persisted: (timestamp 1669989569475) toc=[domain.private.data] partition=[[0]] offset=[62]
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
      Committed all active tasks 1-0 in 45.9156ms
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
      Processing 1 records in 00:00:00.9474470
dbug: Streamiz.Kafka.Net.Processors.Internal.RecordQueue[0]
      stream-task[1|0] - recordQueue [record-queue-value-topic-1-0] Adding new record in queue
dbug: Streamiz.Kafka.Net.Processors.Internal.RecordQueue[0]
dbug: Streamiz.Kafka.Net.Processors.Internal.RecordQueue[0]
      stream-task[1|0] - recordQueue [record-queue-value-topic-1-0] Record added in queue. New size : 1
dbug: Streamiz.Kafka.Net.Processors.StreamTask[0]
      stream-task[1|0] Added record into the buffered queue of partition value-topic [[0]], new queue size is 1
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
      Add 1 records in tasks in 00:00:00.0002425
dbug: Streamiz.Kafka.Net.Processors.Internal.RecordQueue[0]
      stream-task[1|0] - recordQueue [record-queue-value-topic-1-0] Polling record in queue
dbug: Streamiz.Kafka.Net.Processors.Internal.RecordQueue[0]
      stream-task[1|0] - recordQueue [record-queue-value-topic-1-0] Rrecord polled. (Record info [Toc:value-topic|Partiti
on:[0]|Offset:68])
dbug: Streamiz.Kafka.Net.Processors.StreamTask[0]
      stream-task[1|0] Start processing one record [Toc:value-topic|Partition:0|Offset:68|Timestamp:1669989569475]
dbug: Streamiz.Kafka.Net.Processors.SourceProcessor[0]
      stream-task[1|0]|processor[KSTREAM-SOURCE-0000000002]- Forward<String,RawData> message with key 89818 and value com.Test.timeseriesanalytics.avro.RawData to each next proces
sor
dbug: Streamiz.Kafka.Net.Processors.KStreamKTableJoinProcessor[0]
      stream-task[1|0]|processor[KSTREAM-JOIN-0000000003]- Forward<String,TagInfo> message with key 89818 and value Test.domain.Data.A.Models.TagInfo to each next pro
cessor
dbug: Streamiz.Kafka.Net.Processors.KStreamFilterProcessor[0]
      stream-task[1|0]|processor[KSTREAM-FILTER-0000000004]- Process<String,TagInfo> message with key 89818 and Test.domain.Data.A.Models.TagInfo with record metadata
 [toc:value-topic|partition:0|offset:68]
dbug: Streamiz.Kafka.Net.Processors.KStreamFilterProcessor[0]
      stream-task[1|0]|processor[KSTREAM-FILTER-0000000004]- Forward<String,TagInfo> message with key 89818 and value Test.domain.Data.A.Models.TagInfo to each next p
rocessor
dbug: Streamiz.Kafka.Net.Processors.KStreamMapProcessor[0]
      stream-task[1|0]|processor[KSTREAM-MAP-0000000005]- Process<String,TagInfo> message with key 89818 and Test.domain.Data.A.Models.TagInfo with record metadata [t
oc:value-topic|partition:0|offset:68]
dbug: Streamiz.Kafka.Net.Processors.KStreamMapProcessor[0]
      stream-task[1|0]|processor[KSTREAM-MAP-0000000005]- Forward<String,String> message with key 123456789 and value {...} to each next proc
essor
dbug: Streamiz.Kafka.Net.Processors.SinkProcessor[0]
      stream-task[1|0]|processor[KSTREAM-SINK-0000000006]- Process<String,String> message with key 123456789 and {...} with record metadata [
toc:value-topic|partition:0|offset:68]
dbug: Streamiz.Kafka.Net.Processors.StreamTask[0]
      stream-task[1|0] Completed processing one record [Toc:value-topic|Partition:0|Offset:68|Timestamp:1669989569475]
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
      Processing 1 records in 00:00:00.7518565
dbug: Streamiz.Kafka.Net.Kafka.Internal.RecordCollector[0]
      stream-task[1|0] Record persisted: (timestamp 1669989569475) toc=[domain.private.data] partition=[[0]] offset=[63]
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
      Flushing all global globalStores registered in the state manager
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
      Flushing store table-store
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
      Committing all active tasks 1-0 since 1079.5559ms has elapsed (commit interval is 1000ms)
dbug: Streamiz.Kafka.Net.Processors.StreamTask[0]
      stream-task[1|0] Comitting
dbug: Streamiz.Kafka.Net.Processors.Internal.ProcessorStateManager[0]
      stream-task[1|0] Flushing all stores registered in the state manager
dbug: Streamiz.Kafka.Net.Kafka.Internal.RecordCollector[0]
      stream-task[1|0] Flushing producer
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
      Committed all active tasks 1-0 in 3.1334ms
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
      Flushing all global globalStores registered in the state manager
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
      Flushing store table-store
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
      Committing all active tasks 1-0 since 1071.6336ms has elapsed (commit interval is 1000ms)
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
      Flushing all global globalStores registered in the state manager
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
      Flushing store table-store
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
      Committing all active tasks 1-0 since 1076.8629ms has elapsed (commit interval is 1000ms)
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
      Flushing all global globalStores registered in the state manager
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
      Flushing store table-store
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
      Committing all active tasks 1-0 since 1083.3077ms has elapsed (commit interval is 1000ms)
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
      Flushing all global globalStores registered in the state manager
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
      Flushing store table-store
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
      Committing all active tasks 1-0 since 1088.1938ms has elapsed (commit interval is 1000ms)
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
      Flushing all global globalStores registered in the state manager
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
      Flushing store table-store
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
      Committing all active tasks 1-0 since 1088.9109ms has elapsed (commit interval is 1000ms)
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
      Flushing all global globalStores registered in the state manager
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
      Flushing store table-store
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
      Committing all active tasks 1-0 since 1089.1086ms has elapsed (commit interval is 1000ms)
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
      Flushing all global globalStores registered in the state manager
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
      Flushing store table-store
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
      Committing all active tasks 1-0 since 1084.8793ms has elapsed (commit interval is 1000ms)
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
      Flushing all global globalStores registered in the state manager
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
      Flushing store table-store
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
      Committing all active tasks 1-0 since 1084.4777ms has elapsed (commit interval is 1000ms)

@LGouellec
Copy link
Owner

@lukasswm ,

Indeed, it seems this record Toc:value-topic|Partition:0|Offset:68|Timestamp:1669989569475 is processed twice.

Do you removed the topic value-topic and recreate with the same name in a short time ?
Because it seems that it processed twice, but it also poll twice from the consumer directly.

Can you consume the topic with the CLI like :

kafka-console-consumer --bootstrap-server localhost:9095 --topic value-topic --property print.key=true --from-beginning

And put the results

@LGouellec
Copy link
Owner

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants