From 3cd6cc686aaa1c6716d4fad6967713093182b6de Mon Sep 17 00:00:00 2001 From: husharp Date: Mon, 9 Oct 2023 10:49:54 +0800 Subject: [PATCH] fix Signed-off-by: husharp --- integration_tests/pd_api_test.go | 106 +++++++++++++++++++++---------- tikv/kv.go | 7 +- 2 files changed, 80 insertions(+), 33 deletions(-) diff --git a/integration_tests/pd_api_test.go b/integration_tests/pd_api_test.go index a05627f106..3ef9aef902 100644 --- a/integration_tests/pd_api_test.go +++ b/integration_tests/pd_api_test.go @@ -12,31 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -// NOTE: The code in this file is based on code from the -// TiDB project, licensed under the Apache License v 2.0 -// -// https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/tests/prewrite_test.go -// - -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - package tikv_test import ( "context" "fmt" + "math" "strings" "sync/atomic" "testing" @@ -71,6 +52,7 @@ func (s *apiTestSuite) SetupTest() { pdClient, err := pd.NewClient(addrs, pd.SecurityOption{}) s.Require().Nil(err) rpcClient := tikv.NewRPCClient() + s.Require().NoError(failpoint.Enable("tikvclient/mockFastSafeTSUpdater", `return()`)) // Set PD HTTP client. store, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0, tikv.WithPDHTTPClient(nil, addrs)) s.store = store @@ -85,6 +67,18 @@ func (s *apiTestSuite) storeAddr(id uint64) string { type storeSafeTsMockClient struct { tikv.Client requestCount int32 + kvSafeTS uint64 +} + +func newStoreSafeTsMockClient(client tikv.Client) storeSafeTsMockClient { + return storeSafeTsMockClient{ + Client: client, + kvSafeTS: 150, // Set a default value. + } +} + +func (c *storeSafeTsMockClient) SetKVSafeTS(ts uint64) { + c.kvSafeTS = ts } func (c *storeSafeTsMockClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { @@ -92,9 +86,9 @@ func (c *storeSafeTsMockClient) SendRequest(ctx context.Context, addr string, re return c.Client.SendRequest(ctx, addr, req, timeout) } atomic.AddInt32(&c.requestCount, 1) - resp := &tikvrpc.Response{} - resp.Resp = &kvrpcpb.StoreSafeTSResponse{SafeTs: 150} - return resp, nil + return &tikvrpc.Response{ + Resp: &kvrpcpb.StoreSafeTSResponse{SafeTs: c.kvSafeTS}, + }, nil } func (c *storeSafeTsMockClient) Close() error { @@ -110,9 +104,7 @@ func (s *apiTestSuite) TestGetClusterMinResolvedTS() { // Try to get the minimum resolved timestamp of the cluster from PD. require := s.Require() require.Nil(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`)) - mockClient := storeSafeTsMockClient{ - Client: s.store.GetTiKVClient(), - } + mockClient := newStoreSafeTsMockClient(s.store.GetTiKVClient()) s.store.SetTiKVClient(&mockClient) var retryCount int for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 100 { @@ -141,6 +133,7 @@ func (s *apiTestSuite) TestGetClusterMinResolvedTS() { } require.GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(1)) require.Equal(uint64(150), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS")) } func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() { @@ -148,9 +141,7 @@ func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() { // Try to get the minimum resolved timestamp of the cluster from PD. require := s.Require() require.Nil(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`)) - mockClient := storeSafeTsMockClient{ - Client: s.store.GetTiKVClient(), - } + mockClient := newStoreSafeTsMockClient(s.store.GetTiKVClient()) s.store.SetTiKVClient(&mockClient) var retryCount int for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 100 { @@ -162,9 +153,6 @@ func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() { } require.Equal(atomic.LoadInt32(&mockClient.requestCount), int32(0)) require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) - defer func() { - s.Require().Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS")) - }() // Set DC label for store 1. dcLabel := "testDC" @@ -193,10 +181,64 @@ func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() { require.GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(1)) require.Equal(uint64(150), s.store.GetMinSafeTS(dcLabel)) + require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS")) +} + +func (s *apiTestSuite) TestInitClusterMinResolvedTSZero() { + util.EnableFailpoints() + require := s.Require() + mockClient := newStoreSafeTsMockClient(s.store.GetTiKVClient()) + s.store.SetTiKVClient(&mockClient) + + // Make sure the store's min resolved ts is not initialized. + mockClient.SetKVSafeTS(0) + // Try to get the minimum resolved timestamp of the cluster from TiKV. + require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(0)`)) + var retryCount int + for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != math.MaxUint64 { + time.Sleep(100 * time.Millisecond) + if retryCount > 5 { + break + } + retryCount++ + } + // Make sure the store's min resolved ts is not initialized. + require.Equal(uint64(math.MaxUint64), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS")) + + // Try to get the minimum resolved timestamp of the cluster from PD. + require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`)) + retryCount = 0 + for s.store.GetMinSafeTS(oracle.GlobalTxnScope) == math.MaxUint64 { + time.Sleep(100 * time.Millisecond) + if retryCount > 5 { + break + } + retryCount++ + } + // Make sure the store's min resolved ts is not regarded as MaxUint64. + require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS")) + + // Fallback to KV Request when PD server not support get min resolved ts. + require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(0)`)) + mockClient.SetKVSafeTS(150) + retryCount = 0 + for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 150 { + time.Sleep(100 * time.Millisecond) + if retryCount > 5 { + break + } + retryCount++ + } + // Make sure the minSafeTS can advance. + require.Equal(uint64(150), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS")) } func (s *apiTestSuite) TearDownTest() { if s.store != nil { s.Require().Nil(s.store.Close()) } + s.Require().NoError(failpoint.Disable("tikvclient/mockFastSafeTSUpdater")) } diff --git a/tikv/kv.go b/tikv/kv.go index 733887f6af..811a1896cd 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -528,6 +528,9 @@ func (s *KVStore) updateMinSafeTS(txnScope string, storeIDs []uint64) { func (s *KVStore) safeTSUpdater() { defer s.wg.Done() t := time.NewTicker(time.Second * 2) + if _, e := util.EvalFailpoint("mockFastSafeTSUpdater"); e == nil { + t.Reset(time.Millisecond * 100) + } defer t.Stop() ctx, cancel := context.WithCancel(s.ctx) ctx = util.WithInternalSourceType(ctx, util.InternalTxnGC) @@ -625,7 +628,9 @@ func (s *KVStore) setClusterMinSafeTSByPD(ctx context.Context) bool { } else if clusterMinSafeTS != 0 { // Update metrics. preClusterMinSafeTS := s.GetMinSafeTS(oracle.GlobalTxnScope) - if preClusterMinSafeTS > clusterMinSafeTS { + // If preClusterMinSafeTS is maxUint64, it means that the min safe ts has not been initialized. + // related to https://github.com/tikv/client-go/issues/991 + if preClusterMinSafeTS != math.MaxUint64 && preClusterMinSafeTS > clusterMinSafeTS { skipClusterSafeTSUpdateCounter.Inc() preSafeTSTime := oracle.GetTimeFromTS(preClusterMinSafeTS) clusterMinSafeTSGap.Set(time.Since(preSafeTSTime).Seconds())