Skip to content

Commit

Permalink
Limit partitionDataSizeSinceLastRebalance to 0
Browse files Browse the repository at this point in the history
Since we estimate the partitionDataSize based on
partitionRowCount and total data processed. It is
possible that the estimated partitionDataSize is
slightly less than it was estimated at the last
rebalance cycle. That's because for a given partition,
row count hasn't increased, however overall data
processed has increased. Therefore, we need to
make sure that the estimated partitionDataSize
since last rebalance is always greater than 0.
Otherwise, it will affect the ordering of
minTaskBuckets priority queue.
  • Loading branch information
gaurav8297 authored and sopel39 committed Dec 6, 2023
1 parent 77cb0d6 commit b5d978d
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,15 @@ private void calculatePartitionDataSize(long dataProcessed)
}

for (int partition = 0; partition < partitionCount; partition++) {
partitionDataSize[partition] = (partitionRowCount.get(partition) * dataProcessed) / totalPartitionRowCount;
// Since we estimate the partitionDataSize based on partitionRowCount and total data processed. It is possible
// that the estimated partitionDataSize is slightly less than it was estimated at the last rebalance cycle.
// That's because for a given partition, row count hasn't increased, however overall data processed
// has increased. Therefore, we need to make sure that the estimated partitionDataSize should be
// at least partitionDataSizeAtLastRebalance. Otherwise, it will affect the ordering of minTaskBuckets
// priority queue.
partitionDataSize[partition] = max(
(partitionRowCount.get(partition) * dataProcessed) / totalPartitionRowCount,
partitionDataSize[partition]);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,115 @@ public void testNoWriterScalingWhenOnlyBufferSizeLimitIsExceeded()
});
}

@Test(dataProvider = "scalingPartitionHandles")
public void testScalingWithTwoDifferentPartitions(PartitioningHandle partitioningHandle)
{
LocalExchange localExchange = new LocalExchange(
nodePartitioningManager,
testSessionBuilder()
.setSystemProperty(SKEWED_PARTITION_MIN_DATA_PROCESSED_REBALANCE_THRESHOLD, "20kB")
.setSystemProperty(QUERY_MAX_MEMORY_PER_NODE, "256MB")
.build(),
4,
partitioningHandle,
ImmutableList.of(0),
TYPES,
Optional.empty(),
DataSize.ofBytes(retainedSizeOfPages(2)),
TYPE_OPERATORS,
DataSize.of(10, KILOBYTE),
TOTAL_MEMORY_USED);

run(localExchange, exchange -> {
assertThat(exchange.getBufferCount()).isEqualTo(4);
assertExchangeTotalBufferedBytes(exchange, 0);

LocalExchangeSinkFactory sinkFactory = exchange.createSinkFactory();
sinkFactory.noMoreSinkFactories();
LocalExchangeSink sink = sinkFactory.createSink();
assertSinkCanWrite(sink);
sinkFactory.close();

LocalExchangeSource sourceA = exchange.getNextSource();
assertSource(sourceA, 0);

LocalExchangeSource sourceB = exchange.getNextSource();
assertSource(sourceB, 0);

LocalExchangeSource sourceC = exchange.getNextSource();
assertSource(sourceC, 0);

LocalExchangeSource sourceD = exchange.getNextSource();
assertSource(sourceD, 0);

sink.addPage(createSingleValuePage(0, 1000));
sink.addPage(createSingleValuePage(0, 1000));
sink.addPage(createSingleValuePage(1, 2));
sink.addPage(createSingleValuePage(1, 2));

// Two partitions are assigned to two different writers
assertSource(sourceA, 2);
assertSource(sourceB, 0);
assertSource(sourceC, 0);
assertSource(sourceD, 2);

sink.addPage(createSingleValuePage(0, 1000));
sink.addPage(createSingleValuePage(0, 1000));
sink.addPage(createSingleValuePage(0, 1000));
sink.addPage(createSingleValuePage(0, 1000));

// partition 0 is assigned to writer B after scaling.
assertSource(sourceA, 2);
assertSource(sourceB, 2);
assertSource(sourceC, 0);
assertSource(sourceD, 4);

sink.addPage(createSingleValuePage(0, 1000));
sink.addPage(createSingleValuePage(0, 1000));
sink.addPage(createSingleValuePage(0, 1000));
sink.addPage(createSingleValuePage(0, 1000));

// partition 0 is assigned to writer A after scaling.
assertSource(sourceA, 3);
assertSource(sourceB, 4);
assertSource(sourceC, 0);
assertSource(sourceD, 5);

sink.addPage(createSingleValuePage(0, 1000));
sink.addPage(createSingleValuePage(0, 1000));
sink.addPage(createSingleValuePage(0, 1000));
sink.addPage(createSingleValuePage(0, 1000));

// partition 0 is assigned to writer C after scaling.
assertSource(sourceA, 4);
assertSource(sourceB, 5);
assertSource(sourceC, 1);
assertSource(sourceD, 6);

sink.addPage(createSingleValuePage(1, 10000));
sink.addPage(createSingleValuePage(1, 10000));
sink.addPage(createSingleValuePage(1, 10000));
sink.addPage(createSingleValuePage(1, 10000));

// partition 1 is assigned to writer B after scaling.
assertSource(sourceA, 5);
assertSource(sourceB, 8);
assertSource(sourceC, 1);
assertSource(sourceD, 6);

sink.addPage(createSingleValuePage(1, 10000));
sink.addPage(createSingleValuePage(1, 10000));
sink.addPage(createSingleValuePage(1, 10000));
sink.addPage(createSingleValuePage(1, 10000));

// partition 1 is assigned to writer C and D after scaling.
assertSource(sourceA, 6);
assertSource(sourceB, 9);
assertSource(sourceC, 2);
assertSource(sourceD, 7);
});
}

@Test
public void testScaledWriterRoundRobinExchangerWhenTotalMemoryUsedIsGreaterThanLimit()
{
Expand Down

0 comments on commit b5d978d

Please sign in to comment.