Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: DH-18395: Address performance problems with large numbers of table locations caused by serial unmanage #6552

Merged
merged 3 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,9 @@ private void processPendingLocations(final boolean notifyListeners) {

try (final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate =
subscriptionBuffer.processPending()) {
if (locationUpdate == null) {
return;
}
removed = processRemovals(locationUpdate);
added = processAdditions(locationUpdate);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,10 @@ private void initializeAvailableLocations() {
manage(locationBuffer);
try (final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate =
locationBuffer.processPending()) {
maybeRemoveLocations(locationUpdate.getPendingRemovedLocationKeys());
maybeAddLocations(locationUpdate.getPendingAddedLocationKeys());
if (locationUpdate != null) {
maybeRemoveLocations(locationUpdate.getPendingRemovedLocationKeys());
maybeAddLocations(locationUpdate.getPendingAddedLocationKeys());
}
}
updateSourceRegistrar.addSource(locationChangePoller = new LocationChangePoller(locationBuffer));
} else {
Expand Down Expand Up @@ -235,16 +237,19 @@ private LocationChangePoller(@NotNull final TableLocationSubscriptionBuffer loca
protected void instrumentedRefresh() {
try (final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate =
locationBuffer.processPending()) {
if (!locationProvider.getUpdateMode().removeAllowed()
&& !locationUpdate.getPendingRemovedLocationKeys().isEmpty()) {
// This TLP doesn't support removed locations, we need to throw an exception.
final ImmutableTableLocationKey[] keys = locationUpdate.getPendingRemovedLocationKeys().stream()
.map(LiveSupplier::get).toArray(ImmutableTableLocationKey[]::new);
throw new TableLocationRemovedException("Source table does not support removed locations", keys);
}
if (locationUpdate != null) {
if (!locationProvider.getUpdateMode().removeAllowed()
&& !locationUpdate.getPendingRemovedLocationKeys().isEmpty()) {
// This TLP doesn't support removed locations, we need to throw an exception.
final ImmutableTableLocationKey[] keys = locationUpdate.getPendingRemovedLocationKeys().stream()
.map(LiveSupplier::get).toArray(ImmutableTableLocationKey[]::new);
throw new TableLocationRemovedException(
"Source table does not support removed locations", keys);
}

maybeRemoveLocations(locationUpdate.getPendingRemovedLocationKeys());
maybeAddLocations(locationUpdate.getPendingAddedLocationKeys());
maybeRemoveLocations(locationUpdate.getPendingRemovedLocationKeys());
maybeAddLocations(locationUpdate.getPendingAddedLocationKeys());
}
}

// This class previously had functionality to notify "location listeners", but it was never used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public abstract class PartitionedTableLocationKey implements ImmutableTableLocat

protected final Map<String, Comparable<?>> partitions;

private int cachedHashCode;
protected int cachedHashCode;

/**
* Construct a new PartitionedTableLocationKey for the supplied {@code partitions}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,16 @@
public class TableLocationSubscriptionBuffer extends ReferenceCountedLivenessNode
implements TableLocationProvider.Listener {

private static final Set<LiveSupplier<ImmutableTableLocationKey>> EMPTY_TABLE_LOCATION_KEYS =
Collections.emptySet();
private static final Map<ImmutableTableLocationKey, LiveSupplier<ImmutableTableLocationKey>> EMPTY_TABLE_LOCATION_KEYS =
Collections.emptyMap();

private final TableLocationProvider tableLocationProvider;

private boolean subscribed = false;

private final Object updateLock = new Object();

// These sets represent adds and removes from completed transactions.
private Set<LiveSupplier<ImmutableTableLocationKey>> pendingLocationsAdded = EMPTY_TABLE_LOCATION_KEYS;
private Set<LiveSupplier<ImmutableTableLocationKey>> pendingLocationsRemoved = EMPTY_TABLE_LOCATION_KEYS;

private LocationUpdate pendingUpdate = null;
private TableDataException pendingException = null;

public TableLocationSubscriptionBuffer(@NotNull final TableLocationProvider tableLocationProvider) {
Expand All @@ -42,28 +39,77 @@ public TableLocationSubscriptionBuffer(@NotNull final TableLocationProvider tabl
}

public final class LocationUpdate implements SafeCloseable {
private final Collection<LiveSupplier<ImmutableTableLocationKey>> pendingAddedLocationKeys;
private final Collection<LiveSupplier<ImmutableTableLocationKey>> pendingRemovedLocations;

public LocationUpdate(
@NotNull final Collection<LiveSupplier<ImmutableTableLocationKey>> pendingAddedLocationKeys,
@NotNull final Collection<LiveSupplier<ImmutableTableLocationKey>> pendingRemovedLocations) {
this.pendingAddedLocationKeys = pendingAddedLocationKeys;
this.pendingRemovedLocations = pendingRemovedLocations;
private final ReferenceCountedLivenessNode livenessNode = new ReferenceCountedLivenessNode(false) {};

// These sets represent adds and removes from completed transactions.
private Map<ImmutableTableLocationKey, LiveSupplier<ImmutableTableLocationKey>> added =
EMPTY_TABLE_LOCATION_KEYS;
private Map<ImmutableTableLocationKey, LiveSupplier<ImmutableTableLocationKey>> removed =
EMPTY_TABLE_LOCATION_KEYS;

private LocationUpdate() {
TableLocationSubscriptionBuffer.this.manage(livenessNode);
}

private void processAdd(@NotNull final LiveSupplier<ImmutableTableLocationKey> addedKeySupplier) {
final ImmutableTableLocationKey addedKey = addedKeySupplier.get();
// Need to verify that we don't have stacked adds (without intervening removes).
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
if (added.containsKey(addedKey)) {
throw new IllegalStateException("TableLocationKey " + addedKey
+ " was already added by a previous transaction.");
}
if (added == EMPTY_TABLE_LOCATION_KEYS) {
added = new HashMap<>();
}
livenessNode.manage(addedKeySupplier);
added.put(addedKey, addedKeySupplier);
}

private void processRemove(@NotNull final LiveSupplier<ImmutableTableLocationKey> removedKeySupplier) {
final ImmutableTableLocationKey removedKey = removedKeySupplier.get();
// If we have a pending add, it is being cancelled by this remove.
if (added.remove(removedKey) != null) {
return;
}
// Verify that we don't have stacked removes (without intervening adds).
if (removed.containsKey(removedKey)) {
throw new IllegalStateException("TableLocationKey " + removedKey
+ " was already removed and has not been replaced.");
}
if (removed == EMPTY_TABLE_LOCATION_KEYS) {
removed = new HashMap<>();
}
livenessNode.manage(removedKeySupplier);
removed.put(removedKey, removedKeySupplier);
}

private void processTransaction(
@Nullable Collection<LiveSupplier<ImmutableTableLocationKey>> addedKeySuppliers,
@Nullable Collection<LiveSupplier<ImmutableTableLocationKey>> removedKeySuppliers) {
if (removedKeySuppliers != null) {
for (final LiveSupplier<ImmutableTableLocationKey> removedKeySupplier : removedKeySuppliers) {
processRemove(removedKeySupplier);
}
}
if (addedKeySuppliers != null) {
for (final LiveSupplier<ImmutableTableLocationKey> addedKeySupplier : addedKeySuppliers) {
processAdd(addedKeySupplier);
}
}
}

public Collection<LiveSupplier<ImmutableTableLocationKey>> getPendingAddedLocationKeys() {
return pendingAddedLocationKeys;
return added.values();
}

public Collection<LiveSupplier<ImmutableTableLocationKey>> getPendingRemovedLocationKeys() {
return pendingRemovedLocations;
return removed.values();
}

@Override
public void close() {
pendingAddedLocationKeys.forEach(TableLocationSubscriptionBuffer.this::unmanage);
pendingRemovedLocations.forEach(TableLocationSubscriptionBuffer.this::unmanage);
TableLocationSubscriptionBuffer.this.unmanage(livenessNode);
}
}

Expand All @@ -76,7 +122,6 @@ public void close() {
* @return The collection of pending location keys.
*/
public synchronized LocationUpdate processPending() {
// TODO: Should I change this to instead re-use the collection?
if (!subscribed) {
if (tableLocationProvider.supportsSubscriptions()) {
tableLocationProvider.subscribe(this);
Expand All @@ -90,23 +135,21 @@ public synchronized LocationUpdate processPending() {
}
subscribed = true;
}
final Collection<LiveSupplier<ImmutableTableLocationKey>> resultLocationKeys;
final Collection<LiveSupplier<ImmutableTableLocationKey>> resultLocationsRemoved;
final LocationUpdate resultUpdate;
final TableDataException resultException;
synchronized (updateLock) {
resultLocationKeys = pendingLocationsAdded;
pendingLocationsAdded = EMPTY_TABLE_LOCATION_KEYS;
resultLocationsRemoved = pendingLocationsRemoved;
pendingLocationsRemoved = EMPTY_TABLE_LOCATION_KEYS;
resultUpdate = pendingUpdate;
pendingUpdate = null;
resultException = pendingException;
pendingException = null;
}

if (resultException != null) {
throw new TableDataException("Processed pending exception", resultException);
try (final SafeCloseable ignored = resultUpdate) {
throw new TableDataException("Processed pending exception", resultException);
}
}

return new LocationUpdate(resultLocationKeys, resultLocationsRemoved);
return resultUpdate;
}

/**
Expand All @@ -119,92 +162,52 @@ public synchronized void reset() {
}
subscribed = false;
}
final LocationUpdate toClose;
synchronized (updateLock) {
pendingLocationsAdded.forEach(this::unmanage);
pendingLocationsRemoved.forEach(this::unmanage);
pendingLocationsAdded = EMPTY_TABLE_LOCATION_KEYS;
pendingLocationsRemoved = EMPTY_TABLE_LOCATION_KEYS;
toClose = pendingUpdate;
pendingUpdate = null;
pendingException = null;
}
if (toClose != null) {
toClose.close();
}
}

// ------------------------------------------------------------------------------------------------------------------
// TableLocationProvider.Listener implementation
// ------------------------------------------------------------------------------------------------------------------

private LocationUpdate ensurePendingUpdate() {
if (pendingUpdate == null) {
pendingUpdate = new LocationUpdate();
}
return pendingUpdate;
}

@Override
public void handleTableLocationKeyAdded(@NotNull final LiveSupplier<ImmutableTableLocationKey> tableLocationKey) {
public void handleTableLocationKeyAdded(@NotNull final LiveSupplier<ImmutableTableLocationKey> addedKeySupplier) {
synchronized (updateLock) {
// Need to verify that we don't have stacked adds (without intervening removes).
if (pendingLocationsAdded.contains(tableLocationKey)) {
throw new IllegalStateException("TableLocationKey " + tableLocationKey
+ " was already added by a previous transaction.");
}
if (pendingLocationsAdded == EMPTY_TABLE_LOCATION_KEYS) {
pendingLocationsAdded = new HashSet<>();
}
manage(tableLocationKey);
pendingLocationsAdded.add(tableLocationKey);
// noinspection resource
ensurePendingUpdate().processAdd(addedKeySupplier);
}
}

@Override
public void handleTableLocationKeyRemoved(@NotNull final LiveSupplier<ImmutableTableLocationKey> tableLocationKey) {
public void handleTableLocationKeyRemoved(
@NotNull final LiveSupplier<ImmutableTableLocationKey> removedKeySupplier) {
synchronized (updateLock) {
// If we have a pending add, it is being cancelled by this remove.
if (pendingLocationsAdded.remove(tableLocationKey)) {
return;
}
// Verify that we don't have stacked removes (without intervening adds).
if (pendingLocationsRemoved.contains(tableLocationKey)) {
throw new IllegalStateException("TableLocationKey " + tableLocationKey
+ " was already removed and has not been replaced.");
}
if (pendingLocationsRemoved == EMPTY_TABLE_LOCATION_KEYS) {
pendingLocationsRemoved = new HashSet<>();
}
manage(tableLocationKey);
pendingLocationsRemoved.add(tableLocationKey);
// noinspection resource
ensurePendingUpdate().processRemove(removedKeySupplier);
}
}

@Override
public void handleTableLocationKeysUpdate(
@Nullable Collection<LiveSupplier<ImmutableTableLocationKey>> addedKeys,
@Nullable Collection<LiveSupplier<ImmutableTableLocationKey>> removedKeys) {
@Nullable Collection<LiveSupplier<ImmutableTableLocationKey>> addedKeySuppliers,
@Nullable Collection<LiveSupplier<ImmutableTableLocationKey>> removedKeySuppliers) {
synchronized (updateLock) {
if (removedKeys != null) {
for (final LiveSupplier<ImmutableTableLocationKey> removedTableLocationKey : removedKeys) {
// If we have a pending add, it is being cancelled by this remove.
if (pendingLocationsAdded.remove(removedTableLocationKey)) {
continue;
}
// Verify that we don't have stacked removes.
if (pendingLocationsRemoved.contains(removedTableLocationKey)) {
throw new IllegalStateException("TableLocationKey " + removedTableLocationKey
+ " was already removed and has not been replaced.");
}
if (pendingLocationsRemoved == EMPTY_TABLE_LOCATION_KEYS) {
pendingLocationsRemoved = new HashSet<>();
}
manage(removedTableLocationKey);
pendingLocationsRemoved.add(removedTableLocationKey);
}
}
if (addedKeys != null) {
for (final LiveSupplier<ImmutableTableLocationKey> addedTableLocationKey : addedKeys) {
// Need to verify that we don't have stacked adds.
if (pendingLocationsAdded.contains(addedTableLocationKey)) {
throw new IllegalStateException("TableLocationKey " + addedTableLocationKey
+ " was already added by a previous transaction.");
}
if (pendingLocationsAdded == EMPTY_TABLE_LOCATION_KEYS) {
pendingLocationsAdded = new HashSet<>();
}
manage(addedTableLocationKey);
pendingLocationsAdded.add(addedTableLocationKey);
}
}
// noinspection resource
ensurePendingUpdate().processTransaction(addedKeySuppliers, removedKeySuppliers);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ public class FileTableLocationKey extends PartitionedTableLocationKey {
protected final File file;
private final int order;

private int cachedHashCode;

/**
* Construct a new FileTableLocationKey for the supplied {@code file} and {@code partitions}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ public class URITableLocationKey extends PartitionedTableLocationKey {
protected final URI uri;
protected final int order;

private int cachedHashCode;

/**
* Construct a new URITableLocationKey for the supplied {@code uri} and {@code partitions}.
*
Expand Down
Loading
Loading