Skip to content

Commit

Permalink
feat: add new tasks for Azure EventHubs (#38)
Browse files Browse the repository at this point in the history
This commit adds the following tasks types:
* io.kesta.plugin.azure.messaging.eventhubs.Produce
* io.kesta.plugin.azure.messaging.eventhubs.Consume
* io.kesta.plugin.azure.messaging.eventhubs.Trigger
  • Loading branch information
fhussonnois committed Jan 26, 2024
1 parent da6c414 commit 9426b80
Show file tree
Hide file tree
Showing 47 changed files with 2,769 additions and 12 deletions.
6 changes: 5 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,14 @@ dependencies {
implementation group: "io.kestra.plugin", name: "plugin-script", version: kestraVersion

// azure
api platform("com.azure:azure-sdk-bom:1.2.12")
api platform("com.azure:azure-sdk-bom:1.2.19")
api group: 'com.azure', name: 'azure-identity'
api group: 'com.azure', name: 'azure-storage-blob'
api group: 'com.azure', name: 'azure-data-tables'
api group: 'com.microsoft.azure', name: 'azure-batch', version: '10.1.0'
api group: 'com.microsoft.azure', name: 'azure-storage', version: '8.6.6'
api group: 'com.azure', name: 'azure-messaging-eventhubs-checkpointstore-blob'
api group: 'com.azure', name: 'azure-messaging-eventhubs'
}


Expand Down Expand Up @@ -96,6 +98,8 @@ dependencies {
testImplementation "org.junit.jupiter:junit-jupiter-engine"
testImplementation "org.hamcrest:hamcrest:2.2"
testImplementation "org.hamcrest:hamcrest-library:2.2"
testImplementation "org.mockito:mockito-core:5.8.0"
testImplementation "org.mockito:mockito-junit-jupiter:5.8.0"
}

/**********************************************************************************************************************\
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package io.kestra.plugin.azure.storage.abstracts;
package io.kestra.plugin.azure;

import io.kestra.core.models.annotations.PluginProperty;
import io.swagger.v3.oas.annotations.media.Schema;

public interface AbstractStorageInterface {
/**
* Top-level interface that can be used by plugins to retrieve
* required configuration properties in order to establish connection to Azure services.
*/
public interface AzureClientInterface {
@Schema(
title = "Connection string of the Storage Account."
)
Expand All @@ -16,7 +20,6 @@ public interface AbstractStorageInterface {
@PluginProperty(dynamic = true)
String getSharedKeyAccountName();


@Schema(
title = "Shared Key access key for authenticating requests."
)
Expand All @@ -29,4 +32,5 @@ public interface AbstractStorageInterface {
)
@PluginProperty(dynamic = true)
String getSasToken();

}
52 changes: 52 additions & 0 deletions src/main/java/io/kestra/plugin/azure/client/AzureClientConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package io.kestra.plugin.azure.client;

import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.azure.AzureClientInterface;

import java.util.Optional;
import java.util.function.Supplier;

import static io.kestra.core.utils.Rethrow.throwFunction;

/**
* Configuration for creating a new Azure Client.
*/
public class AzureClientConfig<T extends AzureClientInterface> {

protected final RunContext runContext;
protected final T plugin;

/**
* Creates a new {@link AzureClientConfig} instance.
*
* @param runContext The context.
* @param plugin The plugin.
*/
public AzureClientConfig(final RunContext runContext,
final T plugin) {
this.runContext = runContext;
this.plugin = plugin;
}

public Optional<String> connectionString() throws IllegalVariableEvaluationException {
return getOptionalConfig(plugin::getConnectionString);
}

public Optional<String> sharedKeyAccountName() throws IllegalVariableEvaluationException {
return getOptionalConfig(plugin::getSharedKeyAccountName);
}

public Optional<String> sharedKeyAccountAccessKey() throws IllegalVariableEvaluationException {
return getOptionalConfig(plugin::getSharedKeyAccountAccessKey);
}

public Optional<String> sasToken() throws IllegalVariableEvaluationException {
return getOptionalConfig(plugin::getSasToken);
}

protected Optional<String> getOptionalConfig(final Supplier<String> supplier) throws IllegalVariableEvaluationException {
return Optional.ofNullable(supplier.get()).map(throwFunction(runContext::render));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.kestra.plugin.azure.eventhubs;

import io.kestra.core.models.tasks.Task;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;

@NoArgsConstructor
@SuperBuilder
@Getter
public class AbstractEventHubTask extends Task implements EventHubClientInterface {

private String connectionString;

private String sharedKeyAccountName;

private String sharedKeyAccountAccessKey;

private String sasToken;

@Builder.Default
private Integer clientMaxRetries = 5;

@Builder.Default
private Long clientRetryDelay = 500L;

private String namespace;

private String eventHubName;

private String customEndpointAddress;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.kestra.plugin.azure.eventhubs;

import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.plugin.azure.AzureClientInterface;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter;
import lombok.experimental.SuperBuilder;

/**
* This class is suffixed 'Interface' as it used to capture parameters from task properties.
*/
@SuperBuilder
@Getter
public final class BlobContainerClientInterface implements AzureClientInterface {

private String connectionString;
private String sharedKeyAccountName;
private String sharedKeyAccountAccessKey;
private String sasToken;
@Schema(
title = "The blob container name."
)
@PluginProperty(dynamic = true)
private String containerName;
}

Loading

0 comments on commit 9426b80

Please sign in to comment.