From 11cfac07390cd26e3ea43ba9b4d1cb200ed3e020 Mon Sep 17 00:00:00 2001 From: Jiaming Lu Date: Wed, 11 Jan 2023 13:28:16 +0800 Subject: [PATCH] Implementing reverseScan Signed-off-by: Jiaming Lu --- src/main/java/org/tikv/common/KVClient.java | 32 ++++++++-- src/main/java/org/tikv/common/Snapshot.java | 38 ++++++++++- .../iterator/ConcreteScanIterator.java | 17 +++-- .../operation/iterator/RawScanIterator.java | 5 +- .../operation/iterator/ScanIterator.java | 5 +- .../tikv/common/region/RegionStoreClient.java | 23 ++++--- src/main/java/org/tikv/raw/RawKVClient.java | 2 +- src/main/java/org/tikv/txn/KVClient.java | 64 ++++++++++++++++--- 8 files changed, 149 insertions(+), 37 deletions(-) diff --git a/src/main/java/org/tikv/common/KVClient.java b/src/main/java/org/tikv/common/KVClient.java index 6ae3a909771..c3ef9db9b94 100644 --- a/src/main/java/org/tikv/common/KVClient.java +++ b/src/main/java/org/tikv/common/KVClient.java @@ -101,7 +101,25 @@ public List batchGet(BackOffer backOffer, List keys, long ve */ public List scan(ByteString startKey, ByteString endKey, long version) throws GrpcException { - Iterator iterator = scanIterator(conf, clientBuilder, startKey, endKey, version); + return scan(startKey, endKey, version, false); + } + + /** + * Scan key-value pairs from TiKV in range [startKey, endKey) or if reversely, [endKey, startKey) + * + * @param startKey start key, inclusive + * @param endKey end key, exclusive + * @param reverse whether to scan reversely + * @return list of key-value pairs in range + */ + public List scan(ByteString startKey, ByteString endKey, long version, boolean reverse) + throws GrpcException { + Iterator iterator; + if (reverse) { + iterator = scanIterator(conf, clientBuilder, endKey, startKey, version, reverse); + } else { + iterator = scanIterator(conf, clientBuilder, startKey, endKey, version, reverse); + } List result = new ArrayList<>(); iterator.forEachRemaining(result::add); return result; @@ -115,7 +133,7 @@ public List scan(ByteString startKey, ByteString endKey, long version) * @return list of key-value pairs in range */ public List scan(ByteString startKey, long version, int limit) throws GrpcException { - Iterator iterator = scanIterator(conf, clientBuilder, startKey, version, limit); + Iterator iterator = scanIterator(conf, clientBuilder, startKey, version, limit, false); List result = new ArrayList<>(); iterator.forEachRemaining(result::add); return result; @@ -183,8 +201,9 @@ private Iterator scanIterator( RegionStoreClientBuilder builder, ByteString startKey, ByteString endKey, - long version) { - return new ConcreteScanIterator(conf, builder, startKey, endKey, version); + long version, + boolean reverse) { + return new ConcreteScanIterator(conf, builder, startKey, endKey, version, reverse); } private Iterator scanIterator( @@ -192,7 +211,8 @@ private Iterator scanIterator( RegionStoreClientBuilder builder, ByteString startKey, long version, - int limit) { - return new ConcreteScanIterator(conf, builder, startKey, version, limit); + int limit, + boolean reverse) { + return new ConcreteScanIterator(conf, builder, startKey, version, limit, reverse); } } diff --git a/src/main/java/org/tikv/common/Snapshot.java b/src/main/java/org/tikv/common/Snapshot.java index 7012bc749ec..2c0cb2ac840 100644 --- a/src/main/java/org/tikv/common/Snapshot.java +++ b/src/main/java/org/tikv/common/Snapshot.java @@ -157,7 +157,24 @@ public Iterator scan(ByteString startKey) { session.getRegionStoreClientBuilder(), startKey, timestamp.getVersion(), - Integer.MAX_VALUE); + Integer.MAX_VALUE, + false); + } + + /** + * scan all keys becofe startKey, inclusive + * + * @param startKey start of keys + * @return iterator of kvPair + */ + public Iterator reverseScan(ByteString startKey) { + return new ConcreteScanIterator( + session.getConf(), + session.getRegionStoreClientBuilder(), + startKey, + timestamp.getVersion(), + Integer.MAX_VALUE, + true); } /** @@ -173,7 +190,24 @@ public Iterator scanPrefix(ByteString prefix) { session.getRegionStoreClientBuilder(), prefix, nextPrefix, - timestamp.getVersion()); + timestamp.getVersion(), + false); + } + /** + * scan all keys with prefix, reversely + * + * @param prefix prefix of keys + * @return iterator of kvPair + */ + public Iterator reverseScanPrefix(ByteString prefix) { + ByteString nextPrefix = Key.toRawKey(prefix).nextPrefix().toByteString(); + return new ConcreteScanIterator( + session.getConf(), + session.getRegionStoreClientBuilder(), + nextPrefix, + prefix, + timestamp.getVersion(), + true); } public TiConfiguration getConf() { diff --git a/src/main/java/org/tikv/common/operation/iterator/ConcreteScanIterator.java b/src/main/java/org/tikv/common/operation/iterator/ConcreteScanIterator.java index 72422736e76..0e5a0896ddb 100644 --- a/src/main/java/org/tikv/common/operation/iterator/ConcreteScanIterator.java +++ b/src/main/java/org/tikv/common/operation/iterator/ConcreteScanIterator.java @@ -44,9 +44,10 @@ public ConcreteScanIterator( RegionStoreClientBuilder builder, ByteString startKey, long version, - int limit) { + int limit, + boolean reverse) { // Passing endKey as ByteString.EMPTY means that endKey is +INF by default, - this(conf, builder, startKey, ByteString.EMPTY, version, limit); + this(conf, builder, startKey, ByteString.EMPTY, version, limit, reverse); } public ConcreteScanIterator( @@ -54,9 +55,10 @@ public ConcreteScanIterator( RegionStoreClientBuilder builder, ByteString startKey, ByteString endKey, - long version) { + long version, + boolean reverse) { // Passing endKey as ByteString.EMPTY means that endKey is +INF by default, - this(conf, builder, startKey, endKey, version, Integer.MAX_VALUE); + this(conf, builder, startKey, endKey, version, Integer.MAX_VALUE, reverse); } private ConcreteScanIterator( @@ -65,8 +67,9 @@ private ConcreteScanIterator( ByteString startKey, ByteString endKey, long version, - int limit) { - super(conf, builder, startKey, endKey, limit, false); + int limit, + boolean reverse) { + super(conf, builder, startKey, endKey, limit, false, reverse); this.version = version; } @@ -76,7 +79,7 @@ TiRegion loadCurrentRegionToCache() throws GrpcException { try (RegionStoreClient client = builder.build(startKey)) { client.setTimeout(conf.getScanTimeout()); BackOffer backOffer = ConcreteBackOffer.newScannerNextMaxBackOff(); - currentCache = client.scan(backOffer, startKey, version); + currentCache = client.scan(backOffer, startKey, version, reverse); // If we get region before scan, we will use region from cache which // may have wrong end key. This may miss some regions that split from old region. // Client will get the newest region during scan. So we need to diff --git a/src/main/java/org/tikv/common/operation/iterator/RawScanIterator.java b/src/main/java/org/tikv/common/operation/iterator/RawScanIterator.java index 08a64aaf87f..3333b37a5cf 100644 --- a/src/main/java/org/tikv/common/operation/iterator/RawScanIterator.java +++ b/src/main/java/org/tikv/common/operation/iterator/RawScanIterator.java @@ -39,8 +39,9 @@ public RawScanIterator( ByteString endKey, int limit, boolean keyOnly, + boolean reverse, BackOffer scanBackOffer) { - super(conf, builder, startKey, endKey, limit, keyOnly); + super(conf, builder, startKey, endKey, limit, keyOnly, reverse); this.scanBackOffer = scanBackOffer; } @@ -56,7 +57,7 @@ TiRegion loadCurrentRegionToCache() throws GrpcException { currentCache = null; } else { try { - currentCache = client.rawScan(backOffer, startKey, limit, keyOnly); + currentCache = client.rawScan(backOffer, startKey, limit, keyOnly, reverse); // Client will get the newest region during scan. So we need to // update region after scan. region = client.getRegion(); diff --git a/src/main/java/org/tikv/common/operation/iterator/ScanIterator.java b/src/main/java/org/tikv/common/operation/iterator/ScanIterator.java index 69fd0217fd5..24a46990fd1 100644 --- a/src/main/java/org/tikv/common/operation/iterator/ScanIterator.java +++ b/src/main/java/org/tikv/common/operation/iterator/ScanIterator.java @@ -43,6 +43,7 @@ public abstract class ScanIterator implements Iterator { protected Key endKey; protected boolean hasEndKey; protected boolean processingLastBatch = false; + protected boolean reverse = false; ScanIterator( TiConfiguration conf, @@ -50,7 +51,8 @@ public abstract class ScanIterator implements Iterator { ByteString startKey, ByteString endKey, int limit, - boolean keyOnly) { + boolean keyOnly, + boolean reverse) { this.startKey = requireNonNull(startKey, "start key is null"); this.endKey = Key.toRawKey(requireNonNull(endKey, "end key is null")); this.hasEndKey = !endKey.isEmpty(); @@ -58,6 +60,7 @@ public abstract class ScanIterator implements Iterator { this.keyOnly = keyOnly; this.conf = conf; this.builder = builder; + this.reverse = reverse; } /** diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java index 22607b2bdb1..d35050fc78b 100644 --- a/src/main/java/org/tikv/common/region/RegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java @@ -70,8 +70,7 @@ import org.tikv.common.util.HistogramUtils; import org.tikv.common.util.Pair; import org.tikv.common.util.RangeSplitter; -import org.tikv.kvproto.Coprocessor; -import org.tikv.kvproto.Errorpb; +import org.tikv.kvproto.*; import org.tikv.kvproto.Kvrpcpb.BatchGetRequest; import org.tikv.kvproto.Kvrpcpb.BatchGetResponse; import org.tikv.kvproto.Kvrpcpb.CommitRequest; @@ -109,8 +108,6 @@ import org.tikv.kvproto.Kvrpcpb.SplitRegionResponse; import org.tikv.kvproto.Kvrpcpb.TxnHeartBeatRequest; import org.tikv.kvproto.Kvrpcpb.TxnHeartBeatResponse; -import org.tikv.kvproto.Metapb; -import org.tikv.kvproto.TikvGrpc; import org.tikv.kvproto.TikvGrpc.TikvBlockingStub; import org.tikv.kvproto.TikvGrpc.TikvFutureStub; import org.tikv.txn.AbstractLockResolverClient; @@ -336,7 +333,7 @@ private List handleBatchGetResponse( } public List scan( - BackOffer backOffer, ByteString startKey, long version, boolean keyOnly) { + BackOffer backOffer, ByteString startKey, long version, boolean keyOnly, boolean reverse) { boolean forWrite = false; while (true) { Supplier request = @@ -348,6 +345,7 @@ public List scan( .setStartKey(codec.encodeKey(startKey)) .setVersion(version) .setKeyOnly(keyOnly) + .setReverse(reverse) .setLimit(getConf().getScanBatchSize()) .build(); @@ -417,6 +415,11 @@ public List scan(BackOffer backOffer, ByteString startKey, long version) return scan(backOffer, startKey, version, false); } + public List scan( + BackOffer backOffer, ByteString startKey, long version, boolean reverse) { + return scan(backOffer, startKey, version, false, reverse); + } + /** * Prewrite batch keys * @@ -1238,9 +1241,11 @@ private void handleRawBatchDelete(RawBatchDeleteResponse resp) { * @param backOffer BackOffer * @param key startKey * @param keyOnly true if value of KvPair is not needed + * @param reverse * @return KvPair list */ - public List rawScan(BackOffer backOffer, ByteString key, int limit, boolean keyOnly) { + public List rawScan( + BackOffer backOffer, ByteString key, int limit, boolean keyOnly, boolean reverse) { Long clusterId = pdClient.getClusterId(); Histogram.Timer requestTimer = GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_scan", clusterId.toString()).startTimer(); @@ -1254,6 +1259,7 @@ public List rawScan(BackOffer backOffer, ByteString key, int limit, bool .setEndKey(range.second) .setKeyOnly(keyOnly) .setLimit(limit) + .setReverse(reverse) .build(); }; @@ -1271,8 +1277,9 @@ public List rawScan(BackOffer backOffer, ByteString key, int limit, bool } } - public List rawScan(BackOffer backOffer, ByteString key, boolean keyOnly) { - return rawScan(backOffer, key, getConf().getScanBatchSize(), keyOnly); + public List rawScan( + BackOffer backOffer, ByteString key, boolean keyOnly, boolean reverse) { + return rawScan(backOffer, key, getConf().getScanBatchSize(), keyOnly, reverse); } private List rawScanHelper(RawScanResponse resp) { diff --git a/src/main/java/org/tikv/raw/RawKVClient.java b/src/main/java/org/tikv/raw/RawKVClient.java index 3fa292c9d80..e82c27b50e5 100644 --- a/src/main/java/org/tikv/raw/RawKVClient.java +++ b/src/main/java/org/tikv/raw/RawKVClient.java @@ -1011,7 +1011,7 @@ private Iterator rawScanIterator( if (limit > MAX_RAW_SCAN_LIMIT) { throw ERR_MAX_SCAN_LIMIT_EXCEEDED; } - return new RawScanIterator(conf, builder, startKey, endKey, limit, keyOnly, backOffer); + return new RawScanIterator(conf, builder, startKey, endKey, limit, keyOnly, false, backOffer); } /** diff --git a/src/main/java/org/tikv/txn/KVClient.java b/src/main/java/org/tikv/txn/KVClient.java index e8c83c54463..02ed24de191 100644 --- a/src/main/java/org/tikv/txn/KVClient.java +++ b/src/main/java/org/tikv/txn/KVClient.java @@ -114,8 +114,37 @@ public List batchGet(BackOffer backOffer, List keys, */ public List scan(ByteString startKey, ByteString endKey, long version) throws GrpcException { + return scan(startKey, endKey, version, false); + } + /** + * Scan key-value pairs from TiKV reversely in range (startKey, endKey] + * + * @param startKey start key, inclusive + * @param endKey end key, exclusive + * @return list of key-value pairs in range + */ + public List reverseScan(ByteString startKey, ByteString endKey, long version) + throws GrpcException { + return scan(endKey, startKey, version, true); + } + + public List scan( + ByteString startKey, ByteString endKey, long version, boolean reverse) throws GrpcException { + Iterator iterator; + if (reverse) { + iterator = scanIterator(conf, clientBuilder, endKey, startKey, version, reverse); + } else { + iterator = scanIterator(conf, clientBuilder, startKey, endKey, version, reverse); + } + List result = new ArrayList<>(); + iterator.forEachRemaining(result::add); + return result; + } + + public List scan(ByteString startKey, long version, int limit, boolean reverse) + throws GrpcException { Iterator iterator = - scanIterator(conf, clientBuilder, startKey, endKey, version); + scanIterator(conf, clientBuilder, startKey, version, limit, reverse); List result = new ArrayList<>(); iterator.forEachRemaining(result::add); return result; @@ -130,14 +159,27 @@ public List scan(ByteString startKey, ByteString endKey, long ve */ public List scan(ByteString startKey, long version, int limit) throws GrpcException { - Iterator iterator = scanIterator(conf, clientBuilder, startKey, version, limit); - List result = new ArrayList<>(); - iterator.forEachRemaining(result::add); - return result; + return scan(startKey, version, limit, false); + } + + /** + * Scan key-value pairs reversively from TiKV in range ('', endKey], maximum to `limit` pairs + * + * @param endKey start key, inclusive + * @param limit limit of kv pairs + * @return list of key-value pairs in range + */ + public List reverseScan(ByteString endKey, long version, int limit) + throws GrpcException { + return scan(endKey, version, limit, true); } public List scan(ByteString startKey, long version) throws GrpcException { - return scan(startKey, version, Integer.MAX_VALUE); + return scan(startKey, version, Integer.MAX_VALUE, false); + } + + public List reverseScan(ByteString endKey, long version) throws GrpcException { + return scan(endKey, version, Integer.MAX_VALUE, true); } public synchronized void ingest(List> list) throws GrpcException { @@ -264,8 +306,9 @@ private Iterator scanIterator( RegionStoreClientBuilder builder, ByteString startKey, ByteString endKey, - long version) { - return new ConcreteScanIterator(conf, builder, startKey, endKey, version); + long version, + boolean reverse) { + return new ConcreteScanIterator(conf, builder, startKey, endKey, version, reverse); } private Iterator scanIterator( @@ -273,8 +316,9 @@ private Iterator scanIterator( RegionStoreClientBuilder builder, ByteString startKey, long version, - int limit) { - return new ConcreteScanIterator(conf, builder, startKey, version, limit); + int limit, + boolean reverse) { + return new ConcreteScanIterator(conf, builder, startKey, version, limit, reverse); } private void doIngest(TiRegion region, List> sortedList)