diff --git a/pom.xml b/pom.xml
index c771f5f4171..1b98009f300 100644
--- a/pom.xml
+++ b/pom.xml
@@ -57,7 +57,7 @@
3.5.1
1.2.17
1.7.16
- 1.38.0
+ 1.48.0
2.0.34.Final
2.8.9
1.6.6
@@ -75,12 +75,12 @@
com.google.protobuf
protobuf-java
- 3.16.1
+ 3.19.6
com.google.protobuf
protobuf-java-util
- 3.16.1
+ 3.19.6
io.perfmark
@@ -232,6 +232,11 @@
3.9
compile
+
+ commons-codec
+ commons-codec
+ 1.15
+
org.apache.httpcomponents
httpclient
diff --git a/shell.nix b/shell.nix
deleted file mode 100644
index 20ff724d272..00000000000
--- a/shell.nix
+++ /dev/null
@@ -1,14 +0,0 @@
-{ pkgs ? import {} }:
-
-(
- pkgs.buildFHSUserEnv {
- name = "client-java-shell";
- targetPkgs = pkgs: with pkgs;[ git maven openjdk8 ];
- runScript = ''
- env \
- GIT_SSL_CAINFO=/etc/ssl/certs/ca-certificates.crt \
- JAVA_HOME=${pkgs.openjdk8}/lib/openjdk \
- bash
- '';
- }
-).env
diff --git a/src/main/java/io/grpc/internal/ClientCallImpl.java b/src/main/java/io/grpc/internal/ClientCallImpl.java
index 5b8d2f6ba25..75b769d6f80 100644
--- a/src/main/java/io/grpc/internal/ClientCallImpl.java
+++ b/src/main/java/io/grpc/internal/ClientCallImpl.java
@@ -24,6 +24,7 @@
import static io.grpc.Status.DEADLINE_EXCEEDED;
import static io.grpc.internal.GrpcUtil.CONTENT_ACCEPT_ENCODING_KEY;
import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY;
+import static io.grpc.internal.GrpcUtil.CONTENT_LENGTH_KEY;
import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY;
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
import static java.lang.Math.max;
@@ -33,6 +34,7 @@
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
+import io.grpc.ClientStreamTracer;
import io.grpc.Codec;
import io.grpc.Compressor;
import io.grpc.CompressorRegistry;
@@ -166,6 +168,7 @@ static void prepareHeaders(
DecompressorRegistry decompressorRegistry,
Compressor compressor,
boolean fullStreamDecompression) {
+ headers.discardAll(CONTENT_LENGTH_KEY);
headers.discardAll(MESSAGE_ENCODING_KEY);
if (compressor != Codec.Identity.NONE) {
headers.put(MESSAGE_ENCODING_KEY, compressor.getMessageEncoding());
@@ -260,10 +263,13 @@ public void runInContext() {
effectiveDeadline, context.getDeadline(), callOptions.getDeadline());
stream = clientStreamProvider.newStream(method, callOptions, headers, context);
} else {
+ ClientStreamTracer[] tracers =
+ GrpcUtil.getClientStreamTracers(callOptions, headers, 0, false);
stream =
new FailingClientStream(
DEADLINE_EXCEEDED.withDescription(
- "ClientCall started after deadline exceeded: " + effectiveDeadline));
+ "ClientCall started after deadline exceeded: " + effectiveDeadline),
+ tracers);
}
if (callExecutorIsDirect) {
@@ -363,12 +369,14 @@ private static void logIfContextNarrowedTimeout(
StringBuilder builder =
new StringBuilder(
String.format(
- "Call timeout set to '%d' ns, due to context deadline.", effectiveTimeout));
+ Locale.US,
+ "Call timeout set to '%d' ns, due to context deadline.",
+ effectiveTimeout));
if (callDeadline == null) {
builder.append(" Explicit call timeout was not set.");
} else {
long callTimeout = callDeadline.timeRemaining(TimeUnit.NANOSECONDS);
- builder.append(String.format(" Explicit call timeout was '%d' ns.", callTimeout));
+ builder.append(String.format(Locale.US, " Explicit call timeout was '%d' ns.", callTimeout));
}
log.fine(builder.toString());
@@ -562,6 +570,9 @@ public void setMessageCompression(boolean enabled) {
@Override
public boolean isReady() {
+ if (halfCloseCalled) {
+ return false;
+ }
return stream.isReady();
}
@@ -711,11 +722,6 @@ private void runInternal() {
}
}
- @Override
- public void closed(Status status, Metadata trailers) {
- closed(status, RpcProgress.PROCESSED, trailers);
- }
-
@Override
public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
PerfMark.startTask("ClientStreamListener.closed", tag);
diff --git a/src/main/java/io/netty/buffer/PoolArena.java b/src/main/java/io/netty/buffer/PoolArena.java
index 66ac2ac85d4..fc6cbf258ae 100644
--- a/src/main/java/io/netty/buffer/PoolArena.java
+++ b/src/main/java/io/netty/buffer/PoolArena.java
@@ -57,7 +57,6 @@ enum SizeClass {
final int numSmallSubpagePools;
final int directMemoryCacheAlignment;
- final int directMemoryCacheAlignmentMask;
private final PoolSubpage[] smallSubpagePools;
private final PoolChunkList q050;
@@ -97,7 +96,6 @@ protected PoolArena(
super(pageSize, pageShifts, chunkSize, cacheAlignment);
this.parent = parent;
directMemoryCacheAlignment = cacheAlignment;
- directMemoryCacheAlignmentMask = cacheAlignment - 1;
numSmallSubpagePools = nSubpages;
smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
@@ -183,9 +181,9 @@ private void tcacheAllocateSmall(
return;
}
- /**
- * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and {@link
- * PoolChunk#free(long)} may modify the doubly linked list as well.
+ /*
+ * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
+ * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
*/
final PoolSubpage head = smallSubpagePools[sizeIdx];
final boolean needsNormalAllocation;
@@ -193,7 +191,13 @@ private void tcacheAllocateSmall(
final PoolSubpage s = head.next;
needsNormalAllocation = s == head;
if (!needsNormalAllocation) {
- assert s.doNotDestroy && s.elemSize == sizeIdx2size(sizeIdx);
+ assert s.doNotDestroy && s.elemSize == sizeIdx2size(sizeIdx)
+ : "doNotDestroy="
+ + s.doNotDestroy
+ + ", elemSize="
+ + s.elemSize
+ + ", sizeIdx="
+ + sizeIdx;
long handle = s.allocate();
assert handle >= 0;
s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity, cache);
@@ -221,7 +225,7 @@ private void tcacheAllocateNormal(
}
}
- // Method must be called inside synchronized(this) { ... } block
+ // Method must be called inside synchronized(this) { ... } block
private void allocateNormal(
PooledByteBuf buf, int reqCapacity, int sizeIdx, PoolThreadCache threadCache) {
if (q050.allocate(buf, reqCapacity, sizeIdx, threadCache)
@@ -272,7 +276,7 @@ void free(
}
}
- private SizeClass sizeClass(long handle) {
+ private static SizeClass sizeClass(long handle) {
return isSubpage(handle) ? SizeClass.Small : SizeClass.Normal;
}
@@ -499,6 +503,25 @@ public long numActiveBytes() {
return max(0, val);
}
+ /**
+ * Return the number of bytes that are currently pinned to buffer instances, by the arena. The
+ * pinned memory is not accessible for use by any other allocation, until the buffers using have
+ * all been released.
+ */
+ public long numPinnedBytes() {
+ long val =
+ activeBytesHuge
+ .value(); // Huge chunks are exact-sized for the buffers they were allocated to.
+ synchronized (this) {
+ for (int i = 0; i < chunkListMetrics.size(); i++) {
+ for (PoolChunkMetric m : chunkListMetrics.get(i)) {
+ val += ((PoolChunk>) m).pinnedBytes();
+ }
+ }
+ }
+ return max(0, val);
+ }
+
protected abstract PoolChunk newChunk(
int pageSize, int maxPageIdx, int pageShifts, int chunkSize);
@@ -588,13 +611,8 @@ private void destroyPoolChunkLists(PoolChunkList... chunkLists) {
static final class HeapArena extends PoolArena {
- HeapArena(
- PooledByteBufAllocator parent,
- int pageSize,
- int pageShifts,
- int chunkSize,
- int directMemoryCacheAlignment) {
- super(parent, pageSize, pageShifts, chunkSize, directMemoryCacheAlignment);
+ HeapArena(PooledByteBufAllocator parent, int pageSize, int pageShifts, int chunkSize) {
+ super(parent, pageSize, pageShifts, chunkSize, 0);
}
private static byte[] newByteArray(int size) {
@@ -610,12 +628,12 @@ boolean isDirect() {
protected PoolChunk newChunk(
int pageSize, int maxPageIdx, int pageShifts, int chunkSize) {
return new PoolChunk(
- this, newByteArray(chunkSize), pageSize, pageShifts, chunkSize, maxPageIdx, 0);
+ this, null, newByteArray(chunkSize), pageSize, pageShifts, chunkSize, maxPageIdx);
}
@Override
protected PoolChunk newUnpooledChunk(int capacity) {
- return new PoolChunk(this, newByteArray(capacity), capacity, 0);
+ return new PoolChunk(this, null, newByteArray(capacity), capacity);
}
@Override
@@ -656,40 +674,33 @@ boolean isDirect() {
return true;
}
- // mark as package-private, only for unit test
- int offsetCacheLine(ByteBuffer memory) {
- // We can only calculate the offset if Unsafe is present as otherwise directBufferAddress(...)
- // will
- // throw an NPE.
- int remainder =
- HAS_UNSAFE
- ? (int)
- (PlatformDependent.directBufferAddress(memory) & directMemoryCacheAlignmentMask)
- : 0;
-
- // offset = alignment - address & (alignment - 1)
- return directMemoryCacheAlignment - remainder;
- }
-
@Override
protected PoolChunk newChunk(
int pageSize, int maxPageIdx, int pageShifts, int chunkSize) {
if (directMemoryCacheAlignment == 0) {
+ ByteBuffer memory = allocateDirect(chunkSize);
return new PoolChunk(
- this, allocateDirect(chunkSize), pageSize, pageShifts, chunkSize, maxPageIdx, 0);
+ this, memory, memory, pageSize, pageShifts, chunkSize, maxPageIdx);
}
- final ByteBuffer memory = allocateDirect(chunkSize + directMemoryCacheAlignment);
+
+ final ByteBuffer base = allocateDirect(chunkSize + directMemoryCacheAlignment);
+ final ByteBuffer memory =
+ PlatformDependent.alignDirectBuffer(base, directMemoryCacheAlignment);
return new PoolChunk(
- this, memory, pageSize, pageShifts, chunkSize, maxPageIdx, offsetCacheLine(memory));
+ this, base, memory, pageSize, pageShifts, chunkSize, maxPageIdx);
}
@Override
protected PoolChunk newUnpooledChunk(int capacity) {
if (directMemoryCacheAlignment == 0) {
- return new PoolChunk(this, allocateDirect(capacity), capacity, 0);
+ ByteBuffer memory = allocateDirect(capacity);
+ return new PoolChunk(this, memory, memory, capacity);
}
- final ByteBuffer memory = allocateDirect(capacity + directMemoryCacheAlignment);
- return new PoolChunk(this, memory, capacity, offsetCacheLine(memory));
+
+ final ByteBuffer base = allocateDirect(capacity + directMemoryCacheAlignment);
+ final ByteBuffer memory =
+ PlatformDependent.alignDirectBuffer(base, directMemoryCacheAlignment);
+ return new PoolChunk(this, base, memory, capacity);
}
private static ByteBuffer allocateDirect(int capacity) {
@@ -701,9 +712,9 @@ private static ByteBuffer allocateDirect(int capacity) {
@Override
protected void destroyChunk(PoolChunk chunk) {
if (PlatformDependent.useDirectBufferNoCleaner()) {
- PlatformDependent.freeDirectNoCleaner(chunk.memory);
+ PlatformDependent.freeDirectNoCleaner((ByteBuffer) chunk.base);
} else {
- PlatformDependent.freeDirectBuffer(chunk.memory);
+ PlatformDependent.freeDirectBuffer((ByteBuffer) chunk.base);
}
}
diff --git a/src/main/java/org/tikv/common/AbstractGRPCClient.java b/src/main/java/org/tikv/common/AbstractGRPCClient.java
index ac68552f7e2..6e2d9a7b5b8 100644
--- a/src/main/java/org/tikv/common/AbstractGRPCClient.java
+++ b/src/main/java/org/tikv/common/AbstractGRPCClient.java
@@ -193,13 +193,17 @@ private boolean doCheckHealth(BackOffer backOffer, String addressStr, HostMappin
HealthCheckResponse resp = stub.check(req);
return resp.getStatus() == HealthCheckResponse.ServingStatus.SERVING;
} catch (Exception e) {
- logger.warn("check health failed.", e);
+ logger.warn("check health failed, addr: {}, caused by: {}", addressStr, e.getMessage());
backOffer.doBackOff(BackOffFuncType.BoCheckHealth, e);
}
}
}
protected boolean checkHealth(BackOffer backOffer, String addressStr, HostMapping hostMapping) {
- return doCheckHealth(backOffer, addressStr, hostMapping);
+ try {
+ return doCheckHealth(backOffer, addressStr, hostMapping);
+ } catch (Exception e) {
+ return false;
+ }
}
}
diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java
index 1568a78e0ef..43383bbda87 100644
--- a/src/main/java/org/tikv/common/PDClient.java
+++ b/src/main/java/org/tikv/common/PDClient.java
@@ -462,14 +462,19 @@ private GetMembersResponse doGetMembers(BackOffer backOffer, URI uri) {
}
return resp;
} catch (Exception e) {
- logger.warn("failed to get member from pd server.", e);
+ logger.warn(
+ "failed to get member from pd server from {}, caused by: {}", uri, e.getMessage());
backOffer.doBackOff(BackOffFuncType.BoPDRPC, e);
}
}
}
private GetMembersResponse getMembers(BackOffer backOffer, URI uri) {
- return doGetMembers(backOffer, uri);
+ try {
+ return doGetMembers(backOffer, uri);
+ } catch (Exception e) {
+ return null;
+ }
}
// return whether the leader has changed to target address `leaderUrlStr`.
@@ -524,13 +529,16 @@ synchronized boolean createFollowerClientWrapper(
public void tryUpdateLeaderOrForwardFollower() {
if (updateLeaderNotify.compareAndSet(false, true)) {
try {
- BackOffer backOffer = defaultBackOffer();
updateLeaderService.submit(
() -> {
try {
- updateLeaderOrForwardFollower(backOffer);
+ updateLeaderOrForwardFollower();
+ } catch (Exception e) {
+ logger.info("update leader or forward follower failed", e);
+ throw e;
} finally {
updateLeaderNotify.set(false);
+ logger.info("updating leader finish");
}
});
} catch (RejectedExecutionException e) {
@@ -540,11 +548,13 @@ public void tryUpdateLeaderOrForwardFollower() {
}
}
- private synchronized void updateLeaderOrForwardFollower(BackOffer backOffer) {
+ private synchronized void updateLeaderOrForwardFollower() {
+ logger.warn("updating leader or forward follower");
if (System.currentTimeMillis() - lastUpdateLeaderTime < MIN_TRY_UPDATE_DURATION) {
return;
}
for (URI url : this.pdAddrs) {
+ BackOffer backOffer = this.probeBackOffer();
// since resp is null, we need update leader's address by walking through all pd server.
GetMembersResponse resp = getMembers(backOffer, url);
if (resp == null) {
@@ -602,8 +612,9 @@ && createFollowerClientWrapper(backOffer, followerUrlStr, leaderUrlStr)) {
}
public void tryUpdateLeader() {
+ logger.info("try update leader");
for (URI url : this.pdAddrs) {
- BackOffer backOffer = defaultBackOffer();
+ BackOffer backOffer = this.probeBackOffer();
// since resp is null, we need update leader's address by walking through all pd server.
GetMembersResponse resp = getMembers(backOffer, url);
if (resp == null) {
@@ -856,4 +867,9 @@ public RequestKeyCodec getCodec() {
private static BackOffer defaultBackOffer() {
return ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF);
}
+
+ private BackOffer probeBackOffer() {
+ int maxSleep = (int) getTimeout() * 2;
+ return ConcreteBackOffer.newCustomBackOff(maxSleep);
+ }
}
diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java
index 7e7eff2b9dc..c30f7b6f6fc 100644
--- a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java
+++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java
@@ -31,6 +31,7 @@
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.TiRegion;
import org.tikv.common.util.BackOffFunction;
+import org.tikv.common.util.BackOffFunction.BackOffFuncType;
import org.tikv.common.util.BackOffer;
import org.tikv.kvproto.Errorpb;
import org.tikv.kvproto.Metapb;
@@ -168,6 +169,12 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) {
regionManager.clearRegionCache();
throw new StatusRuntimeException(Status.UNKNOWN.withDescription(error.toString()));
}
+ // The tso cache is used up in TiKV servers, we should backoff and wait its cache is renewed.
+ else if (error.getMessage().contains("TsoBatchUsedUp")) {
+ logger.warn(String.format("tso batch used up for region [%s]", recv.getRegion()));
+ backOffer.doBackOff(BackOffFuncType.BoTsoBatchUsedUp, new GrpcException(error.getMessage()));
+ return true;
+ }
logger.warn(String.format("Unknown error %s for region [%s]", error, recv.getRegion()));
// For other errors, we only drop cache here.
diff --git a/src/main/java/org/tikv/common/util/BackOffFunction.java b/src/main/java/org/tikv/common/util/BackOffFunction.java
index 3eb07a725a9..33ccde27dc1 100644
--- a/src/main/java/org/tikv/common/util/BackOffFunction.java
+++ b/src/main/java/org/tikv/common/util/BackOffFunction.java
@@ -81,6 +81,7 @@ public enum BackOffFuncType {
BoServerBusy,
BoTxnNotFound,
BoCheckTimeout,
- BoCheckHealth
+ BoCheckHealth,
+ BoTsoBatchUsedUp
}
}
diff --git a/src/main/java/org/tikv/common/util/ConcreteBackOffer.java b/src/main/java/org/tikv/common/util/ConcreteBackOffer.java
index cef280567c6..39b65474040 100644
--- a/src/main/java/org/tikv/common/util/ConcreteBackOffer.java
+++ b/src/main/java/org/tikv/common/util/ConcreteBackOffer.java
@@ -174,6 +174,13 @@ private BackOffFunction createBackOffFunc(BackOffFunction.BackOffFuncType funcTy
case BoCheckHealth:
backOffFunction = BackOffFunction.create(100, 600, BackOffStrategy.EqualJitter);
break;
+ case BoTsoBatchUsedUp:
+ backOffFunction =
+ BackOffFunction.create(
+ TiConfiguration.getInt(TIKV_BO_REGION_MISS_BASE_IN_MS),
+ 500,
+ BackOffStrategy.NoJitter);
+ break;
}
return backOffFunction;
}
diff --git a/src/test/java/org/tikv/common/TsoBatchUsedUpTest.java b/src/test/java/org/tikv/common/TsoBatchUsedUpTest.java
new file mode 100644
index 00000000000..cda984f823f
--- /dev/null
+++ b/src/test/java/org/tikv/common/TsoBatchUsedUpTest.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2022 TiKV Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.tikv.common;
+
+import com.google.protobuf.ByteString;
+import org.junit.Assert;
+import org.junit.Test;
+import org.tikv.kvproto.Errorpb.Error;
+import org.tikv.raw.RawKVClient;
+
+public class TsoBatchUsedUpTest extends MockThreeStoresTest {
+ RawKVClient createClient() {
+ return session.createRawClient();
+ }
+
+ @Test
+ public void testTsoBatchUsedUp() {
+ ByteString key = ByteString.copyFromUtf8("tso");
+ servers.get(0).putError("tso", () -> Error.newBuilder().setMessage("TsoBatchUsedUp"));
+ try (RawKVClient client = createClient()) {
+ try {
+ client.put(key, ByteString.EMPTY);
+ Assert.fail();
+ } catch (Exception ignore) {
+ }
+ pdServers.get(0).addGetRegionListener(request -> null);
+ // Will not clean region cache
+ Assert.assertNotNull(session.getRegionManager().getRegionByKey(key));
+ }
+ }
+}