Skip to content

Commit

Permalink
refactor: adjust some namespaces in connector-csharp
Browse files Browse the repository at this point in the history
  • Loading branch information
VonDerBeck committed Feb 28, 2024
1 parent e5174e7 commit 617d200
Show file tree
Hide file tree
Showing 25 changed files with 125 additions and 86 deletions.
20 changes: 20 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ on:

jobs:
build:
name: Build Java
runs-on: ubuntu-latest
steps:
- name: Checkout
Expand All @@ -26,3 +27,22 @@ jobs:

- name: Run Maven
run: mvn -B clean verify com.mycila:license-maven-plugin:check
build-net:
name: Build .NET
runs-on: ubuntu-latest
timeout-minutes: 5
steps:
- name: Pull Zeebe
run: docker pull ghcr.io/camunda-community-hub/zeebe-with-redis-exporter:latest
- name: Pull Redis
run: docker pull redis:7-alpine
- name: Checkout
uses: actions/checkout@v4
- name: Setup
uses: actions/[email protected]
with:
dotnet-version: 8.x
- name: Build
run: dotnet build --configuration Release
- name: Test
run: dotnet test --configuration Release --no-build
1 change: 1 addition & 0 deletions .github/workflows/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ on:
- 'main'
paths-ignore:
- 'README.md'
- 'connector-csharp/**'
release:
types: [published]
jobs:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
using Io.Zeebe.Exporter.Proto;
using Microsoft.Extensions.DependencyInjection;
using Io.Zeebe.Redis.Connect.Csharp;
using NLog.Extensions.Hosting;
using Io.Zeebe.Redis.Connect.Csharp.Hosting;
using NLog.Extensions.Hosting;

using static PleaseWait.Dsl;
using static PleaseWait.TimeUnit;
Expand Down
9 changes: 5 additions & 4 deletions connector-csharp/zeebe-redis-connector/ZeebeRedis.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Io.Zeebe.Exporter.Proto;
using Io.Zeebe.Redis.Connect.Csharp.Consumer;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StackExchange.Redis;
Expand All @@ -7,8 +8,6 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using zeebe_redis_connector;
using zeebe_redis_connector.consumer;

namespace Io.Zeebe.Redis.Connect.Csharp
{
Expand All @@ -17,7 +16,7 @@ public class ZeebeRedis
private readonly ILogger? _logger;
private CancellationTokenSource? _cancellationTokenSource = null;
private readonly ConnectionMultiplexer _redisConnection;
private readonly String _consumerGroup;
private readonly string _consumerGroup;
private readonly bool _deleteConsumerGroupOnDispose = false;
private readonly IDatabase _database;
private readonly List<StreamPosition> _streamPositions = new List<StreamPosition>();
Expand All @@ -31,7 +30,7 @@ public class ZeebeRedis
// Constructors
//---------------------------------------------------------------------

public ZeebeRedis(ConnectionMultiplexer redisConnection, String? consumerGroup = null, ILoggerFactory? loggerFactory = null, int pollIntervalMillis = 500, bool closeRedisConnectionOnDispose = false)
public ZeebeRedis(ConnectionMultiplexer redisConnection, string? consumerGroup = null, ILoggerFactory? loggerFactory = null, int pollIntervalMillis = 500, bool closeRedisConnectionOnDispose = false)
{
_redisConnection = redisConnection ?? throw new ArgumentNullException(nameof(redisConnection));
_consumerGroup = consumerGroup ?? Guid.NewGuid().ToString();
Expand Down Expand Up @@ -247,8 +246,10 @@ public async Task StartConsumeEvents(CancellationTokenSource? cancellationTokenS
id = entry.Id;
var record = Record.Parser.ParseFrom(entry.Values.First().Value);

#pragma warning disable CS8600, CS8602
_consumer.TryGetValue(result.Key, out IRecordConsumer consumer);
consumer.Consume(record);
#pragma warning restore CS8600, CS8602
if (!string.IsNullOrEmpty(id))
{
_database.StreamAcknowledge(result.Key, _consumerGroup, id);
Expand Down
8 changes: 5 additions & 3 deletions connector-csharp/zeebe-redis-connector/ZeebeRedisOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ public class ZeebeRedisOptions
private string _redisConfigString = "localhost";
public virtual string RedisConfigString
{
#pragma warning disable CS8603 // Possible null reference return.
get { return GetEnvironmentVariable("REDIS_CONFIG_STRING", _redisConfigString); }
#pragma warning restore CS8603 // Possible null reference return.
set { _redisConfigString = value; }
}

private string _redisConsumerGroup = Guid.NewGuid().ToString();
public virtual string RedisConsumerGroup
private string? _redisConsumerGroup = null;
public virtual string? RedisConsumerGroup
{
get { return GetEnvironmentVariable("REDIS_CONSUMER_GROUP", _redisConsumerGroup); }
set { _redisConsumerGroup = value; }
Expand All @@ -39,7 +41,7 @@ public bool Validate()
return true;
}

public static string GetEnvironmentVariable(string name, string defaultValue)
public static string? GetEnvironmentVariable(string name, string? defaultValue)
=> Environment.GetEnvironmentVariable(name) is string v && v.Length > 0 ? v : defaultValue;

public static int GetEnvironmentVariable(string name, int defaultValue)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
using Io.Zeebe.Exporter.Proto;
using System;

namespace zeebe_redis_connector.consumer
namespace Io.Zeebe.Redis.Connect.Csharp.Consumer
{
public class DeploymentDistributionRecordConsumer : IRecordConsumer
{
public static String STREAM = "zeebe:DEPLOYMENT_DISTRIBUTION";
public static string STREAM = "zeebe:DEPLOYMENT_DISTRIBUTION";

private readonly Action<DeploymentDistributionRecord> _consumer;

public DeploymentDistributionRecordConsumer(Action<DeploymentDistributionRecord> action) {
this._consumer = action;
public DeploymentDistributionRecordConsumer(Action<DeploymentDistributionRecord> action)
{
_consumer = action;
}

public void Consume(Record record)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
using Io.Zeebe.Exporter.Proto;
using System;

namespace zeebe_redis_connector.consumer
namespace Io.Zeebe.Redis.Connect.Csharp.Consumer
{
public class DeploymentRecordConsumer : IRecordConsumer
{
public static String STREAM = "zeebe:DEPLOYMENT";
public static string STREAM = "zeebe:DEPLOYMENT";

private readonly Action<DeploymentRecord> _consumer;

public DeploymentRecordConsumer(Action<DeploymentRecord> action) {
this._consumer = action;
public DeploymentRecordConsumer(Action<DeploymentRecord> action)
{
_consumer = action;
}

public void Consume(Record record)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
using Io.Zeebe.Exporter.Proto;
using System;

namespace zeebe_redis_connector.consumer
namespace Io.Zeebe.Redis.Connect.Csharp.Consumer
{
public class ErrorRecordConsumer : IRecordConsumer
{
public static String STREAM = "zeebe:ERROR";
public static string STREAM = "zeebe:ERROR";

private readonly Action<ErrorRecord> _consumer;

public ErrorRecordConsumer(Action<ErrorRecord> action) {
this._consumer = action;
public ErrorRecordConsumer(Action<ErrorRecord> action)
{
_consumer = action;
}

public void Consume(Record record)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using Io.Zeebe.Exporter.Proto;

namespace zeebe_redis_connector.consumer
namespace Io.Zeebe.Redis.Connect.Csharp.Consumer
{
public interface IRecordConsumer
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
using Io.Zeebe.Exporter.Proto;
using System;

namespace zeebe_redis_connector.consumer
namespace Io.Zeebe.Redis.Connect.Csharp.Consumer
{
public class IncidentRecordConsumer : IRecordConsumer
{
public static String STREAM = "zeebe:INCIDENT";
public static string STREAM = "zeebe:INCIDENT";

private readonly Action<IncidentRecord> _consumer;

public IncidentRecordConsumer(Action<IncidentRecord> action) {
this._consumer = action;
public IncidentRecordConsumer(Action<IncidentRecord> action)
{
_consumer = action;
}

public void Consume(Record record)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
using Io.Zeebe.Exporter.Proto;
using System;

namespace zeebe_redis_connector.consumer
namespace Io.Zeebe.Redis.Connect.Csharp.Consumer
{
public class JobBatchRecordConsumer : IRecordConsumer
{
public static String STREAM = "zeebe:JOB_BATCH";
public static string STREAM = "zeebe:JOB_BATCH";

private readonly Action<JobBatchRecord> _consumer;

public JobBatchRecordConsumer(Action<JobBatchRecord> action) {
this._consumer = action;
public JobBatchRecordConsumer(Action<JobBatchRecord> action)
{
_consumer = action;
}

public void Consume(Record record)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
using Io.Zeebe.Exporter.Proto;
using System;

namespace zeebe_redis_connector.consumer
namespace Io.Zeebe.Redis.Connect.Csharp.Consumer
{
public class JobRecordConsumer : IRecordConsumer
{
public static String STREAM = "zeebe:JOB";
public static string STREAM = "zeebe:JOB";

private readonly Action<JobRecord> _consumer;

public JobRecordConsumer(Action<JobRecord> action) {
this._consumer = action;
public JobRecordConsumer(Action<JobRecord> action)
{
_consumer = action;
}

public void Consume(Record record)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
using Io.Zeebe.Exporter.Proto;
using System;

namespace zeebe_redis_connector.consumer
namespace Io.Zeebe.Redis.Connect.Csharp.Consumer
{
public class MessageRecordConsumer : IRecordConsumer
{
public static String STREAM = "zeebe:MESSAGE";
public static string STREAM = "zeebe:MESSAGE";

private readonly Action<MessageRecord> _consumer;

public MessageRecordConsumer(Action<MessageRecord> action) {
this._consumer = action;
public MessageRecordConsumer(Action<MessageRecord> action)
{
_consumer = action;
}

public void Consume(Record record)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
using Io.Zeebe.Exporter.Proto;
using System;

namespace zeebe_redis_connector.consumer
namespace Io.Zeebe.Redis.Connect.Csharp.Consumer
{
public class MessageStartEventSubscriptionRecordConsumer : IRecordConsumer
{
public static String STREAM = "zeebe:MESSAGE_START_EVENT_SUBSCRIPTION";
public static string STREAM = "zeebe:MESSAGE_START_EVENT_SUBSCRIPTION";

private readonly Action<MessageStartEventSubscriptionRecord> _consumer;

public MessageStartEventSubscriptionRecordConsumer(Action<MessageStartEventSubscriptionRecord> action) {
this._consumer = action;
public MessageStartEventSubscriptionRecordConsumer(Action<MessageStartEventSubscriptionRecord> action)
{
_consumer = action;
}

public void Consume(Record record)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
using Io.Zeebe.Exporter.Proto;
using System;

namespace zeebe_redis_connector.consumer
namespace Io.Zeebe.Redis.Connect.Csharp.Consumer
{
public class MessageSubscriptionRecordConsumer : IRecordConsumer
{
public static String STREAM = "zeebe:MESSAGE_SUBSCRIPTION";
public static string STREAM = "zeebe:MESSAGE_SUBSCRIPTION";

private readonly Action<MessageSubscriptionRecord> _consumer;

public MessageSubscriptionRecordConsumer(Action<MessageSubscriptionRecord> action) {
this._consumer = action;
public MessageSubscriptionRecordConsumer(Action<MessageSubscriptionRecord> action)
{
_consumer = action;
}

public void Consume(Record record)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
using Io.Zeebe.Exporter.Proto;
using System;

namespace zeebe_redis_connector.consumer
namespace Io.Zeebe.Redis.Connect.Csharp.Consumer
{
public class ProcessEventRecordConsumer : IRecordConsumer
{
public static String STREAM = "zeebe:PROCESS_EVENT";
public static string STREAM = "zeebe:PROCESS_EVENT";

private readonly Action<ProcessEventRecord> _consumer;

public ProcessEventRecordConsumer(Action<ProcessEventRecord> action) {
this._consumer = action;
public ProcessEventRecordConsumer(Action<ProcessEventRecord> action)
{
_consumer = action;
}

public void Consume(Record record)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
using Io.Zeebe.Exporter.Proto;
using System;

namespace zeebe_redis_connector.consumer
namespace Io.Zeebe.Redis.Connect.Csharp.Consumer
{
public class ProcessInstanceCreationRecordConsumer : IRecordConsumer
{
public static String STREAM = "zeebe:PROCESS_INSTANCE_CREATION";
public static string STREAM = "zeebe:PROCESS_INSTANCE_CREATION";

private readonly Action<ProcessInstanceCreationRecord> _consumer;

public ProcessInstanceCreationRecordConsumer(Action<ProcessInstanceCreationRecord> action) {
this._consumer = action;
public ProcessInstanceCreationRecordConsumer(Action<ProcessInstanceCreationRecord> action)
{
_consumer = action;
}

public void Consume(Record record)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
using Io.Zeebe.Exporter.Proto;
using System;

namespace zeebe_redis_connector.consumer
namespace Io.Zeebe.Redis.Connect.Csharp.Consumer
{
public class ProcessInstanceRecordConsumer : IRecordConsumer
{
public static String STREAM = "zeebe:PROCESS_INSTANCE";
public static string STREAM = "zeebe:PROCESS_INSTANCE";

private readonly Action<ProcessInstanceRecord> _consumer;

public ProcessInstanceRecordConsumer(Action<ProcessInstanceRecord> action) {
this._consumer = action;
public ProcessInstanceRecordConsumer(Action<ProcessInstanceRecord> action)
{
_consumer = action;
}

public void Consume(Record record)
Expand Down
Loading

0 comments on commit 617d200

Please sign in to comment.