Skip to content

Commit

Permalink
Merge pull request #358 from LGouellec/int/kafka-native
Browse files Browse the repository at this point in the history
Integration Test - Kafka native docker image
  • Loading branch information
LGouellec authored Aug 8, 2024
2 parents 81d3d49 + 4a70a76 commit dbea20d
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
<PackageIconUrl>https://raw.githubusercontent.com/LGouellec/kafka-streams-dotnet/master/resources/logo-kafka-stream-net.png</PackageIconUrl>
</PropertyGroup>

<ItemGroup>
<None Include="..\..\key.snk" Link="key.snk" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\core\Streamiz.Kafka.Net.csproj" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
using System.Security.Principal;
using System.Text;
using Docker.DotNet.Models;
using DotNet.Testcontainers.Builders;
using DotNet.Testcontainers.Configurations;
using Testcontainers.Kafka;

namespace Streamiz.Kafka.Net.IntegrationTests.Fixtures;

public class ApacheKafkaBuilder
: ContainerBuilder<ApacheKafkaBuilder, KafkaContainer, KafkaConfiguration>
{
public const string DEFAULT_IMAGE_NAME = $"apache/kafka:{DOCKER_VERSION}";
public const string APACHE_KAFKA_NATIVE_IMAGE_NAME = $"apache/kafka-native:{DOCKER_VERSION}";
public const string DOCKER_VERSION = "3.8.0";

public const string KafkaImage = DEFAULT_IMAGE_NAME;
public const ushort KafkaPort = 9092;
public const ushort BrokerPort = 9093;
public const string StartupScriptFilePath = "/testcontainers.sh";

private const string DEFAULT_CLUSTER_ID = "4L6g3nShT-eMCtK--X86sw";

/// <summary>
/// Initializes a new instance of the <see cref="KafkaBuilder" /> class.
/// </summary>
public ApacheKafkaBuilder()
: this(new KafkaConfiguration())
{
DockerResourceConfiguration = Init().DockerResourceConfiguration;
}

/// <summary>
/// Initializes a new instance of the <see cref="KafkaBuilder" /> class.
/// </summary>
/// <param name="resourceConfiguration">The Docker resource configuration.</param>
private ApacheKafkaBuilder(KafkaConfiguration resourceConfiguration)
: base(resourceConfiguration)
{
DockerResourceConfiguration = resourceConfiguration;
}

/// <inheritdoc />
protected override KafkaConfiguration DockerResourceConfiguration { get; }

/// <inheritdoc />
public override KafkaContainer Build()
{
Validate();
return new KafkaContainer(DockerResourceConfiguration);
}

/// <inheritdoc />
protected sealed override ApacheKafkaBuilder Init()
{
return base.Init()
.WithImage(KafkaImage)
.WithPortBinding(KafkaPort, true)
.WithPortBinding(BrokerPort, true)
.WithEnvironment("CLUSTER_ID", DEFAULT_CLUSTER_ID)
.WithEnvironment("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:" + KafkaPort + ",BROKER://0.0.0.0:" + BrokerPort + ",CONTROLLER://0.0.0.0:9094")
.WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT")
.WithEnvironment("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER")
.WithEnvironment("KAFKA_PROCESS_ROLES", "broker,controller")
.WithEnvironment("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER")
.WithEnvironment("KAFKA_NODE_ID", "1")
.WithEnvironment("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
.WithEnvironment("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1")
.WithEnvironment("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
.WithEnvironment("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
.WithEnvironment("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", long.MaxValue.ToString())
.WithEnvironment("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0")
.WithEnvironment("KAFKA_CONTROLLER_QUORUM_VOTERS", "1@localhost:9094")
.WithEntrypoint("/bin/sh", "-c")
.WithCommand("while [ ! -f " + StartupScriptFilePath + " ]; do sleep 0.1; done; " + StartupScriptFilePath)
.WithWaitStrategy(Wait.ForUnixContainer().UntilMessageIsLogged("\\[KafkaRaftServer nodeId=\\d+\\] Kafka Server started"))
.WithStartupCallback((container, ct) =>
{
const char lf = '\n';
var startupScript = new StringBuilder();
startupScript.Append("#!/bin/bash");
startupScript.Append(lf);
startupScript.Append("export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://" + container.Hostname + ":" + container.GetMappedPublicPort(KafkaPort) + ",BROKER://" + container.IpAddress + ":" + BrokerPort);
startupScript.Append(lf);
startupScript.Append("/etc/kafka/docker/run");
return container.CopyAsync(Encoding.Default.GetBytes(startupScript.ToString()), StartupScriptFilePath, Unix.FileMode755, ct);
});
}

/// <inheritdoc />
protected override ApacheKafkaBuilder Clone(IResourceConfiguration<CreateContainerParameters> resourceConfiguration)
{
return Merge(DockerResourceConfiguration, new KafkaConfiguration(resourceConfiguration));
}

/// <inheritdoc />
protected override ApacheKafkaBuilder Clone(IContainerConfiguration resourceConfiguration)
{
return Merge(DockerResourceConfiguration, new KafkaConfiguration(resourceConfiguration));
}

/// <inheritdoc />
protected override ApacheKafkaBuilder Merge(KafkaConfiguration oldValue, KafkaConfiguration newValue)
{
return new ApacheKafkaBuilder(new KafkaConfiguration(oldValue, newValue));
}
}
28 changes: 17 additions & 11 deletions test/Streamiz.Kafka.Net.IntegrationTests/Fixtures/KafkaFixture.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using Testcontainers.Kafka;
using Testcontainers;

namespace Streamiz.Kafka.Net.IntegrationTests.Fixtures
{
Expand All @@ -16,8 +17,8 @@ public class KafkaFixture

public KafkaFixture()
{
container = new KafkaBuilder()
.WithImage("registry.hub.docker.com/confluentinc/cp-kafka:7.4.0")
container = new ApacheKafkaBuilder()
.WithImage(ApacheKafkaBuilder.APACHE_KAFKA_NATIVE_IMAGE_NAME)
.WithPortBinding(9092)
.WithName("kafka-streamiz-integration-tests")
.Build();
Expand Down Expand Up @@ -148,15 +149,20 @@ internal void ProduceRandomData(string topic, int numberResult)

public async Task CreateTopic(string name, int partitions = 1)
{
await container.ExecAsync(new List<string>()
ClientConfig config = new ClientConfig();
config.BootstrapServers = BootstrapServers;
config.ClientId = "create-topic-client";
AdminClientBuilder builder = new AdminClientBuilder(config);

using IAdminClient client = builder.Build();
var metadata = client.GetMetadata(name, TimeSpan.FromSeconds(10));
if(metadata.Topics.Any(t => t.Topic.Contains(name)))
await client.DeleteTopicsAsync(new List<string> { name });
await client.CreateTopicsAsync(new List<TopicSpecification>{new TopicSpecification
{
"/bin/sh",
"-c",
$"/usr/bin/kafka-topics --create --bootstrap-server {container.IpAddress}:9092 " +
"--replication-factor 1 " +
$"--partitions {partitions} " +
$"--topic {name}"
});
Name = name,
NumPartitions = partitions
}});
}

public Task DisposeAsync() => container.DisposeAsync().AsTask();
Expand Down
3 changes: 0 additions & 3 deletions test/Streamiz.Kafka.Net.IntegrationTests/PerformanceTests.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using Confluent.Kafka;
using NUnit.Framework;
using RocksDbSharp;
using Streamiz.Kafka.Net.Crosscutting;
using Streamiz.Kafka.Net.IntegrationTests.Fixtures;
using Streamiz.Kafka.Net.Mock;
using Streamiz.Kafka.Net.SerDes;
using RocksDb = Streamiz.Kafka.Net.Table.RocksDb;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Testcontainers" Version="3.3.0" />
<PackageReference Include="Testcontainers" Version="3.9.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.7.1" />
<PackageReference Include="nunit" Version="3.13.2" />
<PackageReference Include="coverlet.collector" Version="3.1.2">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="NUnit3TestAdapter" Version="4.2.1" />
<PackageReference Include="Testcontainers.Kafka" Version="3.3.0" />
<PackageReference Include="Testcontainers.Kafka" Version="3.9.0" />
</ItemGroup>

<ItemGroup>
Expand Down

0 comments on commit dbea20d

Please sign in to comment.