Skip to content

Commit

Permalink
feat(adls): implement Azure Data Lake Storage (#152)
Browse files Browse the repository at this point in the history
Implement kestra tasks for ADLS
    Read
    Reads
    Append
    Delete
    DeleteFiles
    Lease
    List
    SharedAccess
    Trigger
    Upload
  • Loading branch information
mgabelle authored Nov 22, 2024
1 parent 2181b38 commit 2129c33
Show file tree
Hide file tree
Showing 36 changed files with 1,935 additions and 22 deletions.
10 changes: 9 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ dependencies {
exclude group: 'com.fasterxml.jackson.datatype', module: 'jackson-datatype-jsr310'
exclude group: 'com.fasterxml.jackson.datatype', module: 'jackson-datatype-guava'
}
api (group: 'com.azure', name: 'azure-storage-file-datalake') {
// exclude libraries already provided by Kestra
exclude group: 'com.fasterxml.jackson.core'
exclude group: 'com.fasterxml.jackson.dataformat', module: 'jackson-dataformat-xml'
exclude group: 'com.fasterxml.jackson.datatype', module: 'jackson-datatype-jdk8'
exclude group: 'com.fasterxml.jackson.datatype', module: 'jackson-datatype-jsr310'
exclude group: 'com.fasterxml.jackson.datatype', module: 'jackson-datatype-guava'
}
api (group: 'com.azure', name: 'azure-data-tables') {
// exclude libraries already provided by Kestra
exclude group: 'com.fasterxml.jackson.core'
Expand Down Expand Up @@ -110,7 +118,7 @@ dependencies {
exclude group: 'com.fasterxml.jackson.datatype', module: 'jackson-datatype-jsr310'
exclude group: 'com.fasterxml.jackson.datatype', module: 'jackson-datatype-guava'
}
api (group: 'com.azure', name: 'azure-messaging-eventhubs') {
api (group: 'com.azure', name: 'azure-storage-file-datalake') {
// exclude libraries already provided by Kestra
exclude group: 'com.fasterxml.jackson.core'
exclude group: 'com.fasterxml.jackson.dataformat', module: 'jackson-dataformat-xml'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,6 @@
package io.kestra.plugin.azure;

import io.kestra.core.models.annotations.PluginProperty;
import io.swagger.v3.oas.annotations.media.Schema;

/**
* Top-level interface that can be used by plugins to retrieve
* required configuration properties in order to establish connection to Azure services.
*/
public interface AzureClientWithSasInterface extends AzureClientInterface {
@Schema(
title = "The SAS token to use for authenticating requests.",
description = "This string should only be the query parameters (with or without a leading '?') and not a full URL."
)
@PluginProperty(dynamic = true)
String getSasToken();
}
public interface AzureClientWithSasInterface extends AzureClientInterface, AzureSasTokenInterface {

}
17 changes: 17 additions & 0 deletions src/main/java/io/kestra/plugin/azure/AzureSasTokenInterface.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.kestra.plugin.azure;

import io.kestra.core.models.annotations.PluginProperty;
import io.swagger.v3.oas.annotations.media.Schema;

/**
* Top-level interface that can be used by plugins to retrieve
* required configuration properties in order to establish connection to Azure services.
*/
public interface AzureSasTokenInterface {
@Schema(
title = "The SAS token to use for authenticating requests.",
description = "This string should only be the query parameters (with or without a leading '?') and not a full URL."
)
@PluginProperty(dynamic = true)
String getSasToken();
}
73 changes: 73 additions & 0 deletions src/main/java/io/kestra/plugin/azure/storage/adls/Append.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package io.kestra.plugin.azure.storage.adls;

import com.azure.core.util.BinaryData;
import com.azure.storage.file.datalake.DataLakeFileClient;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.VoidOutput;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.azure.storage.adls.abstracts.AbstractDataLakeWithFileName;
import io.kestra.plugin.azure.storage.adls.models.AdlsFile;
import io.kestra.plugin.azure.storage.adls.services.DataLakeService;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;

import java.net.URI;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Plugin(
examples = {
@Example(
full = true,
code = """
id: azure_storage_datalake_read
namespace: company.team
tasks:
- id: read_file
type: io.kestra.plugin.azure.storage.adls.Read
endpoint: "https://yourblob.blob.core.windows.net"
sasToken: "{{ secret('SAS_TOKEN') }}"
fileSystem: "mydata"
fileName: "path/to/myfile"
data: "Text to append"
"""
)
}
)
@Schema(
title = "Append data to an existing file from Azure Data Lake Storage."
)
public class Append extends AbstractDataLakeWithFileName implements RunnableTask<VoidOutput> {
@Schema(title = "Data")
@NotNull
protected Property<String> data;

@Override
public VoidOutput run(RunContext runContext) throws Exception {
DataLakeFileClient client = this.dataLakeFileClient(runContext);

final BinaryData binaryData = BinaryData.fromString(runContext.render(data).as(String.class).orElseThrow());
final long fileSize = client.getProperties().getFileSize();

client.append(binaryData, fileSize);
client.flush(fileSize + binaryData.getLength(), true);

runContext.metric(Counter.of("file.size", fileSize));
runContext.metric(Counter.of("data.size", binaryData.getLength()));

return null;
}
}
71 changes: 71 additions & 0 deletions src/main/java/io/kestra/plugin/azure/storage/adls/Delete.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package io.kestra.plugin.azure.storage.adls;

import com.azure.storage.file.datalake.DataLakeFileClient;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.azure.storage.adls.abstracts.AbstractDataLakeWithFileName;
import io.kestra.plugin.azure.storage.adls.models.AdlsFile;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Plugin(
examples = {
@Example(
full = true,
code = """
id: azure_storage_datalake_delete
namespace: company.team
tasks:
- id: read_file
type: io.kestra.plugin.azure.storage.adls.Delete
endpoint: "https://yourblob.blob.core.windows.net"
sasToken: "{{ secret('SAS_TOKEN') }}"
fileSystem: "mydata"
fileName: "path/to/myfile"
"""
)
}
)
@Schema(
title = "Delete a file from Azure Data Lake Storage."
)
public class Delete extends AbstractDataLakeWithFileName implements RunnableTask<Delete.Output> {

@Override
public Delete.Output run(RunContext runContext) throws Exception {

DataLakeFileClient fileClient = this.dataLakeFileClient(runContext);
Output output = null;
if (Boolean.TRUE.equals(fileClient.exists())) {
output = Output
.builder()
.file(AdlsFile.of(fileClient))
.build();
}
fileClient.delete();

return output;

}

@SuperBuilder
@Getter
public static class Output implements io.kestra.core.models.tasks.Output {
@Schema(
title = "The deleted file."
)
private final AdlsFile file;
}
}
163 changes: 163 additions & 0 deletions src/main/java/io/kestra/plugin/azure/storage/adls/DeleteFiles.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package io.kestra.plugin.azure.storage.adls;

import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.azure.storage.adls.abstracts.AbstractDataLakeConnection;
import io.kestra.plugin.azure.storage.adls.abstracts.AbstractDataLakeStorageInterface;
import io.kestra.plugin.azure.storage.adls.models.AdlsFile;
import io.kestra.plugin.azure.storage.adls.services.DataLakeService;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Schedulers;

import java.util.NoSuchElementException;
import java.util.function.Function;

import static io.kestra.core.utils.Rethrow.throwConsumer;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Plugin(
examples = {
@Example(
full = true,
code = """
id: azure_storage_blob_delete_list
namespace: company.team
tasks:
- id: delete_list
type: io.kestra.plugin.azure.storage.blob.DeleteList
endpoint: "https://yourblob.blob.core.windows.net"
sasToken: "{{ secret('SAS_TOKEN') }}"
fileSystem: "fileSystem"
directoryName: "path/to/mydirectory"
"""
)
}
)
@Schema(
title = "Delete a list of keys from the Azure Data Lake Storage."
)
public class DeleteFiles extends AbstractDataLakeConnection implements RunnableTask<DeleteFiles.Output>, AbstractDataLakeStorageInterface {
protected String fileSystem;

@Schema(title = "Directory Name")
@PluginProperty(dynamic = true)
@NotNull
protected String directoryName;

@Min(2)
@Schema(
title = "Number of concurrent parallel deletions."
)
@PluginProperty(dynamic = false)
private Integer concurrent;

@Schema(
title = "Whether to raise an error if the file is not found."
)
@PluginProperty(dynamic = true)
@Builder.Default
private final Boolean errorOnEmpty = false;

@Override
public Output run(RunContext runContext) throws Exception {
Logger logger = runContext.logger();

DataLakeServiceClient client = this.dataLakeServiceClient(runContext);
DataLakeFileSystemClient fileSystemClient = client.getFileSystemClient(runContext.render(this.fileSystem));

Flux<AdlsFile> flowable = Flux
.create(throwConsumer(emitter -> {
DataLakeService
.list(runContext, fileSystemClient, directoryName)
.forEach(emitter::next);

emitter.complete();
}), FluxSink.OverflowStrategy.BUFFER);

Flux<Long> result;

if (this.concurrent != null) {
result = flowable
.parallel(this.concurrent)
.runOn(Schedulers.boundedElastic())
.map(delete(logger, fileSystemClient))
.sequential();
} else {
result = flowable
.map(delete(logger, fileSystemClient));
}

Pair<Long, Long> finalResult = result
.reduce(Pair.of(0L, 0L), (pair, size) -> Pair.of(pair.getLeft() + 1, pair.getRight() + size))
.blockOptional()
.orElse(Pair.of(0L, 0L));

runContext.metric(Counter.of("count", finalResult.getLeft()));
runContext.metric(Counter.of("size", finalResult.getRight()));

if (Boolean.TRUE.equals(errorOnEmpty) && finalResult.getLeft() == 0) {
throw new NoSuchElementException("Unable to find any files to delete on " +
runContext.render(this.fileSystem) + " " +
"with directoryName='" + runContext.render(this.directoryName)
);
}

logger.info("Deleted {} keys for {} bytes", finalResult.getLeft(), finalResult.getValue());

return Output
.builder()
.count(finalResult.getLeft())
.size(finalResult.getRight())
.build();
}

private static Function<AdlsFile, Long> delete(Logger logger, DataLakeFileSystemClient fileSystemClient) {
return o -> {
logger.debug("Deleting '{}'", o.getName());

DataLakeFileClient fileClient = fileSystemClient.getFileClient(o.getName());
long fileSize = fileClient.getProperties().getFileSize();

fileClient.delete();

return fileSize;
};
}


@Builder
@Getter
public static class Output implements io.kestra.core.models.tasks.Output {
@Builder.Default
@Schema(
title = "The count of deleted files."
)
private final long count = 0;

@Builder.Default
@Schema(
title = "The size of all the deleted files."
)
private final long size = 0;
}
}
Loading

0 comments on commit 2129c33

Please sign in to comment.