From 31a7a72930d7ce3141ae7c7421f38a5ba2fd0039 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20B=C4=85czkowski?= Date: Thu, 7 Nov 2024 14:22:32 +0100 Subject: [PATCH] Add ReadWriteLock for concurrent tablet modifications Each table now has a dedicated lock for its set of tablets. Multiple sets can be modified concurrently. Most importantly without copying the whole set. Should improve times, mostly in cases of large amount of tablets and reduce number of misses during tablet shuffling. --- .../com/datastax/driver/core/Metadata.java | 23 +-- .../com/datastax/driver/core/TabletMap.java | 135 ++++++++++-------- 2 files changed, 94 insertions(+), 64 deletions(-) diff --git a/driver-core/src/main/java/com/datastax/driver/core/Metadata.java b/driver-core/src/main/java/com/datastax/driver/core/Metadata.java index e84e407536a..c4cb08ba2b9 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/Metadata.java +++ b/driver-core/src/main/java/com/datastax/driver/core/Metadata.java @@ -36,13 +36,13 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.NavigableSet; import java.util.Set; import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -916,8 +916,8 @@ public int getShardForTabletToken( UUID targetHostUuid = host.getHostId(); long tokenValue = (long) token.getValue(); TabletMap.KeyspaceTableNamePair key = new TabletMap.KeyspaceTableNamePair(keyspace, table); - NavigableSet targetTablets = tabletMap.getMapping().get(key); - if (targetTablets == null) { + TabletMap.TabletSet targetSet = tabletMap.getMapping().get(key); + if (targetSet == null) { logger.trace( "Could not determine shard for token {} on host {} because table {}.{} is not present in tablets " + "metadata. Returning -1.", @@ -927,13 +927,20 @@ public int getShardForTabletToken( table); return -1; } - TabletMap.Tablet row = targetTablets.ceiling(TabletMap.Tablet.malformedTablet(tokenValue)); - if (row != null && row.getFirstToken() < tokenValue) { - for (TabletMap.HostShardPair hostShardPair : row.getReplicas()) { - if (hostShardPair.getHost().equals(targetHostUuid)) { - return hostShardPair.getShard(); + Lock readLock = targetSet.lock.readLock(); + try { + readLock.lock(); + TabletMap.Tablet row = + targetSet.tablets.ceiling(TabletMap.Tablet.malformedTablet(tokenValue)); + if (row != null && row.getFirstToken() < tokenValue) { + for (TabletMap.HostShardPair hostShardPair : row.getReplicas()) { + if (hostShardPair.getHost().equals(targetHostUuid)) { + return hostShardPair.getShard(); + } } } + } finally { + readLock.unlock(); } logger.trace( "Could not find tablet corresponding to token {} on host {} for table {} in keyspace {}. Returning -1.", diff --git a/driver-core/src/main/java/com/datastax/driver/core/TabletMap.java b/driver-core/src/main/java/com/datastax/driver/core/TabletMap.java index 38d424b8c20..0920802c595 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/TabletMap.java +++ b/driver-core/src/main/java/com/datastax/driver/core/TabletMap.java @@ -14,6 +14,8 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,7 +27,9 @@ public class TabletMap { private static final Logger logger = LoggerFactory.getLogger(TabletMap.class); - private final ConcurrentMap> mapping; + // There are no additional locking mechanisms for the mapping field itself, however each TabletSet + // inside has its own ReadWriteLock that should be used when dealing with its internals. + private final ConcurrentMap mapping; private final Cluster.Manager cluster; @@ -34,7 +38,7 @@ public class TabletMap { private TypeCodec tabletPayloadCodec = null; public TabletMap( - Cluster.Manager cluster, ConcurrentMap> mapping) { + Cluster.Manager cluster, ConcurrentMap mapping) { this.cluster = cluster; this.mapping = mapping; } @@ -46,9 +50,9 @@ public static TabletMap emptyMap(Cluster.Manager cluster) { /** * Returns the mapping of tables to their tablets. * - * @return the Map keyed by (keyspace,table) pairs with Set of tablets as value type. + * @return the Map keyed by (keyspace,table) pairs with {@link TabletSet} as value type. */ - public Map> getMapping() { + public Map getMapping() { return mapping; } @@ -68,28 +72,34 @@ public Set getReplicas(String keyspace, String table, long token) { return Collections.emptySet(); } - NavigableSet set = mapping.get(key); - if (set == null) { + TabletSet tabletSet = mapping.get(key); + if (tabletSet == null) { logger.trace( "There is no tablets for {}.{} in this mapping. Returning empty set.", keyspace, table); return Collections.emptySet(); } - Tablet row = mapping.get(key).ceiling(Tablet.malformedTablet(token)); - if (row == null || row.firstToken >= token) { - logger.trace( - "Could not find tablet for {}.{} that owns token {}. Returning empty set.", - keyspace, - table, - token); - return Collections.emptySet(); - } + Lock readLock = tabletSet.lock.readLock(); + try { + readLock.lock(); + Tablet row = mapping.get(key).tablets.ceiling(Tablet.malformedTablet(token)); + if (row == null || row.firstToken >= token) { + logger.trace( + "Could not find tablet for {}.{} that owns token {}. Returning empty set.", + keyspace, + table, + token); + return Collections.emptySet(); + } - HashSet uuidSet = new HashSet<>(); - for (HostShardPair hostShardPair : row.replicas) { - if (cluster.metadata.getHost(hostShardPair.getHost()) != null) - uuidSet.add(hostShardPair.getHost()); + HashSet uuidSet = new HashSet<>(); + for (HostShardPair hostShardPair : row.replicas) { + if (cluster.metadata.getHost(hostShardPair.getHost()) != null) + uuidSet.add(hostShardPair.getHost()); + } + return uuidSet; + } finally { + readLock.unlock(); } - return uuidSet; } /** @@ -121,46 +131,47 @@ void processTabletsRoutingV1Payload(String keyspace, String table, ByteBuffer pa HostShardPair hostShardPair = new HostShardPair(tuple.getUUID(0), tuple.getInt(1)); replicas.add(hostShardPair); } - - // Working on a copy to avoid concurrent modification of the same set - NavigableSet existingTablets = - new TreeSet<>(mapping.computeIfAbsent(ktPair, k -> new TreeSet<>())); - - // Single tablet token range is represented by (firstToken, lastToken] interval - // We need to do two sweeps: remove overlapping tablets by looking at lastToken of existing - // tablets - // and then by looking at firstToken of existing tablets. Currently, the tablets are sorted - // according - // to their lastTokens. - - // First sweep: remove all tablets whose lastToken is inside this interval - Iterator it = - existingTablets.headSet(Tablet.malformedTablet(lastToken), true).descendingIterator(); - while (it.hasNext()) { - Tablet tablet = it.next(); - if (tablet.lastToken <= firstToken) { - break; + Tablet newTablet = new Tablet(keyspace, null, table, firstToken, lastToken, replicas); + + TabletSet tabletSet = mapping.computeIfAbsent(ktPair, k -> new TabletSet()); + Lock writeLock = tabletSet.lock.writeLock(); + try { + writeLock.lock(); + NavigableSet currentTablets = tabletSet.tablets; + // Single tablet token range is represented by (firstToken, lastToken] interval + // We need to do two sweeps: remove overlapping tablets by looking at lastToken of existing + // tablets + // and then by looking at firstToken of existing tablets. Currently, the tablets are sorted + // according + // to their lastTokens. + + // First sweep: remove all tablets whose lastToken is inside this interval + Iterator it = currentTablets.headSet(newTablet, true).descendingIterator(); + while (it.hasNext()) { + Tablet tablet = it.next(); + if (tablet.lastToken <= firstToken) { + break; + } + it.remove(); } - it.remove(); - } - // Second sweep: remove all tablets whose firstToken is inside this tuple's (firstToken, - // lastToken] - // After the first sweep, this theoretically should remove at most 1 tablet - it = existingTablets.tailSet(Tablet.malformedTablet(lastToken), true).iterator(); - while (it.hasNext()) { - Tablet tablet = it.next(); - if (tablet.firstToken >= lastToken) { - break; + // Second sweep: remove all tablets whose firstToken is inside this tuple's (firstToken, + // lastToken] + // After the first sweep, this theoretically should remove at most 1 tablet + it = currentTablets.tailSet(newTablet, true).iterator(); + while (it.hasNext()) { + Tablet tablet = it.next(); + if (tablet.firstToken >= lastToken) { + break; + } + it.remove(); } - it.remove(); - } - // Add new (now) non-overlapping tablet - existingTablets.add(new Tablet(keyspace, null, table, firstToken, lastToken, replicas)); - - // Set the updated result in the main map - mapping.put(ktPair, existingTablets); + // Add new (now) non-overlapping tablet + currentTablets.add(newTablet); + } finally { + writeLock.unlock(); + } } public TupleType getPayloadOuterTuple() { @@ -258,6 +269,18 @@ public int hashCode() { } } + /** + * Set of tablets bundled with ReadWriteLock to allow concurrent modification for different sets. + */ + public static class TabletSet { + final NavigableSet tablets; + final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + + public TabletSet() { + this.tablets = new TreeSet<>(); + } + } + /** * Represents a single tablet created from tablets-routing-v1 custom payload. Its {@code * compareTo} implementation intentionally relies solely on {@code lastToken} in order to allow