diff --git a/integration_tests/2pc_test.go b/integration_tests/2pc_test.go index 82511c66dd..5617607218 100644 --- a/integration_tests/2pc_test.go +++ b/integration_tests/2pc_test.go @@ -2772,4 +2772,5 @@ func (s *testCommitterSuite) Test2PCCleanupLifecycleHooks() { func (s *testCommitterSuite) Test2PCUpdateLatestCommitInf() { testUpdateLatestCommitInfo(s.Require(), s.store, "2pc") + testUpdateLatestCommitInfo(s.Require(), s.store, "pipelined") } diff --git a/integration_tests/store_test.go b/integration_tests/store_test.go index f46d2f8961..a2628f0fcd 100644 --- a/integration_tests/store_test.go +++ b/integration_tests/store_test.go @@ -198,14 +198,18 @@ func (s *testStoreSuite) TestFailBusyServerKV() { func testUpdateLatestCommitInfo(require *require.Assertions, store tikv.StoreProbe, mode string) { doTxn := func() *util.CommitInfo { - txn, err := store.Begin() + var ops []tikv.TxnOption + if mode == "pipelined" { + ops = append(ops, tikv.WithPipelinedMemDB()) + } + txn, err := store.Begin(ops...) require.Nil(err) switch mode { case "async": txn.SetEnableAsyncCommit(true) case "1pc": txn.SetEnable1PC(true) - case "2pc": + case "2pc", "pipelined": // do nothing default: require.FailNow("unknown mode:" + mode) @@ -217,10 +221,16 @@ func testUpdateLatestCommitInfo(require *require.Assertions, store tikv.StorePro return txn.GetCommitter().GetCommitInfo() } + txnSize := 8 + mutationLen := 1 + if mode == "pipelined" { + txnSize = 0 + mutationLen = 0 + } commitInfo1 := doTxn() require.Equal(commitInfo1, store.GetLastCommitInfo()) - require.Equal(commitInfo1.MutationLen, 1) - require.Equal(commitInfo1.TxnSize, 8) + require.Equal(commitInfo1.MutationLen, mutationLen) + require.Equal(commitInfo1.TxnSize, txnSize) require.Equal(commitInfo1.TxnType, mode) commitInfo2 := doTxn() lastInfo := store.GetLastCommitInfo() @@ -229,13 +239,13 @@ func testUpdateLatestCommitInfo(require *require.Assertions, store tikv.StorePro require.Greater(lastInfo.CommitTS, commitInfo1.CommitTS) require.GreaterOrEqual(lastInfo.StartTS, commitInfo1.CommitTS) - errMsg := fmt.Sprintf("Verified ts: %d, LastCommit: TxnType: %s, StartTS: %d, CommitTS: %d, MutationLen: 1, TxnSize: 8, Primary: [107 101 121]", - lastInfo.StartTS, mode, lastInfo.StartTS, lastInfo.CommitTS) + errMsg := fmt.Sprintf("Verified ts: %d, LastCommit: TxnType: %s, StartTS: %d, CommitTS: %d, MutationLen: %d, TxnSize: %d, Primary: [107 101 121]", + lastInfo.StartTS, mode, lastInfo.StartTS, lastInfo.CommitTS, mutationLen, txnSize) require.PanicsWithValue(errMsg, func() { lastInfo.Verify(lastInfo.StartTS) }) - errMsg = fmt.Sprintf("Verified ts: %d, LastCommit: TxnType: %s, StartTS: %d, CommitTS: %d, MutationLen: 1, TxnSize: 8, Primary: [107 101 121]", - lastInfo.CommitTS-1, mode, lastInfo.StartTS, lastInfo.CommitTS) + errMsg = fmt.Sprintf("Verified ts: %d, LastCommit: TxnType: %s, StartTS: %d, CommitTS: %d, MutationLen: %d, TxnSize: %d, Primary: [107 101 121]", + lastInfo.CommitTS-1, mode, lastInfo.StartTS, lastInfo.CommitTS, mutationLen, txnSize) require.PanicsWithValue(errMsg, func() { lastInfo.Verify(lastInfo.CommitTS - 1) }) diff --git a/internal/mockstore/mocktikv/rpc.go b/internal/mockstore/mocktikv/rpc.go index 1e1ae53419..dbcda642b2 100644 --- a/internal/mockstore/mocktikv/rpc.go +++ b/internal/mockstore/mocktikv/rpc.go @@ -661,6 +661,40 @@ func (h kvHandler) handleSplitRegion(req *kvrpcpb.SplitRegionRequest) *kvrpcpb.S return resp } +func (h kvHandler) handleKvFlush(req *kvrpcpb.FlushRequest) *kvrpcpb.FlushResponse { + regionID := req.Context.RegionId + prewriteReq := &kvrpcpb.PrewriteRequest{ + Context: req.Context, + Mutations: req.Mutations, + PrimaryLock: req.PrimaryKey, + StartVersion: req.StartTs, + MinCommitTs: req.MinCommitTs, + LockTtl: req.LockTtl, + AssertionLevel: req.AssertionLevel, + } + + h.cluster.handleDelay(prewriteReq.StartVersion, regionID) + + for _, m := range req.Mutations { + if !h.checkKeyInRegion(m.Key) { + panic("KvPrewrite: key not in region") + } + } + errs := h.mvccStore.Prewrite(prewriteReq) + for i, e := range errs { + if e != nil { + if _, isLocked := errors.Cause(e).(*ErrLocked); !isLocked { + // Keep only one error if it's not a KeyIsLocked error. + errs = errs[i : i+1] + break + } + } + } + return &kvrpcpb.FlushResponse{ + Errors: convertToKeyErrors(errs), + } +} + // Client is a client that sends RPC. // This is same with tikv.Client, define again for avoid circle import. type Client interface { @@ -1070,6 +1104,13 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R Name: "mvcc.num_rows", Value: strconv.Itoa(len(scanResp.Pairs)), }}} + case tikvrpc.CmdFlush: + r := req.Flush() + if err := session.checkRequest(reqCtx, r.Size()); err != nil { + resp.Resp = &kvrpcpb.PrewriteResponse{RegionError: err} + return resp, nil + } + resp.Resp = kvHandler{session}.handleKvFlush(r) default: return nil, errors.Errorf("unsupported this request type %v", req.Type) } diff --git a/oracle/oracles/pd.go b/oracle/oracles/pd.go index 4d5798af79..a9597e1d1c 100644 --- a/oracle/oracles/pd.go +++ b/oracle/oracles/pd.go @@ -235,9 +235,9 @@ func (o *pdOracle) GetAllTSOKeyspaceGroupMinTS(ctx context.Context) (uint64, err type tsFuture struct { tso.TSFuture - o *pdOracle - txnScope string - commitInfo *util.CommitInfo + o *pdOracle + txnScope string + lastCommitInfo *util.CommitInfo } // Wait implements the oracle.Future interface. @@ -249,8 +249,8 @@ func (f *tsFuture) Wait() (uint64, error) { return 0, errors.WithStack(err) } ts := oracle.ComposeTS(physical, logical) - if f.commitInfo != nil { - f.commitInfo.Verify(ts) + if f.lastCommitInfo != nil { + f.lastCommitInfo.Verify(ts) } f.o.setLastTS(ts, f.txnScope) return ts, nil diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index cacc377b81..4fbcefd5e4 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -1013,6 +1013,9 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *retry.Backoffer, action // Already spawned a goroutine for async commit transaction. if actionIsCommit && !actionCommit.retry && !c.isAsyncCommit() { + if !c.txn.IsPipelined() { + c.updateStoreCommitInfo() + } secondaryBo := retry.NewBackofferWithVars(c.store.Ctx(), CommitSecondaryMaxBackoff, c.txn.vars) if c.store.IsClose() { logutil.Logger(bo.GetCtx()).Warn("the store is closed", @@ -1905,7 +1908,6 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { } } atomic.StoreUint64(&c.commitTS, commitTS) - c.updateStoreCommitInfo() if c.store.GetOracle().IsExpired(c.startTS, MaxTxnTimeUse, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) { err = errors.Errorf("session %d txn takes too much time, txnStartTS: %d, comm: %d", @@ -1958,6 +1960,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { if c.isAsyncCommit() { // For async commit protocol, the commit is considered success here. c.txn.commitTS = c.commitTS + c.updateStoreCommitInfo() logutil.Logger(ctx).Debug("2PC will use async commit protocol to commit this txn", zap.Uint64("startTS", c.startTS), zap.Uint64("commitTS", c.commitTS), zap.Uint64("sessionID", c.sessionID)) @@ -2383,18 +2386,24 @@ func (c *twoPhaseCommitter) updateStoreCommitInfo() { func (c *twoPhaseCommitter) getCommitInfo() *util.CommitInfo { var txnType string - if c.isAsyncCommit() { + if c.txn.isPipelined { + txnType = "pipelined" + } else if c.isAsyncCommit() { txnType = "async" } else if c.isOnePC() { txnType = "1pc" } else { txnType = "2pc" } + var mutationLen int + if !c.txn.isPipelined { + mutationLen = c.mutations.Len() + } return &util.CommitInfo{ TxnType: txnType, StartTS: c.startTS, CommitTS: atomic.LoadUint64(&c.commitTS), - MutationLen: c.mutations.Len(), + MutationLen: mutationLen, TxnSize: c.txnSize, Primary: c.primaryKey, } diff --git a/txnkv/transaction/pipelined_flush.go b/txnkv/transaction/pipelined_flush.go index c745e538a5..bad13de021 100644 --- a/txnkv/transaction/pipelined_flush.go +++ b/txnkv/transaction/pipelined_flush.go @@ -338,6 +338,7 @@ func (c *twoPhaseCommitter) commitFlushedMutations(bo *retry.Backoffer) error { zap.Uint64("startTS", c.startTS), zap.Uint64("commitTS", commitTS), ) + c.updateStoreCommitInfo() broadcastToAllStores( c.txn, c.store,