Skip to content

Commit

Permalink
[PRODUCER] rewrite producer.Record by java 17 record (#1699)
Browse files Browse the repository at this point in the history
chia7712 authored May 5, 2023
1 parent e463cda commit 668d5a4
Showing 8 changed files with 84 additions and 158 deletions.
80 changes: 19 additions & 61 deletions common/src/main/java/org/astraea/common/producer/Record.java
Original file line number Diff line number Diff line change
@@ -22,31 +22,21 @@
import org.astraea.common.Header;
import org.astraea.common.admin.TopicPartition;

public interface Record<Key, Value> {

static Builder<byte[], byte[]> builder() {
public record Record<Key, Value>(
String topic,
List<Header> headers,
Key key,
Value value,
// timestamp of record
Optional<Long> timestamp,
// expected partition, or null if you don't care for it.
Optional<Integer> partition) {

public static Builder<byte[], byte[]> builder() {
return new Builder<>();
}

String topic();

List<Header> headers();

Key key();

Value value();

/**
* @return timestamp of record
*/
Optional<Long> timestamp();

/**
* @return expected partition, or null if you don't care for it.
*/
Optional<Integer> partition();

class Builder<Key, Value> {
public static class Builder<Key, Value> {
private Object key;
private Object value;
private String topic;
@@ -106,45 +96,13 @@ public Builder<Key, Value> headers(List<Header> headers) {

@SuppressWarnings("unchecked")
public Record<Key, Value> build() {
return new Record<>() {
private final Key key = (Key) Builder.this.key;
private final Value value = (Value) Builder.this.value;
private final String topic =
Objects.requireNonNull(Builder.this.topic, "topic must be defined");
private final Optional<Integer> partition = Builder.this.partition;
private final Optional<Long> timestamp = Builder.this.timestamp;
private final List<Header> headers = Objects.requireNonNull(Builder.this.headers);

@Override
public String topic() {
return topic;
}

@Override
public List<Header> headers() {
return headers;
}

@Override
public Key key() {
return key;
}

@Override
public Value value() {
return value;
}

@Override
public Optional<Long> timestamp() {
return timestamp;
}

@Override
public Optional<Integer> partition() {
return partition;
}
};
return new Record<>(
Objects.requireNonNull(topic, "topic must be defined"),
Objects.requireNonNull(headers),
(Key) key,
(Value) value,
timestamp,
partition);
}
}
}
100 changes: 39 additions & 61 deletions connector/src/main/java/org/astraea/connector/SourceRecord.java
Original file line number Diff line number Diff line change
@@ -18,88 +18,58 @@

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.astraea.common.Header;
import org.astraea.common.admin.TopicPartition;
import org.astraea.common.producer.Record;

public class SourceRecord implements Record<byte[], byte[]> {
public record SourceRecord(
String topic,
List<Header> headers,
byte[] key,
byte[] value,
// timestamp of record
Optional<Long> timestamp,
// expected partition, or null if you don't care for it.
Optional<Integer> partition,
Map<String, String> metadataIndex,
Map<String, String> metadata) {

public static Builder builder() {
return new Builder();
}

private final Record<byte[], byte[]> record;
private final Map<String, String> metadataIndex;
private final Map<String, String> metadata;

private SourceRecord(
Record<byte[], byte[]> record,
Map<String, String> metadataIndex,
Map<String, String> metadata) {
this.record = record;
this.metadataIndex = metadataIndex;
this.metadata = metadata;
}

@Override
public String topic() {
return record.topic();
}

@Override
public List<Header> headers() {
return record.headers();
}

@Override
public byte[] key() {
return record.key();
}

@Override
public byte[] value() {
return record.value();
}

@Override
public Optional<Long> timestamp() {
return record.timestamp();
}

@Override
public Optional<Integer> partition() {
return record.partition();
}

public Map<String, String> metadataIndex() {
return metadataIndex;
}

public Map<String, String> metadata() {
return metadata;
}

public static class Builder {

private final Record.Builder<byte[], byte[]> builder = Record.builder();
private byte[] key;
private byte[] value;
private String topic;
private Optional<Integer> partition = Optional.empty();
private Optional<Long> timestamp = Optional.empty();
private List<Header> headers = List.of();
private Map<String, String> metadataIndex = Map.of();
private Map<String, String> metadata = Map.of();

private Builder() {}

public Builder record(Record<byte[], byte[]> record) {
builder.record(record);
key(record.key());
value(record.value());
topic(record.topic());
record.partition().ifPresent(this::partition);
record.timestamp().ifPresent(this::timestamp);
headers(record.headers());
return this;
}

public Builder key(byte[] key) {
builder.key(key);
this.key = key;
return this;
}

public Builder value(byte[] value) {
builder.value(value);
this.value = value;
return this;
}

@@ -109,22 +79,22 @@ public Builder topicPartition(TopicPartition topicPartition) {
}

public Builder topic(String topic) {
builder.topic(topic);
this.topic = Objects.requireNonNull(topic);
return this;
}

public Builder partition(int partition) {
builder.partition(partition);
if (partition >= 0) this.partition = Optional.of(partition);
return this;
}

public Builder timestamp(long timestamp) {
builder.timestamp(timestamp);
this.timestamp = Optional.of(timestamp);
return this;
}

public Builder headers(List<Header> headers) {
builder.headers(headers);
this.headers = headers;
return this;
}

@@ -139,7 +109,15 @@ public Builder metadata(Map<String, String> metadata) {
}

public SourceRecord build() {
return new SourceRecord(builder.build(), metadataIndex, metadata);
return new SourceRecord(
Objects.requireNonNull(topic, "topic must be defined"),
Objects.requireNonNull(headers),
key,
value,
timestamp,
partition,
metadataIndex,
metadata);
}
}
}
38 changes: 15 additions & 23 deletions connector/src/main/java/org/astraea/connector/SourceTask.java
Original file line number Diff line number Diff line change
@@ -34,7 +34,7 @@ public abstract class SourceTask extends org.apache.kafka.connect.source.SourceT
* use {@link Record#builder()} or {@link SourceRecord#builder()} to construct the returned
* records
*/
protected abstract Collection<Record<byte[], byte[]>> take() throws InterruptedException;
protected abstract Collection<SourceRecord> take() throws InterruptedException;

protected void commit(Metadata metadata) throws InterruptedException {
// empty
@@ -62,28 +62,20 @@ public final List<org.apache.kafka.connect.source.SourceRecord> poll()
if (records == null || records.isEmpty()) return null;
return records.stream()
.map(
r -> {
Map<String, ?> sp = null;
Map<String, ?> so = null;
if (r instanceof SourceRecord) {
var sr = (SourceRecord) r;
if (!sr.metadataIndex().isEmpty()) sp = sr.metadataIndex();
if (!sr.metadata().isEmpty()) so = sr.metadata();
}
return new org.apache.kafka.connect.source.SourceRecord(
sp,
so,
r.topic(),
r.partition().orElse(null),
r.key() == null ? null : Schema.BYTES_SCHEMA,
r.key(),
r.value() == null ? null : Schema.BYTES_SCHEMA,
r.value(),
r.timestamp().orElse(null),
r.headers().stream()
.map(h -> new HeaderImpl(h.key(), null, h.value()))
.collect(Collectors.toList()));
})
r ->
new org.apache.kafka.connect.source.SourceRecord(
r.metadataIndex(),
r.metadata(),
r.topic(),
r.partition().orElse(null),
r.key() == null ? null : Schema.BYTES_SCHEMA,
r.key(),
r.value() == null ? null : Schema.BYTES_SCHEMA,
r.value(),
r.timestamp().orElse(null),
r.headers().stream()
.map(h -> new HeaderImpl(h.key(), null, h.value()))
.collect(Collectors.toList())))
.collect(Collectors.toList());
}

Original file line number Diff line number Diff line change
@@ -32,10 +32,10 @@
import org.astraea.common.Utils;
import org.astraea.common.backup.RecordReader;
import org.astraea.common.backup.RecordWriter;
import org.astraea.common.producer.Record;
import org.astraea.connector.Definition;
import org.astraea.connector.MetadataStorage;
import org.astraea.connector.SourceConnector;
import org.astraea.connector.SourceRecord;
import org.astraea.connector.SourceTask;
import org.astraea.fs.FileSystem;
import org.astraea.fs.Type;
@@ -158,21 +158,21 @@ protected void init(Configuration configuration, MetadataStorage storage) {
}

@Override
protected Collection<Record<byte[], byte[]>> take() {
protected Collection<SourceRecord> take() {
if (paths.isEmpty()) {
paths = getFileSet(addedPaths, rootDir, tasksCount, fileSet);
}
addedPaths.addAll(paths);
var currentPath = ((LinkedList<String>) paths).poll();
if (currentPath != null) {
var records = new ArrayList<Record<byte[], byte[]>>();
var records = new ArrayList<SourceRecord>();
var inputStream = Client.read(currentPath);
var reader = RecordReader.builder(inputStream).build();
while (reader.hasNext()) {
var record = reader.next();
if (record.key() == null && record.value() == null) continue;
records.add(
Record.builder()
SourceRecord.builder()
.topic(record.topic())
.partition(record.partition())
.key(record.key())
Original file line number Diff line number Diff line change
@@ -29,11 +29,11 @@
import org.astraea.common.DistributionType;
import org.astraea.common.Utils;
import org.astraea.common.admin.TopicPartition;
import org.astraea.common.producer.Record;
import org.astraea.common.producer.RecordGenerator;
import org.astraea.connector.Definition;
import org.astraea.connector.MetadataStorage;
import org.astraea.connector.SourceConnector;
import org.astraea.connector.SourceRecord;
import org.astraea.connector.SourceTask;

public class PerfSource extends SourceConnector {
@@ -253,10 +253,11 @@ protected void init(Configuration configuration, MetadataStorage storage) {
}

@Override
protected Collection<Record<byte[], byte[]>> take() {
protected Collection<SourceRecord> take() {
return specifyPartitions.stream()
.flatMap(tp -> recordGenerator.apply(tp).stream())
.collect(Collectors.toUnmodifiableList());
.map(r -> SourceRecord.builder().record(r).build())
.toList();
}
}
}
Original file line number Diff line number Diff line change
@@ -29,7 +29,6 @@
import org.astraea.common.Utils;
import org.astraea.common.connector.ConnectorClient;
import org.astraea.common.connector.ConnectorConfigs;
import org.astraea.common.producer.Record;
import org.astraea.it.Service;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
@@ -152,7 +151,7 @@ protected void close() {
}

@Override
protected Collection<Record<byte[], byte[]>> take() {
protected Collection<SourceRecord> take() {
return List.of();
}
}
Original file line number Diff line number Diff line change
@@ -27,7 +27,6 @@
import org.astraea.common.Utils;
import org.astraea.common.connector.ConnectorClient;
import org.astraea.common.connector.ConnectorConfigs;
import org.astraea.common.producer.Record;
import org.astraea.it.Service;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
@@ -139,7 +138,7 @@ protected void init(Configuration configuration, MetadataStorage storage) {
}

@Override
protected Collection<Record<byte[], byte[]>> take() {
protected Collection<SourceRecord> take() {
if (isDone) return List.of();
isDone = true;
return topics.stream()
Original file line number Diff line number Diff line change
@@ -31,7 +31,6 @@
import org.astraea.common.connector.ConnectorConfigs;
import org.astraea.common.consumer.Consumer;
import org.astraea.common.consumer.ConsumerConfigs;
import org.astraea.common.producer.Record;
import org.astraea.it.Service;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
@@ -148,7 +147,7 @@ protected void init(Configuration configuration, MetadataStorage storage) {
}

@Override
protected Collection<Record<byte[], byte[]>> take() throws InterruptedException {
protected Collection<SourceRecord> take() throws InterruptedException {
if (isDone) return List.of();
isDone = true;
return topics.stream()

0 comments on commit 668d5a4

Please sign in to comment.