From 9426b801e5a640523dae7690a39626228ae209a6 Mon Sep 17 00:00:00 2001 From: Florian Hussonnois Date: Tue, 9 Jan 2024 00:25:28 +0100 Subject: [PATCH] feat: add new tasks for Azure EventHubs (#38) This commit adds the following tasks types: * io.kesta.plugin.azure.messaging.eventhubs.Produce * io.kesta.plugin.azure.messaging.eventhubs.Consume * io.kesta.plugin.azure.messaging.eventhubs.Trigger --- build.gradle | 6 +- ...terface.java => AzureClientInterface.java} | 10 +- .../azure/client/AzureClientConfig.java | 52 ++++ .../azure/eventhubs/AbstractEventHubTask.java | 33 +++ .../BlobContainerClientInterface.java | 26 ++ .../plugin/azure/eventhubs/Consume.java | 236 ++++++++++++++++ .../eventhubs/EventHubClientInterface.java | 43 +++ .../eventhubs/EventHubConsumerInterface.java | 75 ++++++ .../plugin/azure/eventhubs/Produce.java | 255 ++++++++++++++++++ .../plugin/azure/eventhubs/Trigger.java | 149 ++++++++++ .../client/EventHubClientFactory.java | 183 +++++++++++++ .../config/BlobContainerClientConfig.java | 27 ++ .../config/EventHubClientConfig.java | 45 ++++ .../config/EventHubConsumerConfig.java | 38 +++ .../internal/InputStreamProvider.java | 47 ++++ .../eventhubs/model/EventDataObject.java | 47 ++++ .../plugin/azure/eventhubs/package-info.java | 7 + .../eventhubs/serdes/ByteArraySerde.java | 35 +++ .../azure/eventhubs/serdes/IonSerde.java | 49 ++++ .../azure/eventhubs/serdes/JsonSerde.java | 48 ++++ .../plugin/azure/eventhubs/serdes/Serde.java | 33 +++ .../plugin/azure/eventhubs/serdes/Serdes.java | 40 +++ .../azure/eventhubs/serdes/StringSerde.java | 54 ++++ .../service/EventDataObjectConverter.java | 74 +++++ .../service/consumer/ConsumerContext.java | 19 ++ .../consumer/EventHubConsumerService.java | 215 +++++++++++++++ .../consumer/EventHubNamePartition.java | 6 + .../consumer/EventPositionStrategy.java | 74 +++++ .../service/consumer/StartingPosition.java | 6 + .../producer/EventDataBatchFactory.java | 36 +++ .../producer/EventHubProducerService.java | 148 ++++++++++ .../service/producer/ProducerContext.java | 19 ++ .../storage/abstracts/AbstractStorage.java | 3 +- .../plugin/azure/storage/blob/Trigger.java | 5 +- .../blob/abstracts/AbstractBlobStorage.java | 4 +- .../storage/blob/services/BlobService.java | 4 +- .../io.kestra.plugin.azure.eventhubs.svg | 1 + .../plugin/azure/eventhubs/ConsumeTest.java | 84 ++++++ .../plugin/azure/eventhubs/ProduceTest.java | 107 ++++++++ .../plugin/azure/eventhubs/TriggerTest.java | 120 +++++++++ .../azure/eventhubs/serdes/JsonSerdeTest.java | 30 +++ .../eventhubs/serdes/StringSerdeTest.java | 22 ++ .../service/EventDataObjectConverterTest.java | 36 +++ .../consumer/EventPositionStrategyTest.java | 31 +++ .../producer/EventHubProducerServiceTest.java | 173 ++++++++++++ src/test/resources/application.yml | 7 +- .../resources/flows/eventshubs-trigger.yaml | 19 ++ 47 files changed, 2769 insertions(+), 12 deletions(-) rename src/main/java/io/kestra/plugin/azure/{storage/abstracts/AbstractStorageInterface.java => AzureClientInterface.java} (78%) create mode 100644 src/main/java/io/kestra/plugin/azure/client/AzureClientConfig.java create mode 100644 src/main/java/io/kestra/plugin/azure/eventhubs/AbstractEventHubTask.java create mode 100644 src/main/java/io/kestra/plugin/azure/eventhubs/BlobContainerClientInterface.java create mode 100644 src/main/java/io/kestra/plugin/azure/eventhubs/Consume.java create mode 100644 src/main/java/io/kestra/plugin/azure/eventhubs/EventHubClientInterface.java create mode 100644 src/main/java/io/kestra/plugin/azure/eventhubs/EventHubConsumerInterface.java create mode 100644 src/main/java/io/kestra/plugin/azure/eventhubs/Produce.java create mode 100644 src/main/java/io/kestra/plugin/azure/eventhubs/Trigger.java create mode 100644 src/main/java/io/kestra/plugin/azure/eventhubs/client/EventHubClientFactory.java create mode 100644 src/main/java/io/kestra/plugin/azure/eventhubs/config/BlobContainerClientConfig.java create mode 100644 src/main/java/io/kestra/plugin/azure/eventhubs/config/EventHubClientConfig.java create mode 100644 src/main/java/io/kestra/plugin/azure/eventhubs/config/EventHubConsumerConfig.java create mode 100644 src/main/java/io/kestra/plugin/azure/eventhubs/internal/InputStreamProvider.java create mode 100644 src/main/java/io/kestra/plugin/azure/eventhubs/model/EventDataObject.java create mode 100644 src/main/java/io/kestra/plugin/azure/eventhubs/package-info.java create mode 100644 src/main/java/io/kestra/plugin/azure/eventhubs/serdes/ByteArraySerde.java create mode 100644 src/main/java/io/kestra/plugin/azure/eventhubs/serdes/IonSerde.java create mode 100644 src/main/java/io/kestra/plugin/azure/eventhubs/serdes/JsonSerde.java create mode 100644 src/main/java/io/kestra/plugin/azure/eventhubs/serdes/Serde.java create mode 100644 src/main/java/io/kestra/plugin/azure/eventhubs/serdes/Serdes.java create mode 100644 src/main/java/io/kestra/plugin/azure/eventhubs/serdes/StringSerde.java create mode 100644 src/main/java/io/kestra/plugin/azure/eventhubs/service/EventDataObjectConverter.java create mode 100644 src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/ConsumerContext.java create mode 100644 src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/EventHubConsumerService.java create mode 100644 src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/EventHubNamePartition.java create mode 100644 src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/EventPositionStrategy.java create mode 100644 src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/StartingPosition.java create mode 100644 src/main/java/io/kestra/plugin/azure/eventhubs/service/producer/EventDataBatchFactory.java create mode 100644 src/main/java/io/kestra/plugin/azure/eventhubs/service/producer/EventHubProducerService.java create mode 100644 src/main/java/io/kestra/plugin/azure/eventhubs/service/producer/ProducerContext.java create mode 100644 src/main/resources/icons/io.kestra.plugin.azure.eventhubs.svg create mode 100644 src/test/java/io/kestra/plugin/azure/eventhubs/ConsumeTest.java create mode 100644 src/test/java/io/kestra/plugin/azure/eventhubs/ProduceTest.java create mode 100644 src/test/java/io/kestra/plugin/azure/eventhubs/TriggerTest.java create mode 100644 src/test/java/io/kestra/plugin/azure/eventhubs/serdes/JsonSerdeTest.java create mode 100644 src/test/java/io/kestra/plugin/azure/eventhubs/serdes/StringSerdeTest.java create mode 100644 src/test/java/io/kestra/plugin/azure/eventhubs/service/EventDataObjectConverterTest.java create mode 100644 src/test/java/io/kestra/plugin/azure/eventhubs/service/consumer/EventPositionStrategyTest.java create mode 100644 src/test/java/io/kestra/plugin/azure/eventhubs/service/producer/EventHubProducerServiceTest.java create mode 100644 src/test/resources/flows/eventshubs-trigger.yaml diff --git a/build.gradle b/build.gradle index 06c733b..7718050 100644 --- a/build.gradle +++ b/build.gradle @@ -55,12 +55,14 @@ dependencies { implementation group: "io.kestra.plugin", name: "plugin-script", version: kestraVersion // azure - api platform("com.azure:azure-sdk-bom:1.2.12") + api platform("com.azure:azure-sdk-bom:1.2.19") api group: 'com.azure', name: 'azure-identity' api group: 'com.azure', name: 'azure-storage-blob' api group: 'com.azure', name: 'azure-data-tables' api group: 'com.microsoft.azure', name: 'azure-batch', version: '10.1.0' api group: 'com.microsoft.azure', name: 'azure-storage', version: '8.6.6' + api group: 'com.azure', name: 'azure-messaging-eventhubs-checkpointstore-blob' + api group: 'com.azure', name: 'azure-messaging-eventhubs' } @@ -96,6 +98,8 @@ dependencies { testImplementation "org.junit.jupiter:junit-jupiter-engine" testImplementation "org.hamcrest:hamcrest:2.2" testImplementation "org.hamcrest:hamcrest-library:2.2" + testImplementation "org.mockito:mockito-core:5.8.0" + testImplementation "org.mockito:mockito-junit-jupiter:5.8.0" } /**********************************************************************************************************************\ diff --git a/src/main/java/io/kestra/plugin/azure/storage/abstracts/AbstractStorageInterface.java b/src/main/java/io/kestra/plugin/azure/AzureClientInterface.java similarity index 78% rename from src/main/java/io/kestra/plugin/azure/storage/abstracts/AbstractStorageInterface.java rename to src/main/java/io/kestra/plugin/azure/AzureClientInterface.java index cde3681..56630c8 100644 --- a/src/main/java/io/kestra/plugin/azure/storage/abstracts/AbstractStorageInterface.java +++ b/src/main/java/io/kestra/plugin/azure/AzureClientInterface.java @@ -1,9 +1,13 @@ -package io.kestra.plugin.azure.storage.abstracts; +package io.kestra.plugin.azure; import io.kestra.core.models.annotations.PluginProperty; import io.swagger.v3.oas.annotations.media.Schema; -public interface AbstractStorageInterface { +/** + * Top-level interface that can be used by plugins to retrieve + * required configuration properties in order to establish connection to Azure services. + */ +public interface AzureClientInterface { @Schema( title = "Connection string of the Storage Account." ) @@ -16,7 +20,6 @@ public interface AbstractStorageInterface { @PluginProperty(dynamic = true) String getSharedKeyAccountName(); - @Schema( title = "Shared Key access key for authenticating requests." ) @@ -29,4 +32,5 @@ public interface AbstractStorageInterface { ) @PluginProperty(dynamic = true) String getSasToken(); + } diff --git a/src/main/java/io/kestra/plugin/azure/client/AzureClientConfig.java b/src/main/java/io/kestra/plugin/azure/client/AzureClientConfig.java new file mode 100644 index 0000000..72c6c82 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/client/AzureClientConfig.java @@ -0,0 +1,52 @@ +package io.kestra.plugin.azure.client; + +import io.kestra.core.exceptions.IllegalVariableEvaluationException; +import io.kestra.core.runners.RunContext; +import io.kestra.plugin.azure.AzureClientInterface; + +import java.util.Optional; +import java.util.function.Supplier; + +import static io.kestra.core.utils.Rethrow.throwFunction; + +/** + * Configuration for creating a new Azure Client. + */ +public class AzureClientConfig { + + protected final RunContext runContext; + protected final T plugin; + + /** + * Creates a new {@link AzureClientConfig} instance. + * + * @param runContext The context. + * @param plugin The plugin. + */ + public AzureClientConfig(final RunContext runContext, + final T plugin) { + this.runContext = runContext; + this.plugin = plugin; + } + + public Optional connectionString() throws IllegalVariableEvaluationException { + return getOptionalConfig(plugin::getConnectionString); + } + + public Optional sharedKeyAccountName() throws IllegalVariableEvaluationException { + return getOptionalConfig(plugin::getSharedKeyAccountName); + } + + public Optional sharedKeyAccountAccessKey() throws IllegalVariableEvaluationException { + return getOptionalConfig(plugin::getSharedKeyAccountAccessKey); + } + + public Optional sasToken() throws IllegalVariableEvaluationException { + return getOptionalConfig(plugin::getSasToken); + } + + protected Optional getOptionalConfig(final Supplier supplier) throws IllegalVariableEvaluationException { + return Optional.ofNullable(supplier.get()).map(throwFunction(runContext::render)); + } + +} diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/AbstractEventHubTask.java b/src/main/java/io/kestra/plugin/azure/eventhubs/AbstractEventHubTask.java new file mode 100644 index 0000000..c901fcb --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/AbstractEventHubTask.java @@ -0,0 +1,33 @@ +package io.kestra.plugin.azure.eventhubs; + +import io.kestra.core.models.tasks.Task; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +@NoArgsConstructor +@SuperBuilder +@Getter +public class AbstractEventHubTask extends Task implements EventHubClientInterface { + + private String connectionString; + + private String sharedKeyAccountName; + + private String sharedKeyAccountAccessKey; + + private String sasToken; + + @Builder.Default + private Integer clientMaxRetries = 5; + + @Builder.Default + private Long clientRetryDelay = 500L; + + private String namespace; + + private String eventHubName; + + private String customEndpointAddress; +} diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/BlobContainerClientInterface.java b/src/main/java/io/kestra/plugin/azure/eventhubs/BlobContainerClientInterface.java new file mode 100644 index 0000000..d98a1a4 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/BlobContainerClientInterface.java @@ -0,0 +1,26 @@ +package io.kestra.plugin.azure.eventhubs; + +import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.plugin.azure.AzureClientInterface; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Getter; +import lombok.experimental.SuperBuilder; + +/** + * This class is suffixed 'Interface' as it used to capture parameters from task properties. + */ +@SuperBuilder +@Getter +public final class BlobContainerClientInterface implements AzureClientInterface { + + private String connectionString; + private String sharedKeyAccountName; + private String sharedKeyAccountAccessKey; + private String sasToken; + @Schema( + title = "The blob container name." + ) + @PluginProperty(dynamic = true) + private String containerName; +} + diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/Consume.java b/src/main/java/io/kestra/plugin/azure/eventhubs/Consume.java new file mode 100644 index 0000000..d4123e8 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/Consume.java @@ -0,0 +1,236 @@ +package io.kestra.plugin.azure.eventhubs; + +import com.azure.messaging.eventhubs.CheckpointStore; +import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore; +import com.azure.messaging.eventhubs.models.PartitionContext; +import com.azure.storage.blob.BlobContainerAsyncClient; +import io.kestra.core.exceptions.IllegalVariableEvaluationException; +import io.kestra.core.models.annotations.Example; +import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.models.executions.metrics.Counter; +import io.kestra.core.models.tasks.RunnableTask; +import io.kestra.core.runners.RunContext; +import io.kestra.core.serializers.FileSerde; +import io.kestra.plugin.azure.eventhubs.client.EventHubClientFactory; +import io.kestra.plugin.azure.eventhubs.config.BlobContainerClientConfig; +import io.kestra.plugin.azure.eventhubs.config.EventHubConsumerConfig; +import io.kestra.plugin.azure.eventhubs.model.EventDataObject; +import io.kestra.plugin.azure.eventhubs.serdes.Serde; +import io.kestra.plugin.azure.eventhubs.serdes.Serdes; +import io.kestra.plugin.azure.eventhubs.service.EventDataObjectConverter; +import io.kestra.plugin.azure.eventhubs.service.consumer.ConsumerContext; +import io.kestra.plugin.azure.eventhubs.service.consumer.EventHubConsumerService; +import io.kestra.plugin.azure.eventhubs.service.consumer.EventHubNamePartition; +import io.kestra.plugin.azure.eventhubs.service.consumer.StartingPosition; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.AccessLevel; +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.ToString; +import lombok.experimental.SuperBuilder; +import lombok.extern.slf4j.Slf4j; +import org.slf4j.Logger; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.net.URI; +import java.time.Duration; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +/** + * The {@link RunnableTask} can be used for consuming batches of events from Azure Event Hubs. + */ +@Plugin(examples = { + @Example( + title = "Consume data events from Azure EventHubs.", + full = true, + code = { + """ + id: ConsumeDataEventsFromAzureEventHubs + namespace: company.team + tasks: + - id: consumeFromEventHubs + type: io.kestra.plugin.azure.eventhubs.Consume + eventHubName: my-eventhub + connectionString:"{{ secret('EVENTHUBS_CONNECTION') }}" + bodyDeserializer: STRING + consumerGroup: "$Default" + checkpointStoreProperties: + containerName: kestra + connectionString: "{{ secret('BLOB_CONNECTION') }}" + """ + } + ) +}) +@Schema( + title = "Consume events from Azure Event Hubs." +) +@Slf4j +@SuperBuilder +@NoArgsConstructor +@Getter +@ToString +@EqualsAndHashCode +public class Consume extends AbstractEventHubTask implements EventHubConsumerInterface, RunnableTask { + // TASK'S PARAMETERS + @Builder.Default + private Serdes bodyDeserializer = Serdes.STRING; + + @Builder.Default + private Map bodyDeserializerProperties = Collections.emptyMap(); + + @Builder.Default + private String consumerGroup = "$Default"; + + @Builder.Default + private StartingPosition partitionStartingPosition = StartingPosition.EARLIEST; + + private String EnqueueTime; + + @Builder.Default + private Integer maxBatchSizePerPartition = 50; + + @Builder.Default + private Duration maxWaitTimePerPartition = Duration.ofSeconds(5); + + @Builder.Default + private Duration maxDuration = Duration.ofSeconds(10); + + @Builder.Default + private Map checkpointStoreProperties = Collections.emptyMap(); + + // SERVICES + @Getter(AccessLevel.NONE) + @Builder.Default + private final EventHubClientFactory clientFactory = new EventHubClientFactory(); + + /** + * {@inheritDoc} + **/ + @Override + public Output run(RunContext runContext) throws Exception { + return run(runContext, this); + } + + /** + * Runs the consumer task using the specified context and plugin interface. + * + * @param runContext The context. + * @param task The plugin interface. + * @return The output. + * @throws Exception if something wrong happens. + */ + Output run(RunContext runContext, EventHubConsumerInterface task) throws Exception { + + final EventHubConsumerConfig config = new EventHubConsumerConfig(runContext, task); + + // Create converter + Serdes serdes = task.getBodyDeserializer(); + Serde serde = serdes.create(task.getBodyDeserializerProperties()); + EventDataObjectConverter converter = new EventDataObjectConverter(serde); + + final EventHubConsumerService service = new EventHubConsumerService( + clientFactory, + config, + converter, + getBlobCheckpointStore(runContext, task, clientFactory) + ); + + File tempFile = runContext.tempFile(".ion").toFile(); + try ( + BufferedOutputStream output = new BufferedOutputStream(new FileOutputStream(tempFile)) + ) { + + final AtomicReference uri = new AtomicReference<>(); + + Logger contextLogger = runContext.logger(); + + final ConsumerContext consumerContext = new ConsumerContext( + task.getMaxBatchSizePerPartition(), + task.getMaxWaitTimePerPartition(), + task.getMaxDuration(), + contextLogger + ); + + Map result = service.poll( + consumerContext, + new EventHubConsumerService.EventProcessorListener() { + @Override + public void onEvent(EventDataObject event, PartitionContext context) throws Exception { + if (contextLogger.isTraceEnabled()) { + contextLogger.trace( + "Received new event from eventHub {} and partitionId={} [offset={}, sequenceId={}]", + context.getEventHubName(), + context.getPartitionId(), + event.offset(), + event.sequenceNumber() + ); + } + FileSerde.write(output, event); + } + + @Override + public void onStop() throws Exception { + output.flush(); + output.close(); + contextLogger.debug("Copying data to storage."); + uri.set(runContext.putTempFile(tempFile)); + contextLogger.debug("Copy on storage completed."); + + } + }); + + int numEvents = result.entrySet().stream() + .peek(entry -> { + Counter counter = Counter.of( + "records", + entry.getValue(), + "eventHubName", + entry.getKey().eventHubName(), + "partitionId", + entry.getKey().partitionId() + ); + runContext.metric(counter); + }) + .map(Map.Entry::getValue) + .reduce(Integer::sum) + .orElse(0); + + return new Output(numEvents, uri.get()); + } + } + + private CheckpointStore getBlobCheckpointStore(final RunContext runContext, + final EventHubConsumerInterface pluginConfig, + final EventHubClientFactory factory) throws IllegalVariableEvaluationException { + BlobContainerClientInterface config = BlobContainerClientInterface.builder() + .containerName(pluginConfig.getCheckpointStoreProperties().get("containerName")) + .connectionString(pluginConfig.getCheckpointStoreProperties().get("connectionString")) + .sharedKeyAccountAccessKey(pluginConfig.getCheckpointStoreProperties().get("sharedKeyAccountAccessKey")) + .sharedKeyAccountName(pluginConfig.getCheckpointStoreProperties().get("sharedKeyAccountName")) + .build(); + BlobContainerAsyncClient client = factory.createBlobContainerAsyncClient( + new BlobContainerClientConfig(runContext, config) + ); + return new BlobCheckpointStore(client); + } + + @Builder + @Getter + public static class Output implements io.kestra.core.models.tasks.Output { + @Schema( + title = "Number of events consumed from Azure Event Hubs." + ) + private final Integer eventsCount; + + @Schema( + title = "URI of a kestra internal storage file containing the messages." + ) + private URI uri; + } +} diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/EventHubClientInterface.java b/src/main/java/io/kestra/plugin/azure/eventhubs/EventHubClientInterface.java new file mode 100644 index 0000000..f3f49ee --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/EventHubClientInterface.java @@ -0,0 +1,43 @@ +package io.kestra.plugin.azure.eventhubs; + +import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.plugin.azure.AzureClientInterface; +import io.swagger.v3.oas.annotations.media.Schema; + +import javax.validation.constraints.NotNull; + +public interface EventHubClientInterface extends AzureClientInterface { + + @Schema( + title = "Custom endpoint address when connecting to the Event Hubs service." + ) + @PluginProperty(dynamic = true) + String getCustomEndpointAddress(); + + @NotNull + @Schema( + title = "Namespace name of the event hub to connect to." + ) + @PluginProperty(dynamic = true) + String getNamespace(); + + @NotNull + @Schema( + title = "The event hub to read from." + ) + @PluginProperty(dynamic = true) + String getEventHubName(); + + @Schema( + title = "The maximum number of retry attempts before considering a client operation to have failed." + ) + @PluginProperty + Integer getClientMaxRetries(); + + @Schema( + title = "The maximum permissible delay between retry attempts in milliseconds." + ) + @PluginProperty + Long getClientRetryDelay(); +} + diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/EventHubConsumerInterface.java b/src/main/java/io/kestra/plugin/azure/eventhubs/EventHubConsumerInterface.java new file mode 100644 index 0000000..e071cce --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/EventHubConsumerInterface.java @@ -0,0 +1,75 @@ +package io.kestra.plugin.azure.eventhubs; + +import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.plugin.azure.eventhubs.serdes.Serdes; +import io.kestra.plugin.azure.eventhubs.service.consumer.StartingPosition; +import io.swagger.v3.oas.annotations.media.Schema; + +import java.time.Duration; +import java.util.Map; + +/** + * Base class for implementing tasks that consume events into EventHubs. + * This class provides all required and optional parameters. + */ +public interface EventHubConsumerInterface extends EventHubClientInterface { + + // TASK'S PARAMETERS + @Schema( + title = "The Deserializer to be used for serializing the event value." + ) + @PluginProperty + Serdes getBodyDeserializer(); + + @Schema( + title = "The config properties to be passed to the Deserializer.", + description = "Configs in key/value pairs." + ) + @PluginProperty + Map getBodyDeserializerProperties(); + + @Schema( + title = "The consumer group." + ) + @PluginProperty + String getConsumerGroup(); + + @Schema( + title = "The starting position." + ) + @PluginProperty + StartingPosition getPartitionStartingPosition(); + + @Schema( + title = "The ISO Datetime to be used when `PartitionStartingPosition` is configured to `INSTANT`.", + description = "Configs in key/value pairs." + ) + @PluginProperty + String getEnqueueTime(); + + @Schema( + title = "The maximum number of events to consume per event hub partition per poll." + ) + @PluginProperty + Integer getMaxBatchSizePerPartition(); + + @Schema( + title = "The max time duration to wait to receive a batch of events up to the `maxBatchSizePerPartition`." + ) + @PluginProperty + Duration getMaxWaitTimePerPartition(); + + @Schema( + title = "The max time duration to wait to receive events from all partitions." + ) + @PluginProperty + Duration getMaxDuration(); + + @Schema( + title = "The config properties to be used for configuring the BlobCheckpointStore.", + description = "Azure Event Hubs Checkpoint Store can be used for storing checkpoints while processing events from Azure Event Hubs." + ) + @PluginProperty + Map getCheckpointStoreProperties(); + +} diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/Produce.java b/src/main/java/io/kestra/plugin/azure/eventhubs/Produce.java new file mode 100644 index 0000000..70c4ab3 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/Produce.java @@ -0,0 +1,255 @@ +package io.kestra.plugin.azure.eventhubs; + +import com.azure.messaging.eventhubs.models.CreateBatchOptions; +import io.kestra.core.exceptions.IllegalVariableEvaluationException; +import io.kestra.core.models.annotations.Example; +import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.executions.metrics.Counter; +import io.kestra.core.models.tasks.RunnableTask; +import io.kestra.core.runners.RunContext; +import io.kestra.plugin.azure.eventhubs.client.EventHubClientFactory; +import io.kestra.plugin.azure.eventhubs.config.EventHubClientConfig; +import io.kestra.plugin.azure.eventhubs.internal.InputStreamProvider; +import io.kestra.plugin.azure.eventhubs.serdes.Serdes; +import io.kestra.plugin.azure.eventhubs.service.EventDataObjectConverter; +import io.kestra.plugin.azure.eventhubs.service.producer.EventDataBatchFactory; +import io.kestra.plugin.azure.eventhubs.service.producer.EventHubProducerService; +import io.kestra.plugin.azure.eventhubs.service.producer.ProducerContext; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; +import lombok.extern.slf4j.Slf4j; + +import javax.validation.constraints.NotNull; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * The {@link RunnableTask} can be used for producing batches of events to Azure Event Hubs. + */ +@Plugin(examples = { + @Example( + title = "Publish a file as events into Azure EventHubs.", + full = true, + code = { + """ + id: SendEventsIntoAzureEventHubs + namespace: company.team + inputs: + - type: FILE + name: file + description: a CSV file with columns id, username, tweet, and timestamp + tasks: + - id: readCsvFile + type: io.kestra.plugin.serdes.csv.CsvReader + from: "{{ inputs.file }}" + - id: transformRowToJson + type: io.kestra.plugin.scripts.nashorn.FileTransform + from: "{{ outputs.readCsvFile.uri }}" + script: | + var result = { + "body": { + "username": row.username, + "tweet": row.tweet + } + }; + row = result + - id: sendToEventHubs + type: io.kestra.plugin.azure.eventhubs.Produce + from: "{{ outputs.transformRowToJson.uri }}" + eventHubName: my-eventhub + connectionString: "secret('EVENTHUBS_CONNECTION')" + maxBatchSizeInBytes: 4096 + maxEventsPerBatch: 100 + bodySerializer: "JSON" + bodyContentType: application/json + eventProperties: + source: kestra + """ + } + ) +}) +@Schema( + title = "Publish events to Azure Event Hubs." +) +@Slf4j +@SuperBuilder +@Getter +@NoArgsConstructor +public class Produce extends AbstractEventHubTask implements RunnableTask { + + // TASK'S METRICS + private static final String METRIC_SENT_EVENTS_NAME = "total-sent-events"; + private static final String METRIC_SENT_BATCHES_NAME = "total-sent-batches"; + + // TASK'S PARAMETERS + @Schema( + title = "The event properties", + description = "The event properties which may be used for passing metadata associated with the event" + + " body during Event Hubs operations." + ) + @PluginProperty + @Builder.Default + private Map eventProperties = Collections.emptyMap(); + + @Schema( + title = "The content of the message to be sent to EventHub", + description = "Can be an internal storage URI, a map (i.e. a list of key-value pairs) or a list of maps. " + + "The following keys are supported: `from`, `contentType`, `properties`.", + anyOf = {String.class, List.class, Map.class} + ) + @NotNull + @PluginProperty(dynamic = true) + private Object from; + + @Schema( + title = "The hashing key to be provided for the batches of events.", + description = "Events with the same `partitionKey` are hashed and sent to the same partition. The provided " + + "`partitionKey` will be used for all the events sent by the `Produce` task." + ) + @PluginProperty + private String partitionKey; + + @Schema( + title = "The maximum size for batches of events, in bytes." + ) + @PluginProperty + private Integer maxBatchSizeInBytes; + + @Schema( + title = "The maximum number of events per batches." + ) + @PluginProperty + @Builder.Default + private Integer maxEventsPerBatch = 1000; + + @Schema( + title = "The MIME type describing the event data", + description = "The MIME type describing the data contained in event body allowing consumers to make informed" + + " decisions for inspecting and processing the event." + ) + @PluginProperty + private String bodyContentType; + + @Schema( + title = "The Serializer to be used for serializing the event value." + ) + @PluginProperty + @Builder.Default + private Serdes bodySerializer = Serdes.STRING; + + @Schema( + title = "The config properties to be passed to the Serializer.", + description = "Configs in key/value pairs." + ) + @PluginProperty + @Builder.Default + private Map bodySerializerProperties = Collections.emptyMap(); + + // SERVICES + @Getter(AccessLevel.NONE) + @Builder.Default + private EventHubClientFactory clientFactory = new EventHubClientFactory(); + + /** + * {@inheritDoc} + **/ + @Override + public Output run(RunContext runContext) throws Exception { + EventHubProducerService service = new EventHubProducerService( + clientFactory, + new EventHubClientConfig<>(runContext, this), + new EventDataObjectConverter(getBodySerializer().create(getBodySerializerProperties())), + new EventDataBatchFactory.Default(getCreateBatchOptions()) + ); + + return run(runContext, service); + + } + + // VisibleForTesting + @SuppressWarnings("unchecked") + Output run(final RunContext runContext, final EventHubProducerService service) throws Exception { + + final InputStreamProvider reader = new InputStreamProvider(runContext); + + InputStream is; + if (this.getFrom() instanceof String uri) { + is = reader.get(runContext.render(uri)); + } else if (this.getFrom() instanceof Map data) { + is = reader.get(data); + } else if (this.getFrom() instanceof List data) { + is = reader.get(data); + } else { + throw new IllegalArgumentException( + "Unsupported type for task-property `from`: " + this.getFrom().getClass().getSimpleName() + ); + } + return send(runContext, service, is); + } + + private Output send(final RunContext runContext, + final EventHubProducerService service, + final InputStream is) throws IllegalVariableEvaluationException { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(is))) { + // Sends + ProducerContext options = new ProducerContext( + getBodyContentType(), + getEventProperties(), + getMaxEventsPerBatch(), + runContext.logger() + ); + EventHubProducerService.Result result = service.sendEvents( + reader, + options + ); + + // metrics + runContext.metric(Counter.of(METRIC_SENT_EVENTS_NAME, result.totalSentEvents())); + runContext.metric(Counter.of(METRIC_SENT_BATCHES_NAME, result.totalSentBatches())); + + return new Output( + result.totalSentEvents(), + result.totalSentBatches() + ); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private CreateBatchOptions getCreateBatchOptions() { + CreateBatchOptions options = new CreateBatchOptions(); + if (getMaxBatchSizeInBytes() != null) { + options.setMaximumSizeInBytes(getMaxBatchSizeInBytes()); + } + + if (getPartitionKey() != null) { + options.setPartitionKey(getPartitionKey()); + } + return options; + } + + @AllArgsConstructor + @Getter + public static final class Output implements io.kestra.core.models.tasks.Output { + @Schema( + title = "Total number of events processed by the task." + ) + private final Integer eventsCount; + + @Schema( + title = "Total number of batches sent to an Azure EventHubs." + ) + private final Integer sendBatchesCount; + } +} \ No newline at end of file diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/Trigger.java b/src/main/java/io/kestra/plugin/azure/eventhubs/Trigger.java new file mode 100644 index 0000000..3e96329 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/Trigger.java @@ -0,0 +1,149 @@ +package io.kestra.plugin.azure.eventhubs; + +import io.kestra.core.models.annotations.Example; +import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.models.conditions.ConditionContext; +import io.kestra.core.models.executions.Execution; +import io.kestra.core.models.executions.ExecutionTrigger; +import io.kestra.core.models.flows.State; +import io.kestra.core.models.triggers.AbstractTrigger; +import io.kestra.core.models.triggers.PollingTriggerInterface; +import io.kestra.core.models.triggers.TriggerContext; +import io.kestra.core.models.triggers.TriggerOutput; +import io.kestra.core.runners.RunContext; +import io.kestra.plugin.azure.eventhubs.serdes.Serdes; +import io.kestra.plugin.azure.eventhubs.service.consumer.StartingPosition; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.ToString; +import lombok.experimental.SuperBuilder; +import lombok.extern.slf4j.Slf4j; + +import java.time.Duration; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; + +/** + * The {@link Trigger} can be used for triggering flow based on events received from Azure Event Hubs. + */ +@Plugin(examples = { + @Example( + title = "Trigger flow based on events received from Azure Event Hubs.", + full = true, + code = { + """ + id: TriggerFromAzureEventHubs + namespace: company.team + tasks: + - id: hello + type: io.kestra.core.tasks.log.Log + message: Hello there! I received {{trigger.eventsCount}} from Azure EventHubs! + triggers: + - id: readFromEventHubs + type: "io.kestra.plugin.azure.eventhubs.Trigger" + interval: PT30S + eventHubName: my-eventhub + connectionString:"secret('EVENTHUBS_CONNECTION')" + bodyDeserializer: STRING + consumerGroup: "$Default" + checkpointStoreProperties: + containerName: kestra + connectionString: "secret('BLOB_CONNECTION')" + """ + } + ) +}) +@Schema( + title = "Trigger flow based on events received from Azure Event Hubs." +) +@Slf4j +@NoArgsConstructor +@SuperBuilder +@ToString +@EqualsAndHashCode +@Getter +public class Trigger extends AbstractTrigger implements EventHubConsumerInterface, PollingTriggerInterface, TriggerOutput { + + // TRIGGER'S PROPERTIES + @Builder.Default + private Duration interval = Duration.ofSeconds(60); + + // TASK'S PARAMETERS + protected String connectionString; + + protected String sharedKeyAccountName; + + protected String sharedKeyAccountAccessKey; + + protected String sasToken; + + @Builder.Default + protected Integer clientMaxRetries = 5; + + @Builder.Default + protected Long clientRetryDelay = 500L; + + @Builder.Default + private Serdes bodyDeserializer = Serdes.STRING; + + @Builder.Default + private Map bodyDeserializerProperties = Collections.emptyMap(); + + @Builder.Default + private String consumerGroup = "$Default"; + + @Builder.Default + private StartingPosition partitionStartingPosition = StartingPosition.EARLIEST; + + private String enqueueTime; + + @Builder.Default + private Integer maxBatchSizePerPartition = 50; + + @Builder.Default + private Duration maxWaitTimePerPartition = Duration.ofSeconds(5); + + @Builder.Default + private Duration maxDuration = Duration.ofSeconds(10); + + @Builder.Default + private Map checkpointStoreProperties = Collections.emptyMap(); + + private String namespace; + + private String eventHubName; + + private String customEndpointAddress; + + /** + * {@inheritDoc} + **/ + @Override + public Optional evaluate(ConditionContext conditionContext, + TriggerContext context) throws Exception { + RunContext runContext = conditionContext.getRunContext(); + + final Consume task = new Consume(); + final Consume.Output output = task.run(runContext, this); + + if (output.getEventsCount() == 0) { + return Optional.empty(); + } + + ExecutionTrigger executionTrigger = ExecutionTrigger.of(this, output); + Execution execution = Execution.builder() + .id(runContext.getTriggerExecutionId()) + .namespace(context.getNamespace()) + .flowId(context.getFlowId()) + .flowRevision(context.getFlowRevision()) + .state(new State()) + .trigger(executionTrigger) + .build(); + + return Optional.of(execution); + } +} \ No newline at end of file diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/client/EventHubClientFactory.java b/src/main/java/io/kestra/plugin/azure/eventhubs/client/EventHubClientFactory.java new file mode 100644 index 0000000..5448e77 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/client/EventHubClientFactory.java @@ -0,0 +1,183 @@ +package io.kestra.plugin.azure.eventhubs.client; + +import com.azure.core.amqp.AmqpRetryMode; +import com.azure.core.amqp.AmqpRetryOptions; +import com.azure.core.credential.AzureNamedKeyCredential; +import com.azure.core.credential.AzureSasCredential; +import com.azure.identity.DefaultAzureCredentialBuilder; +import com.azure.messaging.eventhubs.EventHubClientBuilder; +import com.azure.messaging.eventhubs.EventHubProducerAsyncClient; +import com.azure.messaging.eventhubs.EventProcessorClientBuilder; +import com.azure.storage.blob.BlobContainerAsyncClient; +import com.azure.storage.blob.BlobContainerClientBuilder; +import io.kestra.core.exceptions.IllegalVariableEvaluationException; +import io.kestra.plugin.azure.client.AzureClientConfig; +import io.kestra.plugin.azure.eventhubs.config.BlobContainerClientConfig; +import io.kestra.plugin.azure.eventhubs.config.EventHubClientConfig; +import io.kestra.plugin.azure.eventhubs.config.EventHubConsumerConfig; +import lombok.extern.slf4j.Slf4j; + +import java.time.Duration; +import java.util.Objects; +import java.util.Optional; + +@Slf4j +public class EventHubClientFactory { + + /** + * Factory method for constructing a new {@link EventHubClientBuilder} for the given config. + * + * @param config The configuration. Cannot be {@code null}. + * @return a new {@link EventHubClientBuilder} object. + */ + public EventHubProducerAsyncClient createAsyncProducerClient(final EventHubClientConfig config) throws IllegalVariableEvaluationException { + Objects.requireNonNull(config, "config should not be null"); + return createBuilder(config).buildAsyncProducerClient(); + } + + private Optional fullyQualifiedNamespace(final EventHubClientConfig config) throws IllegalVariableEvaluationException { + return config.namespace().map(ns -> ns + ".servicebus.windows.net"); + } + + /** + * Factory method for constructing a new {@link EventProcessorClientBuilder} for the given config. + * + * @param config The configuration. Cannot be {@code null}. + * @return a new {@link EventProcessorClientBuilder} object. + */ + public EventProcessorClientBuilder createEventProcessorClientBuilder(final EventHubConsumerConfig config) throws IllegalVariableEvaluationException { + Objects.requireNonNull(config, "config should not be null"); + EventProcessorClientBuilder builder = new EventProcessorClientBuilder() + .eventHubName(config.eventHubName()) + .consumerGroup(config.consumerGroup()) + .retryOptions(getRetryOptions(config)); + + config.customEndpointAddress().ifPresent(builder::customEndpointAddress); + fullyQualifiedNamespace(config).ifPresent(builder::fullyQualifiedNamespace); + + Optional connectionString = connectionString(config, "EventHubClient"); + + if (connectionString.isPresent()) { + return builder.connectionString(connectionString.get()); + } + + Optional azureNamedKeyCredential = namedKeyCredential(config, "EventHubClient"); + if (azureNamedKeyCredential.isPresent()) { + return builder.credential(azureNamedKeyCredential.get()); + } + + Optional azureSasCredential = sasCredential(config, "EventHubClient"); + if (azureSasCredential.isPresent()) { + return builder.credential(azureSasCredential.get()); + } + + return builder.credential(new DefaultAzureCredentialBuilder().build()); + } + + /** + * Factory method for constructing a new {@link EventHubClientBuilder} for the given config. + * + * @param config The configuration. + * @return a new {@link EventHubClientBuilder} object. + */ + private EventHubClientBuilder createBuilder(EventHubClientConfig config) throws IllegalVariableEvaluationException { + + EventHubClientBuilder builder = new EventHubClientBuilder() + .eventHubName(config.eventHubName()) + .retryOptions(getRetryOptions(config)); + + config.customEndpointAddress().ifPresent(builder::customEndpointAddress); + fullyQualifiedNamespace(config).ifPresent(builder::fullyQualifiedNamespace); + + Optional connectionString = connectionString(config, "EventHubClient"); + if (connectionString.isPresent()) { + return builder.connectionString(connectionString.get()); + } + + Optional azureNamedKeyCredential = namedKeyCredential(config, "EventHubClient"); + if (azureNamedKeyCredential.isPresent()) { + return builder.credential(azureNamedKeyCredential.get()); + } + + Optional azureSasCredential = sasCredential(config, "EventHubClient"); + if (azureSasCredential.isPresent()) { + return builder.credential(azureSasCredential.get()); + } + + return builder.credential(new DefaultAzureCredentialBuilder().build()); + } + + private static AmqpRetryOptions getRetryOptions(EventHubClientConfig config) throws IllegalVariableEvaluationException { + return new AmqpRetryOptions() + .setMode(AmqpRetryMode.EXPONENTIAL) + .setDelay(config.clientRetryDelay().map(Duration::ofMillis).orElse(Duration.ofMillis(500))) + .setMaxRetries(config.clientMaxRetries().orElse(5)) + .setMaxDelay(Duration.ofMinutes(1)); + } + + /** + * Factory method for constructing a new {@link BlobContainerAsyncClient} for the given config. + * + * @param config The configuration. Cannot be {@code null}. + * @return a new {@link BlobContainerAsyncClient} object. + */ + public BlobContainerAsyncClient createBlobContainerAsyncClient(BlobContainerClientConfig config) throws IllegalVariableEvaluationException { + BlobContainerClientBuilder builder = new BlobContainerClientBuilder() + .containerName(config.containerName()); + + Optional connectionString = connectionString(config, "BlobContainerClient"); + if (connectionString.isPresent()) { + return builder.connectionString(connectionString.get()) + .buildAsyncClient(); + } + + Optional azureNamedKeyCredential = namedKeyCredential(config, "BlobContainerClient"); + if (azureNamedKeyCredential.isPresent()) { + return builder.credential(azureNamedKeyCredential.get()) + .buildAsyncClient(); + } + + Optional azureSasCredential = sasCredential(config, "BlobContainerClient"); + if (azureSasCredential.isPresent()) { + return builder.credential(azureSasCredential.get()) + .buildAsyncClient(); + } + + return builder.credential(new DefaultAzureCredentialBuilder().build()).buildAsyncClient(); + } + + private Optional connectionString(final AzureClientConfig config, + final String clientName) throws IllegalVariableEvaluationException { + Optional optionalConnectionString = config.connectionString(); + if (optionalConnectionString.isPresent()) { + log.debug("Creating new {} using the `connectionString` that was passed " + + "through the task's configuration", clientName); + } + return optionalConnectionString; + } + + private Optional sasCredential(final AzureClientConfig config, + final String clientName) throws IllegalVariableEvaluationException { + if (config.sasToken().isPresent()) { + log.debug("Creating new {} using the `sasToken`" + + " that was passed through the task's configuration", clientName); + AzureSasCredential credential = new AzureSasCredential(config.sasToken().get()); + return Optional.of(credential); + } + return Optional.empty(); + } + + private Optional namedKeyCredential(final AzureClientConfig config, + final String clientName) throws IllegalVariableEvaluationException { + if (config.sharedKeyAccountAccessKey().isPresent() && config.sharedKeyAccountName().isPresent()) { + log.debug("Creating new {} using the `sharedKeyAccountName` and `sharedKeyAccountAccessKey`" + + " that was passed through the task's configuration", clientName); + AzureNamedKeyCredential credential = new AzureNamedKeyCredential( + config.sharedKeyAccountName().get(), + config.sharedKeyAccountAccessKey().get() + ); + return Optional.of(credential); + } + return Optional.empty(); + } +} diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/config/BlobContainerClientConfig.java b/src/main/java/io/kestra/plugin/azure/eventhubs/config/BlobContainerClientConfig.java new file mode 100644 index 0000000..8521191 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/config/BlobContainerClientConfig.java @@ -0,0 +1,27 @@ +package io.kestra.plugin.azure.eventhubs.config; + +import io.kestra.core.exceptions.IllegalVariableEvaluationException; +import io.kestra.core.runners.RunContext; +import io.kestra.plugin.azure.client.AzureClientConfig; +import io.kestra.plugin.azure.eventhubs.BlobContainerClientInterface; + +/** + * Configuration that uses the {@link RunContext} to render configuration. + */ +public class BlobContainerClientConfig extends AzureClientConfig { + + /** + * Creates a new {@link BlobContainerClientConfig} instance. + * + * @param runContext The context. Cannot be null. + * @param plugin The plugin. Cannot be null. + */ + public BlobContainerClientConfig(final RunContext runContext, + final BlobContainerClientInterface plugin) { + super(runContext, plugin); + } + + public String containerName() throws IllegalVariableEvaluationException { + return runContext.render(plugin.getContainerName()); + } +} diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/config/EventHubClientConfig.java b/src/main/java/io/kestra/plugin/azure/eventhubs/config/EventHubClientConfig.java new file mode 100644 index 0000000..5e6abc1 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/config/EventHubClientConfig.java @@ -0,0 +1,45 @@ +package io.kestra.plugin.azure.eventhubs.config; + +import io.kestra.core.exceptions.IllegalVariableEvaluationException; +import io.kestra.core.runners.RunContext; +import io.kestra.plugin.azure.client.AzureClientConfig; +import io.kestra.plugin.azure.eventhubs.EventHubClientInterface; + +import java.util.Optional; + +/** + * Configuration that uses the {@link RunContext} to render configuration. + */ +public class EventHubClientConfig extends AzureClientConfig { + + /** + * Creates a new {@link EventHubClientConfig} instance. + * + * @param runContext The context. Cannot be null. + * @param plugin The plugin. Cannot be null. + */ + public EventHubClientConfig(final RunContext runContext, + final T plugin) { + super(runContext, plugin); + } + + public String eventHubName() throws IllegalVariableEvaluationException { + return runContext.render(plugin.getEventHubName()); + } + + public Optional clientMaxRetries() { + return Optional.ofNullable(plugin.getClientMaxRetries()); + } + + public Optional clientRetryDelay() { + return Optional.ofNullable(plugin.getClientRetryDelay()); + } + + public Optional namespace() throws IllegalVariableEvaluationException { + return getOptionalConfig(plugin::getNamespace); + } + + public Optional customEndpointAddress() throws IllegalVariableEvaluationException { + return getOptionalConfig(plugin::getCustomEndpointAddress); + } +} diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/config/EventHubConsumerConfig.java b/src/main/java/io/kestra/plugin/azure/eventhubs/config/EventHubConsumerConfig.java new file mode 100644 index 0000000..ba64977 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/config/EventHubConsumerConfig.java @@ -0,0 +1,38 @@ +package io.kestra.plugin.azure.eventhubs.config; + +import com.azure.messaging.eventhubs.models.EventPosition; +import io.kestra.core.exceptions.IllegalVariableEvaluationException; +import io.kestra.core.runners.RunContext; +import io.kestra.plugin.azure.eventhubs.EventHubConsumerInterface; +import io.kestra.plugin.azure.eventhubs.service.consumer.EventPositionStrategy; +import io.kestra.plugin.azure.eventhubs.service.consumer.StartingPosition; + +/** + * Configuration that uses the {@link RunContext} to render configuration. + */ +public final class EventHubConsumerConfig extends EventHubClientConfig { + + /** + * Creates a new {@link EventHubConsumerConfig} instance. + * + * @param runContext The context. Cannot be null. + * @param plugin The plugin. Cannot be null. + */ + public EventHubConsumerConfig(final RunContext runContext, + final EventHubConsumerInterface plugin) { + super(runContext, plugin); + } + + public String consumerGroup() throws IllegalVariableEvaluationException { + return runContext.render(plugin.getConsumerGroup()); + } + + public EventPosition partitionStartingPosition() { + StartingPosition partitionStartingPosition = plugin.getPartitionStartingPosition(); + return switch (partitionStartingPosition) { + case EARLIEST -> new EventPositionStrategy.Earliest().get(); + case LATEST -> new EventPositionStrategy.Latest().get(); + case INSTANT -> new EventPositionStrategy.EnqueuedTime(plugin.getEnqueueTime()).get(); + }; + } +} diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/internal/InputStreamProvider.java b/src/main/java/io/kestra/plugin/azure/eventhubs/internal/InputStreamProvider.java new file mode 100644 index 0000000..f8c99ce --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/internal/InputStreamProvider.java @@ -0,0 +1,47 @@ +package io.kestra.plugin.azure.eventhubs.internal; + +import io.kestra.core.runners.RunContext; +import io.kestra.core.serializers.FileSerde; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.List; +import java.util.Map; + +/** + * Class for getting input data as ION InputStream. + */ +public final class InputStreamProvider { + + private final RunContext context; + + public InputStreamProvider(final RunContext context) { + this.context = context; + } + + public InputStream get(final String path) { + try { + URI from = new URI(context.render(path)); + return context.uriToInputStream(from); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public InputStream get(final List objects) throws IOException { + final ByteArrayOutputStream os = new ByteArrayOutputStream(); + try (os) { + for (Object o : objects) { + FileSerde.write(os, o); + } + return new ByteArrayInputStream(os.toByteArray()); + } + } + + public InputStream get(final Map object) throws IOException { + return get(List.of(object)); + } +} diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/model/EventDataObject.java b/src/main/java/io/kestra/plugin/azure/eventhubs/model/EventDataObject.java new file mode 100644 index 0000000..ea7da25 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/model/EventDataObject.java @@ -0,0 +1,47 @@ +package io.kestra.plugin.azure.eventhubs.model; + +import javax.validation.constraints.NotNull; +import java.util.Collections; +import java.util.Map; + +/** + * A serializable entity class representing an Event Data + * to be published or consumed from Azure Event Hubs. + * + * @param partitionKey the event data partitionKey. + * @param body the event data body. + * @param contentType the event data content-type. + * @param correlationId The event correlation ID. + * @param messageId the event message ID. + * @param properties the event properties. + * @see com.azure.messaging.eventhubs.EventData + */ +public record EventDataObject( + String partitionKey, + Object body, + String contentType, + String correlationId, + String messageId, + Long enqueuedTimestamp, + Long offset, + Long sequenceNumber, + Map properties) { + + public EventDataObject(@NotNull final String body) { + this(null, body); + } + + public EventDataObject(final String partitionKey, @NotNull final String body) { + this( + partitionKey, + body, + null, + null, + null, + null, + null, + null, + Collections.emptyMap() + ); + } +} diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/package-info.java b/src/main/java/io/kestra/plugin/azure/eventhubs/package-info.java new file mode 100644 index 0000000..e373f8a --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/package-info.java @@ -0,0 +1,7 @@ +@PluginSubGroup( + description = "This sub-group of plugins contains tasks for using Azure Event Hubs.", + categories = {PluginSubGroup.PluginCategory.CLOUD, PluginSubGroup.PluginCategory.MESSAGING} +) +package io.kestra.plugin.azure.eventhubs; + +import io.kestra.core.models.annotations.PluginSubGroup; \ No newline at end of file diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/serdes/ByteArraySerde.java b/src/main/java/io/kestra/plugin/azure/eventhubs/serdes/ByteArraySerde.java new file mode 100644 index 0000000..b79ba92 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/serdes/ByteArraySerde.java @@ -0,0 +1,35 @@ +package io.kestra.plugin.azure.eventhubs.serdes; + +import java.nio.ByteBuffer; + +/** + * A {@link Serde} for serializing/deserializing bytes array. + */ +public class ByteArraySerde implements Serde { + + /** + * {@inheritDoc} + **/ + @Override + public byte[] serialize(Object data) { + if (data == null) return null; + + if (data instanceof ByteBuffer buffer) + return buffer.array(); + if (data instanceof byte[] array) { + return array; + } + throw new RuntimeException( + "Cannot serialize object of type '" + data + "' into bytes array." + ); + } + + /** + * {@inheritDoc} + **/ + @Override + public ByteBuffer deserialize(byte[] data) { + if (data == null) return null; + return ByteBuffer.wrap(data); + } +} diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/serdes/IonSerde.java b/src/main/java/io/kestra/plugin/azure/eventhubs/serdes/IonSerde.java new file mode 100644 index 0000000..6085b43 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/serdes/IonSerde.java @@ -0,0 +1,49 @@ +package io.kestra.plugin.azure.eventhubs.serdes; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.kestra.core.serializers.JacksonMapper; + +import java.io.IOException; + +/** + * A {@link Serde} for serializing/deserializing objects from and to Amazon Ion format. + */ +public class IonSerde implements Serde { + + private final static ObjectMapper OBJECT_MAPPER = JacksonMapper.ofIon() + .setSerializationInclusion(JsonInclude.Include.ALWAYS); + + /** + * Creates a new {@link IonSerde} instance. + */ + public IonSerde() { + } + + /** + * {@inheritDoc} + **/ + @Override + public byte[] serialize(Object data) { + if (data == null) return null; + try { + return OBJECT_MAPPER.writeValueAsBytes(data); + } catch (JsonProcessingException e) { + throw new RuntimeException("Error serializing data object into Ion.", e); + } + } + + /** + * {@inheritDoc} + **/ + @Override + public Object deserialize(byte[] data) { + if (data == null) return null; + try { + return OBJECT_MAPPER.readTree(data); + } catch (IOException e) { + throw new RuntimeException("Error deserializing Ion into object.", e); + } + } +} diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/serdes/JsonSerde.java b/src/main/java/io/kestra/plugin/azure/eventhubs/serdes/JsonSerde.java new file mode 100644 index 0000000..35a9f17 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/serdes/JsonSerde.java @@ -0,0 +1,48 @@ +package io.kestra.plugin.azure.eventhubs.serdes; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.json.JsonMapper; + +import java.io.IOException; + +/** + * A {@link Serde} for JSON. + */ +public class JsonSerde implements Serde { + + private static final ObjectMapper OBJECT_MAPPER = JsonMapper.builder().build(); + + /** + * Creates a new {@link JsonSerde} instance. + */ + public JsonSerde() { + } + + /** + * {@inheritDoc} + **/ + @Override + public byte[] serialize(Object data) { + if (data == null) return null; + try { + return OBJECT_MAPPER.writeValueAsBytes(data); + } catch (JsonProcessingException e) { + throw new RuntimeException("Error serializing JSON message", e); + } + } + + /** + * {@inheritDoc} + **/ + @Override + public JsonNode deserialize(byte[] data) { + if (data == null) return null; + try { + return OBJECT_MAPPER.readTree(data); + } catch (IOException e) { + throw new RuntimeException("Error deserializing JSON message", e); + } + } +} diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/serdes/Serde.java b/src/main/java/io/kestra/plugin/azure/eventhubs/serdes/Serde.java new file mode 100644 index 0000000..8d8f1c5 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/serdes/Serde.java @@ -0,0 +1,33 @@ +package io.kestra.plugin.azure.eventhubs.serdes; + +import java.util.Map; + +/** + * Service interface for serializing/deserializing data objects. + */ +public interface Serde { + + /** + * Configures this class. + * + * @param configs configs in key/value pairs. + */ + default void configure(Map configs) { + } + + /** + * Method that can be used to serialize a data object into a specific byte array data format. + * + * @param data The data to be serialized. Can be {@code null}. + * @return the serialized object. + */ + byte[] serialize(Object data); + + /** + * Method that can be used to deserialize a specific byte array data format into object. + * + * @param data The data to be deserialized. Can be {@code null}. + * @return the deserialized object. + */ + Object deserialize(byte[] data); +} diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/serdes/Serdes.java b/src/main/java/io/kestra/plugin/azure/eventhubs/serdes/Serdes.java new file mode 100644 index 0000000..e3f563b --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/serdes/Serdes.java @@ -0,0 +1,40 @@ +package io.kestra.plugin.azure.eventhubs.serdes; + +import java.util.Map; +import java.util.Objects; +import java.util.function.Supplier; + +/** + * Factory class for constructing built-in {@link Serde}. + */ +public enum Serdes { + + STRING(StringSerde::new), + BINARY(ByteArraySerde::new), + ION(IonSerde::new), + JSON(JsonSerde::new); + + private final Supplier supplier; + + /** + * Creates a new {@link Serdes} instance. + * + * @param supplier the serde supplier. + */ + Serdes(Supplier supplier) { + this.supplier = supplier; + } + + /** + * Factory method for constructing a new {@link Serde} instance configured with the properties configs. + * + * @param properties configs in key/value pairs. + * @return a new {@link Serde} + */ + public Serde create(final Map properties) { + Objects.requireNonNull(properties, "Cannot create 'Serde' with null properties."); + Serde dataFormat = supplier.get(); + dataFormat.configure(properties); + return dataFormat; + } +} diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/serdes/StringSerde.java b/src/main/java/io/kestra/plugin/azure/eventhubs/serdes/StringSerde.java new file mode 100644 index 0000000..c03c0cf --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/serdes/StringSerde.java @@ -0,0 +1,54 @@ +package io.kestra.plugin.azure.eventhubs.serdes; + +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +/** + * A {@link Serde} for serializing/deserializing String objects. + */ +public class StringSerde implements Serde { + + public static final String SERIALIZER_ENCODING_CONFIG_NAME = "serializer.encoding"; + private Charset encoding; + + public StringSerde() { + this(StandardCharsets.UTF_8); + } + + public StringSerde(final Charset encoding) { + this.encoding = Objects.requireNonNull(encoding, "encoding cannot be null"); + } + + /** + * {@inheritDoc} + **/ + @Override + public void configure(Map configs) { + encoding = Optional + .ofNullable(configs.get(SERIALIZER_ENCODING_CONFIG_NAME)) + .map(Object::toString) + .map(Charset::forName) + .orElse(StandardCharsets.UTF_8); + } + + /** + * {@inheritDoc} + **/ + @Override + public byte[] serialize(Object data) { + if (data == null) return null; + return data.toString().getBytes(encoding); + } + + /** + * {@inheritDoc} + **/ + @Override + public String deserialize(byte[] data) { + if (data == null) return null; + return new String(data, encoding); + } +} diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/service/EventDataObjectConverter.java b/src/main/java/io/kestra/plugin/azure/eventhubs/service/EventDataObjectConverter.java new file mode 100644 index 0000000..f0bd374 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/service/EventDataObjectConverter.java @@ -0,0 +1,74 @@ +package io.kestra.plugin.azure.eventhubs.service; + +import com.azure.messaging.eventhubs.EventData; +import io.kestra.plugin.azure.eventhubs.model.EventDataObject; +import io.kestra.plugin.azure.eventhubs.serdes.Serde; + +import java.time.Instant; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * Converts {@link EventData} into {@link EventDataObject} and vice versa. + */ +public final class EventDataObjectConverter { + + private final Serde serde; + + public EventDataObjectConverter(final Serde serde) { + this.serde = Objects.requireNonNull(serde, "serde cannot be null"); + } + + /** + * Converts the given list of {@link EventData} into a new list of {@link EventDataObject}. + * + * @param data The list of {@link Collectors} + * @return the new list of {@link EventDataObject}, or {@code null} if the given data is null. + */ + public List convertFromEventData(final List data) { + return data.stream().map(this::convertFromEventData).collect(Collectors.toList()); + } + + /** + * Converts the given {@link EventDataObject} into a new {@link EventData}. + * + * @param data The {@link EventDataObject}. + * @return the new {@link EventData}, or {@code null} if the given data is null. + */ + public EventData convertToEventData(final EventDataObject data) { + if (data == null) return null; + + byte[] value = serde.serialize(data.body()); + + final EventData event = new EventData(value); + Optional.ofNullable(data.contentType()).ifPresent(event::setContentType); + Optional.ofNullable(data.correlationId()).ifPresent(event::setCorrelationId); + Optional.ofNullable(data.messageId()).ifPresent(event::setMessageId); + Optional.ofNullable(data.properties()).ifPresent(props -> event.getProperties().putAll(data.properties())); + return event; + } + + /** + * Converts the given {@link EventDataObject} into a new {@link EventData}. + * + * @param data The {@link EventDataObject} + * @return the new {@link EventData}, or {@code null} if the given data is null. + */ + public EventDataObject convertFromEventData(final EventData data) { + if (data == null) return null; + + return new EventDataObject( + data.getPartitionKey(), + serde.deserialize(data.getBody()), + data.getContentType(), + data.getCorrelationId(), + data.getMessageId(), + Optional.ofNullable(data.getEnqueuedTime()).map(Instant::toEpochMilli).orElse(null), + data.getOffset(), + data.getSequenceNumber(), + data.getProperties() + ); + } +} diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/ConsumerContext.java b/src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/ConsumerContext.java new file mode 100644 index 0000000..05a7c94 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/ConsumerContext.java @@ -0,0 +1,19 @@ +package io.kestra.plugin.azure.eventhubs.service.consumer; + +import org.slf4j.Logger; + +import java.time.Duration; + +/** + * Context for consuming events from Azure Event Hubs. + * + * @param maxPollEvents + * @param maxBatchPartitionWait + * @param maxDuration + * @param logger + */ +public record ConsumerContext(int maxPollEvents, + Duration maxBatchPartitionWait, + Duration maxDuration, + Logger logger) { +} diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/EventHubConsumerService.java b/src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/EventHubConsumerService.java new file mode 100644 index 0000000..7a38c10 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/EventHubConsumerService.java @@ -0,0 +1,215 @@ +package io.kestra.plugin.azure.eventhubs.service.consumer; + +import com.azure.messaging.eventhubs.CheckpointStore; +import com.azure.messaging.eventhubs.EventData; +import com.azure.messaging.eventhubs.EventProcessorClient; +import com.azure.messaging.eventhubs.models.Checkpoint; +import com.azure.messaging.eventhubs.models.EventBatchContext; +import com.azure.messaging.eventhubs.models.EventPosition; +import com.azure.messaging.eventhubs.models.PartitionContext; +import io.kestra.plugin.azure.eventhubs.client.EventHubClientFactory; +import io.kestra.plugin.azure.eventhubs.config.EventHubConsumerConfig; +import io.kestra.plugin.azure.eventhubs.model.EventDataObject; +import io.kestra.plugin.azure.eventhubs.service.EventDataObjectConverter; +import lombok.extern.slf4j.Slf4j; +import org.slf4j.Logger; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +@Slf4j +public final class EventHubConsumerService { + + private final EventHubClientFactory clientFactory; + private final EventHubConsumerConfig config; + private final EventDataObjectConverter converter; + private final CheckpointStore checkpointStore; + + /** + * Creates a new {@link EventHubConsumerService} instance. + * + * @param clientFactory The {@link EventHubClientFactory} - Cannot be {@code null}. + * @param consumerConfig The {@link EventHubConsumerConfig} - Cannot be {@code null}. + * @param converter The {@link EventDataObjectConverter} to be used for converting entities to event data. + * @param checkpointStore The {@link CheckpointStore}. + */ + public EventHubConsumerService(final EventHubClientFactory clientFactory, + final EventHubConsumerConfig consumerConfig, + final EventDataObjectConverter converter, + final CheckpointStore checkpointStore) { + this.clientFactory = Objects.requireNonNull(clientFactory, "clientFactory cannot be null"); + this.config = Objects.requireNonNull(consumerConfig, "consumerConfig cannot be null"); + this.converter = Objects.requireNonNull(converter, "converter cannot be null"); + this.checkpointStore = Objects.requireNonNull(checkpointStore, "checkpointStoreSupplier cannot be null"); + } + + public Map poll(final ConsumerContext consumerContext, + final EventProcessorListener listener) throws Exception { + + Logger logger = consumerContext.logger(); + + CountDownLatch latch = new CountDownLatch(1); + + // Create Map that will hold all initialized partitions. + Set partitions = Collections.synchronizedSet(new HashSet<>()); + + // Counter + Map eventsByEventHubNamePartition = new ConcurrentHashMap<>(); + + Map checkpointsByPartitions = new ConcurrentHashMap<>(); + + // Create single EventProcessorClient. + EventProcessorClient client = clientFactory.createEventProcessorClientBuilder(config) + .consumerGroup(config.consumerGroup()) + .checkpointStore(checkpointStore) + // Set the offset reset strategy + .initialPartitionEventPosition(partition -> { + EventPosition position = config.partitionStartingPosition(); + if (logger.isInfoEnabled()) { + logger.info("Initializing partitionId {} with offset={}, sequenceNumber={}, enqueuedDateTime={} if no checkpoint exist.", + partition, + position.getOffset(), + position.getSequenceNumber(), + position.getEnqueuedDateTime() + ); + } + return position; + }) + // Capture the partition to process. + .processPartitionInitialization(context -> { + partitions.add(context.getPartitionContext().getPartitionId()); + }) + // Process Events + .processEventBatch(context -> { + PartitionContext partitionContext = context.getPartitionContext(); + if (!partitions.remove(partitionContext.getPartitionId())) { + if (logger.isTraceEnabled()) { + logger.trace( + "PartitionId={} has already been consumed once. Rejecting events.", + partitionContext.getPartitionId() + ); + } + return; + } + List events = context.getEvents(); + + // Convert eventData, and invoke listener. + for (EventDataObject event : converter.convertFromEventData(events)) { + try { + listener.onEvent(event, partitionContext); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + EventHubNamePartition key = new EventHubNamePartition( + partitionContext.getEventHubName(), + partitionContext.getPartitionId() + ); + + // Keep checkpoint of the last event in the batch + createCheckpoint(context) + .ifPresent(checkpoint -> checkpointsByPartitions.put(key, checkpoint)); + + // Increment event counter for the current partition. + eventsByEventHubNamePartition + .computeIfAbsent(key, ignored -> new AtomicInteger(0)) + .addAndGet(events.size()); + + // Check whether all partitions were polled at-least once. + if (partitions.isEmpty()) { + // Proactively stop consuming. + latch.countDown(); + } + + }, consumerContext.maxPollEvents(), consumerContext.maxBatchPartitionWait()) + // Handle errors + .processError(errorContext -> { + PartitionContext partitionContext = errorContext.getPartitionContext(); + logger.error("Failed to process eventHub: {}, partitionId: {} with consumerGroup: {}", + partitionContext.getEventHubName(), + partitionContext.getPartitionId(), + partitionContext.getConsumerGroup(), + errorContext.getThrowable()); + latch.countDown(); // stop processing immediately. + }) + .buildEventProcessorClient(); + + try { + client.start(); + if (!latch.await(consumerContext.maxDuration().toMillis(), TimeUnit.MILLISECONDS)) { + logger.debug("Reached `maxDuration`({}ms) before receiving events from EventHub {}.", + consumerContext.maxDuration().toMillis(), + config.eventHubName()); + } + } finally { + client.stop(); + listener.onStop(); + updateCheckpoints(checkpointStore, checkpointsByPartitions.values(), logger); + } + + return eventsByEventHubNamePartition + .entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, it -> it.getValue().get())); + } + + private void updateCheckpoints(CheckpointStore store, + Collection checkpoints, + Logger logger) { + for (Checkpoint checkpoint : checkpoints) { + logger.debug("Checkpointing position for consumerGroup={}, eventHubName={}, partitionId={}, sequenceNumber={}, and offset={}.", + checkpoint.getConsumerGroup(), + checkpoint.getEventHubName(), + checkpoint.getPartitionId(), + checkpoint.getSequenceNumber(), + checkpoint.getOffset() + ); + store.updateCheckpoint(checkpoint).block(); + } + } + + private Optional createCheckpoint(final EventBatchContext context) { + List events = context.getEvents(); + if (events.isEmpty()) { + return Optional.empty(); + } + PartitionContext partitionContext = context.getPartitionContext(); + + return Optional.of(new Checkpoint() + .setFullyQualifiedNamespace(partitionContext.getFullyQualifiedNamespace()) + .setEventHubName(partitionContext.getEventHubName()) + .setConsumerGroup(partitionContext.getConsumerGroup()) + .setPartitionId(partitionContext.getPartitionId()) + .setSequenceNumber(events.get(events.size() - 1).getSequenceNumber()) + .setOffset(events.get(events.size() - 1).getOffset()) + ); + } + + public interface EventProcessorListener { + + /** + * Invokes on each received event. + * + * @param event The event to be processed. + */ + void onEvent(EventDataObject event, PartitionContext context) throws Exception; + + /** + * Invokes when the event processor is stopped. + */ + default void onStop() throws Exception { + } + } +} diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/EventHubNamePartition.java b/src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/EventHubNamePartition.java new file mode 100644 index 0000000..d9223ba --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/EventHubNamePartition.java @@ -0,0 +1,6 @@ +package io.kestra.plugin.azure.eventhubs.service.consumer; + +public record EventHubNamePartition(String eventHubName, + String partitionId) { +} + diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/EventPositionStrategy.java b/src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/EventPositionStrategy.java new file mode 100644 index 0000000..5acdf20 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/EventPositionStrategy.java @@ -0,0 +1,74 @@ +package io.kestra.plugin.azure.eventhubs.service.consumer; + +import com.azure.messaging.eventhubs.models.EventPosition; + +import javax.validation.constraints.NotNull; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; + +/** + * Strategies to initialize an Event Hub consumer if no offsets are stored. + */ +public interface EventPositionStrategy { + + EventPosition get(); + + /** + * Earliest. + * + * @see EventPosition#earliest() + */ + record Earliest() implements EventPositionStrategy { + + /** + * @return an {@link EventPosition} that corresponds to the location of + * the first event present in the partition. + */ + @Override + public EventPosition get() { + return EventPosition.earliest(); + } + } + + /** + * Latest. + * + * @see EventPosition#latest() () + */ + record Latest() implements EventPositionStrategy { + + /** + * @return an {@link EventPosition} that corresponds to the end of the partition, + * where no more events are currently enqueued. + */ + @Override + public EventPosition get() { + return EventPosition.latest(); + } + } + + /** + * EnqueuedTime. + * + * @param dateTime the ISO Date Time + */ + record EnqueuedTime(@NotNull String dateTime) implements EventPositionStrategy { + + static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter + .ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS") + .withZone(ZoneOffset.UTC); + + /** + * @return an {@link EventPosition} that corresponds to a specific Instant. + */ + @Override + public EventPosition get() { + OffsetDateTime odt = OffsetDateTime.parse(dateTime, DATE_TIME_FORMATTER); + Instant instant = Instant.from(odt); + return EventPosition.fromEnqueuedTime(instant); + } + } + +} diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/StartingPosition.java b/src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/StartingPosition.java new file mode 100644 index 0000000..30dad09 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/StartingPosition.java @@ -0,0 +1,6 @@ +package io.kestra.plugin.azure.eventhubs.service.consumer; + +public enum StartingPosition { + + EARLIEST, LATEST, INSTANT; +} diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/service/producer/EventDataBatchFactory.java b/src/main/java/io/kestra/plugin/azure/eventhubs/service/producer/EventDataBatchFactory.java new file mode 100644 index 0000000..738c453 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/service/producer/EventDataBatchFactory.java @@ -0,0 +1,36 @@ +package io.kestra.plugin.azure.eventhubs.service.producer; + +import com.azure.messaging.eventhubs.EventDataBatch; +import com.azure.messaging.eventhubs.EventHubProducerAsyncClient; +import com.azure.messaging.eventhubs.models.CreateBatchOptions; +import reactor.core.publisher.Mono; + +/** + * Service interface for constructing new {@link EventDataBatch} objects. + */ +public interface EventDataBatchFactory { + + /** + * Factry method to create a new batch. + * + * @param client The {@link EventHubProducerAsyncClient}. + * @return a new {@link EventDataBatch}. + */ + Mono createBatch(EventHubProducerAsyncClient client); + + /** + * Default factory for creating bath from the given options and client. + * + * @param options The options to be used for creating new batch. + */ + record Default(CreateBatchOptions options) implements EventDataBatchFactory { + + /** + * {@inheritDoc} + **/ + @Override + public Mono createBatch(EventHubProducerAsyncClient client) { + return client.createBatch(options); + } + } +} diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/service/producer/EventHubProducerService.java b/src/main/java/io/kestra/plugin/azure/eventhubs/service/producer/EventHubProducerService.java new file mode 100644 index 0000000..caed152 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/service/producer/EventHubProducerService.java @@ -0,0 +1,148 @@ +package io.kestra.plugin.azure.eventhubs.service.producer; + +import com.azure.messaging.eventhubs.EventData; +import com.azure.messaging.eventhubs.EventDataBatch; +import com.azure.messaging.eventhubs.EventHubProducerAsyncClient; +import io.kestra.core.exceptions.IllegalVariableEvaluationException; +import io.kestra.core.serializers.FileSerde; +import io.kestra.plugin.azure.eventhubs.client.EventHubClientFactory; +import io.kestra.plugin.azure.eventhubs.config.EventHubClientConfig; +import io.kestra.plugin.azure.eventhubs.config.EventHubConsumerConfig; +import io.kestra.plugin.azure.eventhubs.model.EventDataObject; +import io.kestra.plugin.azure.eventhubs.service.EventDataObjectConverter; +import io.reactivex.BackpressureStrategy; +import io.reactivex.Flowable; +import org.slf4j.Logger; +import reactor.core.publisher.Mono; + +import java.io.BufferedReader; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Default service for sending batches of events into Azure Event Hubs. + */ +public class EventHubProducerService { + + private static final int DEFAULT_MAX_EVENT_PER_BATCH = Integer.MAX_VALUE; + private final EventHubClientFactory clientFactory; + private final EventHubClientConfig config; + private final EventDataObjectConverter adapter; + private final EventDataBatchFactory batchFactory; + + /** + * Creates a new {@link EventHubProducerService} instance. + * + * @param clientFactory The {@link EventHubClientFactory} - Cannot be {@code null}. + * @param config The {@link EventHubConsumerConfig} - Cannot be {@code null}. + * @param converter The {@link EventDataObjectConverter} to be used for converting entities to event data. + */ + public EventHubProducerService(final EventHubClientFactory clientFactory, + final EventHubClientConfig config, + final EventDataObjectConverter converter, + final EventDataBatchFactory batchFactory) { + this.clientFactory = Objects.requireNonNull(clientFactory, "clientFactory cannot be null"); + this.config = Objects.requireNonNull(config, "config cannot be null"); + this.adapter = Objects.requireNonNull(converter, "converter cannot be null"); + this.batchFactory = Objects.requireNonNull(batchFactory, "dataBatchFactory cannot be null"); + } + + /** + * Publishes the given event stream as events into Event Hubs. + * + * @param eventStream The stream of events to send. + * @return The sender result. + */ + public Result sendEvents(BufferedReader eventStream, ProducerContext context) throws IllegalVariableEvaluationException { + Flowable flowable = Flowable.create( + FileSerde.reader(eventStream, EventDataObject.class), + BackpressureStrategy.BUFFER + ); + try (EventHubProducerAsyncClient producer = clientFactory.createAsyncProducerClient(config)) { + return sendEvents(producer, adapter, flowable, context); + } + } + + private Result sendEvents(EventHubProducerAsyncClient producer, + EventDataObjectConverter adapter, + Flowable flowable, + ProducerContext context + ) { + + Logger logger = context.logger(); + + Integer maxEventsPerBatch = Optional + .ofNullable(context.maxEventsPerBatch()) + .orElse(DEFAULT_MAX_EVENT_PER_BATCH); + + final AtomicInteger numSentEvents = new AtomicInteger(0); + EventDataBatch firstBatch = batchFactory.createBatch(producer).block(); + final AtomicReference currentBatch = new AtomicReference<>(firstBatch); + + Integer numSentBatches = flowable.flatMap(data -> { + EventDataBatch batch = currentBatch.get(); + final EventData event = adapter.convertToEventData(data); + // Set default content-type + Optional.ofNullable(context.bodyContentType()) + .ifPresent(event::setContentType); + // Set default properties + Optional.ofNullable(context.eventProperties()) + .ifPresent(props -> event.getProperties().putAll(props)); + + int batchSizeInEvents = batch.getCount(); + boolean isFull = batchSizeInEvents >= maxEventsPerBatch; + + if (!isFull && batch.tryAdd(event)) { + return Mono.empty(); + } + + // Send the current batch then create another size-limited EventDataBatch + // and try to fit the event into this new batch. + int batchSize = batch.getCount(); + int totalSent = numSentEvents.getAndAccumulate(batchSize, Integer::sum); + if (logger.isTraceEnabled()) { + logger.trace("Sending new batch of {} events (total-sent-events: {})", batchSize, totalSent); + } + return Mono.when( + producer.send(batch), + batchFactory.createBatch(producer) + .map(newBatch -> { + currentBatch.set(newBatch); + // Try to add the event that did not fit in the previous + // batch into a new empty one. + if (!newBatch.tryAdd(event)) { + throw new IllegalArgumentException(String.format( + "Event is too large for an empty batch. Max size: %s. Event size: %s", + newBatch.getMaxSizeInBytes(), event.getBodyAsBinaryData().getLength())); + } + return newBatch; + }) + ).then(Mono.just(1)); + }) + .reduce(Integer::sum) + .blockingGet(); + + // Eventually send last partial batch. + final EventDataBatch batch = currentBatch.getAndSet(null); + if (batch != null && batch.getCount() > 0) { + int batchSize = batch.getCount(); + int totalSent = numSentEvents.getAndAccumulate(batchSize, Integer::sum); + if (logger.isTraceEnabled()) { + logger.trace("Sending new batch of {} events (total-sent-events: {})", batchSize, totalSent); + } + producer.send(batch).block(); + } + + numSentBatches = Optional + .ofNullable(numSentBatches) + .map(count -> count + 1) + .orElse(1); // numSentBatches will be NULL when a single and partial batch is sent. + + return new Result(numSentEvents.get(), numSentBatches); + } + + public record Result(int totalSentEvents, int totalSentBatches) { + } +} diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/service/producer/ProducerContext.java b/src/main/java/io/kestra/plugin/azure/eventhubs/service/producer/ProducerContext.java new file mode 100644 index 0000000..192045a --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/service/producer/ProducerContext.java @@ -0,0 +1,19 @@ +package io.kestra.plugin.azure.eventhubs.service.producer; + +import org.slf4j.Logger; + +import java.util.Map; + +/** + * Options for publihsing events. + * + * @param bodyContentType The default body content-type. + * @param eventProperties the default properties to add to events. + * @param maxEventsPerBatch The maximum number of events per batch. + */ +public record ProducerContext(String bodyContentType, + Map eventProperties, + Integer maxEventsPerBatch, + Logger logger +) { +} diff --git a/src/main/java/io/kestra/plugin/azure/storage/abstracts/AbstractStorage.java b/src/main/java/io/kestra/plugin/azure/storage/abstracts/AbstractStorage.java index c4004ca..c7e3a66 100644 --- a/src/main/java/io/kestra/plugin/azure/storage/abstracts/AbstractStorage.java +++ b/src/main/java/io/kestra/plugin/azure/storage/abstracts/AbstractStorage.java @@ -1,6 +1,7 @@ package io.kestra.plugin.azure.storage.abstracts; import io.kestra.plugin.azure.AbstractConnection; +import io.kestra.plugin.azure.AzureClientInterface; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.NoArgsConstructor; @@ -12,7 +13,7 @@ @EqualsAndHashCode @Getter @NoArgsConstructor -public abstract class AbstractStorage extends AbstractConnection implements AbstractStorageInterface { +public abstract class AbstractStorage extends AbstractConnection implements AzureClientInterface { protected String connectionString; diff --git a/src/main/java/io/kestra/plugin/azure/storage/blob/Trigger.java b/src/main/java/io/kestra/plugin/azure/storage/blob/Trigger.java index c1316d2..abb64b6 100644 --- a/src/main/java/io/kestra/plugin/azure/storage/blob/Trigger.java +++ b/src/main/java/io/kestra/plugin/azure/storage/blob/Trigger.java @@ -12,7 +12,7 @@ import io.kestra.core.models.triggers.TriggerOutput; import io.kestra.core.runners.RunContext; import io.kestra.plugin.azure.AbstractConnectionInterface; -import io.kestra.plugin.azure.storage.abstracts.AbstractStorageInterface; +import io.kestra.plugin.azure.AzureClientInterface; import io.kestra.plugin.azure.storage.blob.abstracts.AbstractBlobStorageContainerInterface; import io.kestra.plugin.azure.storage.blob.abstracts.ActionInterface; import io.kestra.plugin.azure.storage.blob.abstracts.ListInterface; @@ -110,7 +110,8 @@ ) } ) -public class Trigger extends AbstractTrigger implements PollingTriggerInterface, TriggerOutput, AbstractConnectionInterface, ListInterface, ActionInterface, AbstractBlobStorageContainerInterface, AbstractStorageInterface { +public class Trigger extends AbstractTrigger implements PollingTriggerInterface, TriggerOutput, AbstractConnectionInterface, ListInterface, ActionInterface, AbstractBlobStorageContainerInterface, AzureClientInterface { + @Builder.Default private final Duration interval = Duration.ofSeconds(60); diff --git a/src/main/java/io/kestra/plugin/azure/storage/blob/abstracts/AbstractBlobStorage.java b/src/main/java/io/kestra/plugin/azure/storage/blob/abstracts/AbstractBlobStorage.java index f544cb9..c5fcfc7 100644 --- a/src/main/java/io/kestra/plugin/azure/storage/blob/abstracts/AbstractBlobStorage.java +++ b/src/main/java/io/kestra/plugin/azure/storage/blob/abstracts/AbstractBlobStorage.java @@ -7,7 +7,7 @@ import io.kestra.core.exceptions.IllegalVariableEvaluationException; import io.kestra.core.runners.RunContext; import io.kestra.plugin.azure.storage.abstracts.AbstractStorage; -import io.kestra.plugin.azure.storage.abstracts.AbstractStorageInterface; +import io.kestra.plugin.azure.AzureClientInterface; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.NoArgsConstructor; @@ -19,7 +19,7 @@ @EqualsAndHashCode @Getter @NoArgsConstructor -public abstract class AbstractBlobStorage extends AbstractStorage implements AbstractStorageInterface { +public abstract class AbstractBlobStorage extends AbstractStorage implements AzureClientInterface { protected BlobServiceClient client(RunContext runContext) throws IllegalVariableEvaluationException { BlobServiceClientBuilder builder = new BlobServiceClientBuilder() .endpoint(runContext.render(endpoint)); diff --git a/src/main/java/io/kestra/plugin/azure/storage/blob/services/BlobService.java b/src/main/java/io/kestra/plugin/azure/storage/blob/services/BlobService.java index 5d81992..b0b3d53 100644 --- a/src/main/java/io/kestra/plugin/azure/storage/blob/services/BlobService.java +++ b/src/main/java/io/kestra/plugin/azure/storage/blob/services/BlobService.java @@ -10,7 +10,7 @@ import io.kestra.core.models.executions.metrics.Counter; import io.kestra.core.runners.RunContext; import io.kestra.plugin.azure.AbstractConnectionInterface; -import io.kestra.plugin.azure.storage.abstracts.AbstractStorageInterface; +import io.kestra.plugin.azure.AzureClientInterface; import io.kestra.plugin.azure.storage.blob.Copy; import io.kestra.plugin.azure.storage.blob.Delete; import io.kestra.plugin.azure.storage.blob.abstracts.ActionInterface; @@ -43,7 +43,7 @@ public static void archive( Copy.CopyObject moveTo, RunContext runContext, AbstractConnectionInterface connectionInterface, - AbstractStorageInterface blobStorageInterface + AzureClientInterface blobStorageInterface ) throws Exception { if (action == ActionInterface.Action.DELETE) { for (Blob object : blobsObjects) { diff --git a/src/main/resources/icons/io.kestra.plugin.azure.eventhubs.svg b/src/main/resources/icons/io.kestra.plugin.azure.eventhubs.svg new file mode 100644 index 0000000..a37b5cd --- /dev/null +++ b/src/main/resources/icons/io.kestra.plugin.azure.eventhubs.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/src/test/java/io/kestra/plugin/azure/eventhubs/ConsumeTest.java b/src/test/java/io/kestra/plugin/azure/eventhubs/ConsumeTest.java new file mode 100644 index 0000000..612fc6b --- /dev/null +++ b/src/test/java/io/kestra/plugin/azure/eventhubs/ConsumeTest.java @@ -0,0 +1,84 @@ +package io.kestra.plugin.azure.eventhubs; + +import com.google.common.collect.ImmutableMap; +import io.kestra.core.runners.RunContext; +import io.kestra.core.runners.RunContextFactory; +import io.kestra.core.utils.TestsUtils; +import io.kestra.plugin.azure.eventhubs.serdes.Serdes; +import io.micronaut.context.annotation.Value; +import io.micronaut.test.extensions.junit5.annotation.MicronautTest; +import jakarta.inject.Inject; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.List; +import java.util.Map; + +@MicronautTest +class ConsumeTest { + + @Inject + private RunContextFactory runContextFactory; + + @Value("${kestra.variables.globals.azure.eventhubs.connection-string}") + protected String connectionString; + + @Value("${kestra.variables.globals.azure.eventhubs.checkpointstore.connection-string}") + protected String checkPointStoreConnectionString; + + @Value("${kestra.variables.globals.azure.eventhubs.checkpointstore.container-name}") + protected String checkPointStoreContainerName; + + @Value("${kestra.variables.globals.azure.eventhubs.eventhub-name}") + protected String eventHubName; + + @Disabled + @Test + void testConsumeTask() throws Exception { + // Given + RunContext runContext = runContextFactory.of(); + + Consume task = Consume.builder() + .bodyDeserializer(Serdes.STRING) + .eventHubName(eventHubName) + .connectionString(connectionString) + .checkpointStoreProperties(Map.of( + "connectionString", checkPointStoreConnectionString, + "containerName", checkPointStoreContainerName + ) + ) + .consumerGroup("$Default") + .maxBatchSizePerPartition(10) + .maxWaitTimePerPartition(Duration.ofSeconds(5)) + .maxDuration(Duration.ofSeconds(10)) + .build(); + + // When + produceEvents(); + Consume.Output runOutput = task.run(runContext); + + // Then + Assertions.assertEquals(2, runOutput.getEventsCount()); + } + + private void produceEvents() throws Exception { + Produce task = Produce.builder() + .id(ConsumeTest.class.getSimpleName()) + .type(Produce.class.getName()) + .bodySerializer(Serdes.STRING) + .eventHubName(eventHubName) + .connectionString(connectionString) + .from(List.of( + ImmutableMap.builder() + .put("body", "event-1") + .build(), + ImmutableMap.builder() + .put("body", "event-2") + .build() + )) + .build(); + task.run(TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of())); + } +} diff --git a/src/test/java/io/kestra/plugin/azure/eventhubs/ProduceTest.java b/src/test/java/io/kestra/plugin/azure/eventhubs/ProduceTest.java new file mode 100644 index 0000000..f9c7d28 --- /dev/null +++ b/src/test/java/io/kestra/plugin/azure/eventhubs/ProduceTest.java @@ -0,0 +1,107 @@ +package io.kestra.plugin.azure.eventhubs; + +import com.azure.messaging.eventhubs.EventDataBatch; +import com.azure.messaging.eventhubs.EventHubProducerAsyncClient; +import com.azure.messaging.eventhubs.models.CreateBatchOptions; +import io.kestra.core.exceptions.IllegalVariableEvaluationException; +import io.kestra.core.runners.RunContext; +import io.kestra.core.runners.RunContextFactory; +import io.kestra.plugin.azure.eventhubs.client.EventHubClientFactory; +import io.kestra.plugin.azure.eventhubs.config.EventHubConsumerConfig; +import io.kestra.plugin.azure.eventhubs.serdes.StringSerde; +import io.kestra.plugin.azure.eventhubs.service.EventDataObjectConverter; +import io.kestra.plugin.azure.eventhubs.service.producer.EventDataBatchFactory; +import io.kestra.plugin.azure.eventhubs.service.producer.EventHubProducerService; +import io.micronaut.test.extensions.junit5.annotation.MicronautTest; +import jakarta.inject.Inject; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import reactor.core.publisher.Mono; + +import java.util.Map; + +@MicronautTest +@ExtendWith(MockitoExtension.class) +class ProduceTest { + + static final EventHubConsumerConfig EMPTY_CONFIG = new EventHubConsumerConfig(null, null); + + @Inject + private RunContextFactory runContextFactory; + + @Mock + private EventHubProducerAsyncClient client; + @Mock + private EventHubClientFactory factory; + private EventDataObjectConverter converter; + + @BeforeEach + public void beforeEach() throws IllegalVariableEvaluationException { + converter = new EventDataObjectConverter(new StringSerde()); + Mockito.when(factory.createAsyncProducerClient(Mockito.any())).thenReturn(client); + Mockito.when(client.send(Mockito.any(EventDataBatch.class))).thenReturn(Mono.empty()); + } + + @Test + void testGivenFromMap() throws Exception { + // Given + RunContext runContext = runContextFactory.of(); + + Produce task = Produce.builder() + .from(Map.of("body", "msg")) + .eventHubName("test") + .build(); + // create mocks + EventDataBatch batch = Mockito.mock(EventDataBatch.class); + Mockito.when(batch.tryAdd(Mockito.any())).thenReturn(true); + Mockito.when(batch.getCount()).thenReturn(1); + Mockito.when(client.createBatch(Mockito.any())).thenReturn(Mono.just(batch)); + + EventHubProducerService service = new EventHubProducerService( + factory, + EMPTY_CONFIG, + converter, + new EventDataBatchFactory.Default(new CreateBatchOptions())) { + }; + + // When + Produce.Output runOutput = task.run(runContext, service); + + // Then + Assertions.assertEquals(1, runOutput.getEventsCount()); + } + + @Test + void testGivenFromList() throws Exception { + // Given + RunContext runContext = runContextFactory.of(); + + Produce task = Produce.builder() + .from(Map.of("body", "msg")) + .eventHubName("test") + .build(); + // create mocks + EventDataBatch batch = Mockito.mock(EventDataBatch.class); + Mockito.when(batch.tryAdd(Mockito.any())).thenReturn(true); + Mockito.when(batch.getCount()).thenReturn(2); + Mockito.when(client.createBatch(Mockito.any())).thenReturn(Mono.just(batch)); + + EventHubProducerService service = new EventHubProducerService( + factory, + EMPTY_CONFIG, + converter, + new EventDataBatchFactory.Default(new CreateBatchOptions())) { + }; + + // When + Produce.Output runOutput = task.run(runContext, service); + + // Then + Assertions.assertEquals(2, runOutput.getEventsCount()); + } +} \ No newline at end of file diff --git a/src/test/java/io/kestra/plugin/azure/eventhubs/TriggerTest.java b/src/test/java/io/kestra/plugin/azure/eventhubs/TriggerTest.java new file mode 100644 index 0000000..17e5a04 --- /dev/null +++ b/src/test/java/io/kestra/plugin/azure/eventhubs/TriggerTest.java @@ -0,0 +1,120 @@ +package io.kestra.plugin.azure.eventhubs; + +import com.google.common.collect.ImmutableMap; +import io.kestra.core.models.executions.Execution; +import io.kestra.core.queues.QueueFactoryInterface; +import io.kestra.core.queues.QueueInterface; +import io.kestra.core.repositories.LocalFlowRepositoryLoader; +import io.kestra.core.runners.FlowListeners; +import io.kestra.core.runners.RunContextFactory; +import io.kestra.core.runners.Worker; +import io.kestra.core.schedulers.AbstractScheduler; +import io.kestra.core.schedulers.DefaultScheduler; +import io.kestra.core.schedulers.SchedulerTriggerStateInterface; +import io.kestra.core.utils.TestsUtils; +import io.kestra.plugin.azure.eventhubs.serdes.Serdes; +import io.micronaut.context.ApplicationContext; +import io.micronaut.context.annotation.Value; +import io.micronaut.test.extensions.junit5.annotation.MicronautTest; +import jakarta.inject.Inject; +import jakarta.inject.Named; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; + +@MicronautTest +class TriggerTest { + + @Inject + private ApplicationContext applicationContext; + + @Inject + private SchedulerTriggerStateInterface triggerState; + + @Inject + private FlowListeners flowListenersService; + + @Inject + @Named(QueueFactoryInterface.EXECUTION_NAMED) + private QueueInterface executionQueue; + + @Inject + private RunContextFactory runContextFactory; + + @Inject + protected LocalFlowRepositoryLoader repositoryLoader; + + @Value("${kestra.variables.globals.azure.eventhubs.connection-string}") + protected String connectionString; + + @Value("${kestra.variables.globals.azure.eventhubs.eventhub-name}") + protected String eventHubName; + + @Disabled + @Test + void testTrigger() throws Exception { + // mock flow listeners + CountDownLatch queueCount = new CountDownLatch(1); + + // scheduler + Worker worker = new Worker(applicationContext, 8, null); + try ( + AbstractScheduler scheduler = new DefaultScheduler( + this.applicationContext, + this.flowListenersService, + this.triggerState + ); + ) { + AtomicReference last = new AtomicReference<>(); + + // wait for execution + executionQueue.receive(TriggerTest.class, execution -> { + last.set(execution.getLeft()); + + queueCount.countDown(); + assertThat(execution.getLeft().getFlowId(), is("trigger")); + }); + + worker.run(); + scheduler.run(); + + repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows"))); + + produceEvents(); + + queueCount.await(1, TimeUnit.MINUTES); + + Integer trigger = (Integer) last.get().getTrigger().getVariables().get("eventsCount"); + + assertThat(trigger, greaterThanOrEqualTo(2)); + } + } + + private void produceEvents() throws Exception { + Produce task = Produce.builder() + .id(ConsumeTest.class.getSimpleName()) + .type(Produce.class.getName()) + .bodySerializer(Serdes.STRING) + .eventHubName(eventHubName) + .connectionString(connectionString) + .from(List.of( + ImmutableMap.builder() + .put("body", "event-1") + .build(), + ImmutableMap.builder() + .put("body", "event-2") + .build() + )) + .build(); + task.run(TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of())); + } +} diff --git a/src/test/java/io/kestra/plugin/azure/eventhubs/serdes/JsonSerdeTest.java b/src/test/java/io/kestra/plugin/azure/eventhubs/serdes/JsonSerdeTest.java new file mode 100644 index 0000000..6a0c4db --- /dev/null +++ b/src/test/java/io/kestra/plugin/azure/eventhubs/serdes/JsonSerdeTest.java @@ -0,0 +1,30 @@ +package io.kestra.plugin.azure.eventhubs.serdes; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.json.JsonMapper; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.Map; + +class JsonSerdeTest { + + static ObjectMapper OBJECT_MAPPER = JsonMapper.builder().build(); + + @Test + void shouldSerializeGivenAnyObject() { + // Given + Map input = Map.of("foo", "bar"); + JsonSerde serde = (JsonSerde) Serdes.JSON.create(Collections.emptyMap()); + + // When + byte[] serialized = serde.serialize(input); + + // Then + Assertions.assertNotNull(serialized); + JsonNode node = serde.deserialize(serialized); + Assertions.assertEquals(OBJECT_MAPPER.convertValue(node, Map.class), input); + } +} \ No newline at end of file diff --git a/src/test/java/io/kestra/plugin/azure/eventhubs/serdes/StringSerdeTest.java b/src/test/java/io/kestra/plugin/azure/eventhubs/serdes/StringSerdeTest.java new file mode 100644 index 0000000..ca9c9c2 --- /dev/null +++ b/src/test/java/io/kestra/plugin/azure/eventhubs/serdes/StringSerdeTest.java @@ -0,0 +1,22 @@ +package io.kestra.plugin.azure.eventhubs.serdes; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Map; + +class StringSerdeTest { + + @Test + void shouldSerializeGivenString() { + // Given + String input = "test"; + StringSerde serde = (StringSerde) Serdes.STRING.create(Map.of(StringSerde.SERIALIZER_ENCODING_CONFIG_NAME, "utf-8")); + + // When + byte[] serialized = serde.serialize(input); + + // Then + Assertions.assertEquals(input, serde.deserialize(serialized)); + } +} \ No newline at end of file diff --git a/src/test/java/io/kestra/plugin/azure/eventhubs/service/EventDataObjectConverterTest.java b/src/test/java/io/kestra/plugin/azure/eventhubs/service/EventDataObjectConverterTest.java new file mode 100644 index 0000000..1181efd --- /dev/null +++ b/src/test/java/io/kestra/plugin/azure/eventhubs/service/EventDataObjectConverterTest.java @@ -0,0 +1,36 @@ +package io.kestra.plugin.azure.eventhubs.service; + +import com.azure.messaging.eventhubs.EventData; +import io.kestra.plugin.azure.eventhubs.model.EventDataObject; +import io.kestra.plugin.azure.eventhubs.serdes.StringSerde; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.time.Instant; +import java.util.Map; + +class EventDataObjectConverterTest { + + @Test + void shouldConvertToEventData() { + EventDataObjectConverter converter = new EventDataObjectConverter(new StringSerde()); + long enqueuedTimestamp = Instant.now().toEpochMilli(); + Map prop = Map.of("prop", "value"); + EventData result = converter.convertToEventData(new EventDataObject( + "key", + "value", + "contentType", + "correlationId", + "messageId", + enqueuedTimestamp, + 1L, + 1L, + prop + )); + Assertions.assertEquals("contentType", result.getContentType()); + Assertions.assertEquals("correlationId", result.getCorrelationId()); + Assertions.assertEquals("messageId", result.getMessageId()); + Assertions.assertEquals("value", result.getBodyAsString()); + Assertions.assertEquals(prop, result.getProperties()); + } +} \ No newline at end of file diff --git a/src/test/java/io/kestra/plugin/azure/eventhubs/service/consumer/EventPositionStrategyTest.java b/src/test/java/io/kestra/plugin/azure/eventhubs/service/consumer/EventPositionStrategyTest.java new file mode 100644 index 0000000..a3bb02f --- /dev/null +++ b/src/test/java/io/kestra/plugin/azure/eventhubs/service/consumer/EventPositionStrategyTest.java @@ -0,0 +1,31 @@ +package io.kestra.plugin.azure.eventhubs.service.consumer; + +import com.azure.messaging.eventhubs.models.EventPosition; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; + +class EventPositionStrategyTest { + + @Test + void shouldGetLatest() { + Assertions.assertEquals(new EventPositionStrategy.Latest().get(), EventPosition.latest()); + } + + @Test + void shouldGetEarliest() { + Assertions.assertEquals(new EventPositionStrategy.Earliest().get(), EventPosition.earliest()); + } + + @Test + void shouldGetEnqueueTime() { + Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); + String ISO = EventPositionStrategy.EnqueuedTime.DATE_TIME_FORMATTER.format(now); + Assertions.assertEquals( + new EventPositionStrategy.EnqueuedTime(ISO).get(), + EventPosition.fromEnqueuedTime(now) + ); + } +} \ No newline at end of file diff --git a/src/test/java/io/kestra/plugin/azure/eventhubs/service/producer/EventHubProducerServiceTest.java b/src/test/java/io/kestra/plugin/azure/eventhubs/service/producer/EventHubProducerServiceTest.java new file mode 100644 index 0000000..716ec11 --- /dev/null +++ b/src/test/java/io/kestra/plugin/azure/eventhubs/service/producer/EventHubProducerServiceTest.java @@ -0,0 +1,173 @@ +package io.kestra.plugin.azure.eventhubs.service.producer; + +import com.azure.messaging.eventhubs.EventData; +import com.azure.messaging.eventhubs.EventDataBatch; +import com.azure.messaging.eventhubs.EventHubProducerAsyncClient; +import io.kestra.core.exceptions.IllegalVariableEvaluationException; +import io.kestra.core.serializers.FileSerde; +import io.kestra.plugin.azure.eventhubs.client.EventHubClientFactory; +import io.kestra.plugin.azure.eventhubs.config.EventHubConsumerConfig; +import io.kestra.plugin.azure.eventhubs.model.EventDataObject; +import io.kestra.plugin.azure.eventhubs.serdes.StringSerde; +import io.kestra.plugin.azure.eventhubs.service.EventDataObjectConverter; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +@ExtendWith(MockitoExtension.class) +class EventHubProducerServiceTest { + + private static final Logger LOG = LoggerFactory.getLogger(EventHubProducerServiceTest.class); + + @Captor + ArgumentCaptor eventDataBatchArgumentCaptor; + + @Captor + ArgumentCaptor eventDataArgumentCaptor; + + @Mock + private EventHubProducerAsyncClient producer; + @Mock + private EventHubClientFactory factory; + private EventDataObjectConverter converter; + + @BeforeEach + public void beforeEach() throws IllegalVariableEvaluationException { + converter = new EventDataObjectConverter(new StringSerde()); + Mockito.when(factory.createAsyncProducerClient(Mockito.any())).thenReturn(producer); + Mockito.when(producer.send(eventDataBatchArgumentCaptor.capture())).thenReturn(Mono.empty()); + } + + @Test + void shouldSendExceptionGivenTooLargeEvent() throws Exception { + // GIVEN + int maxRecordPerBatch = 1; + EventHubProducerService service = createNewSenderService(0); + List events = List.of(new EventDataObject("message")); + byte[] data = getDataAsBytesFor(events); + + // WHEN + try (BufferedReader stream = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(data)))) { + ProducerContext options = new ProducerContext( + null, null, maxRecordPerBatch, LOG + ); + // THEN + Assertions.assertThrows(IllegalArgumentException.class, () -> service.sendEvents(stream, options)); + } + } + + @Test + void shouldSendGivenStreamOfOneEvent() throws Exception { + // GIVEN + int maxRecordPerBatch = 1; + EventHubProducerService service = createNewSenderService(1024); + List events = List.of(new EventDataObject("message")); + byte[] data = getDataAsBytesFor(events); + + // WHEN + try (BufferedReader stream = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(data)))) { + + ProducerContext options = new ProducerContext( + null, null, maxRecordPerBatch, LOG + ); + EventHubProducerService.Result result = service.sendEvents(stream, options); + // THEN + Mockito.verify(producer, Mockito.times(1)).send(eventDataBatchArgumentCaptor.capture()); + Assertions.assertEquals(new EventHubProducerService.Result(1, 1), result); + } + } + + @Test + void shouldSendGivenStreamOfMultipleEvents() throws Exception { + // GIVEN + int maxRecordPerBatch = 2; + int expectedTotalNumRecords = 11; + int expectedTotalNumBatches = (expectedTotalNumRecords / 2) + expectedTotalNumRecords % maxRecordPerBatch; + + EventHubProducerService service = createNewSenderService(1024); + List events = IntStream + .range(0, expectedTotalNumRecords) + .mapToObj(i -> new EventDataObject("message-" + i)) + .collect(Collectors.toList()); + + byte[] data = getDataAsBytesFor(events); + + // WHEN + try (BufferedReader stream = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(data)))) { + + ProducerContext options = new ProducerContext( + null, null, maxRecordPerBatch, LOG + ); + EventHubProducerService.Result result = service.sendEvents(stream, options); + + // THEN + Mockito.verify(producer, Mockito.times(expectedTotalNumBatches)).send(eventDataBatchArgumentCaptor.capture()); + Assertions.assertEquals( + events, + converter.convertFromEventData(eventDataArgumentCaptor.getAllValues().stream().distinct().collect(Collectors.toList())) + ); + Assertions.assertEquals(new EventHubProducerService.Result(expectedTotalNumRecords, expectedTotalNumBatches), result); + } + } + + private static byte[] getDataAsBytesFor(List entities) throws IOException { + final ByteArrayOutputStream os = new ByteArrayOutputStream(); + try (os) { + for (EventDataObject entity : entities) { + FileSerde.write(os, entity); + } + } + return os.toByteArray(); + } + + private EventHubProducerService createNewSenderService(int maxBatchSize) { + + return new EventHubProducerService( + factory, + new EventHubConsumerConfig(null, null), + converter, + new EventDataBatchFactory() { + @Override + public Mono createBatch(EventHubProducerAsyncClient client) { + final EventDataBatch batch = Mockito.mock(EventDataBatch.class); + final AtomicInteger counter = new AtomicInteger(0); + final AtomicInteger batchSize = new AtomicInteger(0); + Mockito.when(batch.tryAdd(eventDataArgumentCaptor.capture())) + .then((Answer) invocation -> { + EventData value = eventDataArgumentCaptor.getValue(); + int currentBatchSize = batchSize.accumulateAndGet(value.getBody().length, Integer::sum); + if (currentBatchSize < maxBatchSize) { + counter.accumulateAndGet(1, Integer::sum); + return true; + } + return false; + }); + if (maxBatchSize > 0) { // this necessary for not having UnnecessaryStubbingException + Mockito.when(batch.getCount()).then((Answer) invocation -> counter.get()); + } + return Mono.just(batch); + } + } + ); + } +} \ No newline at end of file diff --git a/src/test/resources/application.yml b/src/test/resources/application.yml index ff1c4d9..beb1f63 100644 --- a/src/test/resources/application.yml +++ b/src/test/resources/application.yml @@ -21,4 +21,9 @@ kestra: account: batchunitest pool-id: azure-unit-test job-id: job-unit-test - + eventhubs: + eventhub-name: "" + connection-string: "" + checkpointstore: + connection-string: "" + container-name: "" diff --git a/src/test/resources/flows/eventshubs-trigger.yaml b/src/test/resources/flows/eventshubs-trigger.yaml new file mode 100644 index 0000000..883184a --- /dev/null +++ b/src/test/resources/flows/eventshubs-trigger.yaml @@ -0,0 +1,19 @@ +id: eventhubs-listen +namespace: io.kestra.tests + +triggers: + - id: watch + type: io.kestra.plugin.azure.eventhubs.Trigger + interval: PT10S + eventHubName: "{{ globals.azure.eventhubs['eventhub-name'] }}" + connectionString: "{{ globals.azure.eventhubs['connection-string'] }}" + bodyDeserializer: STRING + consumerGroup: "$Default" + checkpointStoreProperties: + containerName: "{{ globals.azure.eventhubs.checkpointstore['container-name'] }}" + connectionString: "{{ globals.azure.eventhubs.checkpointstore['connection-string'] }}" + +tasks: + - id: end + type: io.kestra.core.tasks.debugs.Return + format: "{{task.id}} > {{taskrun.startDate}}"