Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CONSUMER] rewrite consumer.Record by java 17 record #1698

Merged
merged 2 commits into from
May 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import org.astraea.common.ByteUtils;
import org.astraea.common.Header;
Expand Down Expand Up @@ -65,60 +62,20 @@ private static Record<byte[], byte[]> readRecord(InputStream inputStream) {
Utils.packException(() -> RecordOuterClass.Record.parseDelimitedFrom(inputStream));
// inputStream reaches EOF
if (outerRecord == null) return null;

return new Record<>() {
@Override
public String topic() {
return outerRecord.getTopic();
}

@Override
public List<Header> headers() {
return outerRecord.getHeadersList().stream()
.map(header -> new Header(header.getKey(), header.getValue().toByteArray()))
.collect(Collectors.toUnmodifiableList());
}

@Override
public byte[] key() {
return outerRecord.getKey().toByteArray();
}

@Override
public byte[] value() {
return outerRecord.getValue().toByteArray();
}

@Override
public long offset() {
return outerRecord.getOffset();
}

@Override
public long timestamp() {
return outerRecord.getTimestamp();
}

@Override
public int partition() {
return outerRecord.getPartition();
}

@Override
public int serializedKeySize() {
return outerRecord.getKey().size();
}

@Override
public int serializedValueSize() {
return outerRecord.getValue().size();
}

@Override
public Optional<Integer> leaderEpoch() {
return Optional.empty();
}
};
return Record.builder()
.topic(outerRecord.getTopic())
.headers(
outerRecord.getHeadersList().stream()
.map(header -> new Header(header.getKey(), header.getValue().toByteArray()))
.toList())
.key(outerRecord.getKey().toByteArray())
.value(outerRecord.getValue().toByteArray())
.offset(outerRecord.getOffset())
.timestamp(outerRecord.getTimestamp())
.partition(outerRecord.getPartition())
.serializedKeySize(outerRecord.getKey().size())
.serializedValueSize(outerRecord.getValue().size())
.build();
}

private InputStream fs;
Expand Down
64 changes: 12 additions & 52 deletions common/src/main/java/org/astraea/common/consumer/Builder.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -238,56 +237,17 @@ public String clientId() {
}

private static <Key, Value> Record<Key, Value> toRecord(ConsumerRecord<Key, Value> record) {
return new Record<>() {
@Override
public String topic() {
return record.topic();
}

@Override
public List<Header> headers() {
return Header.of(record.headers());
}

@Override
public Key key() {
return record.key();
}

@Override
public Value value() {
return record.value();
}

@Override
public long offset() {
return record.offset();
}

@Override
public long timestamp() {
return record.timestamp();
}

@Override
public int partition() {
return record.partition();
}

@Override
public int serializedKeySize() {
return record.serializedKeySize();
}

@Override
public int serializedValueSize() {
return record.serializedValueSize();
}

@Override
public Optional<Integer> leaderEpoch() {
return record.leaderEpoch();
}
};
return Record.builder()
.topic(record.topic())
.headers(Header.of(record.headers()))
.key(record.key())
.value(record.value())
.offset(record.offset())
.timestamp(record.timestamp())
.partition(record.partition())
.serializedKeySize(record.serializedKeySize())
.serializedValueSize(record.serializedValueSize())
.leaderEpoch(record.leaderEpoch())
.build();
}
}
137 changes: 106 additions & 31 deletions common/src/main/java/org/astraea/common/consumer/Record.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,51 +17,126 @@
package org.astraea.common.consumer;

import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.astraea.common.Header;
import org.astraea.common.admin.TopicPartition;

public interface Record<Key, Value> {
public record Record<Key, Value>(
String topic,
List<Header> headers,
Key key,
Value value,
// The position of this record in the corresponding Kafka partition.
long offset,
// timestamp of record
long timestamp,
// expected partition, or null if you don't care for it.
int partition,
// The size of the serialized, uncompressed key in bytes. If key is null, the returned size is
// -1.
int serializedKeySize,
// The size of the serialized, uncompressed value in bytes. If value is null, the returned size
// is -1.
int serializedValueSize,
// the leader epoch or empty for legacy record formats
Optional<Integer> leaderEpoch) {

default TopicPartition topicPartition() {
public static <Key, Value> RecordBuilder<Key, Value> builder() {
return new RecordBuilder<>();
}

public TopicPartition topicPartition() {
return TopicPartition.of(topic(), partition());
}

String topic();
public static class RecordBuilder<Key, Value> {

private Object key;
private Object value;
private long offset;
private String topic;
private int partition;
private long timestamp;
private List<Header> headers = List.of();

private int serializedKeySize;
private int serializedValueSize;

private Optional<Integer> leaderEpoch = Optional.empty();

private RecordBuilder() {}

List<Header> headers();
@SuppressWarnings("unchecked")
public <NewKey> RecordBuilder<NewKey, Value> key(NewKey key) {
this.key = key;
return (RecordBuilder<NewKey, Value>) this;
}

Key key();
@SuppressWarnings("unchecked")
public <NewValue> RecordBuilder<Key, NewValue> value(NewValue value) {
this.value = value;
return (RecordBuilder<Key, NewValue>) this;
}

Value value();
public RecordBuilder<Key, Value> topicPartition(TopicPartition topicPartition) {
topic(topicPartition.topic());
return partition(topicPartition.partition());
}

/** The position of this record in the corresponding Kafka partition. */
long offset();
/**
* @return timestamp of record
*/
long timestamp();
public RecordBuilder<Key, Value> topic(String topic) {
this.topic = Objects.requireNonNull(topic);
return this;
}

/**
* @return expected partition, or null if you don't care for it.
*/
int partition();
public RecordBuilder<Key, Value> partition(int partition) {
if (partition >= 0) this.partition = partition;
return this;
}

/**
* The size of the serialized, uncompressed key in bytes. If key is null, the returned size is -1.
*/
int serializedKeySize();
public RecordBuilder<Key, Value> timestamp(long timestamp) {
this.timestamp = timestamp;
return this;
}

/**
* The size of the serialized, uncompressed value in bytes. If value is null, the returned size is
* -1.
*/
int serializedValueSize();
public RecordBuilder<Key, Value> headers(List<Header> headers) {
this.headers = headers;
return this;
}

/**
* Get the leader epoch for the record if available
*
* @return the leader epoch or empty for legacy record formats
*/
Optional<Integer> leaderEpoch();
public RecordBuilder<Key, Value> serializedKeySize(int serializedKeySize) {
this.serializedKeySize = serializedKeySize;
return this;
}

public RecordBuilder<Key, Value> serializedValueSize(int serializedValueSize) {
this.serializedValueSize = serializedValueSize;
return this;
}

public RecordBuilder<Key, Value> leaderEpoch(Optional<Integer> leaderEpoch) {
this.leaderEpoch = leaderEpoch;
return this;
}

public RecordBuilder<Key, Value> offset(long offset) {
this.offset = offset;
return this;
}

@SuppressWarnings("unchecked")
public Record<Key, Value> build() {
return new Record<>(
Objects.requireNonNull(RecordBuilder.this.topic),
Objects.requireNonNull(RecordBuilder.this.headers),
(Key) RecordBuilder.this.key,
(Value) RecordBuilder.this.value,
RecordBuilder.this.offset,
RecordBuilder.this.timestamp,
RecordBuilder.this.partition,
RecordBuilder.this.serializedKeySize,
RecordBuilder.this.serializedValueSize,
Objects.requireNonNull(RecordBuilder.this.leaderEpoch));
}
}
}
Loading