Skip to content

Commit

Permalink
address comment & add test for pipelined txn
Browse files Browse the repository at this point in the history
Signed-off-by: you06 <[email protected]>
  • Loading branch information
you06 committed Jan 28, 2025
1 parent bc4949d commit 974a320
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 16 deletions.
1 change: 1 addition & 0 deletions integration_tests/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2772,4 +2772,5 @@ func (s *testCommitterSuite) Test2PCCleanupLifecycleHooks() {

func (s *testCommitterSuite) Test2PCUpdateLatestCommitInf() {
testUpdateLatestCommitInfo(s.Require(), s.store, "2pc")
testUpdateLatestCommitInfo(s.Require(), s.store, "pipelined")
}
26 changes: 18 additions & 8 deletions integration_tests/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,14 +198,18 @@ func (s *testStoreSuite) TestFailBusyServerKV() {

func testUpdateLatestCommitInfo(require *require.Assertions, store tikv.StoreProbe, mode string) {
doTxn := func() *util.CommitInfo {
txn, err := store.Begin()
var ops []tikv.TxnOption
if mode == "pipelined" {
ops = append(ops, tikv.WithPipelinedMemDB())
}
txn, err := store.Begin(ops...)
require.Nil(err)
switch mode {
case "async":
txn.SetEnableAsyncCommit(true)
case "1pc":
txn.SetEnable1PC(true)
case "2pc":
case "2pc", "pipelined":
// do nothing
default:
require.FailNow("unknown mode:" + mode)
Expand All @@ -217,10 +221,16 @@ func testUpdateLatestCommitInfo(require *require.Assertions, store tikv.StorePro
return txn.GetCommitter().GetCommitInfo()
}

txnSize := 8
mutationLen := 1
if mode == "pipelined" {
txnSize = 0
mutationLen = 0
}
commitInfo1 := doTxn()
require.Equal(commitInfo1, store.GetLastCommitInfo())
require.Equal(commitInfo1.MutationLen, 1)
require.Equal(commitInfo1.TxnSize, 8)
require.Equal(commitInfo1.MutationLen, mutationLen)
require.Equal(commitInfo1.TxnSize, txnSize)
require.Equal(commitInfo1.TxnType, mode)
commitInfo2 := doTxn()
lastInfo := store.GetLastCommitInfo()
Expand All @@ -229,13 +239,13 @@ func testUpdateLatestCommitInfo(require *require.Assertions, store tikv.StorePro
require.Greater(lastInfo.CommitTS, commitInfo1.CommitTS)
require.GreaterOrEqual(lastInfo.StartTS, commitInfo1.CommitTS)

errMsg := fmt.Sprintf("Verified ts: %d, LastCommit: TxnType: %s, StartTS: %d, CommitTS: %d, MutationLen: 1, TxnSize: 8, Primary: [107 101 121]",
lastInfo.StartTS, mode, lastInfo.StartTS, lastInfo.CommitTS)
errMsg := fmt.Sprintf("Verified ts: %d, LastCommit: TxnType: %s, StartTS: %d, CommitTS: %d, MutationLen: %d, TxnSize: %d, Primary: [107 101 121]",
lastInfo.StartTS, mode, lastInfo.StartTS, lastInfo.CommitTS, mutationLen, txnSize)
require.PanicsWithValue(errMsg, func() {
lastInfo.Verify(lastInfo.StartTS)
})
errMsg = fmt.Sprintf("Verified ts: %d, LastCommit: TxnType: %s, StartTS: %d, CommitTS: %d, MutationLen: 1, TxnSize: 8, Primary: [107 101 121]",
lastInfo.CommitTS-1, mode, lastInfo.StartTS, lastInfo.CommitTS)
errMsg = fmt.Sprintf("Verified ts: %d, LastCommit: TxnType: %s, StartTS: %d, CommitTS: %d, MutationLen: %d, TxnSize: %d, Primary: [107 101 121]",
lastInfo.CommitTS-1, mode, lastInfo.StartTS, lastInfo.CommitTS, mutationLen, txnSize)
require.PanicsWithValue(errMsg, func() {
lastInfo.Verify(lastInfo.CommitTS - 1)
})
Expand Down
41 changes: 41 additions & 0 deletions internal/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,40 @@ func (h kvHandler) handleSplitRegion(req *kvrpcpb.SplitRegionRequest) *kvrpcpb.S
return resp
}

func (h kvHandler) handleKvFlush(req *kvrpcpb.FlushRequest) *kvrpcpb.FlushResponse {
regionID := req.Context.RegionId
prewriteReq := &kvrpcpb.PrewriteRequest{
Context: req.Context,
Mutations: req.Mutations,
PrimaryLock: req.PrimaryKey,
StartVersion: req.StartTs,
MinCommitTs: req.MinCommitTs,
LockTtl: req.LockTtl,
AssertionLevel: req.AssertionLevel,
}

h.cluster.handleDelay(prewriteReq.StartVersion, regionID)

for _, m := range req.Mutations {
if !h.checkKeyInRegion(m.Key) {
panic("KvPrewrite: key not in region")
}
}
errs := h.mvccStore.Prewrite(prewriteReq)
for i, e := range errs {
if e != nil {
if _, isLocked := errors.Cause(e).(*ErrLocked); !isLocked {
// Keep only one error if it's not a KeyIsLocked error.
errs = errs[i : i+1]
break
}
}
}
return &kvrpcpb.FlushResponse{
Errors: convertToKeyErrors(errs),
}
}

// Client is a client that sends RPC.
// This is same with tikv.Client, define again for avoid circle import.
type Client interface {
Expand Down Expand Up @@ -1070,6 +1104,13 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
Name: "mvcc.num_rows",
Value: strconv.Itoa(len(scanResp.Pairs)),
}}}
case tikvrpc.CmdFlush:
r := req.Flush()
if err := session.checkRequest(reqCtx, r.Size()); err != nil {
resp.Resp = &kvrpcpb.PrewriteResponse{RegionError: err}
return resp, nil
}
resp.Resp = kvHandler{session}.handleKvFlush(r)
default:
return nil, errors.Errorf("unsupported this request type %v", req.Type)
}
Expand Down
10 changes: 5 additions & 5 deletions oracle/oracles/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,9 @@ func (o *pdOracle) GetAllTSOKeyspaceGroupMinTS(ctx context.Context) (uint64, err

type tsFuture struct {
tso.TSFuture
o *pdOracle
txnScope string
commitInfo *util.CommitInfo
o *pdOracle
txnScope string
lastCommitInfo *util.CommitInfo
}

// Wait implements the oracle.Future interface.
Expand All @@ -249,8 +249,8 @@ func (f *tsFuture) Wait() (uint64, error) {
return 0, errors.WithStack(err)
}
ts := oracle.ComposeTS(physical, logical)
if f.commitInfo != nil {
f.commitInfo.Verify(ts)
if f.lastCommitInfo != nil {
f.lastCommitInfo.Verify(ts)
}
f.o.setLastTS(ts, f.txnScope)
return ts, nil
Expand Down
15 changes: 12 additions & 3 deletions txnkv/transaction/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,9 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *retry.Backoffer, action

// Already spawned a goroutine for async commit transaction.
if actionIsCommit && !actionCommit.retry && !c.isAsyncCommit() {
if !c.txn.IsPipelined() {
c.updateStoreCommitInfo()
}
secondaryBo := retry.NewBackofferWithVars(c.store.Ctx(), CommitSecondaryMaxBackoff, c.txn.vars)
if c.store.IsClose() {
logutil.Logger(bo.GetCtx()).Warn("the store is closed",
Expand Down Expand Up @@ -1905,7 +1908,6 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
}
}
atomic.StoreUint64(&c.commitTS, commitTS)
c.updateStoreCommitInfo()

if c.store.GetOracle().IsExpired(c.startTS, MaxTxnTimeUse, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) {
err = errors.Errorf("session %d txn takes too much time, txnStartTS: %d, comm: %d",
Expand Down Expand Up @@ -1958,6 +1960,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
if c.isAsyncCommit() {
// For async commit protocol, the commit is considered success here.
c.txn.commitTS = c.commitTS
c.updateStoreCommitInfo()
logutil.Logger(ctx).Debug("2PC will use async commit protocol to commit this txn",
zap.Uint64("startTS", c.startTS), zap.Uint64("commitTS", c.commitTS),
zap.Uint64("sessionID", c.sessionID))
Expand Down Expand Up @@ -2383,18 +2386,24 @@ func (c *twoPhaseCommitter) updateStoreCommitInfo() {

func (c *twoPhaseCommitter) getCommitInfo() *util.CommitInfo {
var txnType string
if c.isAsyncCommit() {
if c.txn.isPipelined {
txnType = "pipelined"
} else if c.isAsyncCommit() {
txnType = "async"
} else if c.isOnePC() {
txnType = "1pc"
} else {
txnType = "2pc"
}
var mutationLen int
if !c.txn.isPipelined {
mutationLen = c.mutations.Len()
}
return &util.CommitInfo{
TxnType: txnType,
StartTS: c.startTS,
CommitTS: atomic.LoadUint64(&c.commitTS),
MutationLen: c.mutations.Len(),
MutationLen: mutationLen,
TxnSize: c.txnSize,
Primary: c.primaryKey,
}
Expand Down
1 change: 1 addition & 0 deletions txnkv/transaction/pipelined_flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ func (c *twoPhaseCommitter) commitFlushedMutations(bo *retry.Backoffer) error {
zap.Uint64("startTS", c.startTS),
zap.Uint64("commitTS", commitTS),
)
c.updateStoreCommitInfo()
broadcastToAllStores(
c.txn,
c.store,
Expand Down

0 comments on commit 974a320

Please sign in to comment.