Skip to content

Commit

Permalink
feat: add new tasks for Azure EventHubs (#38)
Browse files Browse the repository at this point in the history
This commit adds the following tasks types:
* io.kesta.plugin.azure.messaging.eventhubs.Produce
* io.kesta.plugin.azure.messaging.eventhubs.Consume
  • Loading branch information
fhussonnois committed Jan 12, 2024
1 parent 9ac891d commit 8d8f63b
Show file tree
Hide file tree
Showing 46 changed files with 2,421 additions and 11 deletions.
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ dependencies {
api group: 'com.azure', name: 'azure-data-tables'
api group: 'com.microsoft.azure', name: 'azure-batch', version: '10.1.0'
api group: 'com.microsoft.azure', name: 'azure-storage', version: '8.6.6'
api group: 'com.azure', name: 'azure-messaging-eventhubs-checkpointstore-blob', version: '1.18.1'
api group: 'com.azure', name: 'azure-messaging-eventhubs', version: '5.17.1'
}


Expand Down Expand Up @@ -96,6 +98,7 @@ dependencies {
testImplementation "org.junit.jupiter:junit-jupiter-engine"
testImplementation "org.hamcrest:hamcrest:2.2"
testImplementation "org.hamcrest:hamcrest-library:2.2"
testImplementation "org.mockito:mockito-core:5.8.0"
}

/**********************************************************************************************************************\
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package io.kestra.plugin.azure.storage.abstracts;
package io.kestra.plugin.azure;

import io.kestra.core.models.annotations.PluginProperty;
import io.swagger.v3.oas.annotations.media.Schema;

public interface AbstractStorageInterface {
/**
* Top-level interface that can be used by plugins to retrieve
* required configuration properties in order to establish connection to Azure services.
*/
public interface AzureClientInterface {
@Schema(
title = "Connection string of the storage account."
)
Expand All @@ -16,7 +20,6 @@ public interface AbstractStorageInterface {
@PluginProperty(dynamic = true)
String getSharedKeyAccountName();


@Schema(
title = "Shared Key access key for authenticating requests"
)
Expand Down
18 changes: 18 additions & 0 deletions src/main/java/io/kestra/plugin/azure/client/AzureClientConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.kestra.plugin.azure.client;

import java.util.Optional;

/**
* Configuration for creating a new Azure EventHub Client.
*/
public interface AzureClientConfig {

Optional<String> connectionString();

Optional<String> sharedKeyAccountName();

Optional<String> sharedKeyAccountAccessKey();

Optional<String> sasToken();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package io.kestra.plugin.azure.client;

import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.azure.AzureClientInterface;

import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;

/**
* Configuration that uses the {@link RunContext} to render configuration.
*/
public class DynamicAzureClientConfig<T extends AzureClientInterface> implements AzureClientConfig {

protected final RunContext runContext;
protected final T plugin;

/**
* Creates a new {@link DynamicAzureClientConfig} instance.
*
* @param runContext The context. Cannot be null.
* @param plugin The plugin. Cannot be null.
*/
public DynamicAzureClientConfig(final RunContext runContext,
final T plugin) {
this.runContext = Objects.requireNonNull(runContext, "runContext cannot be null");
this.plugin = Objects.requireNonNull(plugin, "plugin cannot be null");
}

@Override
public Optional<String> connectionString() {
String o = plugin.getConnectionString();
if (o != null)
return Optional.ofNullable(renderWithRunContext(o));
else {
return Optional.empty();
}
}

@Override
public Optional<String> sharedKeyAccountName() {
return getOptionalConfig(plugin::getSharedKeyAccountName);
}

@Override
public Optional<String> sharedKeyAccountAccessKey() {
return getOptionalConfig(plugin::getSharedKeyAccountAccessKey);
}

@Override
public Optional<String> sasToken() {
return getOptionalConfig(plugin::getSasToken);
}

protected Optional<String> getOptionalConfig(final Supplier<String> t) {
String config = t.get();
return config != null ? Optional.ofNullable(renderWithRunContext(config)) : Optional.empty();
}

protected String renderWithRunContext(String input) {
try {
return runContext.render(input);
} catch (IllegalVariableEvaluationException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package io.kestra.plugin.azure.messaging.eventhubs;

import io.kestra.plugin.azure.messaging.eventhubs.serdes.Serdes;
import io.kestra.plugin.azure.messaging.eventhubs.service.consumer.CheckpointStoreType;
import io.kestra.plugin.azure.messaging.eventhubs.service.consumer.StartingPosition;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;

import java.util.Collections;
import java.util.Map;

/**
* Base class for implementing tasks that consume events into EventHubs.
* This class provides all required and optional parameters.
*/
@SuperBuilder
@NoArgsConstructor
@ToString
@EqualsAndHashCode
@Getter
public abstract class AbstractConsumeEventHubTask extends AbstractEventHubTask {

// TASK'S PARAMETERS
@Schema(
title = "The Deserializer to be used for serializing the event value.",
description = "Supported values are: [STRING, JSON]"
)
@Builder.Default
private Serdes bodySerde = Serdes.STRING;

@Schema(
title = "The config properties to be passed to the Deserializer.",
description = "Configs in key/value pairs."
)
@Builder.Default
private Map<String, Object> bodySerdeProperties = Collections.emptyMap();

@Schema(
title = "The consumer group."
)
@Builder.Default
private String consumerGroup = "$Default";

@Schema(
title = "The starting position."
)
@Builder.Default
private StartingPosition partitionStartingPosition = StartingPosition.EARLIEST;

@Schema(
title = "The maximum number of events to consume per event hub partition per poll."
)
@Builder.Default
private Integer maxBatchPartitionEvents = 50;

@Schema(
title = "The max time duration to wait to receive a batch of events up to the `maxPollEvents`."
)
@Builder.Default
private Long maxBatchPartitionWait = 1000L;

@Schema(
title = "The max time duration to wait to receive events from all partitions."
)
@Builder.Default
private Long maxPollWait = 5000L;

@Schema(
title = "The storage to be used as a persistent store for maintaining checkpoints.",
description = "Azure Event Hubs Checkpoint Store can be used for storing checkpoints while processing events from Azure Event Hubs.",
anyOf = {Object.class}
)
@Builder.Default
private CheckpointStoreType checkpointStoreType = CheckpointStoreType.BLOB;

@Schema(
title = "The config properties to be passed to the CheckpointStore.",
description = "Configs in key/value pairs."
)
private Map<String, String> checkpointStoreConfig = Collections.emptyMap();

private String namespace;

private String eventHubName;

private String customEndpointAddress;

@SuperBuilder
@NoArgsConstructor
@ToString
@EqualsAndHashCode
@Getter
public static class BlobCheckPointConfig implements BlobContainerClientInterface {

private String connectionString;

private String sharedKeyAccountName;

private String sharedKeyAccountAccessKey;

private String sasToken;

private String containerName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.kestra.plugin.azure.messaging.eventhubs;

import io.kestra.core.models.tasks.Task;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
public abstract class AbstractEventHubTask extends Task implements EventHubClientInterface {

protected String connectionString;

protected String sharedKeyAccountName;

protected String sharedKeyAccountAccessKey;

protected String sasToken;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package io.kestra.plugin.azure.messaging.eventhubs;

import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.plugin.azure.messaging.eventhubs.serdes.Serdes;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;

import javax.validation.constraints.NotNull;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
* Base class for implementing tasks that produce events into EventHubs.
* This class provides all required and optional parameters.
*/
@SuperBuilder
@NoArgsConstructor
@ToString
@EqualsAndHashCode
@Getter
public abstract class AbstractProduceEventHubTask extends AbstractEventHubTask {

// TASK'S PARAMETERS
@Schema(
title = "The event properties",
description = "The event properties which may be used for passing metadata associated with the event" +
" body during Event Hubs operations."
)
@Builder.Default
private Map<String, String> eventProperties = Collections.emptyMap();

@Schema(
title = "The content of the message to be sent to EventHub",
description = "Can be an internal storage uri, a map (i.e. a list of key-value pairs) or a list of maps. " +
"The following keys are supported: `from`, `contentType`, `properties`.",
anyOf = {String.class, List.class, Map.class}
)
@NotNull
@PluginProperty(dynamic = true)
private Object from;

@Schema(
title = "The hashing key to be provided for the batches of events.",
description = "Events with the same `partitionKey` are hashed and sent to the same partition. The provided " +
"`partitionKey` will be used for all the events sent by the `Produce` task."
)
private String partitionKey;

@Schema(
title = "The maximum size for batches of events, in bytes.",
description = "The maximum size to allow for the batches of events."
)
private Integer maxBatchSizeInBytes;

@Schema(
title = "The maximum number of events per batches.",
description = "The maximum number of events to allow for the one batch."
)
@Builder.Default
private Integer maxEventsPerBatch = 1000;

@Schema(
title = "The MIME type describing the event data",
description = "The MIME type describing the data contained in event body allowing consumers to make informed" +
" decisions for inspecting and processing the event."
)
private String bodyContentType;

@Schema(
title = "The Serializer to be used for serializing the event value.",
description = "Supported values are: [STRING, JSON]"
)
@Builder.Default
private Serdes bodySerde = Serdes.STRING;

@Schema(
title = "The config properties to be passed to the Serializer.",
description = "Configs in key/value pairs."
)
@Builder.Default
protected Map<String, Object> bodySerdeProperties = Collections.emptyMap();

private String namespace;

private String eventHubName;

private String customEndpointAddress;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.kestra.plugin.azure.messaging.eventhubs;

import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.plugin.azure.AzureClientInterface;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;

public interface BlobContainerClientInterface extends AzureClientInterface {

@Schema(
title = "The blob container name."
)
@PluginProperty(dynamic = true)
String getContainerName();


@SuperBuilder
@NoArgsConstructor
@Getter
class Default implements BlobContainerClientInterface {

private String connectionString;
private String sharedKeyAccountName;
private String sharedKeyAccountAccessKey;
private String sasToken;
private String containerName;
}
}

Loading

0 comments on commit 8d8f63b

Please sign in to comment.