Skip to content

Commit

Permalink
Merge pull request #1403 Traces and logs for read messages within tra…
Browse files Browse the repository at this point in the history
…nsaction
  • Loading branch information
rekby authored Aug 9, 2024
2 parents e51dc72 + df037b5 commit 4dc77d6
Show file tree
Hide file tree
Showing 7 changed files with 768 additions and 38 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* Added traces and logs for read messages from topic within transaction
* Changed result type of `query.Session.NodeID()` from `int64` to `uint32` for compatibility with table session and discovery
* Removed experimental method `query.Result.Err()`
* Added the finishing reading the grpc stream on `query.Result.Close()` call
Expand Down
150 changes: 113 additions & 37 deletions internal/topic/topicreaderinternal/stream_reader_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,21 @@ func (r *topicStreamReaderImpl) PopBatchTx(
ctx context.Context,
tx tx.Transaction,
opts ReadMessageBatchOptions,
) (*topicreadercommon.PublicBatch, error) {
) (_ *topicreadercommon.PublicBatch, resErr error) {
traceCtx := ctx
onDone := trace.TopicOnReaderStreamPopBatchTx(
r.cfg.Trace,
&traceCtx,
r.readerID,
r.readConnectionID,
tx.SessionID(),
tx.ID(),
)
ctx = traceCtx
defer func() {
onDone(resErr)
}()

batch, err := r.ReadMessageBatch(ctx, opts)
if err != nil {
return nil, err
Expand All @@ -209,49 +223,45 @@ func (r *topicStreamReaderImpl) commitWithTransaction(
tx tx.Transaction,
batch *topicreadercommon.PublicBatch,
) error {
commitRange := topicreadercommon.GetCommitRange(batch)
req := r.createUpdateOffsetRequest(ctx, batch, tx)

updateOffesetInTransactionErr := retry.Retry(ctx, func(ctx context.Context) (err error) {
err = r.topicClient.UpdateOffsetsInTransaction(ctx, &rawtopic.UpdateOffsetsInTransactionRequest{
OperationParams: rawydb.NewRawOperationParamsFromProto(operation.Params(ctx, 0, 0, operation.ModeSync)),
Tx: rawtopic.UpdateOffsetsInTransactionRequest_TransactionIdentity{
ID: tx.ID(),
Session: tx.SessionID(),
},
Topics: []rawtopic.UpdateOffsetsInTransactionRequest_TopicOffsets{
{
Path: batch.Topic(),
Partitions: []rawtopic.UpdateOffsetsInTransactionRequest_PartitionOffsets{
{
PartitionID: batch.PartitionID(),
PartitionOffsets: []rawtopiccommon.OffsetRange{
{
Start: commitRange.CommitOffsetStart,
End: commitRange.CommitOffsetEnd,
},
},
},
},
},
},
Consumer: r.cfg.Consumer,
})
traceCtx := ctx
onDone := trace.TopicOnReaderUpdateOffsetsInTransaction(
r.cfg.Trace,
&traceCtx,
r.readerID,
r.readConnectionID,
tx.SessionID(),
tx.ID(),
)
defer func() {
onDone(err)
}()

ctx = traceCtx
err = r.topicClient.UpdateOffsetsInTransaction(ctx, req)

return err
})
if updateOffesetInTransactionErr == nil {
tx.OnCompleted(func(transactionResult error) {
//nolint:godox
// TODO: trace
if transactionResult == nil {
topicreadercommon.BatchGetPartitionSession(batch).SetCommittedOffsetForward(commitRange.CommitOffsetEnd)
} else {
_ = r.CloseWithError(xcontext.ValueOnly(ctx), xerrors.WithStackTrace(xerrors.RetryableError(
fmt.Errorf("ydb: failed batch commit because transaction doesn't committed: %w", updateOffesetInTransactionErr),
)))
}
})
r.addOnTransactionCompletedHandler(ctx, tx, batch, updateOffesetInTransactionErr)
} else {
_ = retry.Retry(ctx, func(ctx context.Context) (err error) {
traceCtx := ctx
onDone := trace.TopicOnReaderTransactionRollback(
r.cfg.Trace,
&traceCtx,
r.readerID,
r.readConnectionID,
tx.SessionID(),
tx.ID(),
)
ctx = traceCtx
defer func() {
onDone(err)
}()

return tx.Rollback(ctx)
})

Expand All @@ -265,6 +275,72 @@ func (r *topicStreamReaderImpl) commitWithTransaction(
return nil
}

func (r *topicStreamReaderImpl) addOnTransactionCompletedHandler(
ctx context.Context,
tx tx.Transaction,
batch *topicreadercommon.PublicBatch,
updateOffesetInTransactionErr error,
) {
commitRange := topicreadercommon.GetCommitRange(batch)
tx.OnCompleted(func(transactionResult error) {
traceCtx := ctx
onDone := trace.TopicOnReaderTransactionCompleted(
r.cfg.Trace,
&traceCtx,
r.readerID,
r.readConnectionID,
tx.SessionID(),
tx.ID(),
transactionResult,
)
defer onDone()

ctx = traceCtx
//nolint:godox
// TODO: trace
if transactionResult == nil {
topicreadercommon.BatchGetPartitionSession(batch).SetCommittedOffsetForward(commitRange.CommitOffsetEnd)
} else {
_ = r.CloseWithError(xcontext.ValueOnly(ctx), xerrors.WithStackTrace(xerrors.RetryableError(
fmt.Errorf("ydb: failed batch commit because transaction doesn't committed: %w", updateOffesetInTransactionErr),
)))
}
})
}

func (r *topicStreamReaderImpl) createUpdateOffsetRequest(
ctx context.Context,
batch *topicreadercommon.PublicBatch,
tx tx.Transaction,
) *rawtopic.UpdateOffsetsInTransactionRequest {
commitRange := topicreadercommon.GetCommitRange(batch)

return &rawtopic.UpdateOffsetsInTransactionRequest{
OperationParams: rawydb.NewRawOperationParamsFromProto(operation.Params(ctx, 0, 0, operation.ModeSync)),
Tx: rawtopic.UpdateOffsetsInTransactionRequest_TransactionIdentity{
ID: tx.ID(),
Session: tx.SessionID(),
},
Topics: []rawtopic.UpdateOffsetsInTransactionRequest_TopicOffsets{
{
Path: batch.Topic(),
Partitions: []rawtopic.UpdateOffsetsInTransactionRequest_PartitionOffsets{
{
PartitionID: batch.PartitionID(),
PartitionOffsets: []rawtopiccommon.OffsetRange{
{
Start: commitRange.CommitOffsetStart,
End: commitRange.CommitOffsetEnd,
},
},
},
},
},
},
Consumer: r.cfg.Consumer,
}
}

func (r *topicStreamReaderImpl) ReadMessageBatch(
ctx context.Context,
opts ReadMessageBatchOptions,
Expand Down
196 changes: 196 additions & 0 deletions log/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,202 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
)
}

t.OnReaderPopBatchTx = func(
startInfo trace.TopicReaderPopBatchTxStartInfo,
) func(trace.TopicReaderPopBatchTxDoneInfo) {
if d.Details()&trace.TopicReaderCustomerEvents == 0 {
return nil
}

start := time.Now()
ctx := with(*startInfo.Context, TRACE, "ydb", "topic", "reader", "customer", "popbatchtx")
l.Log(WithLevel(ctx, TRACE), "starting pop batch tx",
Int64("reader_id", startInfo.ReaderID),
String("transaction_session_id", startInfo.TransactionSessionID),
String("transaction_id", startInfo.TransactionID),
)

return func(doneInfo trace.TopicReaderPopBatchTxDoneInfo) {
if doneInfo.Error == nil {
l.Log(
WithLevel(ctx, DEBUG), "pop batch done",
Int64("reader_id", startInfo.ReaderID),
String("transaction_session_id", startInfo.TransactionSessionID),
String("transaction_id", startInfo.TransactionID),
Int("messaged_count", doneInfo.MessagesCount),
Int64("start_offset", doneInfo.StartOffset),
Int64("end_offset", doneInfo.EndOffset),
latencyField(start),
versionField(),
)
} else {
l.Log(
WithLevel(ctx, WARN), "pop batch failed",
Int64("reader_id", startInfo.ReaderID),
String("transaction_session_id", startInfo.TransactionSessionID),
String("transaction_id", startInfo.TransactionID),
Error(doneInfo.Error),
latencyField(start),
versionField(),
)
}
}
}

t.OnReaderStreamPopBatchTx = func(
startInfo trace.TopicReaderStreamPopBatchTxStartInfo,
) func(
trace.TopicReaderStreamPopBatchTxDoneInfo,
) {
if d.Details()&trace.TopicReaderTransactionEvents == 0 {
return nil
}

start := time.Now()
ctx := with(*startInfo.Context, TRACE, "ydb", "topic", "reader", "transaction", "popbatchtx_on_stream")
l.Log(WithLevel(ctx, TRACE), "starting pop batch tx",
Int64("reader_id", startInfo.ReaderID),
String("reader_connection_id", startInfo.ReaderConnectionID),
String("transaction_session_id", startInfo.TransactionSessionID),
String("transaction_id", startInfo.TransactionID),
versionField(),
)

return func(doneInfo trace.TopicReaderStreamPopBatchTxDoneInfo) {
if doneInfo.Error == nil {
l.Log(
WithLevel(ctx, DEBUG), "pop batch on stream done",
Int64("reader_id", startInfo.ReaderID),
String("transaction_session_id", startInfo.TransactionSessionID),
String("transaction_id", startInfo.TransactionID),
latencyField(start),
versionField(),
)
} else {
l.Log(
WithLevel(ctx, WARN), "pop batch on stream failed",
Int64("reader_id", startInfo.ReaderID),
String("transaction_session_id", startInfo.TransactionSessionID),
String("transaction_id", startInfo.TransactionID),
Error(doneInfo.Error),
latencyField(start),
versionField(),
)
}
}
}

t.OnReaderUpdateOffsetsInTransaction = func(
startInfo trace.TopicReaderOnUpdateOffsetsInTransactionStartInfo,
) func(
trace.TopicReaderOnUpdateOffsetsInTransactionDoneInfo,
) {
if d.Details()&trace.TopicReaderTransactionEvents == 0 {
return nil
}

start := time.Now()
ctx := with(*startInfo.Context, TRACE, "ydb", "topic", "reader", "transaction", "update_offsets")
l.Log(WithLevel(ctx, TRACE), "starting update offsets in transaction",
Int64("reader_id", startInfo.ReaderID),
String("reader_connection_id", startInfo.ReaderConnectionID),
String("transaction_session_id", startInfo.TransactionSessionID),
String("transaction_id", startInfo.TransactionID),
versionField(),
)

return func(doneInfo trace.TopicReaderOnUpdateOffsetsInTransactionDoneInfo) {
if doneInfo.Error == nil {
l.Log(
WithLevel(ctx, DEBUG), "pop batch on stream done",
Int64("reader_id", startInfo.ReaderID),
String("transaction_session_id", startInfo.TransactionSessionID),
String("transaction_id", startInfo.TransactionID),
latencyField(start),
versionField(),
)
} else {
l.Log(
WithLevel(ctx, WARN), "pop batch on stream failed",
Int64("reader_id", startInfo.ReaderID),
String("transaction_session_id", startInfo.TransactionSessionID),
String("transaction_id", startInfo.TransactionID),
Error(doneInfo.Error),
latencyField(start),
versionField(),
)
}
}
}

t.OnReaderTransactionRollback = func(
startInfo trace.TopicReaderTransactionRollbackStartInfo,
) func(
trace.TopicReaderTransactionRollbackDoneInfo,
) {
if d.Details()&trace.TopicReaderTransactionEvents == 0 {
return nil
}

start := time.Now()
ctx := with(*startInfo.Context, TRACE, "ydb", "topic", "reader", "transaction", "update_offsets")
l.Log(WithLevel(ctx, TRACE), "starting update offsets in transaction",
Int64("reader_id", startInfo.ReaderID),
String("reader_connection_id", startInfo.ReaderConnectionID),
String("transaction_session_id", startInfo.TransactionSessionID),
String("transaction_id", startInfo.TransactionID),
versionField(),
)

return func(doneInfo trace.TopicReaderTransactionRollbackDoneInfo) {
if doneInfo.RollbackError == nil {
l.Log(
WithLevel(ctx, DEBUG), "pop batch on stream done",
Int64("reader_id", startInfo.ReaderID),
String("transaction_session_id", startInfo.TransactionSessionID),
String("transaction_id", startInfo.TransactionID),
latencyField(start),
versionField(),
)
} else {
l.Log(
WithLevel(ctx, WARN), "pop batch on stream failed",
Int64("reader_id", startInfo.ReaderID),
String("transaction_session_id", startInfo.TransactionSessionID),
String("transaction_id", startInfo.TransactionID),
Error(doneInfo.RollbackError),
latencyField(start),
versionField(),
)
}
}
}

t.OnReaderTransactionCompleted = func(
startInfo trace.TopicReaderTransactionCompletedStartInfo,
) func(
trace.TopicReaderTransactionCompletedDoneInfo,
) {
if d.Details()&trace.TopicReaderTransactionEvents == 0 {
return nil
}

// expected as very short in memory operation without errors, no need log start separately
start := time.Now()

return func(doneInfo trace.TopicReaderTransactionCompletedDoneInfo) {
ctx := with(*startInfo.Context, TRACE, "ydb", "topic", "reader", "transaction", "update_offsets")
l.Log(WithLevel(ctx, TRACE), "starting update offsets in transaction",
Int64("reader_id", startInfo.ReaderID),
String("reader_connection_id", startInfo.ReaderConnectionID),
String("transaction_session_id", startInfo.TransactionSessionID),
String("transaction_id", startInfo.TransactionID),
latencyField(start),
versionField(),
)
}
}

///
/// Topic writer
///
Expand Down
Loading

0 comments on commit 4dc77d6

Please sign in to comment.