Skip to content
This repository has been archived by the owner on Dec 12, 2022. It is now read-only.

Commit

Permalink
Merge pull request #78 from imburseag/AB1937-fix-batch-flushing
Browse files Browse the repository at this point in the history
AB1937 Re-structure code in line with confluent kafka and periodic batch sink documentation
  • Loading branch information
awithers-imburse authored Oct 12, 2021
2 parents 02b9117 + 0d957a8 commit a033a98
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 122 deletions.
174 changes: 54 additions & 120 deletions src/Serilog.Sinks.Kafka/KafkaSink.cs
Original file line number Diff line number Diff line change
@@ -1,89 +1,91 @@
using Confluent.Kafka;
using Serilog.Configuration;
using Serilog.Events;
using Serilog.Formatting;
using Serilog.Sinks.PeriodicBatching;
using System;
using System;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
using Confluent.Kafka;
using Serilog.Events;
using Serilog.Formatting;
using Serilog.Sinks.PeriodicBatching;

namespace Serilog.Sinks.Kafka
{
public class KafkaSink : PeriodicBatchingSink
public class KafkaSink : IBatchedLogEventSink
{
private TopicPartition topic;
private IProducer<Null, byte[]> producer;
private ITextFormatter formatter;
private const int FlushTimeoutSecs = 10;

private readonly TopicPartition _globalTopicPartition;
private readonly ITextFormatter _formatter;
private readonly Func<LogEvent, string> _topicDecider;
private IProducer<Null, byte[]> _producer;

public KafkaSink(
string bootstrapServers,
int batchSizeLimit,
int period,
SecurityProtocol securityProtocol,
SaslMechanism saslMechanism,
string topic,
string saslUsername,
string saslPassword,
string sslCaLocation,
ITextFormatter formatter = null) : base(batchSizeLimit, TimeSpan.FromSeconds(period))
string topic = null,
Func<LogEvent, string> topicDecider = null,
ITextFormatter formatter = null)
{
ConfigureKafkaConnection(bootstrapServers, securityProtocol, saslMechanism, saslUsername,
saslPassword, sslCaLocation);

this.formatter = formatter ?? new Formatting.Json.JsonFormatter(renderMessage: true);

this.topic = new TopicPartition(topic, Partition.Any);
}
ConfigureKafkaConnection(
bootstrapServers,
securityProtocol,
saslMechanism,
saslUsername,
saslPassword,
sslCaLocation);

public KafkaSink(
string bootstrapServers,
int batchSizeLimit,
int period,
SecurityProtocol securityProtocol,
SaslMechanism saslMechanism,
Func<LogEvent, string> topicDecider,
string saslUsername,
string saslPassword,
string sslCaLocation,
ITextFormatter formatter = null) : base(batchSizeLimit, TimeSpan.FromSeconds(period))
{
ConfigureKafkaConnection(bootstrapServers, securityProtocol, saslMechanism, saslUsername,
saslPassword, sslCaLocation);
_formatter = formatter ?? new Formatting.Json.JsonFormatter(renderMessage: true);

this.formatter = formatter ?? new Formatting.Json.JsonFormatter(renderMessage: true);
if (topic != null)
_globalTopicPartition = new TopicPartition(topic, Partition.Any);

this._topicDecider = topicDecider;
if (topicDecider != null)
_topicDecider = topicDecider;
}

protected override async Task EmitBatchAsync(IEnumerable<LogEvent> events)
{
var tasks = new List<Task>();
public Task OnEmptyBatchAsync() => Task.CompletedTask;

foreach (var logEvent in events)
public Task EmitBatchAsync(IEnumerable<LogEvent> batch)
{
foreach (var logEvent in batch)
{
Message<Null, byte[]> message;

var topicPartition = _topicDecider == null
? _globalTopicPartition
: new TopicPartition(_topicDecider(logEvent), Partition.Any);

using (var render = new StringWriter(CultureInfo.InvariantCulture))
{
formatter.Format(logEvent, render);
var message = new Message<Null, byte[]> { Value = Encoding.UTF8.GetBytes(render.ToString()) };
_formatter.Format(logEvent, render);

var kakfaTopicPartition = _topicDecider != null
? new TopicPartition(_topicDecider(logEvent), Partition.Any)
: topic;

tasks.Add(producer.ProduceAsync(kakfaTopicPartition, message));
message = new Message<Null, byte[]>
{
Value = Encoding.UTF8.GetBytes(render.ToString())
};
}

_producer.Produce(topicPartition, message);
}

await Task.WhenAll(tasks);
_producer.Flush(TimeSpan.FromSeconds(FlushTimeoutSecs));

return Task.CompletedTask;
}

private void ConfigureKafkaConnection(string bootstrapServers, SecurityProtocol securityProtocol,
SaslMechanism saslMechanism, string saslUsername, string saslPassword, string sslCaLocation)
private void ConfigureKafkaConnection(
string bootstrapServers,
SecurityProtocol securityProtocol,
SaslMechanism saslMechanism,
string saslUsername,
string saslPassword,
string sslCaLocation)
{
var config = new ProducerConfig()
.SetValue("ApiVersionFallbackMs", 0)
Expand All @@ -99,76 +101,8 @@ private void ConfigureKafkaConnection(string bootstrapServers, SecurityProtocol
.SetValue("SaslUsername", saslUsername)
.SetValue("SaslPassword", saslPassword);

producer = new ProducerBuilder<Null, byte[]>(config)
_producer = new ProducerBuilder<Null, byte[]>(config)
.Build();
}
}

public static class LoggerConfigurationKafkaExtensions
{
/// <summary>
/// Adds a sink that writes log events to a Kafka topic in the broker endpoints.
/// </summary>
/// <param name="loggerConfiguration">The logger configuration.</param>
/// <param name="batchSizeLimit">The maximum number of events to include in a single batch.</param>
/// <param name="period">The time in seconds to wait between checking for event batches.</param>
/// <param name="bootstrapServers">The list of bootstrapServers separated by comma.</param>
/// <param name="topic">The topic name.</param>
/// <returns></returns>
public static LoggerConfiguration Kafka(
this LoggerSinkConfiguration loggerConfiguration,
string bootstrapServers = "localhost:9092",
int batchSizeLimit = 50,
int period = 5,
SecurityProtocol securityProtocol = SecurityProtocol.Plaintext,
SaslMechanism saslMechanism = SaslMechanism.Plain,
string topic = "logs",
string saslUsername = null,
string saslPassword = null,
string sslCaLocation = null,
ITextFormatter formatter = null)
{
var sink = new KafkaSink(
bootstrapServers,
batchSizeLimit,
period,
securityProtocol,
saslMechanism,
topic,
saslUsername,
saslPassword,
sslCaLocation,
formatter);

return loggerConfiguration.Sink(sink);
}

public static LoggerConfiguration Kafka(
this LoggerSinkConfiguration loggerConfiguration,
Func<LogEvent, string> topicDecider,
string bootstrapServers = "localhost:9092",
int batchSizeLimit = 50,
int period = 5,
SecurityProtocol securityProtocol = SecurityProtocol.Plaintext,
SaslMechanism saslMechanism = SaslMechanism.Plain,
string saslUsername = null,
string saslPassword = null,
string sslCaLocation = null,
ITextFormatter formatter = null)
{
var sink = new KafkaSink(
bootstrapServers,
batchSizeLimit,
period,
securityProtocol,
saslMechanism,
topicDecider,
saslUsername,
saslPassword,
sslCaLocation,
formatter);

return loggerConfiguration.Sink(sink);
}
}
}
114 changes: 114 additions & 0 deletions src/Serilog.Sinks.Kafka/LoggerConfigurationExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
using System;
using Confluent.Kafka;
using Serilog.Configuration;
using Serilog.Events;
using Serilog.Formatting;
using Serilog.Sinks.PeriodicBatching;

namespace Serilog.Sinks.Kafka
{
public static class LoggerConfigurationExtensions
{
/// <summary>
/// Adds a sink that writes log events to a Kafka topic in the broker endpoints.
/// </summary>
/// <param name="loggerConfiguration">The logger configuration.</param>
/// <param name="batchSizeLimit">The maximum number of events to include in a single batch.</param>
/// <param name="period">The time in seconds to wait between checking for event batches.</param>
/// <param name="bootstrapServers">The list of bootstrapServers separated by comma.</param>
/// <param name="topic">The topic name.</param>
/// <returns></returns>
public static LoggerConfiguration Kafka(
this LoggerSinkConfiguration loggerConfiguration,
string bootstrapServers = "localhost:9092",
int batchSizeLimit = 50,
int period = 5,
SecurityProtocol securityProtocol = SecurityProtocol.Plaintext,
SaslMechanism saslMechanism = SaslMechanism.Plain,
string topic = "logs",
string saslUsername = null,
string saslPassword = null,
string sslCaLocation = null,
ITextFormatter formatter = null)
{
return loggerConfiguration.Kafka(
bootstrapServers,
batchSizeLimit,
period,
securityProtocol,
saslMechanism,
saslUsername,
saslPassword,
sslCaLocation,
topic,
topicDecider: null,
formatter);
}

public static LoggerConfiguration Kafka(
this LoggerSinkConfiguration loggerConfiguration,
Func<LogEvent, string> topicDecider,
string bootstrapServers = "localhost:9092",
int batchSizeLimit = 50,
int period = 5,
SecurityProtocol securityProtocol = SecurityProtocol.Plaintext,
SaslMechanism saslMechanism = SaslMechanism.Plain,
string saslUsername = null,
string saslPassword = null,
string sslCaLocation = null,
ITextFormatter formatter = null)
{
return loggerConfiguration.Kafka(
bootstrapServers,
batchSizeLimit,
period,
securityProtocol,
saslMechanism,
saslUsername,
saslPassword,
sslCaLocation,
topic: null,
topicDecider,
formatter);
}

private static LoggerConfiguration Kafka(
this LoggerSinkConfiguration loggerConfiguration,
string bootstrapServers,
int batchSizeLimit,
int period,
SecurityProtocol securityProtocol,
SaslMechanism saslMechanism,
string saslUsername,
string saslPassword,
string sslCaLocation,
string topic,
Func<LogEvent, string> topicDecider,
ITextFormatter formatter)
{
var kafkaSink = new KafkaSink(
bootstrapServers,
securityProtocol,
saslMechanism,
saslUsername,
saslPassword,
sslCaLocation,
topic,
topicDecider,
formatter);

var batchingOptions = new PeriodicBatchingSinkOptions
{
BatchSizeLimit = batchSizeLimit,
Period = TimeSpan.FromSeconds(period)
};

var batchingSink = new PeriodicBatchingSink(
kafkaSink,
batchingOptions);

return loggerConfiguration
.Sink(batchingSink);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

namespace Serilog.Sinks.Kafka
{
public static class ExtensionMethods
public static class ProducerConfigExtensions
{
private const string SerilogEnvVar = "SERILOG__KAFKA__";

Expand Down Expand Up @@ -40,7 +40,7 @@ public static ProducerConfig SetValue(this ProducerConfig config, string key, ob
return config;
}

public static void SetValues(object obj, string propertyName, string stringValue)
private static void SetValues(object obj, string propertyName, string stringValue)
{
if (string.IsNullOrEmpty(stringValue))
return;
Expand Down

0 comments on commit a033a98

Please sign in to comment.