Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(chdump): add OTLP logs ingester #567

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 4 additions & 72 deletions cmd/otelbench/chdump/chdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,46 +2,24 @@
package chdump

import (
"archive/tar"
"io"
"io/fs"
"os"
"path"
"path/filepath"
"strings"

"github.com/ClickHouse/ch-go/proto"
"github.com/go-faster/errors"
"github.com/klauspost/compress/zstd"
)

// ReadOptions defines options for [Read].
type ReadOptions struct {
// ConsumeOptions defines options for [Consume].
type ConsumeOptions struct {
OnTraces func(*Spans) error
OnPoints func(*Points) error
OnExpHistograms func(*ExpHistograms) error
OnLogs func(*Logs) error
}

// ReadTar reads dump from given tar file.
func ReadTar(r io.Reader, opts ReadOptions) error {
tr := tar.NewReader(r)
iter := &tarIter{tr: tr}
return readDump(iter, opts)
}

// ReadDir reads dump from unpacked directory.
func ReadDir(dir string, opts ReadOptions) error {
entries, err := os.ReadDir(dir)
if err != nil {
return errors.Wrap(err, "read dir")
}
iter := &dirIter{dir: dir, entries: entries}
return readDump(iter, opts)
}

// readDump reads dump from given tar file.
func readDump(iter dumpIter, opts ReadOptions) error {
// Consume reads dump and calls callbacks.
func Consume(iter TableReader, opts ConsumeOptions) error {
for {
name, file, err := iter.Next()
if err != nil {
Expand Down Expand Up @@ -74,52 +52,6 @@ func readDump(iter dumpIter, opts ReadOptions) error {
}
}

type dumpIter interface {
Next() (string, io.ReadCloser, error)
}

type tarIter struct {
tr *tar.Reader
}

func (t *tarIter) Next() (string, io.ReadCloser, error) {
h, err := t.tr.Next()
if err != nil {
return "", nil, err
}
name := path.Clean(h.Name)
return name, io.NopCloser(t.tr), nil
}

type dirIter struct {
dir string
entries []fs.DirEntry
idx int
}

func (d *dirIter) Next() (string, io.ReadCloser, error) {
for {
if d.idx >= len(d.entries) {
return "", nil, io.EOF
}

entry := d.entries[d.idx]
if entry.IsDir() {
d.idx++
continue
}

f, err := os.Open(filepath.Join(d.dir, entry.Name()))
if err != nil {
return "", nil, err
}
name := filepath.Clean(entry.Name())

d.idx++
return name, f, nil
}
}

const protoVersion = 51902

func readBlock[
Expand Down
56 changes: 56 additions & 0 deletions cmd/otelbench/chdump/ingester.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package chdump

import (
"context"

"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/plog/plogotlp"
"golang.org/x/sync/errgroup"
)

// IngestLogs loads logs from dump and sends them to the collector.
type IngestLogs struct {
// Workers is the number of workers to use.
//
// Defaults to 1.
Workers int
}

// Run ingests logs.
func (lg IngestLogs) Run(ctx context.Context, client plogotlp.GRPCClient, tr TableReader) error {
var (
workers = max(lg.Workers, 1)
batcnCh = make(chan plog.Logs, workers)
)
grp, grpCtx := errgroup.WithContext(ctx)
grp.Go(func() error {
defer close(batcnCh)
return Consume(tr, ConsumeOptions{
OnLogs: func(t *Logs) error {
ctx := grpCtx

batch := plog.NewLogs()
t.ToOTLP(batch)

select {
case <-ctx.Done():
return ctx.Err()
case batcnCh <- batch:
return nil
}
},
})
})
for range workers {
grp.Go(func() error {
ctx := grpCtx
for batch := range batcnCh {
if _, err := client.Export(ctx, plogotlp.NewExportRequestFromLogs(batch)); err != nil {
return err
}
}
return nil
})
}
return grp.Wait()
}
77 changes: 77 additions & 0 deletions cmd/otelbench/chdump/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"slices"

"github.com/ClickHouse/ch-go/proto"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"

"github.com/go-faster/oteldb/internal/otelstorage"
)
Expand Down Expand Up @@ -75,6 +77,81 @@ func (c *Logs) Reset() {
}
}

// ToOTLP appends data from [Logs] to given batch.
func (c *Logs) ToOTLP(batch plog.Logs) {
resMap := map[otelstorage.Hash]plog.ResourceLogs{}
resLogs := batch.ResourceLogs()
for i := range resLogs.Len() {
resLog := resLogs.At(i)
attrs := otelstorage.Attrs(resLog.Resource().Attributes())
resMap[attrs.Hash()] = resLog
}

getResLog := func(resourceAttrs otelstorage.Attrs) plog.ResourceLogs {
hash := resourceAttrs.Hash()

resLog, ok := resMap[hash]
if !ok {
resLog = resLogs.AppendEmpty()
resource := resLog.Resource()
resourceAttrs.AsMap().CopyTo(resource.Attributes())

resMap[hash] = resLog
}
return resLog
}
getScopeLog := func(resLog plog.ResourceLogs, scopeAttrs otelstorage.Attrs, scopeName, scopeVersion string) plog.ScopeLogs {
scopeLogs := resLog.ScopeLogs()
scopeAttrsHash := scopeAttrs.Hash()

for i := range scopeLogs.Len() {
scopeLog := scopeLogs.At(i)
scope := scopeLog.Scope()
if scope.Name() == scopeName &&
scope.Version() == scopeVersion &&
otelstorage.Attrs(scope.Attributes()).Hash() == scopeAttrsHash {
return scopeLog
}
}
scopeLog := scopeLogs.AppendEmpty()

scope := scopeLog.Scope()
scope.SetName(scopeName)
scope.SetVersion(scopeVersion)
scopeAttrs.AsMap().CopyTo(scope.Attributes())

return scopeLog
}

for row := range c.Body.Rows() {
timestamp := c.Timestamp.Row(row)
severityText := c.SeverityText.Row(row)
severityNumber := c.SeverityNumber.Row(row)
traceFlags := c.TraceFlags.Row(row)
traceID := c.TraceID.Row(row)
spanID := c.SpanID.Row(row)
body := c.Body.Row(row)
attributes := c.Attributes.Row(row)
resource := c.Resource.Row(row)
scope := c.Scope.Row(row)
scopeName := c.ScopeName.Row(row)
scopeVersion := c.ScopeVersion.Row(row)

resLog := getResLog(resource)
scopeLog := getScopeLog(resLog, scope, scopeName, scopeVersion)
record := scopeLog.LogRecords().AppendEmpty()

record.SetTimestamp(otelstorage.NewTimestampFromTime(timestamp))
record.SetSeverityText(severityText)
record.SetSeverityNumber(plog.SeverityNumber(severityNumber))
record.SetFlags(plog.LogRecordFlags(traceFlags))
record.SetTraceID(pcommon.TraceID(traceID))
record.SetSpanID(pcommon.SpanID(spanID))
record.Body().SetStr(body)
attributes.CopyTo(record.Attributes())
}
}

func (c *Logs) columns() iter.Seq[proto.ResultColumn] {
return func(yield func(proto.ResultColumn) bool) {
for _, col := range []proto.ResultColumn{
Expand Down
76 changes: 76 additions & 0 deletions cmd/otelbench/chdump/table_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package chdump

import (
"archive/tar"
"io"
"io/fs"
"os"
"path"
"path/filepath"

"github.com/go-faster/errors"
)

// ReadTar reads dump from given tar file.
func ReadTar(r io.Reader) (TableReader, error) {
tr := tar.NewReader(r)
iter := &tarReader{tr: tr}
return iter, nil
}

// ReadDir reads dump from unpacked directory.
func ReadDir(dir string) (TableReader, error) {
entries, err := os.ReadDir(dir)
if err != nil {
return nil, errors.Wrap(err, "read dir")
}
iter := &dirReader{dir: dir, entries: entries}
return iter, nil
}

// TableReader is an interface to read tables from dump.
type TableReader interface {
Next() (string, io.ReadCloser, error)
}

type tarReader struct {
tr *tar.Reader
}

func (t *tarReader) Next() (string, io.ReadCloser, error) {
h, err := t.tr.Next()
if err != nil {
return "", nil, err
}
name := path.Clean(h.Name)
return name, io.NopCloser(t.tr), nil
}

type dirReader struct {
dir string
entries []fs.DirEntry
idx int
}

func (d *dirReader) Next() (string, io.ReadCloser, error) {
for {
if d.idx >= len(d.entries) {
return "", nil, io.EOF
}

entry := d.entries[d.idx]
if entry.IsDir() {
d.idx++
continue
}

f, err := os.Open(filepath.Join(d.dir, entry.Name()))
if err != nil {
return "", nil, err
}
name := filepath.Clean(entry.Name())

d.idx++
return name, f, nil
}
}
Loading