Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[issue-370] Fix all javadoc "no return" warnings #430

Merged
merged 1 commit into from
Jan 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,7 @@ coverage:
status:
project:
default:
threshold: 20%
threshold: 5%
patch:
default:
threshold: 20%
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ protected AbstractReaderBuilder() {
* The default client configuration is obtained from {@code PravegaConfig.fromDefaults()}.
*
* @param pravegaConfig the configuration to use.
* @return A builder to configure and create a reader.
*/
public B withPravegaConfig(PravegaConfig pravegaConfig) {
this.pravegaConfig = pravegaConfig;
Expand Down Expand Up @@ -124,6 +125,8 @@ public B forStream(final Stream stream) {

/**
* Gets the Pravega configuration.
*
* @return the instance of {@link PravegaConfig}.
*/
public PravegaConfig getPravegaConfig() {
Preconditions.checkState(pravegaConfig != null, "A Pravega configuration must be supplied.");
Expand All @@ -132,6 +135,8 @@ public PravegaConfig getPravegaConfig() {

/**
* Resolves the streams to be provided to the reader, based on the configured default scope.
*
* @return A list of {@link StreamWithBoundaries} that is ready to be read
*/
protected List<StreamWithBoundaries> resolveStreams() {
Preconditions.checkState(!streams.isEmpty(), "At least one stream must be supplied.");
Expand All @@ -154,6 +159,8 @@ public B enableMetrics(boolean enable) {

/**
* getter to fetch the metrics flag.
*
* @return A boolean if metrics is enabled
*/
public boolean isMetricsEnabled() {
return enableMetrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ protected AbstractStreamingReaderBuilder() {
* The default value is generated based on other inputs.
*
* @param uid the uid to use.
* @return A builder to configure and create a streaming reader.
*/
public B uid(String uid) {
this.uid = uid;
Expand All @@ -69,6 +70,7 @@ public B uid(String uid) {
* The default value is taken from the {@link PravegaConfig} {@code defaultScope} property.
*
* @param scope the scope name.
* @return A builder to configure and create a streaming reader.
*/
public B withReaderGroupScope(String scope) {
this.readerGroupScope = Preconditions.checkNotNull(scope);
Expand All @@ -79,6 +81,7 @@ public B withReaderGroupScope(String scope) {
* Configures the reader group name.
*
* @param readerGroupName the reader group name.
* @return A builder to configure and create a streaming reader.
*/
public B withReaderGroupName(String readerGroupName) {
this.readerGroupName = Preconditions.checkNotNull(readerGroupName);
Expand All @@ -89,6 +92,7 @@ public B withReaderGroupName(String readerGroupName) {
* Sets the group refresh time, with a default of 1 second.
*
* @param groupRefreshTime The group refresh time
* @return A builder to configure and create a streaming reader.
*/
public B withReaderGroupRefreshTime(Time groupRefreshTime) {
this.readerGroupRefreshTime = groupRefreshTime;
Expand All @@ -99,6 +103,7 @@ public B withReaderGroupRefreshTime(Time groupRefreshTime) {
* Sets the timeout for initiating a checkpoint in Pravega.
*
* @param checkpointInitiateTimeout The timeout
* @return A builder to configure and create a streaming reader.
*/
public B withCheckpointInitiateTimeout(Time checkpointInitiateTimeout) {
Preconditions.checkArgument(checkpointInitiateTimeout.getSize() > 0, "timeout must be > 0");
Expand All @@ -111,6 +116,7 @@ public B withCheckpointInitiateTimeout(Time checkpointInitiateTimeout) {
* expires (without an event being returned), another call will be made.
*
* @param eventReadTimeout The timeout
* @return A builder to configure and create a streaming reader.
*/
public B withEventReadTimeout(Time eventReadTimeout) {
Preconditions.checkArgument(eventReadTimeout.getSize() > 0, "timeout must be > 0");
Expand All @@ -126,6 +132,7 @@ public B withEventReadTimeout(Time eventReadTimeout) {
* This configuration is particularly relevant when multiple checkpoint requests need to be honored (e.g., frequent savepoint requests being triggered concurrently).
*
* @param maxOutstandingCheckpointRequest maximum outstanding checkpoint request.
* @return A builder to configure and create a streaming reader.
*/
public B withMaxOutstandingCheckpointRequest(int maxOutstandingCheckpointRequest) {
this.maxOutstandingCheckpointRequest = maxOutstandingCheckpointRequest;
Expand Down Expand Up @@ -201,6 +208,8 @@ private boolean isReaderGroupNameAutoGenerated(String readerGroupName) {
* 1. be stable across savepoints for the same inputs
* 2. disambiguate one source from another (e.g. in a program that uses numerous instances of {@link FlinkPravegaReader})
* 3. allow for reconfiguration of the stream cuts and timeouts
*
* @return A generated reader group ID.
*/
public String generateUid() {
StringBuilder sb = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ protected AbstractStreamingWriterBuilder() {
* Sets the writer mode to provide at-least-once or exactly-once guarantees.
*
* @param writerMode The writer mode of {@code BEST_EFFORT}, {@code ATLEAST_ONCE}, or {@code EXACTLY_ONCE}.
* @return A builder to configure and create a streaming writer.
*/
public B withWriterMode(PravegaWriterMode writerMode) {
this.writerMode = writerMode;
Expand All @@ -52,6 +53,7 @@ public B withWriterMode(PravegaWriterMode writerMode) {
* Enable watermark.
*
* @param enableWatermark boolean
* @return A builder to configure and create a streaming writer.
*/
public B enableWatermark(boolean enableWatermark) {
this.enableWatermark = enableWatermark;
Expand All @@ -67,6 +69,7 @@ public B enableWatermark(boolean enableWatermark) {
* This configuration setting sets the lease renewal period. The default value is 30 seconds.
*
* @param period the lease renewal period
* @return A builder to configure and create a streaming writer.
*/
public B withTxnLeaseRenewalPeriod(Time period) {
Preconditions.checkArgument(period.getSize() > 0, "The timeout must be a positive value.");
Expand All @@ -79,6 +82,7 @@ public B withTxnLeaseRenewalPeriod(Time period) {
*
* @param serializationSchema the deserialization schema to use.
* @param eventRouter the event router to use.
* @return An instance of {@link FlinkPravegaWriter}.
*/
protected FlinkPravegaWriter<T> createSinkFunction(SerializationSchema<T> serializationSchema, PravegaEventRouter<T> eventRouter) {
Preconditions.checkNotNull(serializationSchema, "serializationSchema");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public AbstractWriterBuilder() {
* The default client configuration is obtained from {@code PravegaConfig.fromDefaults()}.
*
* @param pravegaConfig the configuration to use.
* @return A builder to configure and create a writer.
*/
public B withPravegaConfig(PravegaConfig pravegaConfig) {
this.pravegaConfig = pravegaConfig;
Expand Down Expand Up @@ -70,6 +71,8 @@ public B forStream(final Stream stream) {

/**
* Gets the Pravega configuration.
*
* @return the instance of {@link PravegaConfig}.
*/
public PravegaConfig getPravegaConfig() {
Preconditions.checkState(pravegaConfig != null, "A Pravega configuration must be supplied.");
Expand All @@ -78,6 +81,8 @@ public PravegaConfig getPravegaConfig() {

/**
* Resolves the stream to be provided to the writer, based on the configured default scope.
*
* @return the resolved stream instance.
*/
public Stream resolveStream() {
Preconditions.checkState(stream != null, "A stream must be supplied.");
Expand All @@ -98,6 +103,8 @@ public B enableMetrics(boolean enable) {

/**
* getter to fetch the metrics flag.
*
* @return A boolean if metrics is enabled
*/
public boolean isMetricsEnabled() {
return enableMetrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,9 @@ public void close() throws IOException {

/**
* Gets a builder {@link FlinkPravegaInputFormat} to read Pravega streams using the Flink batch API.
*
* @param <T> the element type.
* @return A builder to configure and create a batch reader.
*/
public static <T> Builder<T> builder() {
return new Builder<>();
Expand All @@ -197,6 +199,7 @@ protected Builder<T> builder() {
* Sets the deserialization schema.
*
* @param deserializationSchema The deserialization schema
* @return A builder to configure and create a batch reader.
*/
public Builder<T> withDeserializationSchema(DeserializationSchema<T> deserializationSchema) {
this.deserializationSchema = deserializationSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ public static class Builder<T> extends AbstractWriterBuilder<Builder<T>> {
* Sets the serialization schema.
*
* @param serializationSchema The serialization schema
* @return A builder to configure and create a batch writer.
*/
public Builder<T> withSerializationSchema(SerializationSchema<T> serializationSchema) {
this.serializationSchema = serializationSchema;
Expand All @@ -253,6 +254,7 @@ public Builder<T> withSerializationSchema(SerializationSchema<T> serializationSc
* Sets the event router.
*
* @param eventRouter the event router which produces a key per event.
* @return A builder to configure and create a batch writer.
*/
public Builder<T> withEventRouter(PravegaEventRouter<T> eventRouter) {
this.eventRouter = eventRouter;
Expand All @@ -266,6 +268,8 @@ protected Builder<T> builder() {

/**
* Builds the {@link FlinkPravegaOutputFormat}.
*
* @return An instance of {@link FlinkPravegaOutputFormat}
*/
public FlinkPravegaOutputFormat<T> build() {
Preconditions.checkNotNull(serializationSchema, "serializationSchema");
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/io/pravega/connectors/flink/FlinkPravegaReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,8 @@ private ReaderGroup createReaderGroup() {

/**
* Create the {@link ReaderGroupManager} for the current configuration.
*
* @return An instance of {@link ReaderGroupManager}
*/
protected ReaderGroupManager createReaderGroupManager() {
if (readerGroupManager == null) {
Expand All @@ -620,6 +622,8 @@ protected ReaderGroupManager createReaderGroupManager() {

/**
* Create the {@link EventStreamClientFactory} for the current configuration.
*
* @return An instance of {@link EventStreamClientFactory}
*/
protected EventStreamClientFactory createEventStreamClientFactory() {
if (eventStreamClientFactory == null) {
Expand All @@ -631,7 +635,9 @@ protected EventStreamClientFactory createEventStreamClientFactory() {

/**
* Create the {@link EventStreamReader} for the current configuration.
*
* @param readerId the readerID to use.
* @return An instance of {@link EventStreamReader}
*/
protected EventStreamReader<T> createEventStreamReader(String readerId) {
return createPravegaReader(
Expand All @@ -648,7 +654,9 @@ protected EventStreamReader<T> createEventStreamReader(String readerId) {

/**
* Gets a builder for {@link FlinkPravegaReader} to read Pravega streams using the Flink streaming API.
*
* @param <T> the element type.
* @return A new builder of {@link FlinkPravegaReader}
*/
public static <T> FlinkPravegaReader.Builder<T> builder() {
return new Builder<>();
Expand Down Expand Up @@ -709,7 +717,9 @@ protected SerializedValue<AssignerWithTimeWindows<T>> getAssignerWithTimeWindows

/**
* Builds a {@link FlinkPravegaReader} based on the configuration.
*
* @throws IllegalStateException if the configuration is invalid.
* @return An instance of {@link FlinkPravegaReader}
*/
public FlinkPravegaReader<T> build() {
FlinkPravegaReader<T> reader = buildSourceFunction();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ protected FlinkPravegaTableSink(Function<TableSchema, FlinkPravegaWriter<Row>> w
/**
* Creates a copy of the sink for configuration purposes.
*/
protected FlinkPravegaTableSink createCopy() {
private FlinkPravegaTableSink createCopy() {
return new FlinkPravegaTableSink(writerFactory, outputFormatFactory, schema);
}

Expand Down
27 changes: 18 additions & 9 deletions src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,13 @@ public class FlinkPravegaWriter<T>
private final long txnLeaseRenewalPeriod;

// The sink's mode of operation. This is used to provide different guarantees for the written events.
private PravegaWriterMode writerMode;
private final PravegaWriterMode writerMode;

// flag to enable/disable watermark
private boolean enableWatermark;
private final boolean enableWatermark;

// Pravega Writer prefix that will be used by all Pravega Writers in this Sink
private final String writerIdPrefix;

// Client factory for PravegaWriter instances
private transient EventStreamClientFactory clientFactory = null;
Expand All @@ -112,9 +115,6 @@ public class FlinkPravegaWriter<T>
// Transactional Pravega writer instance
private transient TransactionalEventStreamWriter<T> transactionalWriter = null;

// Pravega Writer prefix that will be used by all Pravega Writers in this Sink
private String writerIdPrefix;

/**
* The flink pravega writer instance which can be added as a sink to a Flink job.
*
Expand Down Expand Up @@ -152,6 +152,8 @@ protected FlinkPravegaWriter(

/**
* Gets the associated event router.
*
* @return The {@link PravegaEventRouter} of the writer
*/
public PravegaEventRouter<T> getEventRouter() {
return this.eventRouter;
Expand All @@ -160,14 +162,14 @@ public PravegaEventRouter<T> getEventRouter() {
/**
* Gets this writer's operating mode.
*/
public PravegaWriterMode getPravegaWriterMode() {
PravegaWriterMode getPravegaWriterMode() {
return this.writerMode;
}

/**
* Gets this enable watermark flag.
*/
public boolean getEnableWatermark() {
boolean getEnableWatermark() {
return this.enableWatermark;
}

Expand All @@ -183,6 +185,7 @@ public void open(Configuration configuration) throws Exception {
}

@Override
@SuppressWarnings("unchecked")
protected void invoke(PravegaTransactionState transaction, T event, Context context) throws Exception {
checkWriteError();

Expand All @@ -204,7 +207,7 @@ protected void invoke(PravegaTransactionState transaction, T event, Context cont
future.whenCompleteAsync(
(result, e) -> {
if (e != null) {
log.warn("Detected a write failure: {}", e);
log.warn("Detected a write failure", e);

// We will record only the first error detected, since this will mostly likely help with
// finding the root cause. Storing all errors will not be feasible.
Expand Down Expand Up @@ -258,6 +261,7 @@ protected void preCommit(PravegaTransactionState transaction) throws Exception {
protected void commit(PravegaTransactionState transaction) {
switch (writerMode) {
case EXACTLY_ONCE:
@SuppressWarnings("unchecked")
final Transaction<T> txn = transaction.getTransaction() != null ? transaction.getTransaction() :
transactionalWriter.getTxn(UUID.fromString(transaction.transactionId));
final Transaction.Status status = txn.checkStatus();
Expand Down Expand Up @@ -294,6 +298,7 @@ protected void recoverAndCommit(PravegaTransactionState transaction) {
protected void abort(PravegaTransactionState transaction) {
switch (writerMode) {
case EXACTLY_ONCE:
@SuppressWarnings("unchecked")
final Transaction<T> txn = transaction.getTransaction() != null ? transaction.getTransaction() :
transactionalWriter.getTxn(UUID.fromString(transaction.transactionId));
txn.abort();
Expand All @@ -320,7 +325,7 @@ public void close() throws Exception {
// Current transaction will be aborted with this method
super.close();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
exception = e;
}

if (writer != null) {
Expand Down Expand Up @@ -688,6 +693,7 @@ protected Builder<T> builder() {
* Sets the serialization schema.
*
* @param serializationSchema The serialization schema
* @return Builder instance.
*/
public Builder<T> withSerializationSchema(SerializationSchema<T> serializationSchema) {
this.serializationSchema = serializationSchema;
Expand All @@ -698,6 +704,7 @@ public Builder<T> withSerializationSchema(SerializationSchema<T> serializationSc
* Sets the event router.
*
* @param eventRouter the event router which produces a key per event.
* @return Builder instance.
*/
public Builder<T> withEventRouter(PravegaEventRouter<T> eventRouter) {
this.eventRouter = eventRouter;
Expand All @@ -706,6 +713,8 @@ public Builder<T> withEventRouter(PravegaEventRouter<T> eventRouter) {

/**
* Builds the {@link FlinkPravegaWriter}.
*
* @return An instance of {@link FlinkPravegaWriter}
*/
public FlinkPravegaWriter<T> build() {
Preconditions.checkState(eventRouter != null, "Event router must be supplied.");
Expand Down
Loading