diff --git a/.codecov.yml b/.codecov.yml index 7600d5a9..ffc4c782 100644 --- a/.codecov.yml +++ b/.codecov.yml @@ -11,4 +11,7 @@ coverage: status: project: default: - threshold: 20% + threshold: 5% + patch: + default: + threshold: 20% \ No newline at end of file diff --git a/src/main/java/io/pravega/connectors/flink/AbstractReaderBuilder.java b/src/main/java/io/pravega/connectors/flink/AbstractReaderBuilder.java index 7467984f..4fc1ff9c 100644 --- a/src/main/java/io/pravega/connectors/flink/AbstractReaderBuilder.java +++ b/src/main/java/io/pravega/connectors/flink/AbstractReaderBuilder.java @@ -48,6 +48,7 @@ protected AbstractReaderBuilder() { * The default client configuration is obtained from {@code PravegaConfig.fromDefaults()}. * * @param pravegaConfig the configuration to use. + * @return A builder to configure and create a reader. */ public B withPravegaConfig(PravegaConfig pravegaConfig) { this.pravegaConfig = pravegaConfig; @@ -124,6 +125,8 @@ public B forStream(final Stream stream) { /** * Gets the Pravega configuration. + * + * @return the instance of {@link PravegaConfig}. */ public PravegaConfig getPravegaConfig() { Preconditions.checkState(pravegaConfig != null, "A Pravega configuration must be supplied."); @@ -132,6 +135,8 @@ public PravegaConfig getPravegaConfig() { /** * Resolves the streams to be provided to the reader, based on the configured default scope. + * + * @return A list of {@link StreamWithBoundaries} that is ready to be read */ protected List resolveStreams() { Preconditions.checkState(!streams.isEmpty(), "At least one stream must be supplied."); @@ -154,6 +159,8 @@ public B enableMetrics(boolean enable) { /** * getter to fetch the metrics flag. + * + * @return A boolean if metrics is enabled */ public boolean isMetricsEnabled() { return enableMetrics; diff --git a/src/main/java/io/pravega/connectors/flink/AbstractStreamingReaderBuilder.java b/src/main/java/io/pravega/connectors/flink/AbstractStreamingReaderBuilder.java index e50ef23f..7be2931a 100644 --- a/src/main/java/io/pravega/connectors/flink/AbstractStreamingReaderBuilder.java +++ b/src/main/java/io/pravega/connectors/flink/AbstractStreamingReaderBuilder.java @@ -57,6 +57,7 @@ protected AbstractStreamingReaderBuilder() { * The default value is generated based on other inputs. * * @param uid the uid to use. + * @return A builder to configure and create a streaming reader. */ public B uid(String uid) { this.uid = uid; @@ -69,6 +70,7 @@ public B uid(String uid) { * The default value is taken from the {@link PravegaConfig} {@code defaultScope} property. * * @param scope the scope name. + * @return A builder to configure and create a streaming reader. */ public B withReaderGroupScope(String scope) { this.readerGroupScope = Preconditions.checkNotNull(scope); @@ -79,6 +81,7 @@ public B withReaderGroupScope(String scope) { * Configures the reader group name. * * @param readerGroupName the reader group name. + * @return A builder to configure and create a streaming reader. */ public B withReaderGroupName(String readerGroupName) { this.readerGroupName = Preconditions.checkNotNull(readerGroupName); @@ -89,6 +92,7 @@ public B withReaderGroupName(String readerGroupName) { * Sets the group refresh time, with a default of 1 second. * * @param groupRefreshTime The group refresh time + * @return A builder to configure and create a streaming reader. */ public B withReaderGroupRefreshTime(Time groupRefreshTime) { this.readerGroupRefreshTime = groupRefreshTime; @@ -99,6 +103,7 @@ public B withReaderGroupRefreshTime(Time groupRefreshTime) { * Sets the timeout for initiating a checkpoint in Pravega. * * @param checkpointInitiateTimeout The timeout + * @return A builder to configure and create a streaming reader. */ public B withCheckpointInitiateTimeout(Time checkpointInitiateTimeout) { Preconditions.checkArgument(checkpointInitiateTimeout.getSize() > 0, "timeout must be > 0"); @@ -111,6 +116,7 @@ public B withCheckpointInitiateTimeout(Time checkpointInitiateTimeout) { * expires (without an event being returned), another call will be made. * * @param eventReadTimeout The timeout + * @return A builder to configure and create a streaming reader. */ public B withEventReadTimeout(Time eventReadTimeout) { Preconditions.checkArgument(eventReadTimeout.getSize() > 0, "timeout must be > 0"); @@ -126,6 +132,7 @@ public B withEventReadTimeout(Time eventReadTimeout) { * This configuration is particularly relevant when multiple checkpoint requests need to be honored (e.g., frequent savepoint requests being triggered concurrently). * * @param maxOutstandingCheckpointRequest maximum outstanding checkpoint request. + * @return A builder to configure and create a streaming reader. */ public B withMaxOutstandingCheckpointRequest(int maxOutstandingCheckpointRequest) { this.maxOutstandingCheckpointRequest = maxOutstandingCheckpointRequest; @@ -201,6 +208,8 @@ private boolean isReaderGroupNameAutoGenerated(String readerGroupName) { * 1. be stable across savepoints for the same inputs * 2. disambiguate one source from another (e.g. in a program that uses numerous instances of {@link FlinkPravegaReader}) * 3. allow for reconfiguration of the stream cuts and timeouts + * + * @return A generated reader group ID. */ public String generateUid() { StringBuilder sb = new StringBuilder(); diff --git a/src/main/java/io/pravega/connectors/flink/AbstractStreamingWriterBuilder.java b/src/main/java/io/pravega/connectors/flink/AbstractStreamingWriterBuilder.java index 933c0296..b791d218 100644 --- a/src/main/java/io/pravega/connectors/flink/AbstractStreamingWriterBuilder.java +++ b/src/main/java/io/pravega/connectors/flink/AbstractStreamingWriterBuilder.java @@ -42,6 +42,7 @@ protected AbstractStreamingWriterBuilder() { * Sets the writer mode to provide at-least-once or exactly-once guarantees. * * @param writerMode The writer mode of {@code BEST_EFFORT}, {@code ATLEAST_ONCE}, or {@code EXACTLY_ONCE}. + * @return A builder to configure and create a streaming writer. */ public B withWriterMode(PravegaWriterMode writerMode) { this.writerMode = writerMode; @@ -52,6 +53,7 @@ public B withWriterMode(PravegaWriterMode writerMode) { * Enable watermark. * * @param enableWatermark boolean + * @return A builder to configure and create a streaming writer. */ public B enableWatermark(boolean enableWatermark) { this.enableWatermark = enableWatermark; @@ -67,6 +69,7 @@ public B enableWatermark(boolean enableWatermark) { * This configuration setting sets the lease renewal period. The default value is 30 seconds. * * @param period the lease renewal period + * @return A builder to configure and create a streaming writer. */ public B withTxnLeaseRenewalPeriod(Time period) { Preconditions.checkArgument(period.getSize() > 0, "The timeout must be a positive value."); @@ -79,6 +82,7 @@ public B withTxnLeaseRenewalPeriod(Time period) { * * @param serializationSchema the deserialization schema to use. * @param eventRouter the event router to use. + * @return An instance of {@link FlinkPravegaWriter}. */ protected FlinkPravegaWriter createSinkFunction(SerializationSchema serializationSchema, PravegaEventRouter eventRouter) { Preconditions.checkNotNull(serializationSchema, "serializationSchema"); diff --git a/src/main/java/io/pravega/connectors/flink/AbstractWriterBuilder.java b/src/main/java/io/pravega/connectors/flink/AbstractWriterBuilder.java index f2314cf3..1dccf3aa 100644 --- a/src/main/java/io/pravega/connectors/flink/AbstractWriterBuilder.java +++ b/src/main/java/io/pravega/connectors/flink/AbstractWriterBuilder.java @@ -40,6 +40,7 @@ public AbstractWriterBuilder() { * The default client configuration is obtained from {@code PravegaConfig.fromDefaults()}. * * @param pravegaConfig the configuration to use. + * @return A builder to configure and create a writer. */ public B withPravegaConfig(PravegaConfig pravegaConfig) { this.pravegaConfig = pravegaConfig; @@ -70,6 +71,8 @@ public B forStream(final Stream stream) { /** * Gets the Pravega configuration. + * + * @return the instance of {@link PravegaConfig}. */ public PravegaConfig getPravegaConfig() { Preconditions.checkState(pravegaConfig != null, "A Pravega configuration must be supplied."); @@ -78,6 +81,8 @@ public PravegaConfig getPravegaConfig() { /** * Resolves the stream to be provided to the writer, based on the configured default scope. + * + * @return the resolved stream instance. */ public Stream resolveStream() { Preconditions.checkState(stream != null, "A stream must be supplied."); @@ -98,6 +103,8 @@ public B enableMetrics(boolean enable) { /** * getter to fetch the metrics flag. + * + * @return A boolean if metrics is enabled */ public boolean isMetricsEnabled() { return enableMetrics; diff --git a/src/main/java/io/pravega/connectors/flink/FlinkPravegaInputFormat.java b/src/main/java/io/pravega/connectors/flink/FlinkPravegaInputFormat.java index b9bea0cc..c9f73c68 100644 --- a/src/main/java/io/pravega/connectors/flink/FlinkPravegaInputFormat.java +++ b/src/main/java/io/pravega/connectors/flink/FlinkPravegaInputFormat.java @@ -174,7 +174,9 @@ public void close() throws IOException { /** * Gets a builder {@link FlinkPravegaInputFormat} to read Pravega streams using the Flink batch API. + * * @param the element type. + * @return A builder to configure and create a batch reader. */ public static Builder builder() { return new Builder<>(); @@ -197,6 +199,7 @@ protected Builder builder() { * Sets the deserialization schema. * * @param deserializationSchema The deserialization schema + * @return A builder to configure and create a batch reader. */ public Builder withDeserializationSchema(DeserializationSchema deserializationSchema) { this.deserializationSchema = deserializationSchema; diff --git a/src/main/java/io/pravega/connectors/flink/FlinkPravegaOutputFormat.java b/src/main/java/io/pravega/connectors/flink/FlinkPravegaOutputFormat.java index a7c2f927..7013a9e3 100644 --- a/src/main/java/io/pravega/connectors/flink/FlinkPravegaOutputFormat.java +++ b/src/main/java/io/pravega/connectors/flink/FlinkPravegaOutputFormat.java @@ -243,6 +243,7 @@ public static class Builder extends AbstractWriterBuilder> { * Sets the serialization schema. * * @param serializationSchema The serialization schema + * @return A builder to configure and create a batch writer. */ public Builder withSerializationSchema(SerializationSchema serializationSchema) { this.serializationSchema = serializationSchema; @@ -253,6 +254,7 @@ public Builder withSerializationSchema(SerializationSchema serializationSc * Sets the event router. * * @param eventRouter the event router which produces a key per event. + * @return A builder to configure and create a batch writer. */ public Builder withEventRouter(PravegaEventRouter eventRouter) { this.eventRouter = eventRouter; @@ -266,6 +268,8 @@ protected Builder builder() { /** * Builds the {@link FlinkPravegaOutputFormat}. + * + * @return An instance of {@link FlinkPravegaOutputFormat} */ public FlinkPravegaOutputFormat build() { Preconditions.checkNotNull(serializationSchema, "serializationSchema"); diff --git a/src/main/java/io/pravega/connectors/flink/FlinkPravegaReader.java b/src/main/java/io/pravega/connectors/flink/FlinkPravegaReader.java index 010ba09f..2ee88a23 100644 --- a/src/main/java/io/pravega/connectors/flink/FlinkPravegaReader.java +++ b/src/main/java/io/pravega/connectors/flink/FlinkPravegaReader.java @@ -609,6 +609,8 @@ private ReaderGroup createReaderGroup() { /** * Create the {@link ReaderGroupManager} for the current configuration. + * + * @return An instance of {@link ReaderGroupManager} */ protected ReaderGroupManager createReaderGroupManager() { if (readerGroupManager == null) { @@ -620,6 +622,8 @@ protected ReaderGroupManager createReaderGroupManager() { /** * Create the {@link EventStreamClientFactory} for the current configuration. + * + * @return An instance of {@link EventStreamClientFactory} */ protected EventStreamClientFactory createEventStreamClientFactory() { if (eventStreamClientFactory == null) { @@ -631,7 +635,9 @@ protected EventStreamClientFactory createEventStreamClientFactory() { /** * Create the {@link EventStreamReader} for the current configuration. + * * @param readerId the readerID to use. + * @return An instance of {@link EventStreamReader} */ protected EventStreamReader createEventStreamReader(String readerId) { return createPravegaReader( @@ -648,7 +654,9 @@ protected EventStreamReader createEventStreamReader(String readerId) { /** * Gets a builder for {@link FlinkPravegaReader} to read Pravega streams using the Flink streaming API. + * * @param the element type. + * @return A new builder of {@link FlinkPravegaReader} */ public static FlinkPravegaReader.Builder builder() { return new Builder<>(); @@ -709,7 +717,9 @@ protected SerializedValue> getAssignerWithTimeWindows /** * Builds a {@link FlinkPravegaReader} based on the configuration. + * * @throws IllegalStateException if the configuration is invalid. + * @return An instance of {@link FlinkPravegaReader} */ public FlinkPravegaReader build() { FlinkPravegaReader reader = buildSourceFunction(); diff --git a/src/main/java/io/pravega/connectors/flink/FlinkPravegaTableSink.java b/src/main/java/io/pravega/connectors/flink/FlinkPravegaTableSink.java index dfae8161..667ab054 100644 --- a/src/main/java/io/pravega/connectors/flink/FlinkPravegaTableSink.java +++ b/src/main/java/io/pravega/connectors/flink/FlinkPravegaTableSink.java @@ -67,7 +67,7 @@ protected FlinkPravegaTableSink(Function> w /** * Creates a copy of the sink for configuration purposes. */ - protected FlinkPravegaTableSink createCopy() { + private FlinkPravegaTableSink createCopy() { return new FlinkPravegaTableSink(writerFactory, outputFormatFactory, schema); } diff --git a/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java b/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java index 57898576..d86a5f46 100644 --- a/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java +++ b/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java @@ -98,10 +98,13 @@ public class FlinkPravegaWriter private final long txnLeaseRenewalPeriod; // The sink's mode of operation. This is used to provide different guarantees for the written events. - private PravegaWriterMode writerMode; + private final PravegaWriterMode writerMode; // flag to enable/disable watermark - private boolean enableWatermark; + private final boolean enableWatermark; + + // Pravega Writer prefix that will be used by all Pravega Writers in this Sink + private final String writerIdPrefix; // Client factory for PravegaWriter instances private transient EventStreamClientFactory clientFactory = null; @@ -112,9 +115,6 @@ public class FlinkPravegaWriter // Transactional Pravega writer instance private transient TransactionalEventStreamWriter transactionalWriter = null; - // Pravega Writer prefix that will be used by all Pravega Writers in this Sink - private String writerIdPrefix; - /** * The flink pravega writer instance which can be added as a sink to a Flink job. * @@ -152,6 +152,8 @@ protected FlinkPravegaWriter( /** * Gets the associated event router. + * + * @return The {@link PravegaEventRouter} of the writer */ public PravegaEventRouter getEventRouter() { return this.eventRouter; @@ -160,14 +162,14 @@ public PravegaEventRouter getEventRouter() { /** * Gets this writer's operating mode. */ - public PravegaWriterMode getPravegaWriterMode() { + PravegaWriterMode getPravegaWriterMode() { return this.writerMode; } /** * Gets this enable watermark flag. */ - public boolean getEnableWatermark() { + boolean getEnableWatermark() { return this.enableWatermark; } @@ -183,6 +185,7 @@ public void open(Configuration configuration) throws Exception { } @Override + @SuppressWarnings("unchecked") protected void invoke(PravegaTransactionState transaction, T event, Context context) throws Exception { checkWriteError(); @@ -204,7 +207,7 @@ protected void invoke(PravegaTransactionState transaction, T event, Context cont future.whenCompleteAsync( (result, e) -> { if (e != null) { - log.warn("Detected a write failure: {}", e); + log.warn("Detected a write failure", e); // We will record only the first error detected, since this will mostly likely help with // finding the root cause. Storing all errors will not be feasible. @@ -258,6 +261,7 @@ protected void preCommit(PravegaTransactionState transaction) throws Exception { protected void commit(PravegaTransactionState transaction) { switch (writerMode) { case EXACTLY_ONCE: + @SuppressWarnings("unchecked") final Transaction txn = transaction.getTransaction() != null ? transaction.getTransaction() : transactionalWriter.getTxn(UUID.fromString(transaction.transactionId)); final Transaction.Status status = txn.checkStatus(); @@ -294,6 +298,7 @@ protected void recoverAndCommit(PravegaTransactionState transaction) { protected void abort(PravegaTransactionState transaction) { switch (writerMode) { case EXACTLY_ONCE: + @SuppressWarnings("unchecked") final Transaction txn = transaction.getTransaction() != null ? transaction.getTransaction() : transactionalWriter.getTxn(UUID.fromString(transaction.transactionId)); txn.abort(); @@ -320,7 +325,7 @@ public void close() throws Exception { // Current transaction will be aborted with this method super.close(); } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); + exception = e; } if (writer != null) { @@ -688,6 +693,7 @@ protected Builder builder() { * Sets the serialization schema. * * @param serializationSchema The serialization schema + * @return Builder instance. */ public Builder withSerializationSchema(SerializationSchema serializationSchema) { this.serializationSchema = serializationSchema; @@ -698,6 +704,7 @@ public Builder withSerializationSchema(SerializationSchema serializationSc * Sets the event router. * * @param eventRouter the event router which produces a key per event. + * @return Builder instance. */ public Builder withEventRouter(PravegaEventRouter eventRouter) { this.eventRouter = eventRouter; @@ -706,6 +713,8 @@ public Builder withEventRouter(PravegaEventRouter eventRouter) { /** * Builds the {@link FlinkPravegaWriter}. + * + * @return An instance of {@link FlinkPravegaWriter} */ public FlinkPravegaWriter build() { Preconditions.checkState(eventRouter != null, "Event router must be supplied."); diff --git a/src/main/java/io/pravega/connectors/flink/PravegaConfig.java b/src/main/java/io/pravega/connectors/flink/PravegaConfig.java index c0967e3c..a270d417 100644 --- a/src/main/java/io/pravega/connectors/flink/PravegaConfig.java +++ b/src/main/java/io/pravega/connectors/flink/PravegaConfig.java @@ -49,6 +49,8 @@ public class PravegaConfig implements Serializable { /** * Gets a configuration based on defaults obtained from the local environment. + * + * @return A default instance of {@link PravegaConfig} */ public static PravegaConfig fromDefaults() { return new PravegaConfig(System.getProperties(), System.getenv(), ParameterTool.fromMap(Collections.emptyMap())); @@ -58,6 +60,7 @@ public static PravegaConfig fromDefaults() { * Gets a configuration based on defaults obtained from the local environment plus the given program parameters. * * @param params the parameters to use. + * @return An instance of {@link PravegaConfig} */ public static PravegaConfig fromParams(ParameterTool params) { return new PravegaConfig(System.getProperties(), System.getenv(), params); @@ -67,6 +70,8 @@ public static PravegaConfig fromParams(ParameterTool params) { /** * Gets the {@link ClientConfig} to use with the Pravega client. + * + * @return The Pravega {@link ClientConfig} */ public ClientConfig getClientConfig() { ClientConfig.ClientConfigBuilder builder = ClientConfig.builder() @@ -137,6 +142,7 @@ public Stream resolve(String streamSpec) { * Configures the Pravega controller RPC URI. * * @param controllerURI The URI. + * @return current instance of PravegaConfig. */ public PravegaConfig withControllerURI(URI controllerURI) { this.controllerURI = controllerURI; @@ -157,6 +163,7 @@ public PravegaConfig withTrustStore(String trustStore) { * Configures the default Pravega scope, to resolve unqualified stream names and to support reader groups. * * @param scope The scope to use (with lowest priority). + * @return current instance of PravegaConfig. */ public PravegaConfig withDefaultScope(String scope) { if (this.defaultScope == null) { @@ -169,6 +176,7 @@ public PravegaConfig withDefaultScope(String scope) { * Configures the self-defined Pravega scope. * * @param scope The scope to use (with highest priority). + * @return current instance of PravegaConfig. */ public PravegaConfig withScope(String scope) { this.defaultScope = scope; @@ -177,6 +185,8 @@ public PravegaConfig withScope(String scope) { /** * Gets the default Pravega scope. + * + * @return current default scope name. */ @Nullable public String getDefaultScope() { @@ -191,6 +201,7 @@ public String getDefaultScope() { * Configures the Pravega credentials to use. * * @param credentials a credentials object. + * @return current instance of PravegaConfig. */ public PravegaConfig withCredentials(Credentials credentials) { this.credentials = credentials; @@ -201,6 +212,7 @@ public PravegaConfig withCredentials(Credentials credentials) { * Enables or disables TLS hostname validation (default: true). * * @param validateHostname a boolean indicating whether to validate the hostname on incoming requests. + * @return current instance of PravegaConfig. */ public PravegaConfig withHostnameValidation(boolean validateHostname) { this.validateHostname = validateHostname; diff --git a/src/main/java/io/pravega/connectors/flink/serialization/WrappingSerializer.java b/src/main/java/io/pravega/connectors/flink/serialization/WrappingSerializer.java index f047644d..a4bc802f 100644 --- a/src/main/java/io/pravega/connectors/flink/serialization/WrappingSerializer.java +++ b/src/main/java/io/pravega/connectors/flink/serialization/WrappingSerializer.java @@ -18,6 +18,8 @@ public interface WrappingSerializer { /** * Gets the wrapped Pravega Serializer. + * + * @return a Pravega {@link Serializer}. */ Serializer getWrappedSerializer(); } diff --git a/src/main/java/io/pravega/connectors/flink/table/descriptors/Pravega.java b/src/main/java/io/pravega/connectors/flink/table/descriptors/Pravega.java index bba96d3d..6dc0a042 100644 --- a/src/main/java/io/pravega/connectors/flink/table/descriptors/Pravega.java +++ b/src/main/java/io/pravega/connectors/flink/table/descriptors/Pravega.java @@ -338,7 +338,9 @@ public static class TableSinkWriterBuilder serializationSchema) { this.serializationSchema = serializationSchema; @@ -361,7 +365,9 @@ protected TableSinkWriterBuilder builder() { /** * Creates the sink function based on the given table schema and current builder state. + * * @param tableSchema the schema of the sink table + * @return An instance of {@link FlinkPravegaWriter}. */ public FlinkPravegaWriter createSinkFunction(TableSchema tableSchema) { Preconditions.checkState(routingKeyFieldName != null, "The routing key field must be provided."); @@ -372,7 +378,9 @@ public FlinkPravegaWriter createSinkFunction(TableSchema tableSchema) { /** * Creates FlinkPravegaOutputFormat based on the given table schema and current builder state. + * * @param tableSchema the schema of the sink table + * @return An instance of {@link FlinkPravegaOutputFormat}. */ public FlinkPravegaOutputFormat createOutputFormat(TableSchema tableSchema) { Preconditions.checkState(routingKeyFieldName != null, "The routing key field must be provided.");