diff --git a/core/Kafka/Internal/StreamsRebalanceListener.cs b/core/Kafka/Internal/StreamsRebalanceListener.cs index 41f6a0b1..a8f25f75 100644 --- a/core/Kafka/Internal/StreamsRebalanceListener.cs +++ b/core/Kafka/Internal/StreamsRebalanceListener.cs @@ -55,16 +55,19 @@ public void PartitionsRevoked(IConsumer consumer, List(partitions.Select(p => p.TopicPartition))); - Thread.SetState(ThreadState.PARTITIONS_REVOKED); - manager.RebalanceInProgress = false; + if (Thread.IsRunning) + { + manager.RebalanceInProgress = true; + manager.RevokeTasks(new List(partitions.Select(p => p.TopicPartition))); + Thread.SetState(ThreadState.PARTITIONS_REVOKED); + manager.RebalanceInProgress = false; - StringBuilder sb = new StringBuilder(); - sb.AppendLine($"Partition revocation took {DateTime.Now - start} ms"); - sb.AppendLine( - $"\tCurrent suspended active tasks: {string.Join(",", partitions.Select(p => $"{p.Topic}-{p.Partition}"))}"); - log.LogInformation(sb.ToString()); + StringBuilder sb = new StringBuilder(); + sb.AppendLine($"Partition revocation took {DateTime.Now - start} ms"); + sb.AppendLine( + $"\tCurrent suspended active tasks: {string.Join(",", partitions.Select(p => $"{p.Topic}-{p.Partition}"))}"); + log.LogInformation(sb.ToString()); + } } } diff --git a/core/KafkaStream.cs b/core/KafkaStream.cs index 2958963a..0a850644 100644 --- a/core/KafkaStream.cs +++ b/core/KafkaStream.cs @@ -259,6 +259,7 @@ public override string ToString() private readonly StreamMetricsRegistry metricsRegistry; private readonly CancellationTokenSource _cancelSource = new(); + private readonly SequentiallyGracefullyShutdownHook shutdownHook; internal State StreamState { get; private set; } @@ -401,6 +402,13 @@ string Protect(string str) queryableStoreProvider = new QueryableStoreProvider(stateStoreProviders, globalStateStoreProvider); StreamState = State.CREATED; + + shutdownHook = new SequentiallyGracefullyShutdownHook( + threads, + globalStreamThread, + externalStreamThread, + _cancelSource + ); } /// @@ -455,7 +463,7 @@ await Task.Factory.StartNew(() => RunMiddleware(true, true); - globalStreamThread?.Start(_cancelSource.Token); + globalStreamThread?.Start(); externalStreamThread?.Start(_cancelSource.Token); foreach (var t in threads) @@ -473,8 +481,8 @@ await Task.Factory.StartNew(() => try { // Allow time for streams thread to run - await Task.Delay(TimeSpan.FromMilliseconds(configuration.StartTaskDelayMs), - token ?? _cancelSource.Token); + // await Task.Delay(TimeSpan.FromMilliseconds(configuration.StartTaskDelayMs), + // token ?? _cancelSource.Token); } catch { @@ -488,16 +496,8 @@ await Task.Delay(TimeSpan.FromMilliseconds(configuration.StartTaskDelayMs), /// public void Dispose() { - Task.Factory.StartNew(() => - { - if (!_cancelSource.IsCancellationRequested) - { - _cancelSource.Cancel(); - } - - Close(); - _cancelSource.Dispose(); - }).Wait(TimeSpan.FromSeconds(30)); + Close(); + _cancelSource.Dispose(); } /// @@ -516,13 +516,7 @@ private void Close() { RunMiddleware(true, false); - foreach (var t in threads) - { - t.Dispose(); - } - - externalStreamThread?.Dispose(); - globalStreamThread?.Dispose(); + shutdownHook.Shutdown(); RunMiddleware(false, false); metricsRegistry.RemoveClientSensors(); diff --git a/core/Mock/ClusterInMemoryTopologyDriver.cs b/core/Mock/ClusterInMemoryTopologyDriver.cs index 8b22280b..63100725 100644 --- a/core/Mock/ClusterInMemoryTopologyDriver.cs +++ b/core/Mock/ClusterInMemoryTopologyDriver.cs @@ -212,7 +212,7 @@ void stateChangedHandeler(IThread thread, ThreadStateTransitionValidator old, InitializeInternalTopicManager(); - globalStreamThread?.Start(token); + globalStreamThread?.Start(); externalStreamThread?.Start(token); threadTopology.Start(token); diff --git a/core/Processors/GlobalStreamThread.cs b/core/Processors/GlobalStreamThread.cs index 60969fe9..dc6de433 100644 --- a/core/Processors/GlobalStreamThread.cs +++ b/core/Processors/GlobalStreamThread.cs @@ -109,8 +109,7 @@ public void Close() private readonly string logPrefix; private readonly string threadClientId; private readonly IConsumer globalConsumer; - private CancellationToken token; - private readonly object stateLock = new object(); + private readonly object stateLock = new(); private readonly IStreamConfig configuration; private StateConsumer stateConsumer; private readonly IGlobalStateMaintainer globalStateMaintainer; @@ -140,7 +139,7 @@ private void Run() SetState(GlobalThreadState.RUNNING); try { - while (!token.IsCancellationRequested && State.IsRunning()) + while (State.IsRunning()) { stateConsumer.PollAndUpdate(); @@ -166,11 +165,11 @@ private void Run() // https://docs.microsoft.com/en-us/visualstudio/code-quality/ca1065 } - Dispose(false); + //Dispose(false); } } - public void Start(CancellationToken token) + public void Start() { log.LogInformation("{LogPrefix}Starting", logPrefix); @@ -184,9 +183,7 @@ public void Start(CancellationToken token) $"{logPrefix}Error happened during initialization of the global state store; this thread has shutdown : {e}"); throw; } - - this.token = token; - + thread.Start(); } @@ -273,7 +270,13 @@ protected virtual void Dispose(bool waitForThread) if (waitForThread) { - thread.Join(); + try + { + thread.Join(); + } + catch (ThreadStateException) + { + } } SetState(GlobalThreadState.DEAD); diff --git a/core/Processors/IThread.cs b/core/Processors/IThread.cs index 6647bfd2..7f55166a 100644 --- a/core/Processors/IThread.cs +++ b/core/Processors/IThread.cs @@ -14,7 +14,6 @@ interface IThread : IDisposable void Run(); void Start(CancellationToken token); IEnumerable ActiveTasks { get; } - event ThreadStateListener StateChanged; } } diff --git a/core/Processors/Internal/SequentiallyGracefullyShutdownHook.cs b/core/Processors/Internal/SequentiallyGracefullyShutdownHook.cs new file mode 100644 index 00000000..fd626179 --- /dev/null +++ b/core/Processors/Internal/SequentiallyGracefullyShutdownHook.cs @@ -0,0 +1,44 @@ +using System.Threading; +using Microsoft.Extensions.Logging; +using Streamiz.Kafka.Net.Crosscutting; + +namespace Streamiz.Kafka.Net.Processors.Internal +{ + internal class SequentiallyGracefullyShutdownHook + { + private readonly IThread[] _streamThreads; + private readonly GlobalStreamThread _globalStreamThread; + private readonly IThread _externalStreamThread; + private readonly CancellationTokenSource _tokenSource; + private readonly ILogger log = Logger.GetLogger(typeof(SequentiallyGracefullyShutdownHook)); + + public SequentiallyGracefullyShutdownHook( + IThread [] streamThreads, + GlobalStreamThread globalStreamThread, + IThread externalStreamThread, + CancellationTokenSource tokenSource + ) + { + _streamThreads = streamThreads; + _globalStreamThread = globalStreamThread; + _externalStreamThread = externalStreamThread; + _tokenSource = tokenSource; + } + + public void Shutdown() + { + log.LogInformation($"Request shutdown gracefully"); + _tokenSource.Cancel(); + + foreach (var t in _streamThreads) + { + t.Dispose(); + } + + _externalStreamThread?.Dispose(); + _globalStreamThread?.Dispose(); + + log.LogInformation($"Shutdown gracefully successful"); + } + } +} \ No newline at end of file diff --git a/core/StreamConfig.cs b/core/StreamConfig.cs index 11a124b9..b27d5479 100644 --- a/core/StreamConfig.cs +++ b/core/StreamConfig.cs @@ -2726,9 +2726,12 @@ public MetricsRecordingLevel MetricsRecording } /// - /// Time wait before completing the start task of . (default: 5000) + /// Time wait before completing the start task of . + /// Should be removed in the next release. + /// (default: 5000) /// [StreamConfigProperty("" + startTaskDelayMsCst)] + [Obsolete] public long StartTaskDelayMs { get => configProperties[startTaskDelayMsCst]; diff --git a/environment/confs/order.avsc b/environment/confs/order.avsc new file mode 100644 index 00000000..b869c2a7 --- /dev/null +++ b/environment/confs/order.avsc @@ -0,0 +1,40 @@ +{ + "namespace": "ksql", + "name": "product", + "type": "record", + "fields": [ + { + "name": "category", + "type": { + "type": "string", + "arg.properties": { + "options": ["1", "2", "3"] + } + } + }, + {"name": "name", "type": { + "type": "string", + "arg.properties": { + "iteration": { + "start": 0 + } + } + }}, + {"name": "description", "type": { + "type": "string", + "arg.properties": { + "iteration": { + "start": 0 + } + } + }}, + {"name": "price", "type": { + "type": "double", + "arg.properties": { + "iteration": { + "start": 0 + } + } + }} + ] +} \ No newline at end of file diff --git a/environment/datagen_connector.json b/environment/datagen_connector.json index 048e6214..cb0eda80 100644 --- a/environment/datagen_connector.json +++ b/environment/datagen_connector.json @@ -1,5 +1,5 @@ { - "name": "datagen-users", + "name": "datagen-products", "config": { "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", "kafka.topic": "users", @@ -9,6 +9,8 @@ "value.converter.schemas.enable": "false", "max.interval": 50, "iterations": 10000000, - "tasks.max": "1" + "tasks.max": "1", + "schema.filename": "/home/appuser/order.avsc", + "schema.keyfield": "name" } } \ No newline at end of file diff --git a/environment/docker-compose-with-connect.yml b/environment/docker-compose-with-connect.yml index 3a4489ca..6e358fe9 100644 --- a/environment/docker-compose-with-connect.yml +++ b/environment/docker-compose-with-connect.yml @@ -78,6 +78,8 @@ services: CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1" CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1" CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1" + volumes: + - ./confs/order.avsc:/home/appuser/order.avsc akhq: image: tchiotludo/akhq:latest diff --git a/environment/start.sh b/environment/start.sh index 9263e49c..765f503e 100644 --- a/environment/start.sh +++ b/environment/start.sh @@ -1,29 +1,20 @@ #!/bin/bash -curl -i -X PUT http://localhost:8083/connectors/datagen_local_01/config \ +curl -i -X PUT http://localhost:8083/connectors/datagen_product/config \ -H "Content-Type: application/json" \ -d '{ - "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", - "key.converter": "org.apache.kafka.connect.storage.StringConverter", - "value.converter": "org.apache.kafka.connect.json.JsonConverter", - "value.converter.schemas.enable": false, - "kafka.topic": "shoes", - "quickstart": "shoes", - "max.interval": 100, - "iterations": 10000000, - "tasks.max": "1" - }' + "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", + "kafka.topic": "product3", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "max.interval": 50, + "iterations": 10000000, + "tasks.max": "1", + "schema.filename": "/home/appuser/order.avsc", + "schema.keyfield": "name" + }' -curl -i -X PUT http://localhost:8083/connectors/datagen_local_02/config \ - -H "Content-Type: application/json" \ - -d '{ - "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", - "key.converter": "org.apache.kafka.connect.storage.StringConverter", - "value.converter": "org.apache.kafka.connect.json.JsonConverter", - "value.converter.schemas.enable": false, - "kafka.topic": "orders", - "quickstart": "shoe_orders", - "max.interval": 100, - "iterations": 10000000, - "tasks.max": "1" - }' \ No newline at end of file + +# curl -i -X PUT http://localhost:8083/connectors/datagen_product/pause +# curl -i -X PUT http://localhost:8083/connectors/datagen_product/resume \ No newline at end of file diff --git a/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/OpenTelemetryMetricsExporter.cs b/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/OpenTelemetryMetricsExporter.cs index 2f4db88c..f71595d2 100644 --- a/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/OpenTelemetryMetricsExporter.cs +++ b/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/OpenTelemetryMetricsExporter.cs @@ -1,10 +1,6 @@ -using System; using System.Collections.Generic; using System.Diagnostics.Metrics; -using System.Globalization; using System.Linq; -using System.Runtime.CompilerServices; -using Streamiz.Kafka.Net.Mock; namespace Streamiz.Kafka.Net.Metrics.OpenTelemetry { diff --git a/samples/sample-stream/log4net.config b/samples/sample-stream/log4net.config index 8c6b0b02..0531d3e7 100644 --- a/samples/sample-stream/log4net.config +++ b/samples/sample-stream/log4net.config @@ -5,8 +5,20 @@ + + + + + + + + + + + + \ No newline at end of file diff --git a/samples/sample-stream/sample-stream.csproj b/samples/sample-stream/sample-stream.csproj index 6f41e8e0..1c6eca58 100644 --- a/samples/sample-stream/sample-stream.csproj +++ b/samples/sample-stream/sample-stream.csproj @@ -15,12 +15,6 @@ - - - - - - diff --git a/test/Streamiz.Kafka.Net.Tests/Private/GlobalStateManagerTests.cs b/test/Streamiz.Kafka.Net.Tests/Private/GlobalStateManagerTests.cs index 50300de6..d4fc4c09 100644 --- a/test/Streamiz.Kafka.Net.Tests/Private/GlobalStateManagerTests.cs +++ b/test/Streamiz.Kafka.Net.Tests/Private/GlobalStateManagerTests.cs @@ -87,6 +87,12 @@ public void SetUp() stateManager.SetGlobalProcessorContext(context); } + [TearDown] + public void Dispose() + { + + } + [Test] public void ShouldInitializeStateStores() { diff --git a/test/Streamiz.Kafka.Net.Tests/Private/GlobalStreamThreadTests.cs b/test/Streamiz.Kafka.Net.Tests/Private/GlobalStreamThreadTests.cs index 8a42bf26..c8d6f5dc 100644 --- a/test/Streamiz.Kafka.Net.Tests/Private/GlobalStreamThreadTests.cs +++ b/test/Streamiz.Kafka.Net.Tests/Private/GlobalStreamThreadTests.cs @@ -14,123 +14,172 @@ namespace Streamiz.Kafka.Net.Tests.Private { public class GlobalStreamThreadTests { - private Mock globalStateMaintainerMock; - private Mock streamConfigMock; - private Mock> globalConsumerMock; - - private GlobalStreamThread globalStreamThread; - private CancellationTokenSource cancellationTokenSource; - - [SetUp] - public void SetUp() + [Test] + public void ShouldConvertExceptionsToStreamsException() { - globalConsumerMock = new Mock>(); - globalStateMaintainerMock = new Mock(); - streamConfigMock = new Mock(); + var globalConsumerMock = new Mock>(); + var globalStateMaintainerMock = new Mock(); + var streamConfigMock = new Mock(); streamConfigMock.Setup(x => x.PollMs).Returns(1); streamConfigMock.Setup(x => x.CommitIntervalMs).Returns(1); globalStateMaintainerMock.Setup(x => x.Initialize()).Returns(new Dictionary()); - cancellationTokenSource = new CancellationTokenSource(); - globalStreamThread = new GlobalStreamThread("global", globalConsumerMock.Object, streamConfigMock.Object, + var globalStreamThread = new GlobalStreamThread("global", globalConsumerMock.Object, streamConfigMock.Object, globalStateMaintainerMock.Object, new StreamMetricsRegistry()); - } - - [TearDown] - public void TearDown() - { - cancellationTokenSource.Cancel(); - } - - [Test] - public void ShouldConvertExceptionsToStreamsException() - { + streamConfigMock.Setup(x => x.PollMs).Throws(new Exception("boom")); - Assert.Throws(() => globalStreamThread.Start(cancellationTokenSource.Token)); + Assert.Throws(() => globalStreamThread.Start()); + globalStreamThread.Dispose(); } [Test] public void ShouldBeRunningAfterSuccesfullStart() { - globalStreamThread.Start(cancellationTokenSource.Token); + var globalConsumerMock = new Mock>(); + var globalStateMaintainerMock = new Mock(); + var streamConfigMock = new Mock(); + streamConfigMock.Setup(x => x.PollMs).Returns(1); + streamConfigMock.Setup(x => x.CommitIntervalMs).Returns(1); + globalStateMaintainerMock.Setup(x => x.Initialize()).Returns(new Dictionary()); + + var globalStreamThread = new GlobalStreamThread("global", globalConsumerMock.Object, streamConfigMock.Object, + globalStateMaintainerMock.Object, new StreamMetricsRegistry()); + + globalStreamThread.Start(); // we need to wait for thread to set running state Thread.Sleep(100); Assert.AreEqual(GlobalThreadState.RUNNING, globalStreamThread.State); + + globalStreamThread.Dispose(); } [Test] public void ShouldStopRunningWhenClosedByUser() { - var token = cancellationTokenSource.Token; + var globalConsumerMock = new Mock>(); + var globalStateMaintainerMock = new Mock(); + var streamConfigMock = new Mock(); + streamConfigMock.Setup(x => x.PollMs).Returns(1); + streamConfigMock.Setup(x => x.CommitIntervalMs).Returns(1); + globalStateMaintainerMock.Setup(x => x.Initialize()).Returns(new Dictionary()); - globalStreamThread.Start(token); - cancellationTokenSource.Cancel(); + var globalStreamThread = new GlobalStreamThread("global", globalConsumerMock.Object, streamConfigMock.Object, + globalStateMaintainerMock.Object, new StreamMetricsRegistry()); + globalStreamThread.Start(); // thread should stop after some time Thread.Sleep(100); + globalStreamThread.Dispose(); Assert.AreEqual(GlobalThreadState.DEAD, globalStreamThread.State); } [Test] public void ShouldStopGlobalConsumer() { - var token = cancellationTokenSource.Token; + var globalConsumerMock = new Mock>(); + var globalStateMaintainerMock = new Mock(); + var streamConfigMock = new Mock(); + streamConfigMock.Setup(x => x.PollMs).Returns(1); + streamConfigMock.Setup(x => x.CommitIntervalMs).Returns(1); + globalStateMaintainerMock.Setup(x => x.Initialize()).Returns(new Dictionary()); - globalStreamThread.Start(token); - cancellationTokenSource.Cancel(); + var globalStreamThread = new GlobalStreamThread("global", globalConsumerMock.Object, streamConfigMock.Object, + globalStateMaintainerMock.Object, new StreamMetricsRegistry()); + + globalStreamThread.Start(); // thread should stop after some time Thread.Sleep(100); + globalStreamThread.Dispose(); globalConsumerMock.Verify(x => x.Close()); } [Test] public void ShouldStopGlobalStateMaintainer() { - var token = cancellationTokenSource.Token; - - globalStreamThread.Start(token); - cancellationTokenSource.Cancel(); + var globalConsumerMock = new Mock>(); + var globalStateMaintainerMock = new Mock(); + var streamConfigMock = new Mock(); + streamConfigMock.Setup(x => x.PollMs).Returns(1); + streamConfigMock.Setup(x => x.CommitIntervalMs).Returns(1); + globalStateMaintainerMock.Setup(x => x.Initialize()).Returns(new Dictionary()); + var globalStreamThread = new GlobalStreamThread("global", globalConsumerMock.Object, streamConfigMock.Object, + globalStateMaintainerMock.Object, new StreamMetricsRegistry()); + + globalStreamThread.Start(); + // thread should stop after some time Thread.Sleep(100); + globalStreamThread.Dispose(); globalStateMaintainerMock.Verify(x => x.Close()); } [Test] public void ShouldStopGlobalStateMaintainerEvenIfStoppingConsumerThrows() { + var globalConsumerMock = new Mock>(); + var globalStateMaintainerMock = new Mock(); + var streamConfigMock = new Mock(); + streamConfigMock.Setup(x => x.PollMs).Returns(1); + streamConfigMock.Setup(x => x.CommitIntervalMs).Returns(1); + globalStateMaintainerMock.Setup(x => x.Initialize()).Returns(new Dictionary()); + + var globalStreamThread = new GlobalStreamThread("global", globalConsumerMock.Object, streamConfigMock.Object, + globalStateMaintainerMock.Object, new StreamMetricsRegistry()); + globalConsumerMock.Setup(x => x.Close()).Throws(new Exception()); - var token = cancellationTokenSource.Token; - globalStreamThread.Start(token); - cancellationTokenSource.Cancel(); + globalStreamThread.Start(); // thread should stop after some time Thread.Sleep(100); + globalStreamThread.Dispose(); + globalStateMaintainerMock.Verify(x => x.Close()); } [Test] public void ShouldAssignTopicsToConsumer() { + var globalConsumerMock = new Mock>(); + var globalStateMaintainerMock = new Mock(); + var streamConfigMock = new Mock(); + streamConfigMock.Setup(x => x.PollMs).Returns(1); + streamConfigMock.Setup(x => x.CommitIntervalMs).Returns(1); + globalStateMaintainerMock.Setup(x => x.Initialize()).Returns(new Dictionary()); + + var globalStreamThread = new GlobalStreamThread("global", globalConsumerMock.Object, streamConfigMock.Object, + globalStateMaintainerMock.Object, new StreamMetricsRegistry()); + var partitionOffsetDictionary = new Dictionary() { {new TopicPartition("topic", 0), Offset.Beginning} }; globalStateMaintainerMock.Setup(x => x.Initialize()).Returns(partitionOffsetDictionary); - globalStreamThread.Start(cancellationTokenSource.Token); + globalStreamThread.Start(); var parts = partitionOffsetDictionary.Keys.Select(o => new TopicPartitionOffset(o, Offset.Beginning)); globalConsumerMock.Verify(x => x.Assign(parts)); + globalStreamThread.Dispose(); } [Test] public void ShouldConsumeRecords() { + var globalConsumerMock = new Mock>(); + var globalStateMaintainerMock = new Mock(); + var streamConfigMock = new Mock(); + streamConfigMock.Setup(x => x.PollMs).Returns(1); + streamConfigMock.Setup(x => x.CommitIntervalMs).Returns(1); + globalStateMaintainerMock.Setup(x => x.Initialize()).Returns(new Dictionary()); + + var globalStreamThread = new GlobalStreamThread("global", globalConsumerMock.Object, streamConfigMock.Object, + globalStateMaintainerMock.Object, new StreamMetricsRegistry()); + var result1 = new ConsumeResult(); var result2 = new ConsumeResult(); @@ -139,33 +188,56 @@ public void ShouldConsumeRecords() .Returns(result2) .Returns((ConsumeResult) null); - globalStreamThread.Start(cancellationTokenSource.Token); + globalStreamThread.Start(); // wait some time so that thread can process data Thread.Sleep(100); globalStateMaintainerMock.Verify(x => x.Update(result1), Times.Once); globalStateMaintainerMock.Verify(x => x.Update(result2), Times.Once); + globalStreamThread.Dispose(); } [Test] public void ShouldNotFlushTooSoon() { + var globalConsumerMock = new Mock>(); + var globalStateMaintainerMock = new Mock(); + var streamConfigMock = new Mock(); + streamConfigMock.Setup(x => x.PollMs).Returns(1); + streamConfigMock.Setup(x => x.CommitIntervalMs).Returns(1); + globalStateMaintainerMock.Setup(x => x.Initialize()).Returns(new Dictionary()); + + var globalStreamThread = new GlobalStreamThread("global", globalConsumerMock.Object, streamConfigMock.Object, + globalStateMaintainerMock.Object, new StreamMetricsRegistry()); + streamConfigMock.Setup(x => x.CommitIntervalMs).Returns(100); - globalStreamThread.Start(cancellationTokenSource.Token); + globalStreamThread.Start(); // this should be true as the thread should wait 100ms to flush globalStateMaintainerMock.Verify(x => x.FlushState(), Times.Never); + globalStreamThread.Dispose(); } [Test] public void ShouldFlush() { + var globalConsumerMock = new Mock>(); + var globalStateMaintainerMock = new Mock(); + var streamConfigMock = new Mock(); + streamConfigMock.Setup(x => x.PollMs).Returns(1); + streamConfigMock.Setup(x => x.CommitIntervalMs).Returns(1); + globalStateMaintainerMock.Setup(x => x.Initialize()).Returns(new Dictionary()); + + var globalStreamThread = new GlobalStreamThread("global", globalConsumerMock.Object, streamConfigMock.Object, + globalStateMaintainerMock.Object, new StreamMetricsRegistry()); + streamConfigMock.Setup(x => x.CommitIntervalMs).Returns(10); - globalStreamThread.Start(cancellationTokenSource.Token); + globalStreamThread.Start(); Thread.Sleep(50); // we are waiting longer than CommitIntervalMs so thread should already flush at least once globalStateMaintainerMock.Verify(x => x.FlushState()); + globalStreamThread.Dispose(); } } } \ No newline at end of file