Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PRODUCER] rewrite producer.Record by java 17 record #1699

Merged
merged 1 commit into from
May 5, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
[PRODUCER] rewrite producer.Record by java 17 record
chia7712 committed May 5, 2023
commit b7fcd9df6032de59e14385c8450bda521712c3d0
80 changes: 19 additions & 61 deletions common/src/main/java/org/astraea/common/producer/Record.java
Original file line number Diff line number Diff line change
@@ -22,31 +22,21 @@
import org.astraea.common.Header;
import org.astraea.common.admin.TopicPartition;

public interface Record<Key, Value> {

static Builder<byte[], byte[]> builder() {
public record Record<Key, Value>(
String topic,
List<Header> headers,
Key key,
Value value,
// timestamp of record
Optional<Long> timestamp,
// expected partition, or null if you don't care for it.
Optional<Integer> partition) {

public static Builder<byte[], byte[]> builder() {
return new Builder<>();
}

String topic();

List<Header> headers();

Key key();

Value value();

/**
* @return timestamp of record
*/
Optional<Long> timestamp();

/**
* @return expected partition, or null if you don't care for it.
*/
Optional<Integer> partition();

class Builder<Key, Value> {
public static class Builder<Key, Value> {
private Object key;
private Object value;
private String topic;
@@ -106,45 +96,13 @@ public Builder<Key, Value> headers(List<Header> headers) {

@SuppressWarnings("unchecked")
public Record<Key, Value> build() {
return new Record<>() {
private final Key key = (Key) Builder.this.key;
private final Value value = (Value) Builder.this.value;
private final String topic =
Objects.requireNonNull(Builder.this.topic, "topic must be defined");
private final Optional<Integer> partition = Builder.this.partition;
private final Optional<Long> timestamp = Builder.this.timestamp;
private final List<Header> headers = Objects.requireNonNull(Builder.this.headers);

@Override
public String topic() {
return topic;
}

@Override
public List<Header> headers() {
return headers;
}

@Override
public Key key() {
return key;
}

@Override
public Value value() {
return value;
}

@Override
public Optional<Long> timestamp() {
return timestamp;
}

@Override
public Optional<Integer> partition() {
return partition;
}
};
return new Record<>(
Objects.requireNonNull(topic, "topic must be defined"),
Objects.requireNonNull(headers),
(Key) key,
(Value) value,
timestamp,
partition);
}
}
}
100 changes: 39 additions & 61 deletions connector/src/main/java/org/astraea/connector/SourceRecord.java
Original file line number Diff line number Diff line change
@@ -18,88 +18,58 @@

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.astraea.common.Header;
import org.astraea.common.admin.TopicPartition;
import org.astraea.common.producer.Record;

public class SourceRecord implements Record<byte[], byte[]> {
public record SourceRecord(
String topic,
List<Header> headers,
byte[] key,
byte[] value,
// timestamp of record
Optional<Long> timestamp,
// expected partition, or null if you don't care for it.
Optional<Integer> partition,
Map<String, String> metadataIndex,
Map<String, String> metadata) {

public static Builder builder() {
return new Builder();
}

private final Record<byte[], byte[]> record;
private final Map<String, String> metadataIndex;
private final Map<String, String> metadata;

private SourceRecord(
Record<byte[], byte[]> record,
Map<String, String> metadataIndex,
Map<String, String> metadata) {
this.record = record;
this.metadataIndex = metadataIndex;
this.metadata = metadata;
}

@Override
public String topic() {
return record.topic();
}

@Override
public List<Header> headers() {
return record.headers();
}

@Override
public byte[] key() {
return record.key();
}

@Override
public byte[] value() {
return record.value();
}

@Override
public Optional<Long> timestamp() {
return record.timestamp();
}

@Override
public Optional<Integer> partition() {
return record.partition();
}

public Map<String, String> metadataIndex() {
return metadataIndex;
}

public Map<String, String> metadata() {
return metadata;
}

public static class Builder {

private final Record.Builder<byte[], byte[]> builder = Record.builder();
private byte[] key;
private byte[] value;
private String topic;
private Optional<Integer> partition = Optional.empty();
private Optional<Long> timestamp = Optional.empty();
private List<Header> headers = List.of();
private Map<String, String> metadataIndex = Map.of();
private Map<String, String> metadata = Map.of();

private Builder() {}

public Builder record(Record<byte[], byte[]> record) {
builder.record(record);
key(record.key());
value(record.value());
topic(record.topic());
record.partition().ifPresent(this::partition);
record.timestamp().ifPresent(this::timestamp);
headers(record.headers());
return this;
}

public Builder key(byte[] key) {
builder.key(key);
this.key = key;
return this;
}

public Builder value(byte[] value) {
builder.value(value);
this.value = value;
return this;
}

@@ -109,22 +79,22 @@ public Builder topicPartition(TopicPartition topicPartition) {
}

public Builder topic(String topic) {
builder.topic(topic);
this.topic = Objects.requireNonNull(topic);
return this;
}

public Builder partition(int partition) {
builder.partition(partition);
if (partition >= 0) this.partition = Optional.of(partition);
return this;
}

public Builder timestamp(long timestamp) {
builder.timestamp(timestamp);
this.timestamp = Optional.of(timestamp);
return this;
}

public Builder headers(List<Header> headers) {
builder.headers(headers);
this.headers = headers;
return this;
}

@@ -139,7 +109,15 @@ public Builder metadata(Map<String, String> metadata) {
}

public SourceRecord build() {
return new SourceRecord(builder.build(), metadataIndex, metadata);
return new SourceRecord(
Objects.requireNonNull(topic, "topic must be defined"),
Objects.requireNonNull(headers),
key,
value,
timestamp,
partition,
metadataIndex,
metadata);
}
}
}
38 changes: 15 additions & 23 deletions connector/src/main/java/org/astraea/connector/SourceTask.java
Original file line number Diff line number Diff line change
@@ -34,7 +34,7 @@ public abstract class SourceTask extends org.apache.kafka.connect.source.SourceT
* use {@link Record#builder()} or {@link SourceRecord#builder()} to construct the returned
* records
*/
protected abstract Collection<Record<byte[], byte[]>> take() throws InterruptedException;
protected abstract Collection<SourceRecord> take() throws InterruptedException;

protected void commit(Metadata metadata) throws InterruptedException {
// empty
@@ -62,28 +62,20 @@ public final List<org.apache.kafka.connect.source.SourceRecord> poll()
if (records == null || records.isEmpty()) return null;
return records.stream()
.map(
r -> {
Map<String, ?> sp = null;
Map<String, ?> so = null;
if (r instanceof SourceRecord) {
var sr = (SourceRecord) r;
if (!sr.metadataIndex().isEmpty()) sp = sr.metadataIndex();
if (!sr.metadata().isEmpty()) so = sr.metadata();
}
return new org.apache.kafka.connect.source.SourceRecord(
sp,
so,
r.topic(),
r.partition().orElse(null),
r.key() == null ? null : Schema.BYTES_SCHEMA,
r.key(),
r.value() == null ? null : Schema.BYTES_SCHEMA,
r.value(),
r.timestamp().orElse(null),
r.headers().stream()
.map(h -> new HeaderImpl(h.key(), null, h.value()))
.collect(Collectors.toList()));
})
r ->
new org.apache.kafka.connect.source.SourceRecord(
r.metadataIndex(),
r.metadata(),
r.topic(),
r.partition().orElse(null),
r.key() == null ? null : Schema.BYTES_SCHEMA,
r.key(),
r.value() == null ? null : Schema.BYTES_SCHEMA,
r.value(),
r.timestamp().orElse(null),
r.headers().stream()
.map(h -> new HeaderImpl(h.key(), null, h.value()))
.collect(Collectors.toList())))
.collect(Collectors.toList());
}

Original file line number Diff line number Diff line change
@@ -32,10 +32,10 @@
import org.astraea.common.Utils;
import org.astraea.common.backup.RecordReader;
import org.astraea.common.backup.RecordWriter;
import org.astraea.common.producer.Record;
import org.astraea.connector.Definition;
import org.astraea.connector.MetadataStorage;
import org.astraea.connector.SourceConnector;
import org.astraea.connector.SourceRecord;
import org.astraea.connector.SourceTask;
import org.astraea.fs.FileSystem;
import org.astraea.fs.Type;
@@ -158,21 +158,21 @@ protected void init(Configuration configuration, MetadataStorage storage) {
}

@Override
protected Collection<Record<byte[], byte[]>> take() {
protected Collection<SourceRecord> take() {
if (paths.isEmpty()) {
paths = getFileSet(addedPaths, rootDir, tasksCount, fileSet);
}
addedPaths.addAll(paths);
var currentPath = ((LinkedList<String>) paths).poll();
if (currentPath != null) {
var records = new ArrayList<Record<byte[], byte[]>>();
var records = new ArrayList<SourceRecord>();
var inputStream = Client.read(currentPath);
var reader = RecordReader.builder(inputStream).build();
while (reader.hasNext()) {
var record = reader.next();
if (record.key() == null && record.value() == null) continue;
records.add(
Record.builder()
SourceRecord.builder()
.topic(record.topic())
.partition(record.partition())
.key(record.key())
Original file line number Diff line number Diff line change
@@ -29,11 +29,11 @@
import org.astraea.common.DistributionType;
import org.astraea.common.Utils;
import org.astraea.common.admin.TopicPartition;
import org.astraea.common.producer.Record;
import org.astraea.common.producer.RecordGenerator;
import org.astraea.connector.Definition;
import org.astraea.connector.MetadataStorage;
import org.astraea.connector.SourceConnector;
import org.astraea.connector.SourceRecord;
import org.astraea.connector.SourceTask;

public class PerfSource extends SourceConnector {
@@ -253,10 +253,11 @@ protected void init(Configuration configuration, MetadataStorage storage) {
}

@Override
protected Collection<Record<byte[], byte[]>> take() {
protected Collection<SourceRecord> take() {
return specifyPartitions.stream()
.flatMap(tp -> recordGenerator.apply(tp).stream())
.collect(Collectors.toUnmodifiableList());
.map(r -> SourceRecord.builder().record(r).build())
.toList();
}
}
}
Original file line number Diff line number Diff line change
@@ -29,7 +29,6 @@
import org.astraea.common.Utils;
import org.astraea.common.connector.ConnectorClient;
import org.astraea.common.connector.ConnectorConfigs;
import org.astraea.common.producer.Record;
import org.astraea.it.Service;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
@@ -152,7 +151,7 @@ protected void close() {
}

@Override
protected Collection<Record<byte[], byte[]>> take() {
protected Collection<SourceRecord> take() {
return List.of();
}
}
Original file line number Diff line number Diff line change
@@ -27,7 +27,6 @@
import org.astraea.common.Utils;
import org.astraea.common.connector.ConnectorClient;
import org.astraea.common.connector.ConnectorConfigs;
import org.astraea.common.producer.Record;
import org.astraea.it.Service;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
@@ -139,7 +138,7 @@ protected void init(Configuration configuration, MetadataStorage storage) {
}

@Override
protected Collection<Record<byte[], byte[]>> take() {
protected Collection<SourceRecord> take() {
if (isDone) return List.of();
isDone = true;
return topics.stream()
Original file line number Diff line number Diff line change
@@ -31,7 +31,6 @@
import org.astraea.common.connector.ConnectorConfigs;
import org.astraea.common.consumer.Consumer;
import org.astraea.common.consumer.ConsumerConfigs;
import org.astraea.common.producer.Record;
import org.astraea.it.Service;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
@@ -148,7 +147,7 @@ protected void init(Configuration configuration, MetadataStorage storage) {
}

@Override
protected Collection<Record<byte[], byte[]>> take() throws InterruptedException {
protected Collection<SourceRecord> take() throws InterruptedException {
if (isDone) return List.of();
isDone = true;
return topics.stream()