Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue/319 #351

Merged
merged 4 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions core/Crosscutting/DictionaryExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,25 @@ public static bool AddOrUpdate<K, V>(this IDictionary<K, V> map, K key, V value)
return true;
}

/// <summary>
/// Add the element if the key doesn't exist
/// </summary>
/// <typeparam name="K">Key type</typeparam>
/// <typeparam name="V">Value type</typeparam>
/// <param name="map">Source dictionary</param>
/// <param name="key">New key</param>
/// <param name="value">Value</param>
/// <returns>Return true if the key|value was added, false otherwise</returns>
public static bool AddIfNotExist<K, V>(this IDictionary<K, V> map, K key, V value)
{
if (!map.ContainsKey(key))
{
map.Add(key, value);
return true;
}
return false;
}

/// <summary>
/// Convert enumerable of <see cref="KeyValuePair{K, V}"/> to <see cref="IDictionary{K, V}"/>
/// </summary>
Expand Down
12 changes: 8 additions & 4 deletions core/KafkaStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,8 @@ public async Task StartAsync(CancellationToken? token = null)
Dispose();
});
}
await Task.Factory.StartNew(async () =>

await Task.Factory.StartNew(() =>
{
if (SetState(State.REBALANCING))
{
Expand All @@ -448,7 +449,8 @@ await Task.Factory.StartNew(async () =>
SetState(State.PENDING_SHUTDOWN);
SetState(State.ERROR);
}
return;

return Task.CompletedTask;
}

RunMiddleware(true, true);
Expand All @@ -463,6 +465,8 @@ await Task.Factory.StartNew(async () =>

RunMiddleware(false, true);
}

return Task.CompletedTask;
}, token ?? _cancelSource.Token);


Expand Down Expand Up @@ -624,8 +628,8 @@ private async Task InitializeInternalTopicManagerAsync()
{
// Create internal topics (changelogs & repartition) if need
var adminClientInternalTopicManager = kafkaSupplier.GetAdmin(configuration.ToAdminConfig(StreamThread.GetSharedAdminClientId($"{configuration.ApplicationId.ToLower()}-admin-internal-topic-manager")));
using(var internalTopicManager = new DefaultTopicManager(configuration, adminClientInternalTopicManager))
await InternalTopicManagerUtils.New().CreateInternalTopicsAsync(internalTopicManager, topology.Builder);
using var internalTopicManager = new DefaultTopicManager(configuration, adminClientInternalTopicManager);
await InternalTopicManagerUtils.New().CreateInternalTopicsAsync(internalTopicManager, topology.Builder);
}

private void RunMiddleware(bool before, bool start)
Expand Down
9 changes: 5 additions & 4 deletions core/Mock/Kafka/MockCluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,8 @@ internal ConsumeResult<byte[], byte[]> Consume(MockConsumer mockConsumer, TimeSp
Key = record.Key,
Value = record.Value,
Timestamp = new Timestamp(
record.Timestamp ?? DateTime.Now, TimestampType.NotAvailable)
record.Timestamp ?? DateTime.Now, TimestampType.NotAvailable),
Headers = record.Headers
}
};
++offset.OffsetConsumed;
Expand Down Expand Up @@ -800,7 +801,7 @@ internal DeliveryReport<byte[], byte[]> Produce(string topic, Message<byte[], by
else
partition = Math.Abs(MurMurHash3.Hash(new MemoryStream(message.Key))) % topics[topic].PartitionNumber;

topics[topic].AddMessage(message.Key, message.Value, partition, message.Timestamp.UnixTimestampMs);
topics[topic].AddMessage(message.Key, message.Value, partition, message.Timestamp.UnixTimestampMs, message.Headers);

r.Message = message;
r.Partition = partition;
Expand All @@ -822,13 +823,13 @@ internal DeliveryReport<byte[], byte[]> Produce(TopicPartition topicPartition, M
CreateTopic(topicPartition.Topic);
if (topics[topicPartition.Topic].PartitionNumber > topicPartition.Partition)
{
topics[topicPartition.Topic].AddMessage(message.Key, message.Value, topicPartition.Partition, message.Timestamp.UnixTimestampMs);
topics[topicPartition.Topic].AddMessage(message.Key, message.Value, topicPartition.Partition, message.Timestamp.UnixTimestampMs, message.Headers);
r.Status = PersistenceStatus.Persisted;
}
else
{
topics[topicPartition.Topic].CreateNewPartitions(topicPartition.Partition);
topics[topicPartition.Topic].AddMessage(message.Key, message.Value, topicPartition.Partition, message.Timestamp.UnixTimestampMs);
topics[topicPartition.Topic].AddMessage(message.Key, message.Value, topicPartition.Partition, message.Timestamp.UnixTimestampMs, message.Headers);
r.Status = PersistenceStatus.Persisted;
}

Expand Down
9 changes: 5 additions & 4 deletions core/Mock/Kafka/MockPartition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace Streamiz.Kafka.Net.Mock.Kafka
{
internal class MockPartition
{
private readonly List<(byte[], byte[], long)> log = new();
private readonly List<(byte[], byte[], long, Headers)> log = new();
private readonly Dictionary<long, long> mappingOffsets = new();

public MockPartition(int indice)
Expand All @@ -21,10 +21,10 @@ public MockPartition(int indice)
public long LowOffset { get; private set; } = Offset.Unset;
public long HighOffset { get; private set; } = Offset.Unset;

internal void AddMessageInLog(byte[] key, byte[] value, long timestamp)
internal void AddMessageInLog(byte[] key, byte[] value, long timestamp, Headers headers)
{
mappingOffsets.Add(Size, log.Count);
log.Add((key, value, timestamp));
log.Add((key, value, timestamp, headers));
++Size;
UpdateOffset();
}
Expand All @@ -47,7 +47,8 @@ internal TestRecord<byte[], byte[]> GetMessage(long offset)
{
Key = record.Item1,
Value = record.Item2,
Timestamp = record.Item3.FromMilliseconds()
Timestamp = record.Item3.FromMilliseconds(),
Headers = record.Item4
};
}

Expand Down
4 changes: 2 additions & 2 deletions core/Mock/Kafka/MockTopic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ public MockTopic(string topic, int part)
public int PartitionNumber { get; private set; }
public IEnumerable<MockPartition> Partitions => partitions.AsReadOnly();

public void AddMessage(byte[] key, byte[] value, int partition, long timestamp = 0)
public void AddMessage(byte[] key, byte[] value, int partition, long timestamp = 0, Headers headers = null)
{
partitions[partition].AddMessageInLog(key, value, timestamp);
partitions[partition].AddMessageInLog(key, value, timestamp, headers);
}

public TestRecord<byte[], byte[]> GetMessage(int partition, long consumerOffset)
Expand Down
14 changes: 8 additions & 6 deletions core/Mock/Sync/SyncPipeBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,23 @@ public StreamTaskPublisher(StreamTask task)
private int offset = 0;

public void PublishRecord(string topic, byte[] key, byte[] value, DateTime timestamp, Headers headers)
=> task.AddRecord(new ConsumeResult<byte[], byte[]>
{
Topic = topic,
TopicPartitionOffset = new TopicPartitionOffset(new TopicPartition(topic, task.Id.Partition), offset++),
Message = new Message<byte[], byte[]> { Key = key, Value = value, Timestamp = new Timestamp(timestamp), Headers = headers }
});
=> task.AddRecord(new ConsumeResult<byte[], byte[]>
{
Topic = topic,
TopicPartitionOffset = new TopicPartitionOffset(new TopicPartition(topic, task.Id.Partition), offset++),
Message = new Message<byte[], byte[]> { Key = key, Value = value, Timestamp = new Timestamp(timestamp), Headers = headers }
});

public void Flush()
{
long now = DateTime.Now.GetMilliseconds();
TaskManager.CurrentTask = task;
while (task.CanProcess(now))
task.Process();

task.PunctuateStreamTime();
task.PunctuateSystemTime();
TaskManager.CurrentTask = null;
}

public void Close()
Expand Down
63 changes: 32 additions & 31 deletions core/Processors/Internal/InternalTopicManagerUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,75 +25,74 @@ static InternalTopicManagerUtils()
};
}

internal static InternalTopicManagerUtils New() => new InternalTopicManagerUtils();
internal static InternalTopicManagerUtils New() => new();

internal async Task CreateInternalTopicsAsync(
ITopicManager topicManager,
InternalTopologyBuilder builder)
{
var clusterMetadata = topicManager.AdminClient.GetMetadata(timeout);
//var clusterMetadata = topicManager.AdminClient.GetMetadata(timeout);
var internalTopicsGroups = builder.MakeInternalTopicGroups();

var resultsConf = new List<DescribeConfigsResult>();
if (internalTopicsGroups.Any(internalTopic =>
!clusterMetadata.Topics.Exists(t => t.Topic.Equals(internalTopic.Value.SourceTopics.First()))))
{
brokerConfigResource.Name = clusterMetadata.Brokers[0].BrokerId.ToString();
resultsConf = await topicManager.AdminClient.DescribeConfigsAsync(new List<ConfigResource> { brokerConfigResource });
}


foreach (var entry in internalTopicsGroups)
{
ComputeRepartitionTopicConfig(entry.Value, internalTopicsGroups, clusterMetadata);
ComputeChangelogTopicConfig(entry.Value, clusterMetadata, resultsConf);
ComputeRepartitionTopicConfig(entry.Value, internalTopicsGroups, topicManager);
ComputeChangelogTopicConfig(entry.Value, topicManager);

var internalTopics = entry.Value.ChangelogTopics.Union(entry.Value.RepartitionTopics).ToDictionary();

await topicManager.ApplyAsync(entry.Key, internalTopics);
// refresh metadata
clusterMetadata = topicManager.AdminClient.GetMetadata(timeout);
}
}

private static void ComputeChangelogTopicConfig(InternalTopologyBuilder.TopologyTopicsInfo topicsInfo,
Metadata clusterMetadata, List<DescribeConfigsResult> resultsConf)
private static void ComputeChangelogTopicConfig(
InternalTopologyBuilder.TopologyTopicsInfo topicsInfo,
ITopicManager topicManager)
{
var topic = clusterMetadata.Topics.FirstOrDefault(t => t.Topic.Equals(topicsInfo.SourceTopics.First()));
if (topic != null)
var metadata =
topicManager.AdminClient.GetMetadata(topicsInfo.SourceTopics.First(), TimeSpan.FromSeconds(10));
var topicMetadata = metadata.Topics.FirstOrDefault(t => t.Topic.Equals(topicsInfo.SourceTopics.First()));
if (topicMetadata != null)
{
topicsInfo
.ChangelogTopics
.Values
.ForEach(c => c.NumberPartitions = topic.Partitions.Count);
.ForEach(c => c.NumberPartitions = topicMetadata.Partitions.Count);
}
else
{
topicsInfo
.ChangelogTopics
.Values
.ForEach(c => c.NumberPartitions = DefaultPartitionNumber(resultsConf));
.ForEach(c => c.NumberPartitions = -1);
}
}

private static void ComputeRepartitionTopicConfig(
InternalTopologyBuilder.TopologyTopicsInfo topicsInfo,
IDictionary<int, InternalTopologyBuilder.TopologyTopicsInfo> topologyTopicInfos,
Metadata clusterMetadata)
ITopicManager topicManager)
{
if (topicsInfo.RepartitionTopics.Any())
{
CheckIfExternalSourceTopicsExist(topicsInfo, clusterMetadata);
SetRepartitionSourceTopicPartitionCount(topicsInfo.RepartitionTopics, topologyTopicInfos,
clusterMetadata);
CheckIfExternalSourceTopicsExist(topicsInfo, topicManager);
SetRepartitionSourceTopicPartitionCount(topicsInfo.RepartitionTopics, topologyTopicInfos, topicManager);
}
}

private static void CheckIfExternalSourceTopicsExist(InternalTopologyBuilder.TopologyTopicsInfo topicsInfo, Metadata clusterMetadata)
private static void CheckIfExternalSourceTopicsExist(
InternalTopologyBuilder.TopologyTopicsInfo topicsInfo,
ITopicManager topicManager)
{
List<string> sourcesTopics = new List<string>(topicsInfo.SourceTopics);
sourcesTopics.RemoveAll(topicsInfo.RepartitionTopics.Keys);
sourcesTopics.RemoveAll(clusterMetadata.Topics.Select(t => t.Topic));
if (sourcesTopics.Any())
List<string> sourcesTopics = new List<string>();
foreach (var s in topicsInfo.SourceTopics)
{
var metadata = topicManager.AdminClient.GetMetadata(s, TimeSpan.FromSeconds(10));
if (metadata.PartitionCountForTopic(s).HasValue)
sourcesTopics.Add(s);
}

if (!sourcesTopics.Any())
{
log.LogError($"Topology use one (or multiple) repartition topic(s)." +
$" The following source topics ({string.Join(",", sourcesTopics)}) are missing/unknown." +
Expand All @@ -105,7 +104,7 @@ private static void CheckIfExternalSourceTopicsExist(InternalTopologyBuilder.Top
private static void SetRepartitionSourceTopicPartitionCount(
IDictionary<string, InternalTopicConfig> repartitionTopics,
IDictionary<int, InternalTopologyBuilder.TopologyTopicsInfo> topologyTopicInfos,
Metadata clusterMetadata)
ITopicManager topicManager)
{
#region Compute topic partition count

Expand All @@ -126,6 +125,8 @@ private static void SetRepartitionSourceTopicPartitionCount(
}
else
{
var clusterMetadata =
topicManager.AdminClient.GetMetadata(upstreamSourceTopic, TimeSpan.FromSeconds(10));
var count = clusterMetadata.PartitionCountForTopic(upstreamSourceTopic);
if (count == null)
count = ComputePartitionCount(upstreamSourceTopic);
Expand Down
8 changes: 6 additions & 2 deletions core/Processors/Internal/InternalTopologyBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -585,11 +585,11 @@ string AddRepartitionTopic(string topic)
int? internalTopicPartition = internalTopics[topic];

if (internalTopicPartition.HasValue)
repartitionTopics.Add(internalTopic,
repartitionTopics.AddIfNotExist(internalTopic,
new RepartitionTopicConfig()
{Name = internalTopic, NumberPartitions = internalTopicPartition.Value});
else
repartitionTopics.Add(internalTopic,
repartitionTopics.AddIfNotExist(internalTopic,
new RepartitionTopicConfig() {Name = internalTopic});
return internalTopic;
}
Expand Down Expand Up @@ -818,6 +818,10 @@ internal void GetLinkTopics(string topic, IList<string> linkTopics)
var sinkTopic = internalTopics.ContainsKey(sinkNode.Topic)
? DecorateTopic(sinkNode.Topic)
: sinkNode.Topic;

if (linkTopics.Contains(sinkTopic))
return;

linkTopics.Add(sinkTopic);
GetLinkTopics(sinkTopic, linkTopics);
}
Expand Down
12 changes: 11 additions & 1 deletion core/StreamConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2003,7 +2003,16 @@ public IsolationLevel? IsolationLevel
/// low
/// </summary>
[StreamConfigProperty("allow.auto.create.topics")]
public bool? AllowAutoCreateTopics { get { return _consumerConfig.AllowAutoCreateTopics; } set { _consumerConfig.AllowAutoCreateTopics = value; } }
public bool? AllowAutoCreateTopics
{
get { return _consumerConfig.AllowAutoCreateTopics; }
set
{
_consumerConfig.AllowAutoCreateTopics = value;
_adminClientConfig.AllowAutoCreateTopics = value;
_producerConfig.AllowAutoCreateTopics = value;
}
}

#endregion

Expand Down Expand Up @@ -2297,6 +2306,7 @@ public StreamConfig(IDictionary<string, dynamic> properties)
MaxPollIntervalMs = 300000;
EnableAutoCommit = false;
EnableAutoOffsetStore = false;
AllowAutoCreateTopics = false;
PartitionAssignmentStrategy = Confluent.Kafka.PartitionAssignmentStrategy.CooperativeSticky;
Partitioner = Confluent.Kafka.Partitioner.Murmur2Random;

Expand Down
2 changes: 2 additions & 0 deletions environment/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ services:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker-1:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
KAFKA_ALLOW_AUTO_CREATE_TOPICS: false
KAFKA_DEFAULT_REPLICATION_FACTOR: 2
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 2
Expand Down Expand Up @@ -59,6 +60,7 @@ services:
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
KAFKA_DEFAULT_REPLICATION_FACTOR: 2
KAFKA_ALLOW_AUTO_CREATE_TOPICS: false
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 2
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 2
Expand Down
4 changes: 1 addition & 3 deletions samples/sample-stream/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,17 @@
using Streamiz.Kafka.Net;
using Streamiz.Kafka.Net.SerDes;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Streamiz.Kafka.Net.Metrics;
using Streamiz.Kafka.Net.Metrics.Prometheus;
using Streamiz.Kafka.Net.Stream;
using Microsoft.Extensions.Logging;
using Streamiz.Kafka.Net.Table;

namespace sample_stream
{
public static class Program
{
public static async Task Main2(string[] args)
public static async Task Main(string[] args)
{
var config = new StreamConfig<StringSerDes, StringSerDes>{
ApplicationId = $"test-app",
Expand Down
Loading
Loading