From c6151803e00b4ff8d6697c4953d9a46a8ae1efd9 Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Fri, 2 Sep 2022 23:14:01 +0800 Subject: [PATCH 1/3] fix wrong backoffer for pd client Signed-off-by: iosmanthus --- .../org/tikv/common/AbstractGRPCClient.java | 8 ++++-- src/main/java/org/tikv/common/PDClient.java | 28 +++++++++++++++---- 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/tikv/common/AbstractGRPCClient.java b/src/main/java/org/tikv/common/AbstractGRPCClient.java index ac68552f7e2..6e2d9a7b5b8 100644 --- a/src/main/java/org/tikv/common/AbstractGRPCClient.java +++ b/src/main/java/org/tikv/common/AbstractGRPCClient.java @@ -193,13 +193,17 @@ private boolean doCheckHealth(BackOffer backOffer, String addressStr, HostMappin HealthCheckResponse resp = stub.check(req); return resp.getStatus() == HealthCheckResponse.ServingStatus.SERVING; } catch (Exception e) { - logger.warn("check health failed.", e); + logger.warn("check health failed, addr: {}, caused by: {}", addressStr, e.getMessage()); backOffer.doBackOff(BackOffFuncType.BoCheckHealth, e); } } } protected boolean checkHealth(BackOffer backOffer, String addressStr, HostMapping hostMapping) { - return doCheckHealth(backOffer, addressStr, hostMapping); + try { + return doCheckHealth(backOffer, addressStr, hostMapping); + } catch (Exception e) { + return false; + } } } diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 1568a78e0ef..43383bbda87 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -462,14 +462,19 @@ private GetMembersResponse doGetMembers(BackOffer backOffer, URI uri) { } return resp; } catch (Exception e) { - logger.warn("failed to get member from pd server.", e); + logger.warn( + "failed to get member from pd server from {}, caused by: {}", uri, e.getMessage()); backOffer.doBackOff(BackOffFuncType.BoPDRPC, e); } } } private GetMembersResponse getMembers(BackOffer backOffer, URI uri) { - return doGetMembers(backOffer, uri); + try { + return doGetMembers(backOffer, uri); + } catch (Exception e) { + return null; + } } // return whether the leader has changed to target address `leaderUrlStr`. @@ -524,13 +529,16 @@ synchronized boolean createFollowerClientWrapper( public void tryUpdateLeaderOrForwardFollower() { if (updateLeaderNotify.compareAndSet(false, true)) { try { - BackOffer backOffer = defaultBackOffer(); updateLeaderService.submit( () -> { try { - updateLeaderOrForwardFollower(backOffer); + updateLeaderOrForwardFollower(); + } catch (Exception e) { + logger.info("update leader or forward follower failed", e); + throw e; } finally { updateLeaderNotify.set(false); + logger.info("updating leader finish"); } }); } catch (RejectedExecutionException e) { @@ -540,11 +548,13 @@ public void tryUpdateLeaderOrForwardFollower() { } } - private synchronized void updateLeaderOrForwardFollower(BackOffer backOffer) { + private synchronized void updateLeaderOrForwardFollower() { + logger.warn("updating leader or forward follower"); if (System.currentTimeMillis() - lastUpdateLeaderTime < MIN_TRY_UPDATE_DURATION) { return; } for (URI url : this.pdAddrs) { + BackOffer backOffer = this.probeBackOffer(); // since resp is null, we need update leader's address by walking through all pd server. GetMembersResponse resp = getMembers(backOffer, url); if (resp == null) { @@ -602,8 +612,9 @@ && createFollowerClientWrapper(backOffer, followerUrlStr, leaderUrlStr)) { } public void tryUpdateLeader() { + logger.info("try update leader"); for (URI url : this.pdAddrs) { - BackOffer backOffer = defaultBackOffer(); + BackOffer backOffer = this.probeBackOffer(); // since resp is null, we need update leader's address by walking through all pd server. GetMembersResponse resp = getMembers(backOffer, url); if (resp == null) { @@ -856,4 +867,9 @@ public RequestKeyCodec getCodec() { private static BackOffer defaultBackOffer() { return ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF); } + + private BackOffer probeBackOffer() { + int maxSleep = (int) getTimeout() * 2; + return ConcreteBackOffer.newCustomBackOff(maxSleep); + } } From 5ef714f4c2ec7a2722cb6f65207f7618f4e2e8a3 Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Mon, 5 Sep 2022 22:38:29 +0800 Subject: [PATCH 2/3] add TsoBatchUsedUp region error handler Signed-off-by: iosmanthus --- .../common/operation/RegionErrorHandler.java | 7 +++++ .../org/tikv/common/util/BackOffFunction.java | 3 +- .../tikv/common/util/ConcreteBackOffer.java | 7 +++++ .../org/tikv/common/TsoBatchUsedUpTest.java | 29 +++++++++++++++++++ 4 files changed, 45 insertions(+), 1 deletion(-) create mode 100644 src/test/java/org/tikv/common/TsoBatchUsedUpTest.java diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java index 7e7eff2b9dc..c30f7b6f6fc 100644 --- a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java @@ -31,6 +31,7 @@ import org.tikv.common.region.RegionManager; import org.tikv.common.region.TiRegion; import org.tikv.common.util.BackOffFunction; +import org.tikv.common.util.BackOffFunction.BackOffFuncType; import org.tikv.common.util.BackOffer; import org.tikv.kvproto.Errorpb; import org.tikv.kvproto.Metapb; @@ -168,6 +169,12 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) { regionManager.clearRegionCache(); throw new StatusRuntimeException(Status.UNKNOWN.withDescription(error.toString())); } + // The tso cache is used up in TiKV servers, we should backoff and wait its cache is renewed. + else if (error.getMessage().contains("TsoBatchUsedUp")) { + logger.warn(String.format("tso batch used up for region [%s]", recv.getRegion())); + backOffer.doBackOff(BackOffFuncType.BoTsoBatchUsedUp, new GrpcException(error.getMessage())); + return true; + } logger.warn(String.format("Unknown error %s for region [%s]", error, recv.getRegion())); // For other errors, we only drop cache here. diff --git a/src/main/java/org/tikv/common/util/BackOffFunction.java b/src/main/java/org/tikv/common/util/BackOffFunction.java index 3eb07a725a9..33ccde27dc1 100644 --- a/src/main/java/org/tikv/common/util/BackOffFunction.java +++ b/src/main/java/org/tikv/common/util/BackOffFunction.java @@ -81,6 +81,7 @@ public enum BackOffFuncType { BoServerBusy, BoTxnNotFound, BoCheckTimeout, - BoCheckHealth + BoCheckHealth, + BoTsoBatchUsedUp } } diff --git a/src/main/java/org/tikv/common/util/ConcreteBackOffer.java b/src/main/java/org/tikv/common/util/ConcreteBackOffer.java index cef280567c6..39b65474040 100644 --- a/src/main/java/org/tikv/common/util/ConcreteBackOffer.java +++ b/src/main/java/org/tikv/common/util/ConcreteBackOffer.java @@ -174,6 +174,13 @@ private BackOffFunction createBackOffFunc(BackOffFunction.BackOffFuncType funcTy case BoCheckHealth: backOffFunction = BackOffFunction.create(100, 600, BackOffStrategy.EqualJitter); break; + case BoTsoBatchUsedUp: + backOffFunction = + BackOffFunction.create( + TiConfiguration.getInt(TIKV_BO_REGION_MISS_BASE_IN_MS), + 500, + BackOffStrategy.NoJitter); + break; } return backOffFunction; } diff --git a/src/test/java/org/tikv/common/TsoBatchUsedUpTest.java b/src/test/java/org/tikv/common/TsoBatchUsedUpTest.java new file mode 100644 index 00000000000..b88ffe7d936 --- /dev/null +++ b/src/test/java/org/tikv/common/TsoBatchUsedUpTest.java @@ -0,0 +1,29 @@ +package org.tikv.common; + +import com.google.protobuf.ByteString; +import org.junit.Assert; +import org.junit.Test; +import org.tikv.kvproto.Errorpb.Error; +import org.tikv.raw.RawKVClient; + +public class TsoBatchUsedUpTest extends MockThreeStoresTest { + RawKVClient createClient() { + return session.createRawClient(); + } + + @Test + public void testTsoBatchUsedUp() { + ByteString key = ByteString.copyFromUtf8("tso"); + servers.get(0).putError("tso", () -> Error.newBuilder().setMessage("TsoBatchUsedUp")); + try (RawKVClient client = createClient()) { + try { + client.put(key, ByteString.EMPTY); + Assert.fail(); + } catch (Exception ignore) { + } + pdServers.get(0).addGetRegionListener(request -> null); + // Will not clean region cache + Assert.assertNotNull(session.getRegionManager().getRegionByKey(key)); + } + } +} From d07e565589110df64f8a4956326f62294698d71e Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Mon, 5 Sep 2022 22:45:39 +0800 Subject: [PATCH 3/3] fix license header Signed-off-by: iosmanthus --- .../org/tikv/common/TsoBatchUsedUpTest.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/test/java/org/tikv/common/TsoBatchUsedUpTest.java b/src/test/java/org/tikv/common/TsoBatchUsedUpTest.java index b88ffe7d936..cda984f823f 100644 --- a/src/test/java/org/tikv/common/TsoBatchUsedUpTest.java +++ b/src/test/java/org/tikv/common/TsoBatchUsedUpTest.java @@ -1,3 +1,20 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + package org.tikv.common; import com.google.protobuf.ByteString;