Skip to content

Commit

Permalink
refactor(docs): Add doc for ADLS and refactor code
Browse files Browse the repository at this point in the history
  • Loading branch information
mgabelle committed Nov 25, 2024
1 parent 2129c33 commit 9ca7584
Show file tree
Hide file tree
Showing 23 changed files with 186 additions and 137 deletions.
6 changes: 3 additions & 3 deletions src/main/java/io/kestra/plugin/azure/storage/adls/Append.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@
@Example(
full = true,
code = """
id: azure_storage_datalake_read
id: azure_storage_datalake_append
namespace: company.team
tasks:
- id: read_file
type: io.kestra.plugin.azure.storage.adls.Read
type: io.kestra.plugin.azure.storage.adls.Append
endpoint: "https://yourblob.blob.core.windows.net"
sasToken: "{{ secret('SAS_TOKEN') }}"
fileSystem: "mydata"
fileName: "path/to/myfile"
filePath: "path/to/myfile"
data: "Text to append"
"""
)
Expand Down
15 changes: 10 additions & 5 deletions src/main/java/io/kestra/plugin/azure/storage/adls/Delete.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,22 @@
examples = {
@Example(
full = true,
title = "Delete an existing file in Azure Data Lake Storage.",
code = """
id: azure_storage_datalake_delete
namespace: company.team
pluginDefaults:
- type: io.kestra.plugin.azure.storage.adls
values:
connectionString: "{{ secret('AZURE_CONNECTION_STRING') }}"
fileSystem: "tasks"
endpoint: "https://yourblob.blob.core.windows.net"
tasks:
- id: read_file
- id: delete_file
type: io.kestra.plugin.azure.storage.adls.Delete
endpoint: "https://yourblob.blob.core.windows.net"
sasToken: "{{ secret('SAS_TOKEN') }}"
fileSystem: "mydata"
fileName: "path/to/myfile"
filePath: "full/path/to/file.txt"
"""
)
}
Expand Down
43 changes: 33 additions & 10 deletions src/main/java/io/kestra/plugin/azure/storage/adls/DeleteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,40 @@
examples = {
@Example(
full = true,
title = "Download files from a remote server, upload them to Azure Data Lake Storage, finally delete them all at one",
code = """
id: azure_storage_blob_delete_list
namespace: company.team
pluginDefaults:
- type: io.kestra.plugin.azure.storage.adls
values:
connectionString: "{{ secret('AZURE_CONNECTION_STRING') }}"
fileSystem: "tasks"
endpoint: "https://yourblob.blob.core.windows.net"
tasks:
- id: delete_list
type: io.kestra.plugin.azure.storage.blob.DeleteList
endpoint: "https://yourblob.blob.core.windows.net"
sasToken: "{{ secret('SAS_TOKEN') }}"
fileSystem: "fileSystem"
directoryName: "path/to/mydirectory"
- id: for_each
type: io.kestra.plugin.core.flow.EachSequential
value: ["pikachu", "charmander"]
tasks:
- id: download_request
type: io.kestra.plugin.core.http.Download
uri: https://pokeapi.co/api/v2/pokemon/{{ taskrun.value }}
- id: to_ion
type: io.kestra.plugin.serdes.json.JsonToIon
from: "{{ currentEachOutput(outputs.download_request).uri }}"
- id: upload_file
type: io.kestra.plugin.azure.storage.adls.Upload
fileName: "adls/pokemon/{{ taskrun.value }}.json"
from: "{{ currentEachOutput(outputs.to_ion).uri }}"
- id: delete_file
type: io.kestra.plugin.azure.storage.adls.DeleteFiles
concurrent: 2
directoryPath: "adls/pokemon/"
"""
)
}
Expand All @@ -62,7 +85,7 @@ public class DeleteFiles extends AbstractDataLakeConnection implements RunnableT
@Schema(title = "Directory Name")
@PluginProperty(dynamic = true)
@NotNull
protected String directoryName;
protected String directoryPath;

@Min(2)
@Schema(
Expand All @@ -88,7 +111,7 @@ public Output run(RunContext runContext) throws Exception {
Flux<AdlsFile> flowable = Flux
.create(throwConsumer(emitter -> {
DataLakeService
.list(runContext, fileSystemClient, directoryName)
.list(runContext, fileSystemClient, directoryPath)
.forEach(emitter::next);

emitter.complete();
Expand Down Expand Up @@ -118,7 +141,7 @@ public Output run(RunContext runContext) throws Exception {
if (Boolean.TRUE.equals(errorOnEmpty) && finalResult.getLeft() == 0) {
throw new NoSuchElementException("Unable to find any files to delete on " +
runContext.render(this.fileSystem) + " " +
"with directoryName='" + runContext.render(this.directoryName)
"with directoryPath='" + runContext.render(this.directoryPath)
);
}

Expand All @@ -133,7 +156,7 @@ public Output run(RunContext runContext) throws Exception {

private static Function<AdlsFile, Long> delete(Logger logger, DataLakeFileSystemClient fileSystemClient) {
return o -> {
logger.debug("Deleting '{}'", o.getName());
logger.info("Deleting '{}'", o.getName());

DataLakeFileClient fileClient = fileSystemClient.getFileClient(o.getName());
long fileSize = fileClient.getProperties().getFileSize();
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/io/kestra/plugin/azure/storage/adls/Lease.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,24 @@
@Example(
full = true,
code = """
id: azure_storage_datalake_read
id: azure_storage_datalake_lease
namespace: company.team
tasks:
- id: read_file
type: io.kestra.plugin.azure.storage.adls.Read
- id: lease_file
type: io.kestra.plugin.azure.storage.adls.Lease
endpoint: "https://yourblob.blob.core.windows.net"
sasToken: "{{ secret('SAS_TOKEN') }}"
fileSystem: "mydata"
fileName: "path/to/myfile"
filePath: "path/to/myfile"
leaseDuration: 20
action: ACQUIRE
"""
)
}
)
@Schema(
title = "Read a file from Azure Data Lake Storage."
title = "Lease a file from Azure Data Lake Storage."
)
public class Lease extends AbstractDataLakeWithFileName implements RunnableTask<Lease.Output> {

Expand Down
32 changes: 23 additions & 9 deletions src/main/java/io/kestra/plugin/azure/storage/adls/List.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,31 @@
examples = {
@Example(
full = true,
title = "List all files and directories in a specific Azure Data Lake Storage directory and log each file data output.",
code = """
id: azure_storage_datalake_list
id: azure_data_lake_storage_list
namespace: company.team
pluginDefaults:
- type: io.kestra.plugin.azure.storage.adls
values:
connectionString: "{{ secret('AZURE_CONNECTION_STRING') }}"
fileSystem: "tasks"
endpoint: "https://yourblob.blob.core.windows.net"
tasks:
- id: read_file
- id: list_files_in_dir
type: io.kestra.plugin.azure.storage.adls.List
endpoint: "https://yourblob.blob.core.windows.net"
sasToken: "{{ secret('SAS_TOKEN') }}"
fileSystem: "mydata"
directoryName: "myDirectory"
directoryPath: "path/to/my/directory/"
- id: for_each_file
type: io.kestra.plugin.core.flow.EachParallel
value: "{{ outputs.list_files_in_dir.files }}"
tasks:
- id: log_file_name
type: io.kestra.plugin.core.debug.Echo
level: DEBUG
format: "{{ taskrun.value }}"
"""
)
}
Expand All @@ -47,10 +61,10 @@
title = "Upload a file to the Azure Data Lake Storage."
)
public class List extends AbstractDataLakeConnection implements RunnableTask<List.Output>, AbstractDataLakeStorageInterface {
@Schema(title = "Directory Name")
@Schema(title = "Directory path", description = "Full path to the directory")
@PluginProperty(dynamic = true)
@NotNull
protected String directoryName;
protected String directoryPath;

protected String fileSystem;

Expand All @@ -59,7 +73,7 @@ public List.Output run(RunContext runContext) throws Exception {
DataLakeServiceClient dataLakeServiceClient = this.dataLakeServiceClient(runContext);
DataLakeFileSystemClient fileSystemClient = dataLakeServiceClient.getFileSystemClient(runContext.render(fileSystem));

java.util.List<AdlsFile> fileList = DataLakeService.list(runContext, fileSystemClient, directoryName);
java.util.List<AdlsFile> fileList = DataLakeService.list(runContext, fileSystemClient, directoryPath);

return Output.builder()
.files(fileList)
Expand Down
17 changes: 13 additions & 4 deletions src/main/java/io/kestra/plugin/azure/storage/adls/Read.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,22 @@
id: azure_storage_datalake_read
namespace: company.team
pluginDefaults:
- type: io.kestra.plugin.azure.storage.adls
values:
connectionString: "{{ secret('AZURE_CONNECTION_STRING') }}"
fileSystem: "tasks"
endpoint: "https://yourblob.blob.core.windows.net"
tasks:
- id: read_file
type: io.kestra.plugin.azure.storage.adls.Read
endpoint: "https://yourblob.blob.core.windows.net"
sasToken: "{{ secret('SAS_TOKEN') }}"
fileSystem: "mydata"
fileName: "path/to/myfile"
filePath: "full/path/to/file.txt"
- id: log_size
type: io.kestra.plugin.core.debug.Echo
level: INFO
format: " {{ outputs.read_file.file.size }}"
"""
)
}
Expand Down
20 changes: 12 additions & 8 deletions src/main/java/io/kestra/plugin/azure/storage/adls/Reads.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,20 @@
@Example(
full = true,
code = """
id: azure_storage_datalake_read
id: azure_storage_datalake_readq
namespace: company.team
pluginDefaults:
- type: io.kestra.plugin.azure.storage.adls
values:
connectionString: "{{ secret('AZURE_CONNECTION_STRING') }}"
fileSystem: "tasks"
endpoint: "https://yourblob.blob.core.windows.net"
tasks:
- id: read_file
type: io.kestra.plugin.azure.storage.adls.Read
endpoint: "https://yourfile.file.core.windows.net"
sasToken: "{{ secret('SAS_TOKEN') }}"
fileSystem: "mydata"
directoryName: "mydirectory"
type: io.kestra.plugin.azure.storage.adls.Reads
directoryPath: "path/to/my/directory/"
"""
)
}
Expand All @@ -55,7 +59,7 @@ public class Reads extends AbstractDataLakeConnection implements RunnableTask<Re
@Schema(title = "Directory Name")
@PluginProperty(dynamic = true)
@NotNull
protected String directoryName;
protected String directoryPath;

protected String fileSystem;

Expand All @@ -66,7 +70,7 @@ public Reads.Output run(RunContext runContext) throws Exception {
.type(io.kestra.plugin.azure.storage.adls.List.class.getName())
.endpoint(this.endpoint)
.fileSystem(this.fileSystem)
.directoryName(this.directoryName)
.directoryPath(this.directoryPath)
.sasToken(this.sasToken)
.connectionString(this.connectionString)
.sharedKeyAccountName(this.sharedKeyAccountName)
Expand Down
Loading

0 comments on commit 9ca7584

Please sign in to comment.