Skip to content

Commit

Permalink
Add feature flag
Browse files Browse the repository at this point in the history
  • Loading branch information
george pogosyan committed Jan 24, 2025
1 parent 2920715 commit 49ad72a
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 12 deletions.
1 change: 1 addition & 0 deletions e2e/file_es/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pipelines:
endpoints:
- http://localhost:9200
fatal_on_failed_insert: true
split_enabled: true
strict: false
index_format: index_name
retry: 1
Expand Down
13 changes: 8 additions & 5 deletions e2e/file_es/file_es.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"path"
"path/filepath"
"strings"
"testing"
"time"

Expand All @@ -35,10 +36,12 @@ func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string)
input.Set("offsets_file", filepath.Join(offsetsDir, "offsets.yaml"))
}

const (
n = 10
successEvent = `{"field_a":"AAAA","field_b":"BBBB"}`
failEvent = `{"field_a":"AAAA","field_b":"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"}`
const n = 10

var (
successEvent = `{"field_a":"AAA","field_b":"BBB"}`
// see ES config: http.max_content_length=128b
failEvent = fmt.Sprintf(`{"s":"%s"}`, strings.Repeat("#", 128))
)

func (c *Config) Send(t *testing.T) {
Expand All @@ -56,7 +59,7 @@ func (c *Config) Send(t *testing.T) {
err = addEvent(file, failEvent)
require.NoError(t, err)

for i := 0; i < 2*n-1; i++ {
for i := 0; i < 2*n; i++ {
err = addEvent(file, successEvent)
require.NoError(t, err)
}
Expand Down
6 changes: 6 additions & 0 deletions plugin/output/elasticsearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ After an insert error, fall with a non-zero exit code or not

<br>

**`split_enabled`** *`bool`* *`default=false`*

Enable split big batches

<br>

**`retention`** *`cfg.Duration`* *`default=1s`*

Retention milliseconds for retry to DB.
Expand Down
36 changes: 29 additions & 7 deletions plugin/output/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ type Plugin struct {
batcher *pipeline.RetriableBatcher
avgEventSize int

begin []int

time string
headerPrefix string
cancel context.CancelFunc
Expand Down Expand Up @@ -173,6 +171,11 @@ type Config struct {
// > **Experimental feature**
FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // *

// > @3@4@5@6
// >
// > Enable split big batches
SplitEnabled bool `json:"split_enabled" default:"false"` // *

// > @3@4@5@6
// >
// > Retention milliseconds for retry to DB.
Expand Down Expand Up @@ -202,6 +205,7 @@ type KeepAliveConfig struct {

type data struct {
outBuf []byte
begin []int
}

func init() {
Expand All @@ -223,7 +227,6 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
p.registerMetrics(params.MetricCtl)
p.mu = &sync.Mutex{}
p.headerPrefix = `{"` + p.config.BatchOpType + `":{"_index":"`
p.begin = make([]int, 0, p.config.BatchSize_+1)

if len(p.config.IndexValues) == 0 {
p.config.IndexValues = append(p.config.IndexValues, "@time")
Expand Down Expand Up @@ -341,6 +344,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) err
if *workerData == nil {
*workerData = &data{
outBuf: make([]byte, 0, p.config.BatchSize_*p.avgEventSize),
begin: make([]int, 0, p.config.BatchSize_+1),
}
}

Expand All @@ -351,16 +355,24 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) err
}

eventsCount := 0
p.begin = p.begin[:0]
data.begin = data.begin[:0]
data.outBuf = data.outBuf[:0]
batch.ForEach(func(event *pipeline.Event) {
eventsCount++
p.begin = append(p.begin, len(data.outBuf))
data.begin = append(data.begin, len(data.outBuf))
data.outBuf = p.appendEvent(data.outBuf, event)
})
p.begin = append(p.begin, len(data.outBuf))
data.begin = append(data.begin, len(data.outBuf))

var statusCode int
var err error

if p.config.SplitEnabled {
statusCode, err = p.saveOrSplit(0, eventsCount, data.begin, data.outBuf)
} else {
statusCode, err = p.save(data.outBuf)
}

statusCode, err := p.saveOrSplit(0, eventsCount, p.begin, data.outBuf)
if err != nil {
p.sendErrorMetric.WithLabelValues(strconv.Itoa(statusCode)).Inc()
switch statusCode {
Expand All @@ -381,6 +393,16 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) err
return nil
}

func (p *Plugin) save(data []byte) (int, error) {
return p.client.DoTimeout(
http.MethodPost,
NDJSONContentType,
data,
p.config.ConnectionTimeout_,
p.reportESErrors,
)
}

func (p *Plugin) saveOrSplit(left int, right int, begin []int, data []byte) (int, error) {
if left == right {
return http.StatusOK, nil
Expand Down

0 comments on commit 49ad72a

Please sign in to comment.