-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(plugins): add new plugin tasks for EventHubs (#38) #56
feat(plugins): add new plugin tasks for EventHubs (#38) #56
Conversation
8d8f63b
to
7368497
Compare
Nice! I see you implemented functionality to consume messages from the EventHubs. Could that be implemented as a trigger in a similar fashion as we support SQS, PubSub or Kafka triggers? |
409db3d
to
72540c3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't look everything but there is a lot of interfaces/asbtract classes that are only implemented once, it would be great to remove them to avoid too many classes if they are not that much important.
Also, we have some package naming convention, please move all your classes from io.kestra.plugin.azure.messaging.eventhubs
to io.kestra.plugin.azure.eventhubs;
.
io.kestra.plugin.azure
id the Azure group of plugins and io.kestra.plugin.azure.eventhubs
the subgroup of plugins, we didn't use anymore hierarchy and as the FQCN of the task is used it's important not to have too long packages.
src/main/java/io/kestra/plugin/azure/messaging/eventhubs/config/BlobContainerClientConfig.java
Outdated
Show resolved
Hide resolved
src/main/java/io/kestra/plugin/azure/messaging/eventhubs/client/AzureAbstractClientFactory.java
Outdated
Show resolved
Hide resolved
...in/java/io/kestra/plugin/azure/messaging/eventhubs/service/consumer/CheckpointStoreType.java
Outdated
Show resolved
Hide resolved
InputStream is = null; | ||
if (this.getFrom() instanceof String data) { | ||
is = reader.read(data); | ||
} | ||
|
||
if (this.getFrom() instanceof Map data) { | ||
is = reader.read(data); | ||
} | ||
|
||
if (this.getFrom() instanceof List data) { | ||
is = reader.read(data); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We usually produce one message by 'line' as we handle data in micro-batch.
If it's a List: one message by element.
If it's a Map, we considered it is an object so a single message.
If it's a String, we consider it is an ION file so one message by line. In this case you should use the FileSerde.read() to read lines and read it in a reactive manner to avoid mounting everything in memory in case the file is big.
You can have a look at an example here: https://github.com/kestra-io/plugin-aws/blob/master/src/main/java/io/kestra/plugin/aws/sns/Publish.java#L68
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, basically the InputDataReader is only responsible for providing an IntputStream
of ION file whatever the type of input.
Then the EventHubProducerService
uses a reactive approach to read that file and produce one event per line.
ad0096a
to
7668250
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two important points:
- All plugin properties must be annotated with
@PluginProperty
even if not setting any attributes or they will not be properly documented. - All logs are stored in database even if not displayed on the UI so you must restrict for too much logging. I think you can remove some logs
Speaking of logs, you sometimes use if(log.isLevelEnabled) but we're using SLF4J that didn't instanciate strings if the level is not enabled so unless you're competing an expansive parameter you should not do that.
src/main/java/io/kestra/plugin/azure/client/DynamicAzureClientConfig.java
Outdated
Show resolved
Hide resolved
src/main/java/io/kestra/plugin/azure/client/DynamicAzureClientConfig.java
Outdated
Show resolved
Hide resolved
src/main/java/io/kestra/plugin/azure/client/DynamicAzureClientConfig.java
Outdated
Show resolved
Hide resolved
src/main/java/io/kestra/plugin/azure/eventhubs/model/EventDataObject.java
Outdated
Show resolved
Hide resolved
src/main/java/io/kestra/plugin/azure/client/AzureClientConfig.java
Outdated
Show resolved
Hide resolved
src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/EventHubConsumerService.java
Outdated
Show resolved
Hide resolved
|
||
Map<EventHubNamePartition, Integer> result = service.poll( | ||
consumerContext, | ||
new EventHubConsumerService.EventProcessorListener() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems overly complicated to use a listener.
Can't we do without it? After all, the service can return the URI and that it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's perfectly possible to do things differently, but I'm not sure it will be any simpler than it already is. The listener allows to only have to look to the task to understand how events are captured and stored without the need to digging into the consumer polling loop.
a8f5ddc
to
ca6b7cf
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, I spotted some missing @PluginProperty
please add the before merging.
@SuperBuilder | ||
@Getter | ||
@NoArgsConstructor | ||
public class Produce extends AbstractEventHubTask implements RunnableTask<Produce.Output> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's missing some @PluginProperty
ca6b7cf
to
b86c976
Compare
This commit adds the following tasks types: * io.kesta.plugin.azure.messaging.eventhubs.Produce * io.kesta.plugin.azure.messaging.eventhubs.Consume * io.kesta.plugin.azure.messaging.eventhubs.Trigger
b86c976
to
9426b80
Compare
This commit adds the following tasks types:
What changes are being made and why?
How the changes have been QAed?
Setup Instructions