Skip to content

Commit

Permalink
[CONNECTOR] rewrite connector.Definition by java 17 record (#1753)
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 authored May 17, 2023
1 parent 126c5e6 commit 63e5ee4
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 122 deletions.
96 changes: 38 additions & 58 deletions connector/src/main/java/org/astraea/connector/Definition.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,32 @@

import java.util.Collection;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiConsumer;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;

public interface Definition {
public record Definition(
String name,
Optional<Object> defaultValue,
String documentation,
Type type,
BiConsumer<String, Object> validator) {

@Override
public Optional<Object> defaultValue() {
// ConfigDef.NO_DEFAULT_VALUE is a placeholder used to represent the lack of a default value.
return defaultValue.filter(v -> v != ConfigDef.NO_DEFAULT_VALUE);
}

/**
* @return true if the configuration is required, and it has no default value.
*/
public boolean required() {
return defaultValue.filter(v -> v == ConfigDef.NO_DEFAULT_VALUE).isPresent();
}

static Builder builder() {
public static Builder builder() {
return new Builder();
}

Expand All @@ -35,32 +54,20 @@ static ConfigDef toConfigDef(Collection<Definition> defs) {
def.define(
d.name(),
ConfigDef.Type.valueOf(d.type().name()),
d.defaultValue(),
d.validator() == null
? null
: (n, o) -> {
try {
d.validator().accept(n, o);
} catch (Exception e) {
throw new ConfigException(n, o, e.getMessage());
}
},
d.required() ? ConfigDef.NO_DEFAULT_VALUE : d.defaultValue().orElse(null),
(n, o) -> {
try {
d.validator().accept(n, o);
} catch (Exception e) {
throw new ConfigException(n, o, e.getMessage());
}
},
ConfigDef.Importance.MEDIUM,
d.documentation()));
return def;
}

String name();

Object defaultValue();

String documentation();

Type type();

BiConsumer<String, Object> validator();

enum Type {
public enum Type {
BOOLEAN,
STRING,
INT,
Expand All @@ -72,14 +79,14 @@ enum Type {
PASSWORD
}

class Builder {
public static class Builder {
private String name;
private Object defaultValue;

private String documentation = "";
private Type type = Type.STRING;

private BiConsumer<String, Object> validator;
private BiConsumer<String, Object> validator = (l, h) -> {};

private Builder() {}

Expand Down Expand Up @@ -113,39 +120,12 @@ public Builder validator(BiConsumer<String, Object> validator) {
}

public Definition build() {
return new Definition() {
private final String name = Objects.requireNonNull(Builder.this.name);
private final String documentation = Objects.requireNonNull(Builder.this.documentation);
private final Object defaultValue = Builder.this.defaultValue;
private final Type type = Objects.requireNonNull(Builder.this.type);

private final BiConsumer<String, Object> validator = Builder.this.validator;

@Override
public String name() {
return name;
}

@Override
public Object defaultValue() {
return defaultValue;
}

@Override
public String documentation() {
return documentation;
}

@Override
public Type type() {
return type;
}

@Override
public BiConsumer<String, Object> validator() {
return validator;
}
};
return new Definition(
Objects.requireNonNull(name),
Optional.ofNullable(defaultValue),
Objects.requireNonNull(documentation),
Objects.requireNonNull(type),
validator);
}
}
}
34 changes: 19 additions & 15 deletions connector/src/main/java/org/astraea/connector/backup/Exporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.astraea.connector.backup;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -79,21 +80,25 @@ public class Exporter extends SinkConnector {
.documentation("the path required for file storage.")
.required()
.build();

static DataSize SIZE_DEFAULT = DataSize.MB.of(100);
static Definition SIZE_KEY =
Definition.builder()
.name("size")
.type(Definition.Type.STRING)
.validator((name, obj) -> DataSize.of(obj.toString()))
.defaultValue("100MB")
.defaultValue(SIZE_DEFAULT.toString())
.documentation("is the maximum number of the size will be included in each file.")
.build();

static Duration TIME_DEFAULT = Duration.ofSeconds(3);

static Definition TIME_KEY =
Definition.builder()
.name("roll.duration")
.type(Definition.Type.STRING)
.validator((name, obj) -> Utils.toDuration(obj.toString()))
.defaultValue("3s")
.defaultValue(TIME_DEFAULT.toSeconds() + "s")
.documentation("the maximum time before a new archive file is rolling out.")
.build();

Expand All @@ -104,14 +109,16 @@ public class Exporter extends SinkConnector {
.documentation("a value that needs to be overridden in the file system.")
.build();

static DataSize BUFFER_SIZE_DEFAULT = DataSize.MB.of(300);

static Definition BUFFER_SIZE_KEY =
Definition.builder()
.name("writer.buffer.size")
.type(Definition.Type.STRING)
.validator((name, obj) -> DataSize.of(obj.toString()))
.documentation(
"a value that represents the capacity of a blocking queue from which the writer can take records.")
.defaultValue("300MB")
.defaultValue(BUFFER_SIZE_DEFAULT.toString())
.build();
private Configuration configs;

Expand Down Expand Up @@ -254,23 +261,20 @@ List<Record<byte[], byte[]>> recordsFromBuffer() {
protected void init(Configuration configuration) {
this.topicName = configuration.requireString(TOPICS_KEY);
this.path = configuration.requireString(PATH_KEY.name());
this.size =
DataSize.of(
configuration.string(SIZE_KEY.name()).orElse(SIZE_KEY.defaultValue().toString()));
this.size = configuration.string(SIZE_KEY.name()).map(DataSize::of).orElse(SIZE_DEFAULT);
this.interval =
Utils.toDuration(
configuration.string(TIME_KEY.name()).orElse(TIME_KEY.defaultValue().toString()))
configuration
.string(TIME_KEY.name())
.map(Utils::toDuration)
.orElse(TIME_DEFAULT)
.toMillis();

this.bufferSize.reset();

this.bufferSizeLimit =
DataSize.of(
configuration
.string(BUFFER_SIZE_KEY.name())
.orElse(BUFFER_SIZE_KEY.defaultValue().toString()))
configuration
.string(BUFFER_SIZE_KEY.name())
.map(DataSize::of)
.orElse(BUFFER_SIZE_DEFAULT)
.bytes();

this.fs = FileSystem.of(configuration.requireString(SCHEMA_KEY.name()), configuration);
this.writerFuture = CompletableFuture.runAsync(createWriter());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,12 @@ public class Importer extends SourceConnector {
.documentation("The root directory of the file that needs to be imported.")
.required()
.build();
static String CLEAN_SOURCE_DEFAULT = "off";
static Definition CLEAN_SOURCE_KEY =
Definition.builder()
.name("clean.source")
.type(Definition.Type.STRING)
.defaultValue("off")
.defaultValue(CLEAN_SOURCE_DEFAULT)
.documentation(
"Clean source policy. Available policies: \"off\", \"delete\", \"archive\". Default: off")
.build();
Expand Down Expand Up @@ -149,10 +150,7 @@ protected void init(Configuration configuration, MetadataStorage storage) {
this.rootDir = configuration.requireString(PATH_KEY.name());
this.tasksCount = configuration.requireInteger(TASKS_COUNT_KEY);
this.paths = new LinkedList<>();
this.cleanSource =
configuration
.string(CLEAN_SOURCE_KEY.name())
.orElse(CLEAN_SOURCE_KEY.defaultValue().toString());
this.cleanSource = configuration.string(CLEAN_SOURCE_KEY.name()).orElse(CLEAN_SOURCE_DEFAULT);
this.archiveDir = configuration.string(ARCHIVE_DIR_KEY.name());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@
import org.astraea.connector.SinkTask;

public class PerfSink extends SinkConnector {

static Duration FREQUENCY_DEFAULT = Duration.ofMillis(300);

static Definition FREQUENCY_DEF =
Definition.builder()
.name("frequency")
.type(Definition.Type.STRING)
.defaultValue("300ms")
.defaultValue(FREQUENCY_DEFAULT.toMillis() + "ms")
.validator((name, value) -> Utils.toDuration(value.toString()))
.build();

Expand Down Expand Up @@ -59,7 +62,7 @@ protected List<Definition> definitions() {

public static class Task extends SinkTask {

private Duration frequency = Utils.toDuration(FREQUENCY_DEF.defaultValue().toString());
private Duration frequency = FREQUENCY_DEFAULT;

private volatile long lastPut = System.currentTimeMillis();

Expand Down
Loading

0 comments on commit 63e5ee4

Please sign in to comment.