Skip to content

Commit

Permalink
Add the source record partition
Browse files Browse the repository at this point in the history
  • Loading branch information
LGouellec committed Jul 3, 2024
1 parent 3630cc4 commit c601bd5
Show file tree
Hide file tree
Showing 10 changed files with 49 additions and 44 deletions.
2 changes: 1 addition & 1 deletion core/Processors/IStreamPartitioner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ namespace Streamiz.Kafka.Net.Processors
{
public interface IStreamPartitioner<K, V>
{
Partition Partition(string topic, K key, V value, int numPartitions);
Partition Partition(string topic, K key, V value, Partition sourcePartition, int numPartitions);
}
}
9 changes: 7 additions & 2 deletions core/Processors/Internal/DefaultStreamPartitioner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@

namespace Streamiz.Kafka.Net.Processors.Internal
{
/// <summary>
/// Forward the source partition as the sink partition of the record
/// </summary>
/// <typeparam name="K">Key record type</typeparam>
/// <typeparam name="V">Value record type</typeparam>
internal class DefaultStreamPartitioner<K, V> : IStreamPartitioner<K, V>
{
public Partition Partition(string topic, K key, V value, int numPartitions)
=> Confluent.Kafka.Partition.Any;
public Partition Partition(string topic, K key, V value, Partition sourcePartition, int numPartitions)
=> sourcePartition;
}
}
8 changes: 4 additions & 4 deletions core/Processors/Internal/WrapperStreamPartitioner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ namespace Streamiz.Kafka.Net.Processors.Internal
{
internal class WrapperStreamPartitioner<K, V> : IStreamPartitioner<K, V>
{
private readonly Func<string, K, V, int, Partition> _partitioner;
private readonly Func<string, K, V, Partition, int, Partition> _partitioner;

public WrapperStreamPartitioner(Func<string, K, V, int, Partition> partitioner)
public WrapperStreamPartitioner(Func<string, K, V, Partition, int, Partition> partitioner)
{
_partitioner = partitioner;
}

public Partition Partition(string topic, K key, V value, int numPartitions)
=> _partitioner(topic, key, value, numPartitions);
public Partition Partition(string topic, K key, V value, Partition sourcePartition, int numPartitions)
=> _partitioner(topic, key, value, sourcePartition, numPartitions);
}
}
1 change: 1 addition & 0 deletions core/Processors/SinkProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public override void Process(K key, V value)
Partition partition = partitioner.Partition(
topicName,
key, value,
Context.Partition,
Context.RecordCollector.PartitionsFor(topicName));

Context.RecordCollector.Send(
Expand Down
16 changes: 8 additions & 8 deletions core/Stream/IKStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public interface IKStream<K, V>
/// <param name="named">A <see cref="string"/> config used to name the processor in the topology. Default : null</param>
/// <exception cref="ArgumentNullException">Throw <see cref="ArgumentNullException"/> if <paramref name="topicName"/> is null</exception>
/// /// <exception cref="ArgumentException">Throw <see cref="ArgumentException"/> if <paramref name="topicName"/> is incorrect</exception>
void To(string topicName, Func<string, K, V, int, Partition> partitioner, string named = null);
void To(string topicName, Func<string, K, V, Partition, int, Partition> partitioner, string named = null);

/// <summary>
/// Materialize this stream to a topic using default serializers specified in the config and producer's.
Expand Down Expand Up @@ -164,7 +164,7 @@ public interface IKStream<K, V>
/// <param name="named">A <see cref="string"/> config used to name the processor in the topology. Default : null</param>
/// <exception cref="ArgumentNullException">Throw <see cref="ArgumentNullException"/> if <paramref name="topicName"/> is null</exception>
/// /// <exception cref="ArgumentException">Throw <see cref="ArgumentException"/> if <paramref name="topicName"/> is incorrect</exception>
void To(string topicName, Func<string, K, V, int, Partition> partitioner, ISerDes<K> keySerdes, ISerDes<V> valueSerdes, string named = null);
void To(string topicName, Func<string, K, V, Partition, int, Partition> partitioner, ISerDes<K> keySerdes, ISerDes<V> valueSerdes, string named = null);

/// <summary>
/// Dynamically materialize this stream to topics using default serializers specified in the config and producer's.
Expand All @@ -181,7 +181,7 @@ public interface IKStream<K, V>
/// <param name="topicExtractor">Extractor function to determine the name of the Kafka topic to write to for each record</param>
/// <param name="partitioner">The function used to determine how records are distributed among partitions of the topic</param>
/// <param name="named">A <see cref="string"/> config used to name the processor in the topology. Default : null</param>
void To(Func<K, V, IRecordContext, string> topicExtractor, Func<string, K, V, int, Partition> partitioner, string named = null);
void To(Func<K, V, IRecordContext, string> topicExtractor, Func<string, K, V, Partition, int, Partition> partitioner, string named = null);

/// <summary>
/// Dynamically materialize this stream to topics using default serializers specified in the config and producer's.
Expand All @@ -202,7 +202,7 @@ public interface IKStream<K, V>
/// <param name="keySerdes">Key serializer</param>
/// <param name="valueSerdes">Value serializer</param>
/// <param name="named">A <see cref="string"/> config used to name the processor in the topology. Default : null</param>
void To(Func<K, V, IRecordContext, string> topicExtractor, Func<string, K, V, int, Partition> partitioner, ISerDes<K> keySerdes, ISerDes<V> valueSerdes, string named = null);
void To(Func<K, V, IRecordContext, string> topicExtractor, Func<string, K, V, Partition, int, Partition> partitioner, ISerDes<K> keySerdes, ISerDes<V> valueSerdes, string named = null);


/// <summary>
Expand Down Expand Up @@ -252,7 +252,7 @@ public interface IKStream<K, V>
/// <param name="recordTimestampExtractor">Extractor function to determine the timestamp of the record stored in the Kafka topic</param>
/// <param name="partitioner">The function used to determine how records are distributed among partitions of the topic</param>
/// <param name="named">A <see cref="string"/> config used to name the processor in the topology. Default : null</param>
void To(Func<K, V, IRecordContext, string> topicExtractor, Func<K, V, IRecordContext, long> recordTimestampExtractor, Func<string, K, V, int, Partition> partitioner, string named = null);
void To(Func<K, V, IRecordContext, string> topicExtractor, Func<K, V, IRecordContext, long> recordTimestampExtractor, Func<string, K, V, Partition, int, Partition> partitioner, string named = null);

/// <summary>
/// Dynamically materialize this stream to topics using default serializers specified in the config and producer's.
Expand Down Expand Up @@ -337,7 +337,7 @@ public interface IKStream<K, V>
/// <param name="topicName">the topic name</param>
/// <param name="partitioner">The function used to determine how records are distributed among partitions of the topic</param>
/// <param name="named">A <see cref="string"/> config used to name the processor in the topology. Default : null</param>
void To<KS, VS>(string topicName, Func<string, K, V, int, Partition> partitioner, string named = null) where KS : ISerDes<K>, new() where VS : ISerDes<V>, new();
void To<KS, VS>(string topicName, Func<string, K, V, Partition, int, Partition> partitioner, string named = null) where KS : ISerDes<K>, new() where VS : ISerDes<V>, new();

/// <summary>
/// Dynamically materialize this stream to a topic using <typeparamref name="KS"/> and <typeparamref name="VS"/> serializers specified in the method parameters.
Expand All @@ -348,7 +348,7 @@ public interface IKStream<K, V>
/// <param name="topicExtractor">Extractor function to determine the name of the Kafka topic to write to for each record</param>
/// <param name="partitioner">The function used to determine how records are distributed among partitions of the topic</param>
/// <param name="named">A <see cref="string"/> config used to name the processor in the topology. Default : null</param>
void To<KS, VS>(Func<K, V, IRecordContext, string> topicExtractor, Func<string, K, V, int, Partition> partitioner, string named = null) where KS : ISerDes<K>, new() where VS : ISerDes<V>, new();
void To<KS, VS>(Func<K, V, IRecordContext, string> topicExtractor, Func<string, K, V, Partition, int, Partition> partitioner, string named = null) where KS : ISerDes<K>, new() where VS : ISerDes<V>, new();

/// <summary>
/// Dynamically materialize this stream to a topic using <typeparamref name="KS"/> and <typeparamref name="VS"/> serializers specified in the method parameters.
Expand Down Expand Up @@ -402,7 +402,7 @@ public interface IKStream<K, V>
/// <param name="recordTimestampExtractor">Extractor function to determine the timestamp of the record stored in the Kafka topic</param>
/// <param name="partitioner">The function used to determine how records are distributed among partitions of the topic</param>
/// <param name="named">A <see cref="string"/> config used to name the processor in the topology. Default : null</param>
void To<KS, VS>(Func<K, V, IRecordContext, string> topicExtractor, Func<K, V, IRecordContext, long> recordTimestampExtractor, Func<string, K, V, int, Partition> partitioner, string named = null) where KS : ISerDes<K>, new() where VS : ISerDes<V>, new();
void To<KS, VS>(Func<K, V, IRecordContext, string> topicExtractor, Func<K, V, IRecordContext, long> recordTimestampExtractor, Func<string, K, V, Partition, int, Partition> partitioner, string named = null) where KS : ISerDes<K>, new() where VS : ISerDes<V>, new();

/// <summary>
/// Dynamically materialize this stream to a topic using <typeparamref name="KS"/> and <typeparamref name="VS"/> serializers specified in the method parameters.
Expand Down
16 changes: 8 additions & 8 deletions core/Stream/Internal/KStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public void To(string topicName, string named = null)
To(new StaticTopicNameExtractor<K, V>(topicName), new DefaultStreamPartitioner<K, V>(), named);
}

public void To(string topicName, Func<string, K, V, int, Partition> partitioner, string named = null)
public void To(string topicName, Func<string, K, V, Partition, int, Partition> partitioner, string named = null)
{
if (string.IsNullOrEmpty(topicName))
{
Expand Down Expand Up @@ -221,7 +221,7 @@ public void To(string topicName, ISerDes<K> keySerdes, ISerDes<V> valueSerdes, s
To(new StaticTopicNameExtractor<K, V>(topicName), new DefaultStreamPartitioner<K, V>(), keySerdes, valueSerdes, named);
}

public void To(string topicName, Func<string, K, V, int, Partition> partitioner, ISerDes<K> keySerdes, ISerDes<V> valueSerdes, string named = null)
public void To(string topicName, Func<string, K, V, Partition, int, Partition> partitioner, ISerDes<K> keySerdes, ISerDes<V> valueSerdes, string named = null)
{
if (string.IsNullOrEmpty(topicName))
throw new ArgumentException("topicName must be empty");
Expand All @@ -232,7 +232,7 @@ public void To(string topicName, Func<string, K, V, int, Partition> partitioner,
}

public void To(Func<K, V, IRecordContext, string> topicExtractor,
Func<string, K, V, int, Partition> partitioner, ISerDes<K> keySerdes, ISerDes<V> valueSerdes,
Func<string, K, V, Partition, int, Partition> partitioner, ISerDes<K> keySerdes, ISerDes<V> valueSerdes,
string named = null)
=> To(
new WrapperTopicNameExtractor<K, V>(topicExtractor),
Expand Down Expand Up @@ -270,7 +270,7 @@ public void To(Func<K, V, IRecordContext, string> topicExtractor, string named =
=> To(new WrapperTopicNameExtractor<K, V>(topicExtractor), named);

public void To(Func<K, V, IRecordContext, string> topicExtractor,
Func<string, K, V, int, Partition> partitioner, string named = null)
Func<string, K, V, Partition, int, Partition> partitioner, string named = null)
=> To(new WrapperTopicNameExtractor<K, V>(topicExtractor),
new WrapperStreamPartitioner<K, V>(partitioner),
named);
Expand All @@ -288,7 +288,7 @@ public void To(Func<K, V, IRecordContext, string> topicExtractor,
named);

public void To(Func<K, V, IRecordContext, string> topicExtractor,
Func<K, V, IRecordContext, long> recordTimestampExtractor, Func<string, K, V, int, Partition> partitioner,
Func<K, V, IRecordContext, long> recordTimestampExtractor, Func<string, K, V, Partition, int, Partition> partitioner,
string named = null)
=> To(
new WrapperTopicNameExtractor<K, V>(topicExtractor),
Expand Down Expand Up @@ -345,7 +345,7 @@ public void To(ITopicNameExtractor<K, V> topicExtractor, ISerDes<K> keySerdes, I
partitioner,
Produced<K, V>.Create(keySerdes, valueSerdes).WithName(named));

public void To<KS, VS>(Func<K, V, IRecordContext, string> topicExtractor, Func<string, K, V, int, Partition> partitioner, string named = null) where KS : ISerDes<K>, new() where VS : ISerDes<V>, new()
public void To<KS, VS>(Func<K, V, IRecordContext, string> topicExtractor, Func<string, K, V, Partition, int, Partition> partitioner, string named = null) where KS : ISerDes<K>, new() where VS : ISerDes<V>, new()
=> To<KS, VS>(
new WrapperTopicNameExtractor<K, V>(topicExtractor),
new WrapperStreamPartitioner<K, V>(partitioner),
Expand All @@ -361,7 +361,7 @@ public void To<KS, VS>(string topicName, string named = null)
where VS : ISerDes<V>, new()
=> To<KS, VS>(new StaticTopicNameExtractor<K, V>(topicName), named);

public void To<KS, VS>(string topicName, Func<string, K, V, int, Partition> partitioner, string named = null) where KS : ISerDes<K>, new() where VS : ISerDes<V>, new()
public void To<KS, VS>(string topicName, Func<string, K, V, Partition, int, Partition> partitioner, string named = null) where KS : ISerDes<K>, new() where VS : ISerDes<V>, new()
=> To<KS, VS>(new StaticTopicNameExtractor<K, V>(topicName),
new WrapperStreamPartitioner<K, V>(partitioner),
named);
Expand All @@ -383,7 +383,7 @@ public void To<KS, VS>(Func<K, V, IRecordContext, string> topicExtractor, Func<K
where VS : ISerDes<V>, new()
=> To<KS, VS>(new WrapperTopicNameExtractor<K, V>(topicExtractor), new WrapperRecordTimestampExtractor<K, V>(recordTimestampExtractor), named);

public void To<KS, VS>(Func<K, V, IRecordContext, string> topicExtractor, Func<K, V, IRecordContext, long> recordTimestampExtractor, Func<string, K, V, int, Partition> partitioner, string named = null) where KS : ISerDes<K>, new() where VS : ISerDes<V>, new()
public void To<KS, VS>(Func<K, V, IRecordContext, string> topicExtractor, Func<K, V, IRecordContext, long> recordTimestampExtractor, Func<string, K, V, Partition, int, Partition> partitioner, string named = null) where KS : ISerDes<K>, new() where VS : ISerDes<V>, new()
=> To<KS, VS>(new WrapperTopicNameExtractor<K, V>(topicExtractor), new WrapperRecordTimestampExtractor<K, V>(recordTimestampExtractor), new WrapperStreamPartitioner<K, V>(partitioner), named);

public void To<KS, VS>(ITopicNameExtractor<K, V> topicExtractor,
Expand Down
4 changes: 2 additions & 2 deletions core/Stream/Repartitioned.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public static Repartitioned<K, V> Create<KS, VS>()
/// </summary>
/// <param name="streamPartitioner">the function used to determine how records are distributed among partitions of the topic</param>
/// <returns>A new instance of <see cref="Repartitioned{K,V}"/></returns>
public static Repartitioned<K, V> Partitioner(Func<string, K, V, int, Partition> streamPartitioner)
public static Repartitioned<K, V> Partitioner(Func<string, K, V, Partition, int, Partition> streamPartitioner)
=> Partitioner(new WrapperStreamPartitioner<K, V>(streamPartitioner));


Expand Down Expand Up @@ -150,7 +150,7 @@ public Repartitioned<K, V> WithStreamPartitioner(IStreamPartitioner<K, V> stream
/// </summary>
/// <param name="streamPartitioner">Function to determine the partition where the repartition record will be persist</param>
/// <returns>this</returns>
public Repartitioned<K, V> WithStreamPartitioner(Func<string, K, V, int, Partition> streamPartitioner)
public Repartitioned<K, V> WithStreamPartitioner(Func<string, K, V, Partition, int, Partition> streamPartitioner)
=> WithStreamPartitioner(new WrapperStreamPartitioner<K, V>(streamPartitioner));


Expand Down
13 changes: 6 additions & 7 deletions samples/sample-stream/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@
using Streamiz.Kafka.Net.Metrics;
using Streamiz.Kafka.Net.Metrics.Prometheus;
using Streamiz.Kafka.Net.Stream;
using System.Collections.Generic;
using Microsoft.Extensions.Logging;
using Streamiz.Kafka.Net.Metrics.Prometheus;
using Streamiz.Kafka.Net.Table;

namespace sample_stream
Expand All @@ -22,9 +20,6 @@ public static async Task Main(string[] args)
ApplicationId = $"test-app",
BootstrapServers = "localhost:9092",
AutoOffsetReset = AutoOffsetReset.Earliest,
PartitionAssignmentStrategy = PartitionAssignmentStrategy.RoundRobin,
Partitioner = Partitioner.ConsistentRandom,
Debug = "broker,topic,msg",
Logger = LoggerFactory.Create((b) =>
{
b.AddConsole();
Expand All @@ -49,7 +44,7 @@ private static Topology BuildTopology()
TimeSpan windowSize = TimeSpan.FromHours(1);

var builder = new StreamBuilder();
builder.Stream<string, string>("input")
/*builder.Stream<string, string>("input")
.GroupByKey()
.WindowedBy(TumblingWindowOptions.Of(windowSize))
.Count(RocksDbWindows.As<string, long>("count-store")
Expand All @@ -60,7 +55,11 @@ private static Topology BuildTopology()
.Map((k,v) => new KeyValuePair<string,string>(k.ToString(), v.ToString()))
.To("output",
new StringSerDes(),
new StringSerDes());
new StringSerDes());*/

builder.Stream<string, string>("input")
.To(
"output");//, (s, s1, arg3, arg4) => new Partition(0));


return builder.Build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void RepartitionWithPartitioner()
{
builder
.Stream<string, string>("topic")
.Repartition(Repartitioned<string, string>.Empty().WithStreamPartitioner((t,k,v, c) => 0))
.Repartition(Repartitioned<string, string>.Empty().WithStreamPartitioner((t,k,v,_, c) => 0))
.To("output");
}, TopologyTestDriver.Mode.ASYNC_CLUSTER_IN_MEMORY, 10);

Expand Down
Loading

0 comments on commit c601bd5

Please sign in to comment.