diff --git a/build.gradle b/build.gradle index 06c733b..beeded4 100644 --- a/build.gradle +++ b/build.gradle @@ -61,6 +61,8 @@ dependencies { api group: 'com.azure', name: 'azure-data-tables' api group: 'com.microsoft.azure', name: 'azure-batch', version: '10.1.0' api group: 'com.microsoft.azure', name: 'azure-storage', version: '8.6.6' + api group: 'com.azure', name: 'azure-messaging-eventhubs-checkpointstore-blob', version: '1.18.1' + api group: 'com.azure', name: 'azure-messaging-eventhubs', version: '5.17.1' } @@ -96,6 +98,7 @@ dependencies { testImplementation "org.junit.jupiter:junit-jupiter-engine" testImplementation "org.hamcrest:hamcrest:2.2" testImplementation "org.hamcrest:hamcrest-library:2.2" + testImplementation "org.mockito:mockito-core:5.8.0" } /**********************************************************************************************************************\ diff --git a/src/main/java/io/kestra/plugin/azure/storage/abstracts/AbstractStorageInterface.java b/src/main/java/io/kestra/plugin/azure/AzureClientInterface.java similarity index 78% rename from src/main/java/io/kestra/plugin/azure/storage/abstracts/AbstractStorageInterface.java rename to src/main/java/io/kestra/plugin/azure/AzureClientInterface.java index 1678b4d..3b88b90 100644 --- a/src/main/java/io/kestra/plugin/azure/storage/abstracts/AbstractStorageInterface.java +++ b/src/main/java/io/kestra/plugin/azure/AzureClientInterface.java @@ -1,9 +1,13 @@ -package io.kestra.plugin.azure.storage.abstracts; +package io.kestra.plugin.azure; import io.kestra.core.models.annotations.PluginProperty; import io.swagger.v3.oas.annotations.media.Schema; -public interface AbstractStorageInterface { +/** + * Top-level interface that can be used by plugins to retrieve + * required configuration properties in order to establish connection to Azure services. + */ +public interface AzureClientInterface { @Schema( title = "Connection string of the storage account." ) @@ -16,7 +20,6 @@ public interface AbstractStorageInterface { @PluginProperty(dynamic = true) String getSharedKeyAccountName(); - @Schema( title = "Shared Key access key for authenticating requests" ) diff --git a/src/main/java/io/kestra/plugin/azure/client/AzureClientConfig.java b/src/main/java/io/kestra/plugin/azure/client/AzureClientConfig.java new file mode 100644 index 0000000..746df94 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/client/AzureClientConfig.java @@ -0,0 +1,18 @@ +package io.kestra.plugin.azure.client; + +import java.util.Optional; + +/** + * Configuration for creating a new Azure EventHub Client. + */ +public interface AzureClientConfig { + + Optional connectionString(); + + Optional sharedKeyAccountName(); + + Optional sharedKeyAccountAccessKey(); + + Optional sasToken(); + +} diff --git a/src/main/java/io/kestra/plugin/azure/client/DynamicAzureClientConfig.java b/src/main/java/io/kestra/plugin/azure/client/DynamicAzureClientConfig.java new file mode 100644 index 0000000..01713f1 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/client/DynamicAzureClientConfig.java @@ -0,0 +1,68 @@ +package io.kestra.plugin.azure.client; + +import io.kestra.core.exceptions.IllegalVariableEvaluationException; +import io.kestra.core.runners.RunContext; +import io.kestra.plugin.azure.AzureClientInterface; + +import java.util.Objects; +import java.util.Optional; +import java.util.function.Supplier; + +/** + * Configuration that uses the {@link RunContext} to render configuration. + */ +public class DynamicAzureClientConfig implements AzureClientConfig { + + protected final RunContext runContext; + protected final T plugin; + + /** + * Creates a new {@link DynamicAzureClientConfig} instance. + * + * @param runContext The context. Cannot be null. + * @param plugin The plugin. Cannot be null. + */ + public DynamicAzureClientConfig(final RunContext runContext, + final T plugin) { + this.runContext = Objects.requireNonNull(runContext, "runContext cannot be null"); + this.plugin = Objects.requireNonNull(plugin, "plugin cannot be null"); + } + + @Override + public Optional connectionString() { + String o = plugin.getConnectionString(); + if (o != null) + return Optional.ofNullable(renderWithRunContext(o)); + else { + return Optional.empty(); + } + } + + @Override + public Optional sharedKeyAccountName() { + return getOptionalConfig(plugin::getSharedKeyAccountName); + } + + @Override + public Optional sharedKeyAccountAccessKey() { + return getOptionalConfig(plugin::getSharedKeyAccountAccessKey); + } + + @Override + public Optional sasToken() { + return getOptionalConfig(plugin::getSasToken); + } + + protected Optional getOptionalConfig(final Supplier t) { + String config = t.get(); + return config != null ? Optional.ofNullable(renderWithRunContext(config)) : Optional.empty(); + } + + protected String renderWithRunContext(String input) { + try { + return runContext.render(input); + } catch (IllegalVariableEvaluationException e) { + throw new RuntimeException(e); + } + } +} diff --git a/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/AbstractConsumeEventHubTask.java b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/AbstractConsumeEventHubTask.java new file mode 100644 index 0000000..81f8b07 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/AbstractConsumeEventHubTask.java @@ -0,0 +1,110 @@ +package io.kestra.plugin.azure.messaging.eventhubs; + +import io.kestra.plugin.azure.messaging.eventhubs.serdes.Serdes; +import io.kestra.plugin.azure.messaging.eventhubs.service.consumer.CheckpointStoreType; +import io.kestra.plugin.azure.messaging.eventhubs.service.consumer.StartingPosition; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.ToString; +import lombok.experimental.SuperBuilder; + +import java.util.Collections; +import java.util.Map; + +/** + * Base class for implementing tasks that consume events into EventHubs. + * This class provides all required and optional parameters. + */ +@SuperBuilder +@NoArgsConstructor +@ToString +@EqualsAndHashCode +@Getter +public abstract class AbstractConsumeEventHubTask extends AbstractEventHubTask { + + // TASK'S PARAMETERS + @Schema( + title = "The Deserializer to be used for serializing the event value.", + description = "Supported values are: [STRING, JSON]" + ) + @Builder.Default + private Serdes bodySerde = Serdes.STRING; + + @Schema( + title = "The config properties to be passed to the Deserializer.", + description = "Configs in key/value pairs." + ) + @Builder.Default + private Map bodySerdeProperties = Collections.emptyMap(); + + @Schema( + title = "The consumer group." + ) + @Builder.Default + private String consumerGroup = "$Default"; + + @Schema( + title = "The starting position." + ) + @Builder.Default + private StartingPosition partitionStartingPosition = StartingPosition.EARLIEST; + + @Schema( + title = "The maximum number of events to consume per event hub partition per poll." + ) + @Builder.Default + private Integer maxBatchPartitionEvents = 50; + + @Schema( + title = "The max time duration to wait to receive a batch of events up to the `maxPollEvents`." + ) + @Builder.Default + private Long maxBatchPartitionWait = 1000L; + + @Schema( + title = "The max time duration to wait to receive events from all partitions." + ) + @Builder.Default + private Long maxPollWait = 5000L; + + @Schema( + title = "The storage to be used as a persistent store for maintaining checkpoints.", + description = "Azure Event Hubs Checkpoint Store can be used for storing checkpoints while processing events from Azure Event Hubs.", + anyOf = {Object.class} + ) + @Builder.Default + private CheckpointStoreType checkpointStoreType = CheckpointStoreType.BLOB; + + @Schema( + title = "The config properties to be passed to the CheckpointStore.", + description = "Configs in key/value pairs." + ) + private Map checkpointStoreConfig = Collections.emptyMap(); + + private String namespace; + + private String eventHubName; + + private String customEndpointAddress; + + @SuperBuilder + @NoArgsConstructor + @ToString + @EqualsAndHashCode + @Getter + public static class BlobCheckPointConfig implements BlobContainerClientInterface { + + private String connectionString; + + private String sharedKeyAccountName; + + private String sharedKeyAccountAccessKey; + + private String sasToken; + + private String containerName; + } +} diff --git a/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/AbstractEventHubTask.java b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/AbstractEventHubTask.java new file mode 100644 index 0000000..6908bf8 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/AbstractEventHubTask.java @@ -0,0 +1,24 @@ +package io.kestra.plugin.azure.messaging.eventhubs; + +import io.kestra.core.models.tasks.Task; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.ToString; +import lombok.experimental.SuperBuilder; + +@SuperBuilder +@ToString +@EqualsAndHashCode +@Getter +@NoArgsConstructor +public abstract class AbstractEventHubTask extends Task implements EventHubClientInterface { + + protected String connectionString; + + protected String sharedKeyAccountName; + + protected String sharedKeyAccountAccessKey; + + protected String sasToken; +} diff --git a/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/AbstractProduceEventHubTask.java b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/AbstractProduceEventHubTask.java new file mode 100644 index 0000000..3947099 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/AbstractProduceEventHubTask.java @@ -0,0 +1,95 @@ +package io.kestra.plugin.azure.messaging.eventhubs; + +import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.plugin.azure.messaging.eventhubs.serdes.Serdes; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.ToString; +import lombok.experimental.SuperBuilder; + +import javax.validation.constraints.NotNull; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * Base class for implementing tasks that produce events into EventHubs. + * This class provides all required and optional parameters. + */ +@SuperBuilder +@NoArgsConstructor +@ToString +@EqualsAndHashCode +@Getter +public abstract class AbstractProduceEventHubTask extends AbstractEventHubTask { + + // TASK'S PARAMETERS + @Schema( + title = "The event properties", + description = "The event properties which may be used for passing metadata associated with the event" + + " body during Event Hubs operations." + ) + @Builder.Default + private Map eventProperties = Collections.emptyMap(); + + @Schema( + title = "The content of the message to be sent to EventHub", + description = "Can be an internal storage uri, a map (i.e. a list of key-value pairs) or a list of maps. " + + "The following keys are supported: `from`, `contentType`, `properties`.", + anyOf = {String.class, List.class, Map.class} + ) + @NotNull + @PluginProperty(dynamic = true) + private Object from; + + @Schema( + title = "The hashing key to be provided for the batches of events.", + description = "Events with the same `partitionKey` are hashed and sent to the same partition. The provided " + + "`partitionKey` will be used for all the events sent by the `Produce` task." + ) + private String partitionKey; + + @Schema( + title = "The maximum size for batches of events, in bytes.", + description = "The maximum size to allow for the batches of events." + ) + private Integer maxBatchSizeInBytes; + + @Schema( + title = "The maximum number of events per batches.", + description = "The maximum number of events to allow for the one batch." + ) + @Builder.Default + private Integer maxEventsPerBatch = 1000; + + @Schema( + title = "The MIME type describing the event data", + description = "The MIME type describing the data contained in event body allowing consumers to make informed" + + " decisions for inspecting and processing the event." + ) + private String bodyContentType; + + @Schema( + title = "The Serializer to be used for serializing the event value.", + description = "Supported values are: [STRING, JSON]" + ) + @Builder.Default + private Serdes bodySerde = Serdes.STRING; + + @Schema( + title = "The config properties to be passed to the Serializer.", + description = "Configs in key/value pairs." + ) + @Builder.Default + protected Map bodySerdeProperties = Collections.emptyMap(); + + private String namespace; + + private String eventHubName; + + private String customEndpointAddress; +} diff --git a/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/BlobContainerClientInterface.java b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/BlobContainerClientInterface.java new file mode 100644 index 0000000..cd3c40e --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/BlobContainerClientInterface.java @@ -0,0 +1,32 @@ +package io.kestra.plugin.azure.messaging.eventhubs; + +import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.plugin.azure.AzureClientInterface; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +public interface BlobContainerClientInterface extends AzureClientInterface { + + @Schema( + title = "The blob container name." + ) + @PluginProperty(dynamic = true) + String getContainerName(); + + + @SuperBuilder + @NoArgsConstructor + @Getter + class Default implements BlobContainerClientInterface { + + private String connectionString; + private String sharedKeyAccountName; + private String sharedKeyAccountAccessKey; + private String sasToken; + private String containerName; + } +} + diff --git a/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/Consume.java b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/Consume.java new file mode 100644 index 0000000..8c124c1 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/Consume.java @@ -0,0 +1,209 @@ +package io.kestra.plugin.azure.messaging.eventhubs; + +import com.azure.messaging.eventhubs.CheckpointStore; +import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore; +import com.azure.messaging.eventhubs.models.PartitionContext; +import com.azure.storage.blob.BlobContainerAsyncClient; +import io.kestra.core.models.annotations.Example; +import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.models.executions.metrics.Counter; +import io.kestra.core.models.tasks.Output; +import io.kestra.core.models.tasks.RunnableTask; +import io.kestra.core.runners.RunContext; +import io.kestra.core.serializers.FileSerde; +import io.kestra.plugin.azure.messaging.eventhubs.client.DefaultAzureClientFactory; +import io.kestra.plugin.azure.messaging.eventhubs.client.EventHubClientFactory; +import io.kestra.plugin.azure.messaging.eventhubs.config.BlobContainerClientConfig; +import io.kestra.plugin.azure.messaging.eventhubs.config.DynamicBlobContainerClientConfig; +import io.kestra.plugin.azure.messaging.eventhubs.config.DynamicEventHubConsumerConfig; +import io.kestra.plugin.azure.messaging.eventhubs.config.EventHubClientConfig; +import io.kestra.plugin.azure.messaging.eventhubs.config.EventHubConsumerConfig; +import io.kestra.plugin.azure.messaging.eventhubs.service.EventDataObjectConverter; +import io.kestra.plugin.azure.messaging.eventhubs.service.consumer.CheckpointStoreType; +import io.kestra.plugin.azure.messaging.eventhubs.service.consumer.EventHubConsumerService; +import io.kestra.plugin.azure.messaging.eventhubs.service.consumer.EventHubNamePartition; +import io.kestra.plugin.azure.messaging.eventhubs.model.EventDataObject; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; +import lombok.experimental.SuperBuilder; +import lombok.extern.slf4j.Slf4j; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.URI; +import java.time.Duration; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; + +/** + * The {@link RunnableTask} can be used for consuming batches of events from Azure Event Hubs. + */ +@Plugin(examples = { + @Example( + title = "Consume data events from Azure EventHubs.", + full = true, + code = { + """ + id: ConsumeDataEventsFromAzureEventHubs + namespace: company.team + - id: consumeFromEventHubs + type: io.kestra.plugin.azure.messaging.eventhubs.Consume + eventHubName: my-eventhub + connectionString:"{{ secrets.azure.eventhubs.connectionString }}" + bodySerde: STRING + consumerGroup: "$Default" + blobCheckpointStoreConfig: + containerName: kestra + connectionString: "{{ secrets.azure.blob.connectionString }}" + """ + } + ) +}) +@Schema( + title = "Consume events from Azure Event Hubs." +) +@Slf4j +@SuperBuilder +@ToString +@EqualsAndHashCode +public class Consume extends AbstractConsumeEventHubTask implements RunnableTask { + + // SERVICES + private final EventHubClientFactory clientFactory; + private final EventHubClientConfig.Factory clientConfigFactory; + + /** + * Empty constructor. Required by Kestra. + */ + Consume() { + this(new DefaultAzureClientFactory(), new DynamicEventHubConsumerConfig.Factory()); + } + + /** + * Creates a new {@link Produce} instance using the specified client factory. + * + * @param clientFactory The client factory. + */ + Consume(final EventHubClientFactory clientFactory, + final EventHubClientConfig.Factory clientConfigFactory) { + this.clientFactory = Objects.requireNonNull(clientFactory, "clientFactory should not be null"); + this.clientConfigFactory = Objects.requireNonNull(clientConfigFactory, "clientConfigFactory should not be null"); + } + + /** + * {@inheritDoc} + **/ + @Override + public Output run(RunContext runContext) throws Exception { + + final EventHubConsumerConfig config = clientConfigFactory.create(runContext, this); + + final EventHubConsumerService service = new EventHubConsumerService( + clientFactory, + config, + new EventDataObjectConverter(getBodySerde().create(getBodySerdeProperties())), + () -> getBlobCheckpointStore(runContext) + ); + + File tempFile = runContext.tempFile(".ion").toFile(); + try ( + BufferedOutputStream output = new BufferedOutputStream(new FileOutputStream(tempFile)) + ) { + + final AtomicReference uri = new AtomicReference<>(); + + Map result = service.poll( + getMaxBatchPartitionEvents(), + Duration.ofMillis(getMaxBatchPartitionWait()), + Duration.ofMillis(getMaxPollWait()), + new EventHubConsumerService.EventProcessorListener() { + @Override + public void onStart() { + log.info("Starting EventHub processor"); + } + + @Override + public void onEvent(EventDataObject event, PartitionContext context) { + try { + FileSerde.write(output, event); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void onStop() { + log.info("Stopping EventHub processor"); + try { + output.flush(); + output.close(); + uri.set(runContext.putTempFile(tempFile)); + log.info("Data copied on storage"); + } catch (IOException e) { + throw new RuntimeException(e); + } + + } + }); + + int numEvents = result.entrySet().stream() + .peek(entry -> { + Counter counter = Counter.of( + "records", + entry.getValue(), + "eventHubName", + entry.getKey().eventHubName(), + "partitionId", + entry.getKey().partitionId() + ); + runContext.metric(counter); + }) + .map(Map.Entry::getValue) + .reduce(Integer::sum) + .orElse(0); + + return new Output(numEvents, uri.get()); + } + } + + private CheckpointStore getBlobCheckpointStore(final RunContext runContext) { + CheckpointStoreType storeType = getCheckpointStoreType(); + return switch (storeType) { + case BLOB -> { + DefaultAzureClientFactory factory = new DefaultAzureClientFactory(); + BlobContainerClientInterface.Default config = BlobContainerClientInterface.Default.builder() + .containerName(getCheckpointStoreConfig().get("containerName")) + .connectionString(getCheckpointStoreConfig().get("connectionString")) + .sharedKeyAccountAccessKey(getCheckpointStoreConfig().get("sharedKeyAccountAccessKey")) + .sharedKeyAccountName(getCheckpointStoreConfig().get("sharedKeyAccountName")) + .build(); + BlobContainerAsyncClient client = factory.createBlobContainerAsyncClient( + new DynamicBlobContainerClientConfig.Factory().create( + runContext, + config + )); + yield new BlobCheckpointStore(client); + } + }; + } + + @Builder + @Getter + public static class Output implements io.kestra.core.models.tasks.Output { + @Schema( + title = "Number of events consumed from Azure Event Hubs." + ) + private final Integer eventsCount; + + @Schema( + title = "URI of a kestra internal storage file containing the messages." + ) + private URI uri; + } +} diff --git a/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/EventHubClientInterface.java b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/EventHubClientInterface.java new file mode 100644 index 0000000..393d394 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/EventHubClientInterface.java @@ -0,0 +1,31 @@ +package io.kestra.plugin.azure.messaging.eventhubs; + +import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.plugin.azure.AzureClientInterface; +import io.swagger.v3.oas.annotations.media.Schema; + +import javax.validation.constraints.NotNull; + +public interface EventHubClientInterface extends AzureClientInterface { + + @Schema( + title = "Custom endpoint address when connecting to the Event Hubs service." + ) + @PluginProperty(dynamic = true) + String getCustomEndpointAddress(); + + @NotNull + @Schema( + title = "Namespace name of the event hub to connect to." + ) + @PluginProperty(dynamic = true) + String getNamespace(); + + @NotNull + @Schema( + title = "The event hub to read from." + ) + @PluginProperty(dynamic = true) + String getEventHubName(); +} + diff --git a/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/Produce.java b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/Produce.java new file mode 100644 index 0000000..54dbe08 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/Produce.java @@ -0,0 +1,204 @@ +package io.kestra.plugin.azure.messaging.eventhubs; + +import com.azure.messaging.eventhubs.models.CreateBatchOptions; +import io.kestra.core.models.annotations.Example; +import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.models.executions.metrics.Counter; +import io.kestra.core.models.tasks.RunnableTask; +import io.kestra.core.runners.RunContext; +import io.kestra.plugin.azure.messaging.eventhubs.client.DefaultAzureClientFactory; +import io.kestra.plugin.azure.messaging.eventhubs.client.EventHubClientFactory; +import io.kestra.plugin.azure.messaging.eventhubs.config.DynamicEventHubClientConfig; +import io.kestra.plugin.azure.messaging.eventhubs.config.EventHubClientConfig; +import io.kestra.plugin.azure.messaging.eventhubs.internal.InputDataReader; +import io.kestra.plugin.azure.messaging.eventhubs.service.EventDataObjectConverter; +import io.kestra.plugin.azure.messaging.eventhubs.service.producer.EventDataBatchFactory; +import io.kestra.plugin.azure.messaging.eventhubs.service.producer.EventHubProducerService; +import io.kestra.plugin.azure.messaging.eventhubs.service.producer.ProducerOptions; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; +import lombok.experimental.SuperBuilder; +import lombok.extern.slf4j.Slf4j; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +/** + * The {@link RunnableTask} can be used for producing batches of events to Azure Event Hubs. + */ +@Plugin(examples = { + @Example( + title = "Publish a file as events into Azure EventHubs.", + full = true, + code = { + """ + id: SendEventsIntoAzureEventHubs + namespace: company.team + inputs: + - type: FILE + name: file + description: a CSV file with columns id, username, tweet, and timestamp + tasks: + - id: readCsvFile + type: io.kestra.plugin.serdes.csv.CsvReader + from: "{{ inputs.file }}" + - id: transformRowToJson + type: io.kestra.plugin.scripts.nashorn.FileTransform + from: "{{ outputs.readCsvFile.uri }}" + script: | + var result = { + "body": { + "username": row.username, + "tweet": row.tweet + } + }; + row = result + - id: sendToEventHubs + type: io.kestra.plugin.azure.messaging.eventhubs.Produce + from: "{{ outputs.rowToJson.uri }}" + bodySerde: "JSON" + eventHubName: from-kestra + connectionString: "{{ secrets.azure.eventhubs.connectionString }}" + maxSizeInBytes: 4096 + maxEventsPerBatch: 100 + bodyContentType: application/json + eventProperties: + source: kestra + """ + } + ) +}) +@Schema( + title = "Publish events to Azure Event Hubs." +) +@Slf4j +@SuperBuilder +@ToString +@EqualsAndHashCode +public class Produce extends AbstractProduceEventHubTask implements RunnableTask { + + // TASK'S METRICS + private static final String METRIC_SENT_EVENTS_NAME = "total-sent-events"; + private static final String METRIC_SENT_BATCHES_NAME = "total-sent-batches"; + + // SERVICES + private final EventHubClientFactory clientFactory; + private final EventHubClientConfig.Factory clientConfigFactory; + + /** + * Empty constructor. Required by Kestra. + */ + Produce() { + this(new DefaultAzureClientFactory(), new DynamicEventHubClientConfig.Factory<>()); + } + + /** + * Creates a new {@link Produce} instance using the specified client factory. + * + * @param clientFactory The client factory. + */ + Produce(final EventHubClientFactory clientFactory, + final EventHubClientConfig.Factory clientConfigFactory) { + this.clientFactory = Objects.requireNonNull(clientFactory, "clientFactory should not be null"); + this.clientConfigFactory = Objects.requireNonNull(clientConfigFactory, "clientConfigFactory should not be null"); + } + + /** + * {@inheritDoc} + **/ + @Override + @SuppressWarnings("unchecked") + public Output run(RunContext runContext) throws Exception { + final InputDataReader reader = new InputDataReader(runContext); + + InputStream is = null; + if (this.getFrom() instanceof String data) { + is = reader.read(data); + } + + if (this.getFrom() instanceof Map data) { + is = reader.read(data); + } + + if (this.getFrom() instanceof List data) { + is = reader.read(data); + } + + return Optional.ofNullable(is) + .map(stream -> send(runContext, stream)) + .orElseThrow(() -> new IllegalArgumentException( + "Unsupported type for task-property `from`: " + this.getFrom().getClass().getSimpleName() + )); + } + + private Output send(final RunContext runContext, final InputStream stream) { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream))) { + // Sends + ProducerOptions options = new ProducerOptions( + getBodyContentType(), + getEventProperties(), + getMaxEventsPerBatch() + ); + EventHubProducerService.Result result = newSender(runContext).sendEvents( + reader, + options + ); + + // metrics + runContext.metric(Counter.of(METRIC_SENT_EVENTS_NAME, result.totalSentEvents())); + runContext.metric(Counter.of(METRIC_SENT_BATCHES_NAME, result.totalSentBatches())); + + return new Output( + result.totalSentEvents(), + result.totalSentBatches() + ); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private EventHubProducerService newSender(final RunContext runContext) { + return new EventHubProducerService( + clientFactory, + clientConfigFactory.create(runContext, this), + new EventDataObjectConverter(getBodySerde().create(getBodySerdeProperties())), + new EventDataBatchFactory.Default(getCreateBatchOptions()) + ); + } + + private CreateBatchOptions getCreateBatchOptions() { + CreateBatchOptions options = new CreateBatchOptions(); + if (getMaxBatchSizeInBytes() != null) { + options.setMaximumSizeInBytes(getMaxBatchSizeInBytes()); + } + + if (getPartitionKey() != null) { + options.setPartitionKey(getPartitionKey()); + } + return options; + } + + @AllArgsConstructor + @Getter + public static final class Output implements io.kestra.core.models.tasks.Output { + @Schema( + title = "Total number of events processed by the task." + ) + private final Integer eventsCount; + + @Schema( + title = "Total number of batches sent to an Azure EventHubs." + ) + private final Integer sendBatchesCount; + + } +} \ No newline at end of file diff --git a/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/client/AzureAbstractClientFactory.java b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/client/AzureAbstractClientFactory.java new file mode 100644 index 0000000..f46059b --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/client/AzureAbstractClientFactory.java @@ -0,0 +1,4 @@ +package io.kestra.plugin.azure.messaging.eventhubs.client; + +public class AzureAbstractClientFactory { +} diff --git a/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/client/BlobContainerFactory.java b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/client/BlobContainerFactory.java new file mode 100644 index 0000000..d633ade --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/client/BlobContainerFactory.java @@ -0,0 +1,9 @@ +package io.kestra.plugin.azure.messaging.eventhubs.client; + +import com.azure.storage.blob.BlobContainerAsyncClient; +import io.kestra.plugin.azure.messaging.eventhubs.config.BlobContainerClientConfig; +import io.kestra.plugin.azure.messaging.eventhubs.config.EventHubClientConfig; + +public interface BlobContainerFactory { + BlobContainerAsyncClient createBlobContainerAsyncClient(BlobContainerClientConfig config); +} diff --git a/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/client/DefaultAzureClientFactory.java b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/client/DefaultAzureClientFactory.java new file mode 100644 index 0000000..1512292 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/client/DefaultAzureClientFactory.java @@ -0,0 +1,166 @@ +package io.kestra.plugin.azure.messaging.eventhubs.client; + +import com.azure.core.credential.AzureNamedKeyCredential; +import com.azure.core.credential.AzureSasCredential; +import com.azure.identity.DefaultAzureCredentialBuilder; +import com.azure.messaging.eventhubs.EventHubClientBuilder; +import com.azure.messaging.eventhubs.EventHubProducerAsyncClient; +import com.azure.messaging.eventhubs.EventProcessorClientBuilder; +import com.azure.storage.blob.BlobContainerAsyncClient; +import com.azure.storage.blob.BlobContainerClientBuilder; +import io.kestra.plugin.azure.client.AzureClientConfig; +import io.kestra.plugin.azure.messaging.eventhubs.config.BlobContainerClientConfig; +import io.kestra.plugin.azure.messaging.eventhubs.config.EventHubClientConfig; +import lombok.extern.slf4j.Slf4j; + +import java.util.Objects; +import java.util.Optional; + +/** + * Factory to create new EventHub clients. + */ +@Slf4j +public final class DefaultAzureClientFactory implements EventHubClientFactory, BlobContainerFactory { + + /** + * {@inheritDoc} + **/ + @Override + public EventHubProducerAsyncClient createAsyncProducerClient(final EventHubClientConfig config) { + + Objects.requireNonNull(config, "config should not be null"); + return createBuilder(config).buildAsyncProducerClient(); + } + + + private Optional fullyQualifiedNamespace(final EventHubClientConfig config) { + return config.namespace().map(ns -> ns + ".servicebus.windows.net"); + } + + /** + * {@inheritDoc} + **/ + @Override + public EventProcessorClientBuilder createEventProcessorClientBuilder(final EventHubClientConfig config) { + Objects.requireNonNull(config, "config should not be null"); + EventProcessorClientBuilder builder = new EventProcessorClientBuilder() + .eventHubName(config.eventHubName()) + .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME); + + config.customEndpointAddress().ifPresent(builder::customEndpointAddress); + fullyQualifiedNamespace(config).ifPresent(builder::fullyQualifiedNamespace); + + Optional connectionString = connectionString(config, "EventHubClient"); + if (connectionString.isPresent()) { + return builder.connectionString(connectionString.get()); + } + + Optional azureNamedKeyCredential = namedKeyCredential(config, "EventHubClient"); + if (azureNamedKeyCredential.isPresent()) { + return builder.credential(azureNamedKeyCredential.get()); + } + + Optional azureSasCredential = sasCredential(config, "EventHubClient"); + if (azureSasCredential.isPresent()) { + return builder.credential(azureSasCredential.get()); + } + + return builder.credential(new DefaultAzureCredentialBuilder().build()); + } + + /** + * Factory method for constructing a new {@link EventHubClientBuilder} for the given config. + * + * @param config The configuration. + * @return a new {@link EventHubClientBuilder} object. + */ + private EventHubClientBuilder createBuilder(EventHubClientConfig config) { + + EventHubClientBuilder builder = new EventHubClientBuilder() + .eventHubName(config.eventHubName()); + + config.customEndpointAddress().ifPresent(builder::customEndpointAddress); + fullyQualifiedNamespace(config).ifPresent(builder::fullyQualifiedNamespace); + + Optional connectionString = connectionString(config, "EventHubClient"); + if (connectionString.isPresent()) { + return builder.connectionString(connectionString.get()); + } + + Optional azureNamedKeyCredential = namedKeyCredential(config, "EventHubClient"); + if (azureNamedKeyCredential.isPresent()) { + return builder.credential(azureNamedKeyCredential.get()); + } + + Optional azureSasCredential = sasCredential(config, "EventHubClient"); + if (azureSasCredential.isPresent()) { + return builder.credential(azureSasCredential.get()); + } + + return builder.credential(new DefaultAzureCredentialBuilder().build()); + } + + /** + * {@inheritDoc} + **/ + @Override + public BlobContainerAsyncClient createBlobContainerAsyncClient(BlobContainerClientConfig config) { + BlobContainerClientBuilder builder = new BlobContainerClientBuilder() + .containerName(config.containerName()); + + Optional connectionString = connectionString(config, "BlobContainerClient"); + if (connectionString.isPresent()) { + return builder.connectionString(connectionString.get()) + .buildAsyncClient(); + } + + Optional azureNamedKeyCredential = namedKeyCredential(config, "BlobContainerClient"); + if (azureNamedKeyCredential.isPresent()) { + return builder.credential(azureNamedKeyCredential.get()) + .buildAsyncClient(); + } + + Optional azureSasCredential = sasCredential(config, "BlobContainerClient"); + if (azureSasCredential.isPresent()) { + return builder.credential(azureSasCredential.get()) + .buildAsyncClient(); + } + + return builder.credential(new DefaultAzureCredentialBuilder().build()).buildAsyncClient(); + } + + private Optional connectionString(final AzureClientConfig config, + final String clientName) { + Optional optionalConnectionString = config.connectionString(); + if (optionalConnectionString.isPresent()) { + log.debug("Creating new {} using the `connectionString` that was passed " + + "through the task's configuration", clientName); + } + return optionalConnectionString; + } + + private Optional sasCredential(final AzureClientConfig config, + final String clientName) { + if (config.sasToken().isPresent()) { + log.debug("Creating new {} using the `sasToken`" + + " that was passed through the task's configuration", clientName); + AzureSasCredential credential = new AzureSasCredential(config.sasToken().get()); + return Optional.of(credential); + } + return Optional.empty(); + } + + private Optional namedKeyCredential(final AzureClientConfig config, + final String clientName) { + if (config.sharedKeyAccountAccessKey().isPresent() && config.sharedKeyAccountName().isPresent()) { + log.debug("Creating new {} using the `sharedKeyAccountName` and `sharedKeyAccountAccessKey`" + + " that was passed through the task's configuration", clientName); + AzureNamedKeyCredential credential = new AzureNamedKeyCredential( + config.sharedKeyAccountName().get(), + config.sharedKeyAccountAccessKey().get() + ); + return Optional.of(credential); + } + return Optional.empty(); + } +} diff --git a/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/client/EventHubClientFactory.java b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/client/EventHubClientFactory.java new file mode 100644 index 0000000..191c25d --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/client/EventHubClientFactory.java @@ -0,0 +1,28 @@ +package io.kestra.plugin.azure.messaging.eventhubs.client; + +import com.azure.messaging.eventhubs.EventHubClientBuilder; +import com.azure.messaging.eventhubs.EventHubProducerAsyncClient; +import com.azure.messaging.eventhubs.EventProcessorClientBuilder; +import io.kestra.plugin.azure.messaging.eventhubs.config.EventHubClientConfig; + +/** + * Service interface for constructing EventHubs clients. + */ +public interface EventHubClientFactory { + + /** + * Factory method for constructing a new {@link EventHubClientBuilder} for the given config. + * + * @param config The configuration. Cannot be {@code null}. + * @return a new {@link EventHubClientBuilder} object. + */ + EventHubProducerAsyncClient createAsyncProducerClient(EventHubClientConfig config); + + /** + * Factory method for constructing a new {@link EventProcessorClientBuilder} for the given config. + * + * @param config The configuration. Cannot be {@code null}. + * @return a new {@link EventProcessorClientBuilder} object. + */ + EventProcessorClientBuilder createEventProcessorClientBuilder(EventHubClientConfig config); +} diff --git a/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/config/BlobContainerClientConfig.java b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/config/BlobContainerClientConfig.java new file mode 100644 index 0000000..95edd18 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/config/BlobContainerClientConfig.java @@ -0,0 +1,44 @@ +package io.kestra.plugin.azure.messaging.eventhubs.config; + +import io.kestra.core.runners.RunContext; +import io.kestra.plugin.azure.client.AzureClientConfig; +import io.kestra.plugin.azure.messaging.eventhubs.BlobContainerClientInterface; + +import java.util.Optional; + +/** + * Configuration for creating a new Azure EventHub Client. + */ +public interface BlobContainerClientConfig extends AzureClientConfig { + + String containerName(); + + /** + * Default EventHubClientConfig + **/ + record Default( + String containerName, + Optional connectionString, + Optional namespace, + Optional sharedKeyAccountName, + Optional sharedKeyAccountAccessKey, + Optional sasToken, + Optional customEndpointAddress + ) implements BlobContainerClientConfig { + } + + /** + * Service interface for constructing new {@link BlobContainerClientConfig} objects. + */ + interface Factory { + + /** + * Creates a new {@link BlobContainerClientConfig} instance for the given context and plugins. + * + * @param runContext The context. Cannot be null. + * @param plugin The plugin. Cannot be null. + * @return a new {@link BlobContainerClientConfig} instance. + */ + O create(RunContext runContext, T plugin); + } +} diff --git a/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/config/DynamicBlobContainerClientConfig.java b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/config/DynamicBlobContainerClientConfig.java new file mode 100644 index 0000000..5beb039 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/config/DynamicBlobContainerClientConfig.java @@ -0,0 +1,42 @@ +package io.kestra.plugin.azure.messaging.eventhubs.config; + +import io.kestra.core.runners.RunContext; +import io.kestra.plugin.azure.client.DynamicAzureClientConfig; +import io.kestra.plugin.azure.messaging.eventhubs.BlobContainerClientInterface; + +/** + * Configuration that uses the {@link RunContext} to render configuration. + */ +public class DynamicBlobContainerClientConfig + extends DynamicAzureClientConfig + implements BlobContainerClientConfig { + + public static class Factory implements BlobContainerClientConfig.Factory { + /** + * {@inheritDoc} + */ + @Override + public BlobContainerClientConfig create(RunContext runContext, BlobContainerClientInterface plugin) { + return new DynamicBlobContainerClientConfig(runContext, plugin); + } + } + + /** + * Creates a new {@link DynamicBlobContainerClientConfig} instance. + * + * @param runContext The context. Cannot be null. + * @param plugin The plugin. Cannot be null. + */ + public DynamicBlobContainerClientConfig(final RunContext runContext, + final BlobContainerClientInterface plugin) { + super(runContext, plugin); + } + + /** + * {@inheritDoc} + **/ + @Override + public String containerName() { + return renderWithRunContext(plugin.getContainerName()); + } +} diff --git a/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/config/DynamicEventHubClientConfig.java b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/config/DynamicEventHubClientConfig.java new file mode 100644 index 0000000..63f1000 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/config/DynamicEventHubClientConfig.java @@ -0,0 +1,50 @@ +package io.kestra.plugin.azure.messaging.eventhubs.config; + +import io.kestra.core.runners.RunContext; +import io.kestra.plugin.azure.client.DynamicAzureClientConfig; +import io.kestra.plugin.azure.messaging.eventhubs.EventHubClientInterface; + +import java.util.Optional; + +/** + * Configuration that uses the {@link RunContext} to render configuration. + */ +public class DynamicEventHubClientConfig + extends DynamicAzureClientConfig + implements EventHubClientConfig { + + public static class Factory implements EventHubClientConfig.Factory { + /** + * {@inheritDoc} + */ + @Override + public EventHubClientConfig create(RunContext runContext, T plugin) { + return new DynamicEventHubClientConfig(runContext, plugin); + } + } + + /** + * Creates a new {@link DynamicEventHubClientConfig} instance. + * + * @param runContext The context. Cannot be null. + * @param plugin The plugin. Cannot be null. + */ + public DynamicEventHubClientConfig(final RunContext runContext, + final T plugin) { + super(runContext, plugin); + } + + public String eventHubName() { + return renderWithRunContext(plugin.getEventHubName()); + } + + @Override + public Optional namespace() { + return getOptionalConfig(plugin::getNamespace); + } + + @Override + public Optional customEndpointAddress() { + return getOptionalConfig(plugin::getCustomEndpointAddress); + } +} diff --git a/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/config/DynamicEventHubConsumerConfig.java b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/config/DynamicEventHubConsumerConfig.java new file mode 100644 index 0000000..13af16b --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/config/DynamicEventHubConsumerConfig.java @@ -0,0 +1,56 @@ +package io.kestra.plugin.azure.messaging.eventhubs.config; + +import com.azure.messaging.eventhubs.models.EventPosition; +import io.kestra.core.runners.RunContext; +import io.kestra.plugin.azure.messaging.eventhubs.AbstractConsumeEventHubTask; +import io.kestra.plugin.azure.messaging.eventhubs.service.consumer.EventPositionStrategy; +import io.kestra.plugin.azure.messaging.eventhubs.service.consumer.StartingPosition; + +/** + * Configuration that uses the {@link RunContext} to render configuration. + */ +public final class DynamicEventHubConsumerConfig extends DynamicEventHubClientConfig implements EventHubConsumerConfig { + + public static class Factory implements EventHubClientConfig.Factory { + /** + * {@inheritDoc} + */ + @Override + public EventHubConsumerConfig create(RunContext runContext, + AbstractConsumeEventHubTask plugin) { + return new DynamicEventHubConsumerConfig(runContext, plugin); + } + } + + /** + * Creates a new {@link DynamicEventHubConsumerConfig} instance. + * + * @param runContext The context. Cannot be null. + * @param plugin The plugin. Cannot be null. + */ + public DynamicEventHubConsumerConfig(final RunContext runContext, + final AbstractConsumeEventHubTask plugin) { + super(runContext, plugin); + } + + /** + * {@inheritDoc} + **/ + @Override + public String consumerGroup() { + return renderWithRunContext(plugin.getConsumerGroup()); + } + + /** + * {@inheritDoc} + **/ + @Override + public EventPosition partitionStartingPosition() { + StartingPosition partitionStartingPosition = plugin.getPartitionStartingPosition(); + return switch (partitionStartingPosition) { + case EARLIEST -> new EventPositionStrategy.Earliest().get(); + case LATEST -> new EventPositionStrategy.Latest().get(); + case INSTANT -> null; + }; + } +} diff --git a/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/config/EventHubClientConfig.java b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/config/EventHubClientConfig.java new file mode 100644 index 0000000..a5e102e --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/config/EventHubClientConfig.java @@ -0,0 +1,48 @@ +package io.kestra.plugin.azure.messaging.eventhubs.config; + +import io.kestra.core.runners.RunContext; +import io.kestra.plugin.azure.client.AzureClientConfig; +import io.kestra.plugin.azure.messaging.eventhubs.EventHubClientInterface; + +import java.util.Optional; + +/** + * Configuration for creating a new Azure EventHub Client. + */ +public interface EventHubClientConfig extends AzureClientConfig { + + Optional namespace(); + + Optional customEndpointAddress(); + + String eventHubName(); + + /** + * Default EventHubClientConfig + **/ + record Default( + String eventHubName, + Optional connectionString, + Optional namespace, + Optional sharedKeyAccountName, + Optional sharedKeyAccountAccessKey, + Optional sasToken, + Optional customEndpointAddress + ) implements EventHubClientConfig { + } + + /** + * Service interface for constructing new {@link EventHubClientConfig} objects. + */ + interface Factory { + + /** + * Creates a new {@link EventHubClientConfig} instance for the given context and plugins. + * + * @param runContext The context. Cannot be null. + * @param plugin The plugin. Cannot be null. + * @return a new {@link EventHubClientConfig} instance. + */ + O create(RunContext runContext, T plugin); + } +} diff --git a/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/config/EventHubConsumerConfig.java b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/config/EventHubConsumerConfig.java new file mode 100644 index 0000000..67a2a17 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/config/EventHubConsumerConfig.java @@ -0,0 +1,12 @@ +package io.kestra.plugin.azure.messaging.eventhubs.config; + +import com.azure.messaging.eventhubs.models.EventPosition; + +public interface EventHubConsumerConfig extends EventHubClientConfig { + + String consumerGroup(); + + EventPosition partitionStartingPosition(); + + +} diff --git a/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/internal/InputDataReader.java b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/internal/InputDataReader.java new file mode 100644 index 0000000..0a84e1a --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/internal/InputDataReader.java @@ -0,0 +1,47 @@ +package io.kestra.plugin.azure.messaging.eventhubs.internal; + +import io.kestra.core.runners.RunContext; +import io.kestra.core.serializers.FileSerde; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.List; +import java.util.Map; + +/** + * Class for reading input data as ION InputStream. + */ +public final class InputDataReader { + + private final RunContext context; + + public InputDataReader(final RunContext context) { + this.context = context; + } + + public InputStream read(final String path) { + try { + URI from = new URI(context.render(path)); + return context.uriToInputStream(from); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public InputStream read(final List objects) throws IOException { + final ByteArrayOutputStream os = new ByteArrayOutputStream(); + try (os) { + for (Object o : objects) { + FileSerde.write(os, o); + } + return new ByteArrayInputStream(os.toByteArray()); + } + } + + public InputStream read(final Map object) throws IOException { + return read(List.of(object)); + } +} diff --git a/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/model/EventDataObject.java b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/model/EventDataObject.java new file mode 100644 index 0000000..d8badf2 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/model/EventDataObject.java @@ -0,0 +1,67 @@ +package io.kestra.plugin.azure.messaging.eventhubs.model; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; + +import javax.validation.constraints.NotNull; +import java.beans.ConstructorProperties; +import java.util.Collections; +import java.util.Map; + +/** + * A serializable entity class representing an Event Data + * to be published or consumed from Azure Event Hubs. + * + * @param partitionKey the event data partitionKey. + * @param body the event data body. + * @param contentType the event data content-type. + * @param correlationId The event correlation ID. + * @param messageId the event message ID. + * @param properties the event properties. + * @see com.azure.messaging.eventhubs.EventData + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public record EventDataObject( + @JsonProperty("partitionKey") String partitionKey, + @NotNull @JsonProperty("body") Object body, + @JsonProperty("contentType") String contentType, + @JsonProperty("correlationId") String correlationId, + @JsonProperty("messageId") String messageId, + @JsonProperty("enqueuedTimestamp") Long enqueuedTimestamp, + @JsonProperty("offset") Long offset, + @JsonProperty("sequenceNumber") Long sequenceNumber, + @JsonProperty("properties") Map properties) { + + @ConstructorProperties({ + "partitionKey", + "body", + "contentType", + "correlationId", + "messageId", + "enqueuedTimestamp", + "offset", + "sequenceNumber", + "properties" + + }) + public EventDataObject { + } + + public EventDataObject(@NotNull final String body) { + this(null, body); + } + + public EventDataObject(final String partitionKey, @NotNull final String body) { + this( + partitionKey, + body, + null, + null, + null, + null, + null, + null, + Collections.emptyMap() + ); + } +} diff --git a/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/package-info.java b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/package-info.java new file mode 100644 index 0000000..43c40f5 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/package-info.java @@ -0,0 +1,7 @@ +@PluginSubGroup( + description = "This sub-group of plugins contains tasks for using Azure Event Hubs.", + categories = { PluginSubGroup.PluginCategory.CLOUD, PluginSubGroup.PluginCategory.MESSAGING } +) +package io.kestra.plugin.azure.messaging.eventhubs; + +import io.kestra.core.models.annotations.PluginSubGroup; \ No newline at end of file diff --git a/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/serdes/JsonSerde.java b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/serdes/JsonSerde.java new file mode 100644 index 0000000..1708fcd --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/serdes/JsonSerde.java @@ -0,0 +1,49 @@ +package io.kestra.plugin.azure.messaging.eventhubs.serdes; + +import com.fasterxml.jackson.core.json.JsonReadFeature; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.json.JsonMapper; + +/** + * String serializer/deserializer. + */ +public final class JsonSerde implements Serde { + + private final ObjectMapper objectMapper; + + public JsonSerde() { + this.objectMapper = JsonMapper.builder() + .enable(JsonReadFeature.ALLOW_NON_NUMERIC_NUMBERS) + .build(); + } + + ObjectMapper objectMapper() { + return objectMapper; + } + + /** {@inheritDoc} **/ + @Override + public byte[] serialize(Object data) { + if (data == null) { + return null; + } + + try { + return objectMapper.writeValueAsBytes(data); + } catch (Exception e) { + throw new RuntimeException("Error serializing JSON message", e); + } + } + + /** {@inheritDoc} **/ + @Override + public JsonNode deserialize(byte[] data) { + if (data == null) return null; + try { + return objectMapper.readTree(data); + } catch (Exception e) { + throw new RuntimeException("Error deserializing JSON message", e); + } + } +} diff --git a/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/serdes/Serde.java b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/serdes/Serde.java new file mode 100644 index 0000000..2017ad8 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/serdes/Serde.java @@ -0,0 +1,33 @@ +package io.kestra.plugin.azure.messaging.eventhubs.serdes; + +import java.util.Map; + +/** + * Service interface for serializing/deserializing a data vent body. + */ +public interface Serde { + + /** + * Configures this class. + * + * @param configs configs in key/value pairs. + */ + default void configure(Map configs) { + } + + /** + * Serializes the given object into bytes. + * + * @param data The data to be serialized. Can be {@code null}. + * @return the serialized object. + */ + byte[] serialize(Object data); + + /** + * Deserialize the given bytes array into object. + * + * @param data The data to be deserialized. Can be {@code null}. + * @return the deserialized object. + */ + Object deserialize(byte[] data); +} diff --git a/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/serdes/Serdes.java b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/serdes/Serdes.java new file mode 100644 index 0000000..86ba16a --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/serdes/Serdes.java @@ -0,0 +1,37 @@ +package io.kestra.plugin.azure.messaging.eventhubs.serdes; + +import java.util.Map; +import java.util.function.Supplier; + +/** + * Serde factory. + */ +public enum Serdes { + + STRING(StringSerde::new), + + JSON(JsonSerde::new); + + private final Supplier supplier; + + /** + * Creates a new {@link Serdes} instance. + * + * @param supplier the serde supplier. + */ + Serdes(Supplier supplier) { + this.supplier = supplier; + } + + /** + * Creates a new {@link Serde} instance configured the given configs. + * + * @param config configs in key/value pairs. + * @return a new {@link Serde} + */ + public Serde create(Map config) { + Serde serde = supplier.get(); + serde.configure(config); + return serde; + } +} diff --git a/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/serdes/StringSerde.java b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/serdes/StringSerde.java new file mode 100644 index 0000000..4eabfd1 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/serdes/StringSerde.java @@ -0,0 +1,47 @@ +package io.kestra.plugin.azure.messaging.eventhubs.serdes; + +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +/** + * String serializer/deserializer. + */ +public final class StringSerde implements Serde { + + public static final String SERIALIZER_ENCODING_CONFIG_NAME = "serializer.encoding"; + private Charset encoding; + + public StringSerde() { + this(StandardCharsets.UTF_8); + } + + public StringSerde(final Charset encoding) { + this.encoding = Objects.requireNonNull(encoding, "encoding cannot be null"); + } + + /** {@inheritDoc} **/ + @Override + public void configure(Map configs) { + encoding = Optional.ofNullable(configs.get(SERIALIZER_ENCODING_CONFIG_NAME)) + .map(Object::toString) + .map(Charset::forName) + .orElse(StandardCharsets.UTF_8); + } + + /** {@inheritDoc} **/ + @Override + public byte[] serialize(Object data) { + if (data == null) return new byte[0]; + return data.toString().getBytes(encoding); + } + + /** {@inheritDoc} **/ + @Override + public String deserialize(byte[] data) { + if (data == null) return null; + return new String(data, encoding); + } +} diff --git a/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/service/EventDataObjectConverter.java b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/service/EventDataObjectConverter.java new file mode 100644 index 0000000..025d63b --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/service/EventDataObjectConverter.java @@ -0,0 +1,74 @@ +package io.kestra.plugin.azure.messaging.eventhubs.service; + +import com.azure.messaging.eventhubs.EventData; +import io.kestra.plugin.azure.messaging.eventhubs.model.EventDataObject; +import io.kestra.plugin.azure.messaging.eventhubs.serdes.Serde; + +import java.time.Instant; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * Converts {@link EventData} into {@link EventDataObject} and vice versa. + */ +public final class EventDataObjectConverter { + + private final Serde serde; + + public EventDataObjectConverter(final Serde serde) { + this.serde = Objects.requireNonNull(serde, "serde cannot be null"); + } + + /** + * Converts the given list of {@link EventData} into a new list of {@link EventDataObject}. + * + * @param data The list of {@link Collectors} + * @return the new list of {@link EventDataObject}, or {@code null} if the given data is null. + */ + public List convertFromEventData(final List data) { + return data.stream().map(this::convertFromEventData).collect(Collectors.toList()); + } + + /** + * Converts the given {@link EventDataObject} into a new {@link EventData}. + * + * @param data The {@link EventDataObject}. + * @return the new {@link EventData}, or {@code null} if the given data is null. + */ + public EventData convertToEventData(final EventDataObject data) { + if (data == null) return null; + + byte[] value = serde.serialize(data.body()); + + final EventData event = new EventData(value); + Optional.ofNullable(data.contentType()).ifPresent(event::setContentType); + Optional.ofNullable(data.correlationId()).ifPresent(event::setCorrelationId); + Optional.ofNullable(data.messageId()).ifPresent(event::setMessageId); + Optional.ofNullable(data.properties()).ifPresent(props -> event.getProperties().putAll(data.properties())); + return event; + } + + /** + * Converts the given {@link EventDataObject} into a new {@link EventData}. + * + * @param data The {@link EventDataObject} + * @return the new {@link EventData}, or {@code null} if the given data is null. + */ + public EventDataObject convertFromEventData(final EventData data) { + if (data == null) return null; + + return new EventDataObject( + data.getPartitionKey(), + serde.deserialize(data.getBody()), + data.getContentType(), + data.getCorrelationId(), + data.getMessageId(), + Optional.ofNullable(data.getEnqueuedTime()).map(Instant::toEpochMilli).orElse(null), + data.getOffset(), + data.getSequenceNumber(), + data.getProperties() + ); + } +} diff --git a/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/service/consumer/CheckpointStoreType.java b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/service/consumer/CheckpointStoreType.java new file mode 100644 index 0000000..cdbfeaa --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/service/consumer/CheckpointStoreType.java @@ -0,0 +1,8 @@ +package io.kestra.plugin.azure.messaging.eventhubs.service.consumer; + +/** + * Supported checkpoint type. + */ +public enum CheckpointStoreType { + BLOB; +} diff --git a/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/service/consumer/EventHubConsumerService.java b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/service/consumer/EventHubConsumerService.java new file mode 100644 index 0000000..e600a5d --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/service/consumer/EventHubConsumerService.java @@ -0,0 +1,219 @@ +package io.kestra.plugin.azure.messaging.eventhubs.service.consumer; + +import com.azure.messaging.eventhubs.CheckpointStore; +import com.azure.messaging.eventhubs.EventData; +import com.azure.messaging.eventhubs.EventProcessorClient; +import com.azure.messaging.eventhubs.models.Checkpoint; +import com.azure.messaging.eventhubs.models.EventBatchContext; +import com.azure.messaging.eventhubs.models.EventPosition; +import com.azure.messaging.eventhubs.models.PartitionContext; +import io.kestra.plugin.azure.messaging.eventhubs.client.EventHubClientFactory; +import io.kestra.plugin.azure.messaging.eventhubs.config.BlobContainerClientConfig; +import io.kestra.plugin.azure.messaging.eventhubs.config.EventHubClientConfig; +import io.kestra.plugin.azure.messaging.eventhubs.config.EventHubConsumerConfig; +import io.kestra.plugin.azure.messaging.eventhubs.model.EventDataObject; +import io.kestra.plugin.azure.messaging.eventhubs.service.EventDataObjectConverter; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.Duration; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +@Slf4j +public final class EventHubConsumerService { + + private final EventHubClientFactory clientFactory; + private final EventHubConsumerConfig config; + private final EventDataObjectConverter converter; + private final Supplier checkpointStoreSupplier; + + /** + * Creates a new {@link EventHubConsumerService} instance. + * + * @param clientFactory The {@link EventHubClientFactory} - Cannot be {@code null}. + * @param consumerConfig The {@link EventHubClientConfig} - Cannot be {@code null}. + * @param consumerConfig The {@link BlobContainerClientConfig} - Cannot be {@code null}. + * @param converter The {@link EventDataObjectConverter} to be used for converting entities to event data. + */ + public EventHubConsumerService(final EventHubClientFactory clientFactory, + final EventHubConsumerConfig consumerConfig, + final EventDataObjectConverter converter, + final Supplier checkpointStoreSupplier) { + this.clientFactory = Objects.requireNonNull(clientFactory, "clientFactory cannot be null"); + this.config = Objects.requireNonNull(consumerConfig, "consumerConfig cannot be null"); + this.converter = Objects.requireNonNull(converter, "converter cannot be null"); + this.checkpointStoreSupplier = Objects.requireNonNull(checkpointStoreSupplier, "checkpointStoreSupplier cannot be null"); + } + + public Map poll(final int maxPollEvents, + final Duration maxPollTime, + final Duration maxTotalWait, + final EventProcessorListener listener) { + + // Create Checkpoint for storing consumer positions + CheckpointStore checkpointStore = checkpointStoreSupplier.get(); + + CountDownLatch latch = new CountDownLatch(1); + + // Create Map that will hold all initialized partitions. + Set partitions = Collections.synchronizedSet(new HashSet<>()); + + // Counter + Map eventsByEventHubNamePartition = new ConcurrentHashMap<>(); + + Map checkpointsByPartitions = new ConcurrentHashMap<>(); + + // Create single EventProcessorClient. + EventProcessorClient client = clientFactory.createEventProcessorClientBuilder(config) + .consumerGroup(config.consumerGroup()) + .checkpointStore(checkpointStore) + // Set the offset reset strategy + .initialPartitionEventPosition(partition -> { + EventPosition position = config.partitionStartingPosition(); + log.info("Initializing partitionId {} with offset={}, sequenceNumber={}, enqueuedDateTime={}", + partition, + position.getOffset(), + position.getSequenceNumber(), + position.getEnqueuedDateTime() + ); + return position; + }) + // Capture the partition to process. + .processPartitionInitialization(context -> { + partitions.add(context.getPartitionContext().getPartitionId()); + }) + // Process Events + .processEventBatch(context -> { + PartitionContext partitionContext = context.getPartitionContext(); + List events = context.getEvents(); + + log.info("Receiving {} events from eventHub: {}, partitionId: {} with consumerGroup: {}", + events.size(), + partitionContext.getEventHubName(), + partitionContext.getPartitionId(), + partitionContext.getConsumerGroup() + ); + + // Convert eventData, and invoke listener. + converter.convertFromEventData(events).forEach(event -> listener.onEvent(event, partitionContext)); + + EventHubNamePartition key = new EventHubNamePartition( + partitionContext.getEventHubName(), + partitionContext.getPartitionId() + ); + + // Keep checkpoint of the last event in the batch + createCheckpoint(context) + .ifPresent(checkpoint -> checkpointsByPartitions.put(key, checkpoint)); + + // Increment event counter for the current partition. + eventsByEventHubNamePartition + .computeIfAbsent(key, ignored -> new AtomicInteger(0)) + .addAndGet(events.size()); + + // Check whether All partitions were polled at-least once. + if (partitions.remove(partitionContext.getPartitionId()) && partitions.isEmpty()) { + // Proactively stop consuming. + latch.countDown(); + } + + }, maxPollEvents, maxPollTime) + // Handle errors + .processError(errorContext -> { + PartitionContext partitionContext = errorContext.getPartitionContext(); + log.error("Failed to process eventHub: {}, partitionId: {} with consumerGroup: {}", + partitionContext.getEventHubName(), + partitionContext.getPartitionId(), + partitionContext.getConsumerGroup(), + errorContext.getThrowable()); + latch.countDown(); // stop processing immediately. + }) + .buildEventProcessorClient(); + + try { + listener.onStart(); + client.start(); + if (!latch.await(maxTotalWait.toMillis(), TimeUnit.MILLISECONDS)) { + log.warn("Timeout before processing EventHub partitions."); + } + } catch (InterruptedException e) { + log.warn("Task was interrupted before processing EventHub partitions."); + Thread.currentThread().interrupt(); + } finally { + client.stop(); + listener.onStop(); + updateCheckpoints(checkpointStore, checkpointsByPartitions.values()); + } + + return eventsByEventHubNamePartition + .entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, it -> it.getValue().get())); + } + + private void updateCheckpoints(CheckpointStore store, Iterable checkpoints) { + for (Checkpoint checkpoint : checkpoints) { + log.info("Checkpointing position for consumerGroup={}, eventHubName={}, partitionId={}, sequenceNumber={}, and offset={}", + checkpoint.getConsumerGroup(), + checkpoint.getEventHubName(), + checkpoint.getPartitionId(), + checkpoint.getSequenceNumber(), + checkpoint.getOffset() + ); + store.updateCheckpoint(checkpoint).block(); + log.info("Checkpoint completed"); + } + } + + private Optional createCheckpoint(final EventBatchContext context) { + List events = context.getEvents(); + if (events.isEmpty()) { + return Optional.empty(); + } + PartitionContext partitionContext = context.getPartitionContext(); + + return Optional.of(new Checkpoint() + .setFullyQualifiedNamespace(partitionContext.getFullyQualifiedNamespace()) + .setEventHubName(partitionContext.getEventHubName()) + .setConsumerGroup(partitionContext.getConsumerGroup()) + .setPartitionId(partitionContext.getPartitionId()) + .setSequenceNumber(events.get(events.size() - 1).getSequenceNumber()) + .setOffset(events.get(events.size() - 1).getOffset()) + ); + } + + public interface EventProcessorListener { + + /** + * Invokes when the event processor is started + */ + default void onStart() { + } + + /** + * Invokes on each received event. + * + * @param event The event to be processed. + */ + void onEvent(EventDataObject event, PartitionContext context); + + /** + * Invokes when the event processor is stopped. + */ + default void onStop() { + } + } +} diff --git a/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/service/consumer/EventHubNamePartition.java b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/service/consumer/EventHubNamePartition.java new file mode 100644 index 0000000..fc9e3cf --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/service/consumer/EventHubNamePartition.java @@ -0,0 +1,6 @@ +package io.kestra.plugin.azure.messaging.eventhubs.service.consumer; + +public record EventHubNamePartition(String eventHubName, + String partitionId) { +} + diff --git a/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/service/consumer/EventPositionStrategy.java b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/service/consumer/EventPositionStrategy.java new file mode 100644 index 0000000..4524876 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/service/consumer/EventPositionStrategy.java @@ -0,0 +1,74 @@ +package io.kestra.plugin.azure.messaging.eventhubs.service.consumer; + +import com.azure.messaging.eventhubs.models.EventPosition; + +import javax.validation.constraints.NotNull; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; + +/** + * Strategies to initialize an Event Hub consumer if no offsets are stored. + */ +public interface EventPositionStrategy { + + EventPosition get(); + + /** + * Earliest. + * + * @see EventPosition#earliest() + */ + record Earliest() implements EventPositionStrategy { + + /** + * @return an {@link EventPosition} that corresponds to the location of + * the first event present in the partition. + */ + @Override + public EventPosition get() { + return EventPosition.earliest(); + } + } + + /** + * Latest. + * + * @see EventPosition#latest() () + */ + record Latest() implements EventPositionStrategy { + + /** + * @return an {@link EventPosition} that corresponds to the end of the partition, + * where no more events are currently enqueued. + */ + @Override + public EventPosition get() { + return EventPosition.latest(); + } + } + + /** + * EnqueuedTime. + * + * @param dateTime the ISO Date Time + */ + record EnqueuedTime(@NotNull String dateTime) implements EventPositionStrategy { + + private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter + .ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS") + .withZone(ZoneOffset.UTC); + + /** + * @return an {@link EventPosition} that corresponds to a specific Instant. + */ + @Override + public EventPosition get() { + OffsetDateTime odt = OffsetDateTime.parse(dateTime, DATE_TIME_FORMATTER); + Instant instant = Instant.from(odt); + return EventPosition.fromEnqueuedTime(instant); + } + } + +} diff --git a/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/service/consumer/StartingPosition.java b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/service/consumer/StartingPosition.java new file mode 100644 index 0000000..e3147b8 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/service/consumer/StartingPosition.java @@ -0,0 +1,6 @@ +package io.kestra.plugin.azure.messaging.eventhubs.service.consumer; + +public enum StartingPosition { + + EARLIEST, LATEST, INSTANT; +} diff --git a/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/service/producer/EventDataBatchFactory.java b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/service/producer/EventDataBatchFactory.java new file mode 100644 index 0000000..95e11ce --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/service/producer/EventDataBatchFactory.java @@ -0,0 +1,36 @@ +package io.kestra.plugin.azure.messaging.eventhubs.service.producer; + +import com.azure.messaging.eventhubs.EventDataBatch; +import com.azure.messaging.eventhubs.EventHubProducerAsyncClient; +import com.azure.messaging.eventhubs.models.CreateBatchOptions; +import reactor.core.publisher.Mono; + +/** + * Service interface for constructing new {@link EventDataBatch} objects. + */ +public interface EventDataBatchFactory { + + /** + * Factry method to create a new batch. + * + * @param client The {@link EventHubProducerAsyncClient}. + * @return a new {@link EventDataBatch}. + */ + Mono createBatch(EventHubProducerAsyncClient client); + + /** + * Default factory for creating bath from the given options and client. + * + * @param options The options to be used for creating new batch. + */ + record Default(CreateBatchOptions options) implements EventDataBatchFactory { + + /** + * {@inheritDoc} + **/ + @Override + public Mono createBatch(EventHubProducerAsyncClient client) { + return client.createBatch(options); + } + } +} diff --git a/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/service/producer/EventHubProducerService.java b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/service/producer/EventHubProducerService.java new file mode 100644 index 0000000..2a296b8 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/service/producer/EventHubProducerService.java @@ -0,0 +1,161 @@ +package io.kestra.plugin.azure.messaging.eventhubs.service.producer; + +import com.azure.messaging.eventhubs.EventData; +import com.azure.messaging.eventhubs.EventDataBatch; +import com.azure.messaging.eventhubs.EventHubProducerAsyncClient; +import io.kestra.core.serializers.FileSerde; +import io.kestra.plugin.azure.messaging.eventhubs.client.EventHubClientFactory; +import io.kestra.plugin.azure.messaging.eventhubs.config.EventHubClientConfig; +import io.kestra.plugin.azure.messaging.eventhubs.model.EventDataObject; +import io.kestra.plugin.azure.messaging.eventhubs.service.EventDataObjectConverter; +import io.reactivex.BackpressureStrategy; +import io.reactivex.Flowable; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Mono; + +import java.io.BufferedReader; +import java.io.IOException; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Default service for sending batches of events into Azure Event Hubs. + */ +@Slf4j +public class EventHubProducerService { + + private static final int DEFAULT_MAX_EVENT_PER_BATCH = Integer.MAX_VALUE; + private final EventHubClientFactory clientFactory; + private final EventHubClientConfig config; + private final EventDataObjectConverter adapter; + private final EventDataBatchFactory batchFactory; + + /** + * Creates a new {@link EventHubProducerService} instance. + * + * @param clientFactory The {@link EventHubClientFactory} - Cannot be {@code null}. + * @param config The {@link EventHubClientConfig} - Cannot be {@code null}. + * @param converter The {@link EventDataObjectConverter} to be used for converting entities to event data. + */ + public EventHubProducerService(final EventHubClientFactory clientFactory, + final EventHubClientConfig config, + final EventDataObjectConverter converter, + final EventDataBatchFactory batchFactory) { + this.clientFactory = Objects.requireNonNull(clientFactory, "clientFactory cannot be null"); + this.config = Objects.requireNonNull(config, "config cannot be null"); + this.adapter = Objects.requireNonNull(converter, "converter cannot be null"); + this.batchFactory = Objects.requireNonNull(batchFactory, "dataBatchFactory cannot be null"); + } + + /** + * Publishes the given event stream as events into Event Hubs. + * + * @param eventStream The stream of events to send. + * @return The sender result. + */ + public Result sendEvents(BufferedReader eventStream, ProducerOptions options) { + try (eventStream) { + Flowable flowable = Flowable.create( + FileSerde.reader(eventStream, EventDataObject.class), + BackpressureStrategy.BUFFER + ); + try (EventHubProducerAsyncClient producer = clientFactory.createAsyncProducerClient(config)) { + return sendEvents(producer, adapter, flowable, options); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private Result sendEvents(EventHubProducerAsyncClient producer, + EventDataObjectConverter adapter, + Flowable flowable, + ProducerOptions options) { + + Integer maxEventsPerBatch = Optional + .ofNullable(options.maxEventsPerBatch()) + .orElse(DEFAULT_MAX_EVENT_PER_BATCH); + + final AtomicInteger numSentEvents = new AtomicInteger(0); + final AtomicReference currentBatch = new AtomicReference<>(createBatch(producer).block()); + + Integer numSentBatches = flowable.flatMap(data -> { + EventDataBatch batch = currentBatch.get(); + final EventData event = adapter.convertToEventData(data); + // Set default content-type + Optional.ofNullable(options.bodyContentType()) + .ifPresent(event::setContentType); + // Set default properties + Optional.ofNullable(options.eventProperties()) + .ifPresent(props -> event.getProperties().putAll(props)); + + int batchSizeInEvents = batch.getCount(); + boolean isFull = batchSizeInEvents >= maxEventsPerBatch; + + if (!isFull && batch.tryAdd(event)) { + return Mono.empty(); + } + + // Send the current batch then create another size-limited EventDataBatch + // and try to fit the event into this new batch. + int batchSize = batch.getCount(); + int totalSent = numSentEvents.getAndAccumulate(batchSize, Integer::sum); + if (log.isTraceEnabled()) { + log.trace("Sending new batch of {} events (total-sent-events: {})", batchSize, totalSent); + } + return Mono.when( + producer.send(batch), + createBatch(producer) + .map(newBatch -> { + currentBatch.set(newBatch); + // Try to add the event that did not fit in the previous + // batch into a new empty one. + if (!newBatch.tryAdd(event)) { + throw new IllegalArgumentException(String.format( + "Event is too large for an empty batch. Max size: %s. Event size: %s", + newBatch.getMaxSizeInBytes(), event.getBodyAsBinaryData().getLength())); + } + return newBatch; + }) + ).then(Mono.just(1)); + }) + .reduce(Integer::sum) + .blockingGet(); + + // Eventually send last partial batch. + final EventDataBatch batch = currentBatch.getAndSet(null); + if (batch != null && batch.getCount() > 0) { + int batchSize = batch.getCount(); + int totalSent = numSentEvents.getAndAccumulate(batchSize, Integer::sum); + if (log.isTraceEnabled()) { + log.trace("Sending new batch of {} events (total-sent-events: {})", batchSize, totalSent); + } + producer.send(batch).block(); + } + + numSentBatches = Optional + .ofNullable(numSentBatches) + .map(count -> count + 1) + .orElse(1); // numSentBatches will be NULL when a single and partial batch is sent. + + if (log.isTraceEnabled()) { + log.trace( + "ProduceTask completed (total-sent-events: {}, total-sent-batches: {}).", + numSentEvents.get(), + numSentBatches + ); + } + return new Result(numSentEvents.get(), numSentBatches); + } + + private Mono createBatch(EventHubProducerAsyncClient producer) { + return Optional + .ofNullable(batchFactory.createBatch(producer)) + .orElseThrow(() -> new IllegalArgumentException("batchFactory should not return null batch.")); + } + + public record Result(int totalSentEvents, int totalSentBatches) { + } +} diff --git a/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/service/producer/ProducerOptions.java b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/service/producer/ProducerOptions.java new file mode 100644 index 0000000..e2d4380 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/messaging/eventhubs/service/producer/ProducerOptions.java @@ -0,0 +1,16 @@ +package io.kestra.plugin.azure.messaging.eventhubs.service.producer; + +import java.util.Map; + +/** + * Options for publihsing events. + * + * @param bodyContentType The default body content-type. + * @param eventProperties the default properties to add to events. + * @param maxEventsPerBatch The maximum number of events per batch. + */ +public record ProducerOptions(String bodyContentType, + Map eventProperties, + Integer maxEventsPerBatch +) { +} diff --git a/src/main/java/io/kestra/plugin/azure/storage/abstracts/AbstractStorage.java b/src/main/java/io/kestra/plugin/azure/storage/abstracts/AbstractStorage.java index c4004ca..c7e3a66 100644 --- a/src/main/java/io/kestra/plugin/azure/storage/abstracts/AbstractStorage.java +++ b/src/main/java/io/kestra/plugin/azure/storage/abstracts/AbstractStorage.java @@ -1,6 +1,7 @@ package io.kestra.plugin.azure.storage.abstracts; import io.kestra.plugin.azure.AbstractConnection; +import io.kestra.plugin.azure.AzureClientInterface; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.NoArgsConstructor; @@ -12,7 +13,7 @@ @EqualsAndHashCode @Getter @NoArgsConstructor -public abstract class AbstractStorage extends AbstractConnection implements AbstractStorageInterface { +public abstract class AbstractStorage extends AbstractConnection implements AzureClientInterface { protected String connectionString; diff --git a/src/main/java/io/kestra/plugin/azure/storage/blob/Trigger.java b/src/main/java/io/kestra/plugin/azure/storage/blob/Trigger.java index 8c81fab..5b0909b 100644 --- a/src/main/java/io/kestra/plugin/azure/storage/blob/Trigger.java +++ b/src/main/java/io/kestra/plugin/azure/storage/blob/Trigger.java @@ -11,9 +11,8 @@ import io.kestra.core.models.triggers.TriggerContext; import io.kestra.core.models.triggers.TriggerOutput; import io.kestra.core.runners.RunContext; -import io.kestra.core.utils.IdUtils; import io.kestra.plugin.azure.AbstractConnectionInterface; -import io.kestra.plugin.azure.storage.abstracts.AbstractStorageInterface; +import io.kestra.plugin.azure.AzureClientInterface; import io.kestra.plugin.azure.storage.blob.abstracts.AbstractBlobStorageContainerInterface; import io.kestra.plugin.azure.storage.blob.abstracts.ActionInterface; import io.kestra.plugin.azure.storage.blob.abstracts.ListInterface; @@ -75,7 +74,7 @@ ) } ) -public class Trigger extends AbstractTrigger implements PollingTriggerInterface, TriggerOutput, AbstractConnectionInterface, ListInterface, AbstractBlobStorageContainerInterface, AbstractStorageInterface { +public class Trigger extends AbstractTrigger implements PollingTriggerInterface, TriggerOutput, AbstractConnectionInterface, ListInterface, AbstractBlobStorageContainerInterface, AzureClientInterface { @Builder.Default private final Duration interval = Duration.ofSeconds(60); diff --git a/src/main/java/io/kestra/plugin/azure/storage/blob/abstracts/AbstractBlobStorage.java b/src/main/java/io/kestra/plugin/azure/storage/blob/abstracts/AbstractBlobStorage.java index f544cb9..c5fcfc7 100644 --- a/src/main/java/io/kestra/plugin/azure/storage/blob/abstracts/AbstractBlobStorage.java +++ b/src/main/java/io/kestra/plugin/azure/storage/blob/abstracts/AbstractBlobStorage.java @@ -7,7 +7,7 @@ import io.kestra.core.exceptions.IllegalVariableEvaluationException; import io.kestra.core.runners.RunContext; import io.kestra.plugin.azure.storage.abstracts.AbstractStorage; -import io.kestra.plugin.azure.storage.abstracts.AbstractStorageInterface; +import io.kestra.plugin.azure.AzureClientInterface; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.NoArgsConstructor; @@ -19,7 +19,7 @@ @EqualsAndHashCode @Getter @NoArgsConstructor -public abstract class AbstractBlobStorage extends AbstractStorage implements AbstractStorageInterface { +public abstract class AbstractBlobStorage extends AbstractStorage implements AzureClientInterface { protected BlobServiceClient client(RunContext runContext) throws IllegalVariableEvaluationException { BlobServiceClientBuilder builder = new BlobServiceClientBuilder() .endpoint(runContext.render(endpoint)); diff --git a/src/main/java/io/kestra/plugin/azure/storage/blob/services/BlobService.java b/src/main/java/io/kestra/plugin/azure/storage/blob/services/BlobService.java index 5d81992..b0b3d53 100644 --- a/src/main/java/io/kestra/plugin/azure/storage/blob/services/BlobService.java +++ b/src/main/java/io/kestra/plugin/azure/storage/blob/services/BlobService.java @@ -10,7 +10,7 @@ import io.kestra.core.models.executions.metrics.Counter; import io.kestra.core.runners.RunContext; import io.kestra.plugin.azure.AbstractConnectionInterface; -import io.kestra.plugin.azure.storage.abstracts.AbstractStorageInterface; +import io.kestra.plugin.azure.AzureClientInterface; import io.kestra.plugin.azure.storage.blob.Copy; import io.kestra.plugin.azure.storage.blob.Delete; import io.kestra.plugin.azure.storage.blob.abstracts.ActionInterface; @@ -43,7 +43,7 @@ public static void archive( Copy.CopyObject moveTo, RunContext runContext, AbstractConnectionInterface connectionInterface, - AbstractStorageInterface blobStorageInterface + AzureClientInterface blobStorageInterface ) throws Exception { if (action == ActionInterface.Action.DELETE) { for (Blob object : blobsObjects) { diff --git a/src/main/resources/icons/io.kestra.plugin.azure.messaging.eventhubs.svg b/src/main/resources/icons/io.kestra.plugin.azure.messaging.eventhubs.svg new file mode 100644 index 0000000..dcae869 --- /dev/null +++ b/src/main/resources/icons/io.kestra.plugin.azure.messaging.eventhubs.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/src/test/java/io/kestra/plugin/azure/messaging/eventhubs/serdes/JsonSerdeTest.java b/src/test/java/io/kestra/plugin/azure/messaging/eventhubs/serdes/JsonSerdeTest.java new file mode 100644 index 0000000..56bbf65 --- /dev/null +++ b/src/test/java/io/kestra/plugin/azure/messaging/eventhubs/serdes/JsonSerdeTest.java @@ -0,0 +1,26 @@ +package io.kestra.plugin.azure.messaging.eventhubs.serdes; + +import com.fasterxml.jackson.databind.JsonNode; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.Map; + +class JsonSerdeTest { + + @Test + void shouldSerializeGivenAnyObject() { + // Given + Map input = Map.of("foo", "bar"); + JsonSerde serde = (JsonSerde) Serdes.JSON.create(Collections.emptyMap()); + + // When + byte[] serialized = serde.serialize(input); + + // Then + Assertions.assertNotNull(serialized); + JsonNode node = serde.deserialize(serialized); + Assertions.assertEquals(serde.objectMapper().convertValue(node, Map.class), input); + } +} \ No newline at end of file diff --git a/src/test/java/io/kestra/plugin/azure/messaging/eventhubs/serdes/StringSerdeTest.java b/src/test/java/io/kestra/plugin/azure/messaging/eventhubs/serdes/StringSerdeTest.java new file mode 100644 index 0000000..05b5593 --- /dev/null +++ b/src/test/java/io/kestra/plugin/azure/messaging/eventhubs/serdes/StringSerdeTest.java @@ -0,0 +1,22 @@ +package io.kestra.plugin.azure.messaging.eventhubs.serdes; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Map; + +class StringSerdeTest { + + @Test + void shouldSerializeGivenString() { + // Given + String input = "test"; + StringSerde serde = (StringSerde) Serdes.STRING.create(Map.of(StringSerde.SERIALIZER_ENCODING_CONFIG_NAME, "utf-8")); + + // When + byte[] serialized = serde.serialize(input); + + // Then + Assertions.assertEquals(input, serde.deserialize(serialized)); + } +} \ No newline at end of file diff --git a/src/test/java/io/kestra/plugin/azure/messaging/eventhubs/service/EventDataObjectConverterTest.java b/src/test/java/io/kestra/plugin/azure/messaging/eventhubs/service/EventDataObjectConverterTest.java new file mode 100644 index 0000000..af10c41 --- /dev/null +++ b/src/test/java/io/kestra/plugin/azure/messaging/eventhubs/service/EventDataObjectConverterTest.java @@ -0,0 +1,36 @@ +package io.kestra.plugin.azure.messaging.eventhubs.service; + +import com.azure.messaging.eventhubs.EventData; +import io.kestra.plugin.azure.messaging.eventhubs.model.EventDataObject; +import io.kestra.plugin.azure.messaging.eventhubs.serdes.StringSerde; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.time.Instant; +import java.util.Map; + +class EventDataObjectConverterTest { + + @Test + void shouldConvertToEventData() { + EventDataObjectConverter converter = new EventDataObjectConverter(new StringSerde()); + long enqueuedTimestamp = Instant.now().toEpochMilli(); + Map prop = Map.of("prop", "value"); + EventData result = converter.convertToEventData(new EventDataObject( + "key", + "value", + "contentType", + "correlationId", + "messageId", + enqueuedTimestamp, + 1L, + 1L, + prop + )); + Assertions.assertEquals("contentType", result.getContentType()); + Assertions.assertEquals("correlationId", result.getCorrelationId()); + Assertions.assertEquals("messageId", result.getMessageId()); + Assertions.assertEquals("value", result.getBodyAsString()); + Assertions.assertEquals(prop, result.getProperties()); + } +} \ No newline at end of file diff --git a/src/test/java/io/kestra/plugin/azure/messaging/eventhubs/service/producer/EventHubProducerServiceTest.java b/src/test/java/io/kestra/plugin/azure/messaging/eventhubs/service/producer/EventHubProducerServiceTest.java new file mode 100644 index 0000000..991834a --- /dev/null +++ b/src/test/java/io/kestra/plugin/azure/messaging/eventhubs/service/producer/EventHubProducerServiceTest.java @@ -0,0 +1,182 @@ +package io.kestra.plugin.azure.messaging.eventhubs.service.producer; + +import com.azure.messaging.eventhubs.EventData; +import com.azure.messaging.eventhubs.EventDataBatch; +import com.azure.messaging.eventhubs.EventHubProducerAsyncClient; +import io.kestra.core.serializers.FileSerde; +import io.kestra.plugin.azure.messaging.eventhubs.client.EventHubClientFactory; +import io.kestra.plugin.azure.messaging.eventhubs.config.EventHubClientConfig; +import io.kestra.plugin.azure.messaging.eventhubs.model.EventDataObject; +import io.kestra.plugin.azure.messaging.eventhubs.serdes.StringSerde; +import io.kestra.plugin.azure.messaging.eventhubs.service.EventDataObjectConverter; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.mockito.stubbing.Answer; +import reactor.core.publisher.Mono; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +class EventHubProducerServiceTest { + + static final EventHubClientConfig.Default EMPTY_CONFIG = new EventHubClientConfig.Default( + null, + null, + null, + null, + null, + null, + null); + + @Captor + ArgumentCaptor eventDataBatchArgumentCaptor; + + @Captor + ArgumentCaptor eventDataArgumentCaptor; + + @Mock + private EventHubProducerAsyncClient producer; + @Mock + private EventHubClientFactory factory; + + private AutoCloseable closeable; + private EventDataObjectConverter converter; + + @BeforeEach + public void beforeEach() { + converter = new EventDataObjectConverter(new StringSerde()); + closeable = MockitoAnnotations.openMocks(this); + Mockito.when(factory.createAsyncProducerClient(Mockito.any())).thenReturn(producer); + Mockito.when(producer.send(eventDataBatchArgumentCaptor.capture())).thenReturn(Mono.empty()); + } + + @AfterEach + public void afterEach() throws Exception { + closeable.close(); + } + + @Test + void shouldSendExceptionGivenTooLargeEvent() throws Exception { + // GIVEN + int maxRecordPerBatch = 1; + EventHubProducerService service = createNewSenderService(0); + List events = List.of(new EventDataObject("message")); + byte[] data = getDataAsBytesFor(events); + + // WHEN + try (BufferedReader stream = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(data)))) { + ProducerOptions options = new ProducerOptions( + null, null, maxRecordPerBatch + ); + // THEN + Assertions.assertThrows(IllegalArgumentException.class, () -> service.sendEvents(stream, options)); + } + } + + @Test + void shouldSendGivenStreamOfOneEvent() throws Exception { + // GIVEN + int maxRecordPerBatch = 1; + EventHubProducerService service = createNewSenderService(1024); + List events = List.of(new EventDataObject("message")); + byte[] data = getDataAsBytesFor(events); + + // WHEN + try (BufferedReader stream = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(data)))) { + + ProducerOptions options = new ProducerOptions( + null, null, maxRecordPerBatch + ); + EventHubProducerService.Result result = service.sendEvents(stream, options); + // THEN + Mockito.verify(producer, Mockito.times(1)).send(eventDataBatchArgumentCaptor.capture()); + Assertions.assertEquals(new EventHubProducerService.Result(1, 1), result); + } + } + + @Test + void shouldSendGivenStreamOfMultipleEvents() throws Exception { + // GIVEN + int maxRecordPerBatch = 2; + int expectedTotalNumRecords = 11; + int expectedTotalNumBatches = (expectedTotalNumRecords / 2) + expectedTotalNumRecords % maxRecordPerBatch; + + EventHubProducerService service = createNewSenderService(1024); + List events = IntStream + .range(0, expectedTotalNumRecords) + .mapToObj(i -> new EventDataObject("message-" + i)) + .collect(Collectors.toList()); + + byte[] data = getDataAsBytesFor(events); + + // WHEN + try (BufferedReader stream = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(data)))) { + + ProducerOptions options = new ProducerOptions( + null, null, maxRecordPerBatch + ); + EventHubProducerService.Result result = service.sendEvents(stream, options); + + // THEN + Mockito.verify(producer, Mockito.times(expectedTotalNumBatches)).send(eventDataBatchArgumentCaptor.capture()); + Assertions.assertEquals( + events, + converter.convertFromEventData(eventDataArgumentCaptor.getAllValues().stream().distinct().collect(Collectors.toList())) + ); + Assertions.assertEquals(new EventHubProducerService.Result(expectedTotalNumRecords, expectedTotalNumBatches), result); + } + } + + private static byte[] getDataAsBytesFor(List entities) throws IOException { + final ByteArrayOutputStream os = new ByteArrayOutputStream(); + try (os) { + for (EventDataObject entity : entities) { + FileSerde.write(os, entity); + } + } + return os.toByteArray(); + } + + private EventHubProducerService createNewSenderService(int maxBatchSize) { + + return new EventHubProducerService( + factory, + EMPTY_CONFIG, + converter, + new EventDataBatchFactory() { + @Override + public Mono createBatch(EventHubProducerAsyncClient client) { + final EventDataBatch batch = Mockito.mock(EventDataBatch.class); + final AtomicInteger counter = new AtomicInteger(0); + final AtomicInteger batchSize = new AtomicInteger(0); + Mockito.when(batch.tryAdd(eventDataArgumentCaptor.capture())) + .then((Answer) invocation -> { + EventData value = eventDataArgumentCaptor.getValue(); + int currentBatchSize = batchSize.accumulateAndGet(value.getBody().length, Integer::sum); + if (currentBatchSize < maxBatchSize) { + counter.accumulateAndGet(1, Integer::sum); + return true; + } + return false; + }); + Mockito.when(batch.getCount()).then((Answer) invocation -> counter.get()); + return Mono.just(batch); + } + } + ); + } +} \ No newline at end of file