Skip to content

Commit

Permalink
link transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
michel-laterman committed Jan 31, 2024
1 parent eeb92c1 commit 16d1fb6
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 3 deletions.
1 change: 1 addition & 0 deletions internal/pkg/bulk/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type Bulk interface {
Search(ctx context.Context, index string, body []byte, opts ...Opt) (*es.ResultT, error)
HasTracer() bool
StartTransaction(name, transactionType string) *apm.Transaction
StartTransactionOptions(name, transactionType string, opts apm.TransactionOptions) *apm.Transaction

// Multi Operation API's run in the bulk engine
MCreate(ctx context.Context, ops []MultiOp, opts ...Opt) ([]BulkIndexerResponseItem, error)
Expand Down
4 changes: 4 additions & 0 deletions internal/pkg/bulk/opBulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,3 +300,7 @@ func (b *Bulker) HasTracer() bool {
func (b *Bulker) StartTransaction(name, transactionType string) *apm.Transaction {
return b.tracer.StartTransaction(name, transactionType)
}

func (b *Bulker) StartTransactionOptions(name, transactionType string, opts apm.TransactionOptions) *apm.Transaction {
return b.tracer.StartTransactionOptions(name, transactionType, opts)
}
13 changes: 10 additions & 3 deletions internal/pkg/policy/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ LOOP:

m.dispatchPending(iCtx)
endTrans(trans)
case hits := <-s.Output():
case hits := <-s.Output(): // TODO would be nice to attach transaction IDs to hits, but would likely need a bigger refactor.
if m.bulker.HasTracer() {
trans = m.bulker.StartTransaction("output policies", "policy_monitor")
iCtx = apm.ContextWithTransaction(ctx, trans)
Expand Down Expand Up @@ -282,8 +282,12 @@ func (m *monitorT) loadPolicies(ctx context.Context) error {
span, ctx := apm.StartSpan(ctx, "Load policies", "load")
defer span.End()

if m.bulker.HasTracer() { // TODO link bulker transaction with the policy_monitor one
trans := m.bulker.StartTransaction("Load policies", "bulker")
if m.bulker.HasTracer() {
tctx := span.TraceContext()
trans := m.bulker.StartTransactionOptions("Load policies", "bulker", apm.TransactionOptions{Links: []apm.SpanLink{{
Trace: tctx.Trace,
Span: tctx.Span,
}}})
ctx = apm.ContextWithTransaction(ctx, trans)
defer trans.End()
}
Expand All @@ -305,6 +309,9 @@ func (m *monitorT) loadPolicies(ctx context.Context) error {
}

func (m *monitorT) processPolicies(ctx context.Context, policies []model.Policy) error {
span, ctx := apm.StartSpan(ctx, "process policies", "process")
defer span.End()

if len(policies) == 0 {
return nil
}
Expand Down
4 changes: 4 additions & 0 deletions internal/pkg/testing/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,8 @@ func (m *MockBulk) StartTransaction(name, transactionType string) *apm.Transacti
return nil
}

func (m *MockBulk) StartTransactionOptions(name, transactionType string, opts apm.TransactionOptions) *apm.Transaction {
return nil
}

var _ bulk.Bulk = (*MockBulk)(nil)

0 comments on commit 16d1fb6

Please sign in to comment.