Skip to content

Commit

Permalink
fix: DH-18395: Address performance problems with large numbers of tab…
Browse files Browse the repository at this point in the history
…le locations caused by serial unmanage (#6552)

* Refactor TableBackTableLocation infrastructure to support partitioning columns. Avoid unnecessary queuing.
* Refactor TableLocationSubscriptionBuffer to avoid the need to unmanage all buffered locations individually, as this is much more expensive than simply dropping all of them.
* Performance "unit test" for profiling the fix
  • Loading branch information
rcaudy authored Jan 13, 2025
1 parent b61abcf commit 75187f9
Show file tree
Hide file tree
Showing 10 changed files with 304 additions and 157 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,9 @@ private void processPendingLocations(final boolean notifyListeners) {

try (final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate =
subscriptionBuffer.processPending()) {
if (locationUpdate == null) {
return;
}
removed = processRemovals(locationUpdate);
added = processAdditions(locationUpdate);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,10 @@ private void initializeAvailableLocations() {
manage(locationBuffer);
try (final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate =
locationBuffer.processPending()) {
maybeRemoveLocations(locationUpdate.getPendingRemovedLocationKeys());
maybeAddLocations(locationUpdate.getPendingAddedLocationKeys());
if (locationUpdate != null) {
maybeRemoveLocations(locationUpdate.getPendingRemovedLocationKeys());
maybeAddLocations(locationUpdate.getPendingAddedLocationKeys());
}
}
updateSourceRegistrar.addSource(locationChangePoller = new LocationChangePoller(locationBuffer));
} else {
Expand Down Expand Up @@ -235,16 +237,19 @@ private LocationChangePoller(@NotNull final TableLocationSubscriptionBuffer loca
protected void instrumentedRefresh() {
try (final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate =
locationBuffer.processPending()) {
if (!locationProvider.getUpdateMode().removeAllowed()
&& !locationUpdate.getPendingRemovedLocationKeys().isEmpty()) {
// This TLP doesn't support removed locations, we need to throw an exception.
final ImmutableTableLocationKey[] keys = locationUpdate.getPendingRemovedLocationKeys().stream()
.map(LiveSupplier::get).toArray(ImmutableTableLocationKey[]::new);
throw new TableLocationRemovedException("Source table does not support removed locations", keys);
}
if (locationUpdate != null) {
if (!locationProvider.getUpdateMode().removeAllowed()
&& !locationUpdate.getPendingRemovedLocationKeys().isEmpty()) {
// This TLP doesn't support removed locations, we need to throw an exception.
final ImmutableTableLocationKey[] keys = locationUpdate.getPendingRemovedLocationKeys().stream()
.map(LiveSupplier::get).toArray(ImmutableTableLocationKey[]::new);
throw new TableLocationRemovedException(
"Source table does not support removed locations", keys);
}

maybeRemoveLocations(locationUpdate.getPendingRemovedLocationKeys());
maybeAddLocations(locationUpdate.getPendingAddedLocationKeys());
maybeRemoveLocations(locationUpdate.getPendingRemovedLocationKeys());
maybeAddLocations(locationUpdate.getPendingAddedLocationKeys());
}
}

// This class previously had functionality to notify "location listeners", but it was never used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public abstract class PartitionedTableLocationKey implements ImmutableTableLocat

protected final Map<String, Comparable<?>> partitions;

private int cachedHashCode;
protected int cachedHashCode;

/**
* Construct a new PartitionedTableLocationKey for the supplied {@code partitions}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,16 @@
public class TableLocationSubscriptionBuffer extends ReferenceCountedLivenessNode
implements TableLocationProvider.Listener {

private static final Set<LiveSupplier<ImmutableTableLocationKey>> EMPTY_TABLE_LOCATION_KEYS =
Collections.emptySet();
private static final Map<ImmutableTableLocationKey, LiveSupplier<ImmutableTableLocationKey>> EMPTY_TABLE_LOCATION_KEYS =
Collections.emptyMap();

private final TableLocationProvider tableLocationProvider;

private boolean subscribed = false;

private final Object updateLock = new Object();

// These sets represent adds and removes from completed transactions.
private Set<LiveSupplier<ImmutableTableLocationKey>> pendingLocationsAdded = EMPTY_TABLE_LOCATION_KEYS;
private Set<LiveSupplier<ImmutableTableLocationKey>> pendingLocationsRemoved = EMPTY_TABLE_LOCATION_KEYS;

private LocationUpdate pendingUpdate = null;
private TableDataException pendingException = null;

public TableLocationSubscriptionBuffer(@NotNull final TableLocationProvider tableLocationProvider) {
Expand All @@ -42,28 +39,77 @@ public TableLocationSubscriptionBuffer(@NotNull final TableLocationProvider tabl
}

public final class LocationUpdate implements SafeCloseable {
private final Collection<LiveSupplier<ImmutableTableLocationKey>> pendingAddedLocationKeys;
private final Collection<LiveSupplier<ImmutableTableLocationKey>> pendingRemovedLocations;

public LocationUpdate(
@NotNull final Collection<LiveSupplier<ImmutableTableLocationKey>> pendingAddedLocationKeys,
@NotNull final Collection<LiveSupplier<ImmutableTableLocationKey>> pendingRemovedLocations) {
this.pendingAddedLocationKeys = pendingAddedLocationKeys;
this.pendingRemovedLocations = pendingRemovedLocations;
private final ReferenceCountedLivenessNode livenessNode = new ReferenceCountedLivenessNode(false) {};

// These sets represent adds and removes from completed transactions.
private Map<ImmutableTableLocationKey, LiveSupplier<ImmutableTableLocationKey>> added =
EMPTY_TABLE_LOCATION_KEYS;
private Map<ImmutableTableLocationKey, LiveSupplier<ImmutableTableLocationKey>> removed =
EMPTY_TABLE_LOCATION_KEYS;

private LocationUpdate() {
TableLocationSubscriptionBuffer.this.manage(livenessNode);
}

private void processAdd(@NotNull final LiveSupplier<ImmutableTableLocationKey> addedKeySupplier) {
final ImmutableTableLocationKey addedKey = addedKeySupplier.get();
// Need to verify that we don't have stacked adds (without intervening removes).
if (added.containsKey(addedKey)) {
throw new IllegalStateException("TableLocationKey " + addedKey
+ " was already added by a previous transaction.");
}
if (added == EMPTY_TABLE_LOCATION_KEYS) {
added = new HashMap<>();
}
livenessNode.manage(addedKeySupplier);
added.put(addedKey, addedKeySupplier);
}

private void processRemove(@NotNull final LiveSupplier<ImmutableTableLocationKey> removedKeySupplier) {
final ImmutableTableLocationKey removedKey = removedKeySupplier.get();
// If we have a pending add, it is being cancelled by this remove.
if (added.remove(removedKey) != null) {
return;
}
// Verify that we don't have stacked removes (without intervening adds).
if (removed.containsKey(removedKey)) {
throw new IllegalStateException("TableLocationKey " + removedKey
+ " was already removed and has not been replaced.");
}
if (removed == EMPTY_TABLE_LOCATION_KEYS) {
removed = new HashMap<>();
}
livenessNode.manage(removedKeySupplier);
removed.put(removedKey, removedKeySupplier);
}

private void processTransaction(
@Nullable Collection<LiveSupplier<ImmutableTableLocationKey>> addedKeySuppliers,
@Nullable Collection<LiveSupplier<ImmutableTableLocationKey>> removedKeySuppliers) {
if (removedKeySuppliers != null) {
for (final LiveSupplier<ImmutableTableLocationKey> removedKeySupplier : removedKeySuppliers) {
processRemove(removedKeySupplier);
}
}
if (addedKeySuppliers != null) {
for (final LiveSupplier<ImmutableTableLocationKey> addedKeySupplier : addedKeySuppliers) {
processAdd(addedKeySupplier);
}
}
}

public Collection<LiveSupplier<ImmutableTableLocationKey>> getPendingAddedLocationKeys() {
return pendingAddedLocationKeys;
return added.values();
}

public Collection<LiveSupplier<ImmutableTableLocationKey>> getPendingRemovedLocationKeys() {
return pendingRemovedLocations;
return removed.values();
}

@Override
public void close() {
pendingAddedLocationKeys.forEach(TableLocationSubscriptionBuffer.this::unmanage);
pendingRemovedLocations.forEach(TableLocationSubscriptionBuffer.this::unmanage);
TableLocationSubscriptionBuffer.this.unmanage(livenessNode);
}
}

Expand All @@ -76,7 +122,6 @@ public void close() {
* @return The collection of pending location keys.
*/
public synchronized LocationUpdate processPending() {
// TODO: Should I change this to instead re-use the collection?
if (!subscribed) {
if (tableLocationProvider.supportsSubscriptions()) {
tableLocationProvider.subscribe(this);
Expand All @@ -90,23 +135,21 @@ public synchronized LocationUpdate processPending() {
}
subscribed = true;
}
final Collection<LiveSupplier<ImmutableTableLocationKey>> resultLocationKeys;
final Collection<LiveSupplier<ImmutableTableLocationKey>> resultLocationsRemoved;
final LocationUpdate resultUpdate;
final TableDataException resultException;
synchronized (updateLock) {
resultLocationKeys = pendingLocationsAdded;
pendingLocationsAdded = EMPTY_TABLE_LOCATION_KEYS;
resultLocationsRemoved = pendingLocationsRemoved;
pendingLocationsRemoved = EMPTY_TABLE_LOCATION_KEYS;
resultUpdate = pendingUpdate;
pendingUpdate = null;
resultException = pendingException;
pendingException = null;
}

if (resultException != null) {
throw new TableDataException("Processed pending exception", resultException);
try (final SafeCloseable ignored = resultUpdate) {
throw new TableDataException("Processed pending exception", resultException);
}
}

return new LocationUpdate(resultLocationKeys, resultLocationsRemoved);
return resultUpdate;
}

/**
Expand All @@ -119,92 +162,52 @@ public synchronized void reset() {
}
subscribed = false;
}
final LocationUpdate toClose;
synchronized (updateLock) {
pendingLocationsAdded.forEach(this::unmanage);
pendingLocationsRemoved.forEach(this::unmanage);
pendingLocationsAdded = EMPTY_TABLE_LOCATION_KEYS;
pendingLocationsRemoved = EMPTY_TABLE_LOCATION_KEYS;
toClose = pendingUpdate;
pendingUpdate = null;
pendingException = null;
}
if (toClose != null) {
toClose.close();
}
}

// ------------------------------------------------------------------------------------------------------------------
// TableLocationProvider.Listener implementation
// ------------------------------------------------------------------------------------------------------------------

private LocationUpdate ensurePendingUpdate() {
if (pendingUpdate == null) {
pendingUpdate = new LocationUpdate();
}
return pendingUpdate;
}

@Override
public void handleTableLocationKeyAdded(@NotNull final LiveSupplier<ImmutableTableLocationKey> tableLocationKey) {
public void handleTableLocationKeyAdded(@NotNull final LiveSupplier<ImmutableTableLocationKey> addedKeySupplier) {
synchronized (updateLock) {
// Need to verify that we don't have stacked adds (without intervening removes).
if (pendingLocationsAdded.contains(tableLocationKey)) {
throw new IllegalStateException("TableLocationKey " + tableLocationKey
+ " was already added by a previous transaction.");
}
if (pendingLocationsAdded == EMPTY_TABLE_LOCATION_KEYS) {
pendingLocationsAdded = new HashSet<>();
}
manage(tableLocationKey);
pendingLocationsAdded.add(tableLocationKey);
// noinspection resource
ensurePendingUpdate().processAdd(addedKeySupplier);
}
}

@Override
public void handleTableLocationKeyRemoved(@NotNull final LiveSupplier<ImmutableTableLocationKey> tableLocationKey) {
public void handleTableLocationKeyRemoved(
@NotNull final LiveSupplier<ImmutableTableLocationKey> removedKeySupplier) {
synchronized (updateLock) {
// If we have a pending add, it is being cancelled by this remove.
if (pendingLocationsAdded.remove(tableLocationKey)) {
return;
}
// Verify that we don't have stacked removes (without intervening adds).
if (pendingLocationsRemoved.contains(tableLocationKey)) {
throw new IllegalStateException("TableLocationKey " + tableLocationKey
+ " was already removed and has not been replaced.");
}
if (pendingLocationsRemoved == EMPTY_TABLE_LOCATION_KEYS) {
pendingLocationsRemoved = new HashSet<>();
}
manage(tableLocationKey);
pendingLocationsRemoved.add(tableLocationKey);
// noinspection resource
ensurePendingUpdate().processRemove(removedKeySupplier);
}
}

@Override
public void handleTableLocationKeysUpdate(
@Nullable Collection<LiveSupplier<ImmutableTableLocationKey>> addedKeys,
@Nullable Collection<LiveSupplier<ImmutableTableLocationKey>> removedKeys) {
@Nullable Collection<LiveSupplier<ImmutableTableLocationKey>> addedKeySuppliers,
@Nullable Collection<LiveSupplier<ImmutableTableLocationKey>> removedKeySuppliers) {
synchronized (updateLock) {
if (removedKeys != null) {
for (final LiveSupplier<ImmutableTableLocationKey> removedTableLocationKey : removedKeys) {
// If we have a pending add, it is being cancelled by this remove.
if (pendingLocationsAdded.remove(removedTableLocationKey)) {
continue;
}
// Verify that we don't have stacked removes.
if (pendingLocationsRemoved.contains(removedTableLocationKey)) {
throw new IllegalStateException("TableLocationKey " + removedTableLocationKey
+ " was already removed and has not been replaced.");
}
if (pendingLocationsRemoved == EMPTY_TABLE_LOCATION_KEYS) {
pendingLocationsRemoved = new HashSet<>();
}
manage(removedTableLocationKey);
pendingLocationsRemoved.add(removedTableLocationKey);
}
}
if (addedKeys != null) {
for (final LiveSupplier<ImmutableTableLocationKey> addedTableLocationKey : addedKeys) {
// Need to verify that we don't have stacked adds.
if (pendingLocationsAdded.contains(addedTableLocationKey)) {
throw new IllegalStateException("TableLocationKey " + addedTableLocationKey
+ " was already added by a previous transaction.");
}
if (pendingLocationsAdded == EMPTY_TABLE_LOCATION_KEYS) {
pendingLocationsAdded = new HashSet<>();
}
manage(addedTableLocationKey);
pendingLocationsAdded.add(addedTableLocationKey);
}
}
// noinspection resource
ensurePendingUpdate().processTransaction(addedKeySuppliers, removedKeySuppliers);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ public class FileTableLocationKey extends PartitionedTableLocationKey {
protected final File file;
private final int order;

private int cachedHashCode;

/**
* Construct a new FileTableLocationKey for the supplied {@code file} and {@code partitions}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ public class URITableLocationKey extends PartitionedTableLocationKey {
protected final URI uri;
protected final int order;

private int cachedHashCode;

/**
* Construct a new URITableLocationKey for the supplied {@code uri} and {@code partitions}.
*
Expand Down
Loading

0 comments on commit 75187f9

Please sign in to comment.