Skip to content

Commit

Permalink
fix golangci lint (#965)
Browse files Browse the repository at this point in the history
Signed-off-by: 3pointer <[email protected]>
  • Loading branch information
3pointer authored Sep 18, 2023
1 parent 9c163cc commit 884157d
Show file tree
Hide file tree
Showing 15 changed files with 121 additions and 97 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,5 @@ jobs:
- name: Lint
uses: golangci/[email protected]
with:
version: v1.47.3
version: v1.52.2

4 changes: 2 additions & 2 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ func (state *accessByKnownProxy) onNoLeader(selector *replicaSelector) {

// tryNewProxy is the state where we try to find a node from followers as proxy.
type tryNewProxy struct {
//nolint:unused
stateBase
leaderIdx AccessIndex
}
Expand Down Expand Up @@ -668,9 +669,8 @@ func (state *accessFollower) IsLeaderExhausted(leader *replica) bool {
// 4. The leader peer should be retried again using snapshot read.
if state.isStaleRead && state.option.leaderOnly {
return leader.isExhausted(2)
} else {
return leader.isExhausted(1)
}
return leader.isExhausted(1)
}

func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replicaSelector, cause error) {
Expand Down
21 changes: 14 additions & 7 deletions internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,8 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaReadFallbackToLeaderReg
req.EnableStaleRead()
req.ReplicaReadType = kv.ReplicaReadFollower

ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
bo := retry.NewBackoffer(ctx, -1)
s.Nil(err)
resp, err := s.regionRequestSender.SendReq(bo, req, regionLoc.Region, time.Second)
Expand Down Expand Up @@ -1084,7 +1085,8 @@ func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback2Leader() {
var ops []StoreSelectorOption
ops = append(ops, WithMatchLabels(leaderLabel))

ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
bo := retry.NewBackoffer(ctx, -1)
s.Nil(err)
resp, _, err := s.regionRequestSender.SendReqCtx(bo, req, regionLoc.Region, time.Second, tikvrpc.TiKV, ops...)
Expand Down Expand Up @@ -1119,18 +1121,22 @@ func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback2Follower() {
Value: strconv.FormatUint(leaderStore.StoreID(), 10),
},
}
var followerID *uint64
var (
followerID uint64
findFollower bool
)
for _, storeID := range s.storeIDs {
if storeID != leaderStore.storeID {
followerID = &storeID
findFollower = true
followerID = storeID
break
}
}
s.NotNil(followerID)
s.True(findFollower)
followerLabel := []*metapb.StoreLabel{
{
Key: "id",
Value: strconv.FormatUint(*followerID, 10),
Value: strconv.FormatUint(followerID, 10),
},
}

Expand Down Expand Up @@ -1181,7 +1187,8 @@ func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback2Follower() {
ops = append(ops, WithMatchLabels(followerLabel))
}

ctx, _ := context.WithTimeout(context.Background(), 10000*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 10000*time.Second)
defer cancel()
bo := retry.NewBackoffer(ctx, -1)
s.Nil(err)
resp, _, err := s.regionRequestSender.SendReqCtx(bo, req, regionLoc.Region, time.Second, tikvrpc.TiKV, ops...)
Expand Down
8 changes: 3 additions & 5 deletions internal/locate/region_request_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,17 +237,14 @@ func (s *testRegionCacheStaleReadSuite) extractResp(resp *tikvrpc.Response) (uin
storeID, err := strconv.Atoi(resps[0])
s.Nil(err)
successReadType, err := strconv.Atoi(resps[2])
s.Nil(err)
return uint64(storeID), resps[1], SuccessReadType(successReadType)
}

func (s *testRegionCacheStaleReadSuite) setUnavailableStore(id uint64) {
s.injection.unavailableStoreIDs[id] = struct{}{}
}

func (s *testRegionCacheStaleReadSuite) setTimeout(id uint64) {
s.injection.timeoutStoreIDs[id] = struct{}{}
}

func TestRegionCacheStaleRead(t *testing.T) {
originReloadRegionInterval := atomic.LoadInt64(&reloadRegionInterval)
originBoTiKVServerBusy := retry.BoTiKVServerBusy
Expand Down Expand Up @@ -545,7 +542,8 @@ func testStaleReadLeader(s *testRegionCacheStaleReadSuite, r *RegionCacheTestCas
}

func testStaleRead(s *testRegionCacheStaleReadSuite, r *RegionCacheTestCase, zone string) {
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
leaderZone := zone == "z1"
var available bool
if leaderZone {
Expand Down
16 changes: 8 additions & 8 deletions internal/mockstore/mocktikv/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ var _ cluster.Cluster = &Cluster{}

// Cluster simulates a TiKV cluster. It focuses on management and the change of
// meta data. A Cluster mainly includes following 3 kinds of meta data:
// 1) Region: A Region is a fragment of TiKV's data whose range is [start, end).
// The data of a Region is duplicated to multiple Peers and distributed in
// multiple Stores.
// 2) Peer: A Peer is a replica of a Region's data. All peers of a Region form
// a group, each group elects a Leader to provide services.
// 3) Store: A Store is a storage/service node. Try to think it as a TiKV server
// process. Only the store with request's Region's leader Peer could respond
// to client's request.
// 1. Region: A Region is a fragment of TiKV's data whose range is [start, end).
// The data of a Region is duplicated to multiple Peers and distributed in
// multiple Stores.
// 2. Peer: A Peer is a replica of a Region's data. All peers of a Region form
// a group, each group elects a Leader to provide services.
// 3. Store: A Store is a storage/service node. Try to think it as a TiKV server
// process. Only the store with request's Region's leader Peer could respond
// to client's request.
type Cluster struct {
sync.RWMutex
id uint64
Expand Down
2 changes: 1 addition & 1 deletion internal/mockstore/mocktikv/cluster_manipulate.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func BootstrapWithMultiZones(cluster *Cluster, n, m int) (storeIDs, peerIDs []ui
},
{
Key: "zone",
Value: fmt.Sprintf(zone),
Value: zone,
},
}
cluster.AddStore(storeID, fmt.Sprintf("store%d", storeID), labels...)
Expand Down
8 changes: 4 additions & 4 deletions kv/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ func NextKey(k []byte) []byte {
//
// Assume there are keys like:
//
// rowkey1
// rowkey1_column1
// rowkey1_column2
// rowKey2
// rowkey1
// rowkey1_column1
// rowkey1_column2
// rowKey2
//
// If we seek 'rowkey1' NextKey, we will get 'rowkey1_column1'.
// If we seek 'rowkey1' PrefixNextKey, we will get 'rowkey2'.
Expand Down
4 changes: 3 additions & 1 deletion tikv/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ const GCScanLockLimit = txnlock.ResolvedCacheSize / 2

// GC does garbage collection (GC) of the TiKV cluster.
// GC deletes MVCC records whose timestamp is lower than the given `safepoint`. We must guarantee
// that all transactions started before this timestamp had committed. We can keep an active
//
// that all transactions started before this timestamp had committed. We can keep an active
//
// transaction list in application to decide which is the minimal start timestamp of them.
//
// For each key, the last mutation record (unless it's a deletion) before `safepoint` is retained.
Expand Down
12 changes: 6 additions & 6 deletions tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ type KVStore struct {
client Client
}
pdClient pd.Client
pdHttpClient *util.PDHTTPClient
pdHTTPClient *util.PDHTTPClient
regionCache *locate.RegionCache
lockResolver *txnlock.LockResolver
txnLatches *latch.LatchesScheduler
Expand Down Expand Up @@ -171,7 +171,7 @@ type Option func(*KVStore)
// WithPDHTTPClient set the PD HTTP client with the given address and TLS config.
func WithPDHTTPClient(tlsConf *tls.Config, pdaddrs []string) Option {
return func(o *KVStore) {
o.pdHttpClient = util.NewPDHTTPClient(tlsConf, pdaddrs)
o.pdHTTPClient = util.NewPDHTTPClient(tlsConf, pdaddrs)
}
}

Expand Down Expand Up @@ -327,8 +327,8 @@ func (s *KVStore) Close() error {

s.oracle.Close()
s.pdClient.Close()
if s.pdHttpClient != nil {
s.pdHttpClient.Close()
if s.pdHTTPClient != nil {
s.pdHTTPClient.Close()
}
s.lockResolver.Close()

Expand Down Expand Up @@ -618,8 +618,8 @@ var (
func (s *KVStore) setClusterMinSafeTSByPD(ctx context.Context) bool {
isGlobal := config.GetTxnScopeFromConfig() == oracle.GlobalTxnScope
// Try to get the minimum resolved timestamp of the cluster from PD.
if s.pdHttpClient != nil && isGlobal {
clusterMinSafeTS, err := s.pdHttpClient.GetClusterMinResolvedTS(ctx)
if s.pdHTTPClient != nil && isGlobal {
clusterMinSafeTS, err := s.pdHTTPClient.GetClusterMinResolvedTS(ctx)
if err != nil {
logutil.BgLogger().Debug("get cluster-level min resolved timestamp from PD failed", zap.Error(err))
} else if clusterMinSafeTS != 0 {
Expand Down
1 change: 1 addition & 0 deletions tikv/test_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func (s StoreProbe) GCResolveLockPhase(ctx context.Context, safepoint uint64, co
return s.resolveLocks(ctx, safepoint, concurrency)
}

// ScanLocks scan locks in a range with given start key/end key.
func (s StoreProbe) ScanLocks(ctx context.Context, startKey, endKey []byte, maxVersion uint64) ([]*txnlock.Lock, error) {
bo := NewGcResolveLockMaxBackoffer(ctx)
const limit = 1024
Expand Down
76 changes: 42 additions & 34 deletions tikvrpc/interceptor/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,38 +32,43 @@ import (
//
// We can implement an RPCInterceptor like this:
// ```
// func LogInterceptor(next InterceptorFunc) RPCInterceptorFunc {
// return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
// log.Println("before")
// resp, err := next(target, req)
// log.Println("after")
// return resp, err
// }
// }
//
// func LogInterceptor(next InterceptorFunc) RPCInterceptorFunc {
// return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
// log.Println("before")
// resp, err := next(target, req)
// log.Println("after")
// return resp, err
// }
// }
//
// txn.SetRPCInterceptor(LogInterceptor)
// ```
//
// Or you want to inject some dependent modules:
// ```
// func GetLogInterceptor(lg *log.Logger) RPCInterceptor {
// return func(next RPCInterceptorFunc) RPCInterceptorFunc {
// return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
// lg.Println("before")
// resp, err := next(target, req)
// lg.Println("after")
// return resp, err
// }
// }
// }
//
// func GetLogInterceptor(lg *log.Logger) RPCInterceptor {
// return func(next RPCInterceptorFunc) RPCInterceptorFunc {
// return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
// lg.Println("before")
// resp, err := next(target, req)
// lg.Println("after")
// return resp, err
// }
// }
// }
//
// txn.SetRPCInterceptor(GetLogInterceptor())
// ```
//
// NOTE: Interceptor calls may not correspond one-to-one with the underlying gRPC requests.
// This is because there may be some exceptions, such as: request batched, no
// valid connection etc. If you have questions about the execution location of
// RPCInterceptor, please refer to:
// tikv/kv.go#NewKVStore()
// internal/client/client_interceptor.go#SendRequest.
//
// tikv/kv.go#NewKVStore()
// internal/client/client_interceptor.go#SendRequest.
type RPCInterceptor func(next RPCInterceptorFunc) RPCInterceptorFunc

// RPCInterceptorFunc is a callable function used to initiate a request to TiKV.
Expand All @@ -77,20 +82,23 @@ type RPCInterceptorFunc func(target string, req *tikvrpc.Request) (*tikvrpc.Resp
//
// We can use RPCInterceptorChain like this:
// ```
// func Interceptor1(next InterceptorFunc) RPCInterceptorFunc {
// return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
// fmt.Println("begin-interceptor-1")
// defer fmt.Println("end-interceptor-1")
// return next(target, req)
// }
// }
// func Interceptor2(next InterceptorFunc) RPCInterceptorFunc {
// return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
// fmt.Println("begin-interceptor-2")
// defer fmt.Println("end-interceptor-2")
// return next(target, req)
// }
// }
//
// func Interceptor1(next InterceptorFunc) RPCInterceptorFunc {
// return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
// fmt.Println("begin-interceptor-1")
// defer fmt.Println("end-interceptor-1")
// return next(target, req)
// }
// }
//
// func Interceptor2(next InterceptorFunc) RPCInterceptorFunc {
// return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
// fmt.Println("begin-interceptor-2")
// defer fmt.Println("end-interceptor-2")
// return next(target, req)
// }
// }
//
// txn.SetRPCInterceptor(NewRPCInterceptorChain().Link(Interceptor1).Link(Interceptor2).Build())
// ```
//
Expand Down
32 changes: 17 additions & 15 deletions txnkv/txnlock/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,15 @@ func (s TxnStatus) Action() kvrpcpb.Action { return s.action }

// StatusCacheable checks whether the transaction status is certain.True will be
// returned if its status is certain:
// If transaction is already committed, the result could be cached.
// Otherwise:
// If l.LockType is pessimistic lock type:
// - if its primary lock is pessimistic too, the check txn status result should not be cached.
// - if its primary lock is prewrite lock type, the check txn status could be cached.
// If l.lockType is prewrite lock type:
// - always cache the check txn status result.
//
// If transaction is already committed, the result could be cached.
// Otherwise:
// If l.LockType is pessimistic lock type:
// - if its primary lock is pessimistic too, the check txn status result should not be cached.
// - if its primary lock is prewrite lock type, the check txn status could be cached.
// If l.lockType is prewrite lock type:
// - always cache the check txn status result.
//
// For prewrite locks, their primary keys should ALWAYS be the correct one and will NOT change.
func (s TxnStatus) StatusCacheable() bool {
if s.IsCommitted() {
Expand Down Expand Up @@ -361,14 +363,14 @@ func (lr *LockResolver) ResolveLocksWithOpts(bo *retry.Backoffer, opts ResolveLo
}

// ResolveLocks tries to resolve Locks. The resolving process is in 3 steps:
// 1) Use the `lockTTL` to pick up all expired locks. Only locks that are too
// old are considered orphan locks and will be handled later. If all locks
// are expired then all locks will be resolved so the returned `ok` will be
// true, otherwise caller should sleep a while before retry.
// 2) For each lock, query the primary key to get txn(which left the lock)'s
// commit status.
// 3) Send `ResolveLock` cmd to the lock's region to resolve all locks belong to
// the same transaction.
// 1. Use the `lockTTL` to pick up all expired locks. Only locks that are too
// old are considered orphan locks and will be handled later. If all locks
// are expired then all locks will be resolved so the returned `ok` will be
// true, otherwise caller should sleep a while before retry.
// 2. For each lock, query the primary key to get txn(which left the lock)'s
// commit status.
// 3. Send `ResolveLock` cmd to the lock's region to resolve all locks belong to
// the same transaction.
func (lr *LockResolver) ResolveLocks(bo *retry.Backoffer, callerStartTS uint64, locks []*Lock) (int64, error) {
opts := ResolveLocksOptions{
CallerStartTS: callerStartTS,
Expand Down
18 changes: 11 additions & 7 deletions util/codec/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,18 @@ var (

// EncodeBytes guarantees the encoded value is in ascending order for comparison,
// encoding with the following rule:
// [group1][marker1]...[groupN][markerN]
// group is 8 bytes slice which is padding with 0.
// marker is `0xFF - padding 0 count`
//
// [group1][marker1]...[groupN][markerN]
// group is 8 bytes slice which is padding with 0.
// marker is `0xFF - padding 0 count`
//
// For example:
// [] -> [0, 0, 0, 0, 0, 0, 0, 0, 247]
// [1, 2, 3] -> [1, 2, 3, 0, 0, 0, 0, 0, 250]
// [1, 2, 3, 0] -> [1, 2, 3, 0, 0, 0, 0, 0, 251]
// [1, 2, 3, 4, 5, 6, 7, 8] -> [1, 2, 3, 4, 5, 6, 7, 8, 255, 0, 0, 0, 0, 0, 0, 0, 0, 247]
//
// [] -> [0, 0, 0, 0, 0, 0, 0, 0, 247]
// [1, 2, 3] -> [1, 2, 3, 0, 0, 0, 0, 0, 250]
// [1, 2, 3, 0] -> [1, 2, 3, 0, 0, 0, 0, 0, 251]
// [1, 2, 3, 4, 5, 6, 7, 8] -> [1, 2, 3, 4, 5, 6, 7, 8, 255, 0, 0, 0, 0, 0, 0, 0, 0, 247]
//
// Refer: https://github.com/facebook/mysql-5.6/wiki/MyRocks-record-format#memcomparable-format
func EncodeBytes(b []byte, data []byte) []byte {
// Allocate more space to avoid unnecessary slice growing.
Expand Down
Loading

0 comments on commit 884157d

Please sign in to comment.