Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CONNECTOR] rewrite connector.Definition by java 17 record #1753

Merged
merged 3 commits into from
May 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 38 additions & 58 deletions connector/src/main/java/org/astraea/connector/Definition.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,32 @@

import java.util.Collection;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiConsumer;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;

public interface Definition {
public record Definition(
String name,
Optional<Object> defaultValue,
String documentation,
Type type,
BiConsumer<String, Object> validator) {

@Override
public Optional<Object> defaultValue() {
// ConfigDef.NO_DEFAULT_VALUE is a placeholder used to represent the lack of a default value.
return defaultValue.filter(v -> v != ConfigDef.NO_DEFAULT_VALUE);
}

/**
* @return true if the configuration is required, and it has no default value.
*/
public boolean required() {
return defaultValue.filter(v -> v == ConfigDef.NO_DEFAULT_VALUE).isPresent();
}

static Builder builder() {
public static Builder builder() {
return new Builder();
}

Expand All @@ -35,32 +54,20 @@ static ConfigDef toConfigDef(Collection<Definition> defs) {
def.define(
d.name(),
ConfigDef.Type.valueOf(d.type().name()),
d.defaultValue(),
d.validator() == null
? null
: (n, o) -> {
try {
d.validator().accept(n, o);
} catch (Exception e) {
throw new ConfigException(n, o, e.getMessage());
}
},
d.required() ? ConfigDef.NO_DEFAULT_VALUE : d.defaultValue().orElse(null),
(n, o) -> {
try {
d.validator().accept(n, o);
} catch (Exception e) {
throw new ConfigException(n, o, e.getMessage());
}
},
ConfigDef.Importance.MEDIUM,
d.documentation()));
return def;
}

String name();

Object defaultValue();

String documentation();

Type type();

BiConsumer<String, Object> validator();

enum Type {
public enum Type {
BOOLEAN,
STRING,
INT,
Expand All @@ -72,14 +79,14 @@ enum Type {
PASSWORD
}

class Builder {
public static class Builder {
private String name;
private Object defaultValue;

private String documentation = "";
private Type type = Type.STRING;

private BiConsumer<String, Object> validator;
private BiConsumer<String, Object> validator = (l, h) -> {};

private Builder() {}

Expand Down Expand Up @@ -113,39 +120,12 @@ public Builder validator(BiConsumer<String, Object> validator) {
}

public Definition build() {
return new Definition() {
private final String name = Objects.requireNonNull(Builder.this.name);
private final String documentation = Objects.requireNonNull(Builder.this.documentation);
private final Object defaultValue = Builder.this.defaultValue;
private final Type type = Objects.requireNonNull(Builder.this.type);

private final BiConsumer<String, Object> validator = Builder.this.validator;

@Override
public String name() {
return name;
}

@Override
public Object defaultValue() {
return defaultValue;
}

@Override
public String documentation() {
return documentation;
}

@Override
public Type type() {
return type;
}

@Override
public BiConsumer<String, Object> validator() {
return validator;
}
};
return new Definition(
Objects.requireNonNull(name),
Optional.ofNullable(defaultValue),
Objects.requireNonNull(documentation),
Objects.requireNonNull(type),
validator);
}
}
}
34 changes: 19 additions & 15 deletions connector/src/main/java/org/astraea/connector/backup/Exporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.astraea.connector.backup;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -80,21 +81,25 @@ public class Exporter extends SinkConnector {
.documentation("the path required for file storage.")
.required()
.build();

static DataSize SIZE_DEFAULT = DataSize.MB.of(100);
static Definition SIZE_KEY =
Definition.builder()
.name("size")
.type(Definition.Type.STRING)
.validator((name, obj) -> DataSize.of(obj.toString()))
.defaultValue("100MB")
.defaultValue(SIZE_DEFAULT.toString())
.documentation("is the maximum number of the size will be included in each file.")
.build();

static Duration TIME_DEFAULT = Duration.ofSeconds(3);

static Definition TIME_KEY =
Definition.builder()
.name("roll.duration")
.type(Definition.Type.STRING)
.validator((name, obj) -> Utils.toDuration(obj.toString()))
.defaultValue("3s")
.defaultValue(TIME_DEFAULT.toSeconds() + "s")
.documentation("the maximum time before a new archive file is rolling out.")
.build();

Expand All @@ -105,14 +110,16 @@ public class Exporter extends SinkConnector {
.documentation("a value that needs to be overridden in the file system.")
.build();

static DataSize BUFFER_SIZE_DEFAULT = DataSize.MB.of(300);

static Definition BUFFER_SIZE_KEY =
Definition.builder()
.name("writer.buffer.size")
.type(Definition.Type.STRING)
.validator((name, obj) -> DataSize.of(obj.toString()))
.documentation(
"a value that represents the capacity of a blocking queue from which the writer can take records.")
.defaultValue("300MB")
.defaultValue(BUFFER_SIZE_DEFAULT.toString())
.build();
private Configuration configs;

Expand Down Expand Up @@ -255,23 +262,20 @@ List<Record<byte[], byte[]>> recordsFromBuffer() {
protected void init(Configuration configuration) {
this.topicName = configuration.requireString(TOPICS_KEY);
this.path = configuration.requireString(PATH_KEY.name());
this.size =
DataSize.of(
configuration.string(SIZE_KEY.name()).orElse(SIZE_KEY.defaultValue().toString()));
this.size = configuration.string(SIZE_KEY.name()).map(DataSize::of).orElse(SIZE_DEFAULT);
this.interval =
Utils.toDuration(
configuration.string(TIME_KEY.name()).orElse(TIME_KEY.defaultValue().toString()))
configuration
.string(TIME_KEY.name())
.map(Utils::toDuration)
.orElse(TIME_DEFAULT)
.toMillis();

this.bufferSize.reset();

this.bufferSizeLimit =
DataSize.of(
configuration
.string(BUFFER_SIZE_KEY.name())
.orElse(BUFFER_SIZE_KEY.defaultValue().toString()))
configuration
.string(BUFFER_SIZE_KEY.name())
.map(DataSize::of)
.orElse(BUFFER_SIZE_DEFAULT)
.bytes();

this.fs = FileSystem.of(configuration.requireString(SCHEMA_KEY.name()), configuration);
this.writerFuture = CompletableFuture.runAsync(createWriter());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,12 @@ public class Importer extends SourceConnector {
.documentation("The root directory of the file that needs to be imported.")
.required()
.build();
static String CLEAN_SOURCE_DEFAULT = "off";
static Definition CLEAN_SOURCE_KEY =
Definition.builder()
.name("clean.source")
.type(Definition.Type.STRING)
.defaultValue("off")
.defaultValue(CLEAN_SOURCE_DEFAULT)
.documentation(
"Clean source policy. Available policies: \"off\", \"delete\", \"archive\". Default: off")
.build();
Expand Down Expand Up @@ -150,10 +151,7 @@ protected void init(Configuration configuration, MetadataStorage storage) {
this.rootDir = configuration.requireString(PATH_KEY.name());
this.tasksCount = configuration.requireInteger(TASKS_COUNT_KEY);
this.paths = new LinkedList<>();
this.cleanSource =
configuration
.string(CLEAN_SOURCE_KEY.name())
.orElse(CLEAN_SOURCE_KEY.defaultValue().toString());
this.cleanSource = configuration.string(CLEAN_SOURCE_KEY.name()).orElse(CLEAN_SOURCE_DEFAULT);
this.archiveDir = configuration.string(ARCHIVE_DIR_KEY.name());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@
import org.astraea.connector.SinkTask;

public class PerfSink extends SinkConnector {

static Duration FREQUENCY_DEFAULT = Duration.ofMillis(300);

static Definition FREQUENCY_DEF =
Definition.builder()
.name("frequency")
.type(Definition.Type.STRING)
.defaultValue("300ms")
.defaultValue(FREQUENCY_DEFAULT.toMillis() + "ms")
.validator((name, value) -> Utils.toDuration(value.toString()))
.build();

Expand Down Expand Up @@ -60,7 +63,7 @@ protected List<Definition> definitions() {

public static class Task extends SinkTask {

private Duration frequency = Utils.toDuration(FREQUENCY_DEF.defaultValue().toString());
private Duration frequency = FREQUENCY_DEFAULT;

private volatile long lastPut = System.currentTimeMillis();

Expand Down
Loading