Skip to content

Commit

Permalink
KAFKA-10867: Improved task idling (apache#9840)
Browse files Browse the repository at this point in the history
Use the new ConsumerRecords.metadata() API to implement
improved task idling as described in KIP-695

Reviewers: Guozhang Wang <[email protected]>
  • Loading branch information
vvcephei authored Jan 28, 2021
1 parent 3daa348 commit 4d28391
Show file tree
Hide file tree
Showing 19 changed files with 790 additions and 364 deletions.
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@
files="(KStreamImpl|KTableImpl).java"/>

<suppress checks="CyclomaticComplexity"
files="(StreamsPartitionAssignor|StreamThread|TaskManager).java"/>
files="(StreamsPartitionAssignor|StreamThread|TaskManager|PartitionGroup).java"/>

<suppress checks="StaticVariableName"
files="StreamsMetricsImpl.java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,16 @@
@SuppressWarnings("deprecation")
public class StreamsConfig extends AbstractConfig {

private final static Logger log = LoggerFactory.getLogger(StreamsConfig.class);
private static final Logger log = LoggerFactory.getLogger(StreamsConfig.class);

private static final ConfigDef CONFIG;

private final boolean eosEnabled;
private final static long DEFAULT_COMMIT_INTERVAL_MS = 30000L;
private final static long EOS_DEFAULT_COMMIT_INTERVAL_MS = 100L;
private static final long DEFAULT_COMMIT_INTERVAL_MS = 30000L;
private static final long EOS_DEFAULT_COMMIT_INTERVAL_MS = 100L;

public final static int DUMMY_THREAD_INDEX = 1;
public static final int DUMMY_THREAD_INDEX = 1;
public static final long MAX_TASK_IDLE_MS_DISABLED = -1;

/**
* Prefix used to provide default topic configs to be applied when creating internal topics.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ private StreamTask createActiveTask(final TaskId taskId,
time,
stateManager,
recordCollector,
context
context,
logContext
);

log.trace("Created task {} with assigned partitions {}", taskId, inputPartitions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,21 @@
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.StreamsConfig;
import org.slf4j.Logger;

import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.Iterator;
import java.util.HashSet;
import java.util.function.Function;

/**
Expand All @@ -53,14 +58,18 @@
*/
public class PartitionGroup {

private final Logger logger;
private final Map<TopicPartition, RecordQueue> partitionQueues;
private final Sensor enforcedProcessingSensor;
private final long maxTaskIdleMs;
private final Sensor recordLatenessSensor;
private final PriorityQueue<RecordQueue> nonEmptyQueuesByTime;

private long streamTime;
private int totalBuffered;
private boolean allBuffered;

private final Map<TopicPartition, Long> fetchedLags = new HashMap<>();
private final Map<TopicPartition, Long> idlePartitionDeadlines = new HashMap<>();

static class RecordInfo {
RecordQueue queue;
Expand All @@ -78,15 +87,144 @@ RecordQueue queue() {
}
}

PartitionGroup(final Map<TopicPartition, RecordQueue> partitionQueues, final Sensor recordLatenessSensor) {
PartitionGroup(final LogContext logContext,
final Map<TopicPartition, RecordQueue> partitionQueues,
final Sensor recordLatenessSensor,
final Sensor enforcedProcessingSensor,
final long maxTaskIdleMs) {
this.logger = logContext.logger(PartitionGroup.class);
nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::headRecordTimestamp));
this.partitionQueues = partitionQueues;
this.enforcedProcessingSensor = enforcedProcessingSensor;
this.maxTaskIdleMs = maxTaskIdleMs;
this.recordLatenessSensor = recordLatenessSensor;
totalBuffered = 0;
allBuffered = false;
streamTime = RecordQueue.UNKNOWN;
}

public void addFetchedMetadata(final TopicPartition partition, final ConsumerRecords.Metadata metadata) {
final Long lag = metadata.lag();
if (lag != null) {
logger.trace("added fetched lag {}: {}", partition, lag);
fetchedLags.put(partition, lag);
}
}

public boolean readyToProcess(final long wallClockTime) {
if (logger.isTraceEnabled()) {
for (final Map.Entry<TopicPartition, RecordQueue> entry : partitionQueues.entrySet()) {
logger.trace(
"buffered/lag {}: {}/{}",
entry.getKey(),
entry.getValue().size(),
fetchedLags.get(entry.getKey())
);
}
}
// Log-level strategy:
// TRACE for messages that don't wait for fetches
// TRACE when we waited for a fetch and decided to wait some more, as configured
// TRACE when we are ready for processing and didn't have to enforce processing
// INFO when we enforce processing, since this has to wait for fetches AND may result in disorder

if (maxTaskIdleMs == StreamsConfig.MAX_TASK_IDLE_MS_DISABLED) {
if (logger.isTraceEnabled() && !allBuffered && totalBuffered > 0) {
final Set<TopicPartition> bufferedPartitions = new HashSet<>();
final Set<TopicPartition> emptyPartitions = new HashSet<>();
for (final Map.Entry<TopicPartition, RecordQueue> entry : partitionQueues.entrySet()) {
if (entry.getValue().isEmpty()) {
emptyPartitions.add(entry.getKey());
} else {
bufferedPartitions.add(entry.getKey());
}
}
logger.trace("Ready for processing because max.task.idle.ms is disabled." +
"\n\tThere may be out-of-order processing for this task as a result." +
"\n\tBuffered partitions: {}" +
"\n\tNon-buffered partitions: {}",
bufferedPartitions,
emptyPartitions);
}
return true;
}

final Set<TopicPartition> queued = new HashSet<>();
Map<TopicPartition, Long> enforced = null;

for (final Map.Entry<TopicPartition, RecordQueue> entry : partitionQueues.entrySet()) {
final TopicPartition partition = entry.getKey();
final RecordQueue queue = entry.getValue();

final Long nullableFetchedLag = fetchedLags.get(partition);

if (!queue.isEmpty()) {
// this partition is ready for processing
idlePartitionDeadlines.remove(partition);
queued.add(partition);
} else if (nullableFetchedLag == null) {
// must wait to fetch metadata for the partition
idlePartitionDeadlines.remove(partition);
logger.trace("Waiting to fetch data for {}", partition);
return false;
} else if (nullableFetchedLag > 0L) {
// must wait to poll the data we know to be on the broker
idlePartitionDeadlines.remove(partition);
logger.trace(
"Lag for {} is currently {}, but no data is buffered locally. Waiting to buffer some records.",
partition,
nullableFetchedLag
);
return false;
} else {
// p is known to have zero lag. wait for maxTaskIdleMs to see if more data shows up.
// One alternative would be to set the deadline to nullableMetadata.receivedTimestamp + maxTaskIdleMs
// instead. That way, we would start the idling timer as of the freshness of our knowledge about the zero
// lag instead of when we happen to run this method, but realistically it's probably a small difference
// and using wall clock time seems more intuitive for users,
// since the log message will be as of wallClockTime.
idlePartitionDeadlines.putIfAbsent(partition, wallClockTime + maxTaskIdleMs);
final long deadline = idlePartitionDeadlines.get(partition);
if (wallClockTime < deadline) {
logger.trace(
"Lag for {} is currently 0 and current time is {}. Waiting for new data to be produced for configured idle time {} (deadline is {}).",
partition,
wallClockTime,
maxTaskIdleMs,
deadline
);
return false;
} else {
// this partition is ready for processing due to the task idling deadline passing
if (enforced == null) {
enforced = new HashMap<>();
}
enforced.put(partition, deadline);
}
}
}
if (enforced == null) {
logger.trace("All partitions were buffered locally, so this task is ready for processing.");
return true;
} else if (queued.isEmpty()) {
logger.trace("No partitions were buffered locally, so this task is not ready for processing.");
return false;
} else {
enforcedProcessingSensor.record(1.0d, wallClockTime);
logger.info("Continuing to process although some partition timestamps were not buffered locally." +
"\n\tThere may be out-of-order processing for this task as a result." +
"\n\tPartitions with local data: {}." +
"\n\tPartitions we gave up waiting for, with their corresponding deadlines: {}." +
"\n\tConfigured max.task.idle.ms: {}." +
"\n\tCurrent wall-clock time: {}.",
queued,
enforced,
maxTaskIdleMs,
wallClockTime);
return true;
}
}

// visible for testing
long partitionTimestamp(final TopicPartition partition) {
final RecordQueue queue = partitionQueues.get(partition);
Expand Down Expand Up @@ -239,7 +377,7 @@ int numBuffered() {
return totalBuffered;
}

boolean allPartitionsBuffered() {
boolean allPartitionsBufferedLocally() {
return allBuffered;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor;
Expand Down Expand Up @@ -286,6 +287,11 @@ public void addRecords(final TopicPartition partition, final Iterable<ConsumerRe
throw new IllegalStateException("Attempted to add records to task " + id() + " for invalid input partition " + partition);
}

@Override
public void addFetchedMetadata(final TopicPartition partition, final ConsumerRecords.Metadata metadata) {
throw new IllegalStateException("Attempted to update metadata for standby task " + id());
}

InternalProcessorContext processorContext() {
return processorContext;
}
Expand Down
Loading

0 comments on commit 4d28391

Please sign in to comment.