Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
LGouellec committed Dec 13, 2024
2 parents 93a803c + 843e4fe commit d9ee9ca
Show file tree
Hide file tree
Showing 399 changed files with 15,082 additions and 4,336 deletions.
6 changes: 3 additions & 3 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
jobs:
build:
docker:
- image: mcr.microsoft.com/dotnet/sdk:6.0
- image: mcr.microsoft.com/dotnet/sdk:8.0
steps:
- checkout
- run: dotnet tool install --global dotnet-sonarscanner --version 5.11.0
- run: dotnet tool install --global dotnet-sonarscanner --version 9.0.0
- run: echo 'export PATH="$PATH:/root/.dotnet/tools"' >> $BASH_ENV
#- run: echo "deb http://ftp.us.debian.org/debian stretch main contrib non-free" >> /etc/apt/sources.list
- run: echo "deb http://ftp.debian.org/debian stable main contrib non-free" >> /etc/apt/sources.list
Expand All @@ -26,7 +26,7 @@
- run: export JAVA_HOME
- run: dotnet sonarscanner begin /k:LGouellec_kafka-streams-dotnet /o:kafka-streams-dotnet /d:sonar.login=${SONAR_TOKEN} /d:sonar.host.url=https://sonarcloud.io /d:sonar.cs.opencover.reportsPaths="**\coverage*.opencover.xml" /d:sonar.coverage.exclusions="**sample*/*.cs,**test*/*.cs,**Tests*.cs,**Mock*.cs,**State/Cache/Internal/*.cs"
- run: dotnet build
- run: dotnet test --no-restore --no-build --verbosity normal -f net6.0 --collect:"XPlat Code Coverage" /p:CollectCoverage=true /p:CoverletOutputFormat=opencover test/Streamiz.Kafka.Net.Tests/Streamiz.Kafka.Net.Tests.csproj
- run: dotnet test --no-restore --no-build --verbosity normal -f net8.0 --collect:"XPlat Code Coverage" /p:CollectCoverage=true /p:CoverletOutputFormat=opencover test/Streamiz.Kafka.Net.Tests/Streamiz.Kafka.Net.Tests.csproj
- run: dotnet sonarscanner end /d:sonar.login=${SONAR_TOKEN}

workflows:
Expand Down
8 changes: 8 additions & 0 deletions .github/workflows/build-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ jobs:
uses: actions/setup-dotnet@v1
with:
dotnet-version: 6.0.202
- name: Setup .NET 7.0
uses: actions/setup-dotnet@v1
with:
dotnet-version: 7.0.410
- name: Setup .NET 8.0
uses: actions/setup-dotnet@v1
with:
dotnet-version: 8.0.404
# BEGIN Dependencies for RocksDB
- run: sudo apt install -y libc6-dev libgflags-dev libsnappy-dev zlib1g-dev libbz2-dev liblz4-dev libzstd-dev
- run: sudo apt install -y bzip2 lz4 librocksdb-dev
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: integration

on:
push:
branches: [ integration ]
branches: [ master ]

jobs:
build:
Expand All @@ -11,10 +11,10 @@ jobs:

steps:
- uses: actions/checkout@v2
- name: Setup .NET 6.0
- name: Setup .NET 8.0
uses: actions/setup-dotnet@v1
with:
dotnet-version: 6.0.202
dotnet-version: 8.0.10
# BEGIN Dependencies for RocksDB
- run: sudo apt install -y libc6-dev libgflags-dev libsnappy-dev zlib1g-dev libbz2-dev liblz4-dev libzstd-dev
- run: sudo apt install -y bzip2 lz4 librocksdb-dev
Expand Down
10 changes: 10 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ jobs:
uses: actions/setup-dotnet@v1
with:
dotnet-version: 6.0.202
- name: Setup .NET 7.0
uses: actions/setup-dotnet@v1
with:
dotnet-version: 7.0.20
- name: Setup .NET 8.0
uses: actions/setup-dotnet@v1
with:
dotnet-version: 8.0.10
- name: Install dependencies
run: dotnet restore
- name: Build
Expand All @@ -42,5 +50,7 @@ jobs:
run: dotnet pack metrics/Streamiz.Kafka.Net.Metrics.Prometheus/Streamiz.Kafka.Net.Metrics.Prometheus.csproj --configuration Release --no-build --no-restore
- name: Pack Metrics OpenTelemetry
run: dotnet pack metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/Streamiz.Kafka.Net.Metrics.OpenTelemetry.csproj --configuration Release --no-build --no-restore
- name: Pack Azure Remote Storage
run: dotnet pack remote/Streamiz.Kafka.Net.Azure.RemoteStorage/Streamiz.Kafka.Net.Azure.RemoteStorage.csproj --configuration Release --no-build --no-restore
- name: Publish in nuget.org
run: dotnet nuget push **/*.nupkg -k ${{ secrets.NUGET_PACKAGE_TOKEN }} -s https://api.nuget.org/v3/index.json -n -d
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,7 @@ build
TestResults

.idea/
.vscode/
.vscode/

confidential
roadmap.md
2 changes: 1 addition & 1 deletion .gitpod.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ tasks:
echo "1" > run
echo "🚀 Enjoy Streamiz the .NET Stream processing library for Apache Kafka (TM)";
sleep 2
dotnet run -f net6.0 --project samples/sample-stream-demo/sample-stream-demo.csproj --no-build --no-restore
dotnet run -f net6.0 --project launcher/sample-stream-demo/sample-stream-demo.csproj --no-build --no-restore
- name: producer
command: while [ ! -f run ]; do sleep 1; done; docker-compose -f environment/docker-compose.yml exec broker kafka-console-producer --bootstrap-server broker:29092 --topic input
- name: consumer
Expand Down
4 changes: 0 additions & 4 deletions Pointers.md

This file was deleted.

45 changes: 11 additions & 34 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
# .NET Stream Processing Library for Apache Kafka <sup>TM</sup> &middot; [![GitHub license](https://img.shields.io/badge/license-MIT-green.svg)](https://github.com/LGouellec/streamiz-kafka-net/blob/master/LICENSE) &middot; [![Join the chat at https://discord.gg/J7Jtxum](https://img.shields.io/discord/704268523169382421.svg?logoColor=white)](https://discord.gg/J7Jtxum) ![build](https://github.com/LGouellec/kafka-streams-dotnet/workflows/build/badge.svg?branch=master)
> [!IMPORTANT]
We’re excited to hear from you and would love to get your feedback on the product. Your insights are invaluable and will help us shape the future of our product to better meet your needs. The [survey](https://docs.google.com/forms/d/1OVISLOQY0FLcvh9KhSEPZ5K2Lgqzl7NkciCyd6w9_kU/) will only take a few minutes, and your responses will be completely confidential.


# .NET Stream Processing Library for Apache Kafka <sup>TM</sup> &middot; [![GitHub license](https://img.shields.io/badge/license-MIT-green.svg)](https://github.com/LGouellec/streamiz/blob/master/LICENSE) &middot; [![Join the chat at https://discord.gg/J7Jtxum](https://img.shields.io/discord/704268523169382421.svg?logoColor=white)](https://discord.gg/J7Jtxum) ![build](https://github.com/LGouellec/streamiz/workflows/build/badge.svg?branch=master)

| Package | Nuget version | Downloads |
|---|---|---|
Expand All @@ -19,13 +23,6 @@
[![Sonar Cloud Maintainability Rate](https://sonarcloud.io/api/project_badges/measure?branch=master&project=LGouellec_kafka-streams-dotnet&metric=sqale_rating)](https://sonarcloud.io/dashboard?branch=master&id=LGouellec_kafka-streams-dotnet)
[![Sonar Cloud Duplicated Code](https://sonarcloud.io/api/project_badges/measure?branch=master&project=LGouellec_kafka-streams-dotnet&metric=duplicated_lines_density)](https://sonarcloud.io/dashboard?branch=master&id=LGouellec_kafka-streams-dotnet)

## Project Statistics
<div>
<img alt="GitHub issues" src="https://img.shields.io/github/issues/LGouellec/kafka-streams-dotnet">
<img alt="GitHub pull requests" src="https://img.shields.io/github/issues-pr/LGouellec/kafka-streams-dotnet">
</div>
<br/>

<img src="./resources/logo-kafka-stream-net.png" width="150">

----
Expand All @@ -40,7 +37,7 @@ affiliation with and is not endorsed by The Apache Software Foundation.

# Try it with Gitpod

[![Open in Gitpod](https://gitpod.io/button/open-in-gitpod.svg)](https://gitpod.io/#https://github.com/LGouellec/kafka-streams-dotnet)
[![Open in Gitpod](https://gitpod.io/button/open-in-gitpod.svg)](https://gitpod.io/#https://github.com/LGouellec/streamiz)

## Step 1

Expand All @@ -57,7 +54,7 @@ Switch to `consumer`terminal and check aggregation result

# Documentation

Read the full documentation on https://lgouellec.github.io/kafka-streams-dotnet/
Read the full documentation on https://lgouellec.github.io/streamiz/

# Installation

Expand Down Expand Up @@ -119,31 +116,11 @@ static async System.Threading.Tasks.Task Main(string[] args)
| Sliding window | X | | No plan for now |
| Session window | X | | No plan for now |
| Cache | X | X | EA 1.6.0 |
| Suppress(..) | X | | No plan for now |
| Suppress(..) | X | X | Plan for 1.7.0 |
| Interactive Queries | X | | No plan for now |
| State store batch restoring | X | | No plan for now |
| Exactly Once (v1 and v2) | X | X | EOS V1 supported, EOS V2 not supported yet |

# Contributing

Maintainers:

- [lgouellec](https://github.com/LGouellec)

**Streamiz Kafka .Net** is a community project. We invite your participation through issues and pull requests! You can peruse the [contributing guidelines](CONTRIBUTING.md).

When adding or changing a service please add tests and documentations.

# Support

You can found support [here](https://discord.gg/J7Jtxum)
| Exactly Once v2 | X | X | |

# Star History
# Community Support

<a href="https://star-history.com/#LGouellec/kafka-streams-dotnet&Date">
<picture>
<source media="(prefers-color-scheme: dark)" srcset="https://api.star-history.com/svg?repos=LGouellec/kafka-streams-dotnet&type=Date&theme=dark" />
<source media="(prefers-color-scheme: light)" srcset="https://api.star-history.com/svg?repos=LGouellec/kafka-streams-dotnet&type=Date" />
<img alt="Star History Chart" src="https://api.star-history.com/svg?repos=LGouellec/kafka-streams-dotnet&type=Date" />
</picture>
</a>
Feel free to reach out to our community support [here](https://discord.gg/J7Jtxum) anytime; we're here to help you with any questions or issues you may have!
32 changes: 32 additions & 0 deletions core/Crosscutting/ByteBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,31 @@ public ByteBuffer PutInt(int @int)

#region ReadOperation

public long GetLong()
{
var bytes = reader.ReadBytes(sizeof(long));

if (BitConverter.IsLittleEndian && bigEndian)
bytes = bytes.Reverse().ToArray();

return BitConverter.ToInt64(bytes, 0);
}

public int GetInt()
{
var bytes = reader.ReadBytes(sizeof(int));

if (BitConverter.IsLittleEndian && bigEndian)
bytes = bytes.Reverse().ToArray();

return BitConverter.ToInt32(bytes, 0);
}

public byte[] GetBytes(int size)
{
return reader.ReadBytes(size);
}

public long GetLong(int offset)
{
reader.BaseStream.Seek(offset, SeekOrigin.Begin);
Expand Down Expand Up @@ -116,6 +141,13 @@ public byte[] GetBytes(int offset, int size)
return reader.ReadBytes(size);
}

public byte[] GetNullableSizePrefixedArray()
{
var size = GetInt();
if (size == -1)
return null;
return GetBytes(size);
}
#endregion

public byte[] ToArray()
Expand Down
11 changes: 10 additions & 1 deletion core/Crosscutting/Bytes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,16 @@ public class Bytes : IEquatable<Bytes>, IComparable<Bytes>
/// Create a Bytes using the byte array.
/// </summary>
/// <param name="bytes">This array becomes the backing storage for the object.</param>
[Obsolete("Will be removed last release version")]
[Obsolete("Will be removed in 1.8.0")]
public Bytes(byte[] bytes)
{
Get = bytes;
}

/// <summary>
/// Create a Bytes using the byte array.
/// </summary>
public Bytes(){}

/// <summary>
///
Expand Down Expand Up @@ -127,6 +132,10 @@ internal static Bytes Wrap(byte[] bytes)
public virtual int CompareTo(Bytes other)
{
BytesComparer comparer = new BytesComparer();
if (other == null || other.Get == null)
return 1;
if (Get == null)
return -1;
return comparer.Compare(this, other);
}
}
Expand Down
23 changes: 21 additions & 2 deletions core/Crosscutting/DictionaryExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using Confluent.Kafka;

namespace Streamiz.Kafka.Net.Crosscutting
{
Expand Down Expand Up @@ -32,6 +30,26 @@ public static bool AddOrUpdate<K, V>(this IDictionary<K, V> map, K key, V value)
return true;
}

/// <summary>
/// Add the element if the key doesn't exist
/// </summary>
/// <typeparam name="K">Key type</typeparam>
/// <typeparam name="V">Value type</typeparam>
/// <param name="map">Source dictionary</param>
/// <param name="key">New key</param>
/// <param name="value">Value</param>
/// <returns>Return true if the key|value was added, false otherwise</returns>
public static bool AddIfNotExist<K, V>(this IDictionary<K, V> map, K key, V value)
{
if (!map.ContainsKey(key))
{
map.Add(key, value);
return true;
}
return false;
}

#if NETSTANDARD2_0 || NET5_0 || NET6_0 || NET7_0
/// <summary>
/// Convert enumerable of <see cref="KeyValuePair{K, V}"/> to <see cref="IDictionary{K, V}"/>
/// </summary>
Expand All @@ -46,6 +64,7 @@ public static IDictionary<K, V> ToDictionary<K, V>(this IEnumerable<KeyValuePair
r.Add(s.Key, s.Value);
return r;
}
#endif


/// <summary>
Expand Down
33 changes: 19 additions & 14 deletions core/Crosscutting/SortedDictionaryExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,49 +1,54 @@
using System.Collections.Generic;
using System;
using System.Collections.Generic;

namespace Streamiz.Kafka.Net.Crosscutting
{
internal static class SortedDictionaryExtensions
{
internal static IEnumerable<KeyValuePair<K, V>> HeadMap<K, V>(this SortedDictionary<K, V> sortedDic, K key, bool inclusive)
internal static IEnumerable<KeyValuePair<K, V>> HeadMap<K, V>(this IEnumerable<KeyValuePair<K, V>> enumerable, K key, bool inclusive)
where K : IComparable<K>
{
foreach (K k in sortedDic.Keys) {
int r = sortedDic.Comparer.Compare(key, k);
foreach (var kv in enumerable)
{
int r = key.CompareTo(kv.Key);
if ((inclusive && r >= 0) || (!inclusive && r > 0))
yield return new KeyValuePair<K, V>(k, sortedDic[k]);
yield return new KeyValuePair<K, V>(kv.Key, kv.Value);
else
break;
}
}

internal static IEnumerable<KeyValuePair<K, V>> SubMap<K, V>(this SortedDictionary<K, V> sortedDic, K keyFrom, K keyTo , bool inclusiveFrom, bool inclusiveTo)
internal static IEnumerable<KeyValuePair<K, V>> SubMap<K, V>(this IEnumerable<KeyValuePair<K, V>> enumerable, K keyFrom, K keyTo , bool inclusiveFrom, bool inclusiveTo)
where K : IComparable<K>
{
foreach (K k in sortedDic.Keys)
foreach (var kv in enumerable)
{
int rF = sortedDic.Comparer.Compare(keyFrom, k);
int rT = sortedDic.Comparer.Compare(keyTo, k);
int rF = keyFrom.CompareTo(kv.Key);
int rT = keyTo.CompareTo(kv.Key);

if((inclusiveFrom && rF <= 0) || (!inclusiveFrom && rF < 0))
{
if ((inclusiveTo && rT >= 0) || (!inclusiveTo && rT > 0))
{
yield return new KeyValuePair<K, V>(k, sortedDic[k]);
yield return new KeyValuePair<K, V>(kv.Key, kv.Value);
}
else
break;
}
}
}

internal static IEnumerable<KeyValuePair<K, V>> TailMap<K, V>(this SortedDictionary<K, V> sortedDic, K keyFrom,
internal static IEnumerable<KeyValuePair<K, V>> TailMap<K, V>(this IEnumerable<KeyValuePair<K, V>> enumerable, K keyFrom,
bool inclusive)
where K : IComparable<K>
{
foreach (K k in sortedDic.Keys)
foreach (var kv in enumerable)
{
int rT = sortedDic.Comparer.Compare(keyFrom, k);
int rT = keyFrom.CompareTo(kv.Key);

if ((inclusive && rT <= 0) || (!inclusive && rT < 0))
{
yield return new KeyValuePair<K, V>(k, sortedDic[k]);
yield return new KeyValuePair<K, V>(kv.Key, kv.Value);
}
}
}
Expand Down
17 changes: 17 additions & 0 deletions core/Errors/StreamProducerException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using System;
using Confluent.Kafka;

namespace Streamiz.Kafka.Net.Errors
{
internal class StreamProducerException : Exception
{
public ProduceException<byte[], byte[]> OriginalProduceException { get; set; }
public ProductionExceptionHandlerResponse Response { get; set; }

public StreamProducerException(ProduceException<byte[], byte[]> originalProduceException, ProductionExceptionHandlerResponse response)
{
OriginalProduceException = originalProduceException;
Response = response;
}
}
}
Loading

0 comments on commit d9ee9ca

Please sign in to comment.