Skip to content

Commit

Permalink
[APP] Rewrite app by java 17 toList and instanceof (#1760)
Browse files Browse the repository at this point in the history
  • Loading branch information
Haser0305 authored May 18, 2023
1 parent c75bcb4 commit a446e69
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 12 deletions.
4 changes: 2 additions & 2 deletions app/src/main/java/org/astraea/app/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ static void execute(Map<String, Class<?>> mains, List<String> args) throws Throw
method.invoke(null, (Object) args.subList(1, args.size()).toArray(String[]::new));
} catch (InvocationTargetException targetException) {
// Print out ParameterException, don't throw.
if (targetException.getTargetException() instanceof ParameterException) {
System.out.println(targetException.getTargetException().getMessage());
if (targetException.getTargetException() instanceof ParameterException exception) {
System.out.println(exception.getMessage());
} else {
throw targetException.getTargetException();
}
Expand Down
16 changes: 6 additions & 10 deletions app/src/main/java/org/astraea/app/web/RecordHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.astraea.app.web;

import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;

import java.time.Duration;
import java.util.Base64;
Expand Down Expand Up @@ -163,8 +162,7 @@ public CompletionStage<Response> get(Channel channel) {
// visible for testing
GetResponse get(Consumer<byte[], byte[]> consumer, int limit, Duration timeout) {
try {
return new GetResponse(
consumer, consumer.poll(timeout).stream().map(Record::new).collect(toList()));
return new GetResponse(consumer, consumer.poll(timeout).stream().map(Record::new).toList());
} catch (Exception e) {
consumer.close();
throw e;
Expand All @@ -190,9 +188,7 @@ public CompletionStage<Response> post(Channel channel) {
() -> {
try {
return producer.send(
records.stream()
.map(record -> createRecord(producer, record))
.collect(toList()));
records.stream().map(record -> createRecord(producer, record)).toList());
} finally {
if (producer.transactional()) {
producer.close();
Expand All @@ -214,7 +210,7 @@ public CompletionStage<Response> post(Channel channel) {
return Response.for404("missing result");
}))
.map(CompletionStage::toCompletableFuture)
.collect(toList())));
.toList()));

if (postRequest.async()) return CompletableFuture.completedFuture(Response.ACCEPT);
return CompletableFuture.completedFuture(
Expand Down Expand Up @@ -410,8 +406,8 @@ public String json() {
@Override
public void onComplete(Throwable error) {
try {
if (error == null && consumer instanceof SubscribedConsumer) {
((SubscribedConsumer<byte[], byte[]>) consumer).commitOffsets(Duration.ofSeconds(5));
if (error == null && consumer instanceof SubscribedConsumer subscribedConsumer) {
subscribedConsumer.commitOffsets(Duration.ofSeconds(5));
}
} finally {
consumer.close();
Expand All @@ -438,7 +434,7 @@ static class Record {
timestamp = record.timestamp();
serializedKeySize = record.serializedKeySize();
serializedValueSize = record.serializedValueSize();
headers = record.headers().stream().map(Header::new).collect(toList());
headers = record.headers().stream().map(Header::new).toList();
key = record.key();
value = record.value();
leaderEpoch = record.leaderEpoch().orElse(null);
Expand Down

0 comments on commit a446e69

Please sign in to comment.