diff --git a/e2e/file_elasticsearch/config.yml b/e2e/file_elasticsearch/config.yml
new file mode 100644
index 000000000..3a9a35894
--- /dev/null
+++ b/e2e/file_elasticsearch/config.yml
@@ -0,0 +1,15 @@
+pipelines:
+ file_elasticsearch:
+ input:
+ type: file
+ persistence_mode: async
+ watching_dir: SOME_DIR
+ offsets_file: SOME_FILE
+ offsets_op: reset
+ output:
+ type: elasticsearch
+ endpoints:
+ - http://localhost:9200
+ username: SOME_USERNAME
+ password: SOME_PASSWORD
+ index_format: SOME_INDEX
\ No newline at end of file
diff --git a/e2e/file_elasticsearch/docker-compose.yml b/e2e/file_elasticsearch/docker-compose.yml
new file mode 100644
index 000000000..3e983a3ee
--- /dev/null
+++ b/e2e/file_elasticsearch/docker-compose.yml
@@ -0,0 +1,21 @@
+# https://github.com/elastic/start-local/tree/main
+services:
+ elasticsearch:
+ image: docker.elastic.co/elasticsearch/elasticsearch:8.17.0
+ container_name: es-local-test
+ ports:
+ - "19200:9200"
+ environment:
+ - discovery.type=single-node
+ - ELASTIC_PASSWORD=elastic
+ - xpack.security.enabled=true
+ - xpack.security.http.ssl.enabled=false
+ healthcheck:
+ test:
+ [
+ "CMD-SHELL",
+ "curl --output /dev/null --silent --head --fail -u elastic:elastic http://elasticsearch:19200",
+ ]
+ interval: 10s
+ timeout: 10s
+ retries: 10
diff --git a/e2e/file_elasticsearch/file_elasticsearch.go b/e2e/file_elasticsearch/file_elasticsearch.go
new file mode 100644
index 000000000..7ada448dd
--- /dev/null
+++ b/e2e/file_elasticsearch/file_elasticsearch.go
@@ -0,0 +1,76 @@
+package file_elasticsearch
+
+import (
+ "fmt"
+ "math/rand"
+ "os"
+ "path"
+ "path/filepath"
+ "testing"
+ "time"
+
+ "github.com/ozontech/file.d/cfg"
+ "github.com/stretchr/testify/require"
+)
+
+// This test verifies that messages sent to Elasticsearch are correctly processed by the ingest pipeline
+// and that each message is assigned a 'processed_at' field containing a timestamp.
+
+// Config for file-elasticsearch plugin e2e test
+type Config struct {
+ Count int
+ Endpoint string
+ Pipeline string
+ Username string
+ Password string
+ dir string
+ index string
+}
+
+// Configure sets additional fields for input and output plugins
+func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string) {
+ c.dir = t.TempDir()
+ offsetsDir := t.TempDir()
+
+ input := conf.Pipelines[pipelineName].Raw.Get("input")
+ input.Set("watching_dir", c.dir)
+ input.Set("filename_pattern", "messages.log")
+ input.Set("offsets_file", filepath.Join(offsetsDir, "offsets.yaml"))
+
+ output := conf.Pipelines[pipelineName].Raw.Get("output")
+ c.index = fmt.Sprintf("my-index-%d", rand.Intn(1000))
+ output.Set("index_format", c.index)
+ output.Set("ingest_pipeline", c.Pipeline)
+ output.Set("username", c.Username)
+ output.Set("password", c.Password)
+ output.Set("endpoints", []string{c.Endpoint})
+
+ err := createIngestPipeline(c.Endpoint, c.Pipeline, c.Username, c.Password)
+ require.NoError(t, err)
+}
+
+// Send creates file and writes messages
+func (c *Config) Send(t *testing.T) {
+ file, err := os.Create(path.Join(c.dir, "messages.log"))
+ require.NoError(t, err)
+ defer func() { _ = file.Close() }()
+
+ for i := 0; i < c.Count; i++ {
+ _, err = file.WriteString("{\"message\":\"test\"}\n")
+ require.NoError(t, err)
+ }
+}
+
+// Validate waits for the message processing to complete
+func (c *Config) Validate(t *testing.T) {
+ err := waitUntilIndexReady(c.Endpoint, c.index, c.Username, c.Password, c.Count, 10, 250*time.Millisecond)
+ require.NoError(t, err)
+ docs, err := getDocumentsFromIndex(c.Endpoint, c.index, c.Username, c.Password)
+ require.NoError(t, err)
+ require.Len(t, docs, c.Count)
+ for _, doc := range docs {
+ if _, ok := doc["processed_at"]; !ok {
+ t.Errorf("doc %v doesn't have processed_at field", doc)
+ }
+ }
+}
diff --git a/e2e/file_elasticsearch/helpers.go b/e2e/file_elasticsearch/helpers.go
new file mode 100644
index 000000000..42269eeba
--- /dev/null
+++ b/e2e/file_elasticsearch/helpers.go
@@ -0,0 +1,162 @@
+package file_elasticsearch
+
+import (
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+ "strings"
+ "time"
+)
+
+func createIngestPipeline(elasticURL, pipelineID, username, password string) error {
+ url := fmt.Sprintf("%s/_ingest/pipeline/%s", elasticURL, pipelineID)
+
+ pipelineBody := `{"description":"test ingest pipeline","processors":[{"set":{"field":"processed_at","value":"{{_ingest.timestamp}}"}}]}`
+
+ req, err := http.NewRequest(http.MethodPut, url, strings.NewReader(pipelineBody))
+ if err != nil {
+ return fmt.Errorf("failed to create request: %w", err)
+ }
+
+ req.Header.Set("Content-Type", "application/json")
+ if username != "" && password != "" {
+ req.SetBasicAuth(username, password)
+ }
+
+ client := &http.Client{Timeout: time.Second}
+ resp, err := client.Do(req)
+ if err != nil {
+ return fmt.Errorf("failed to make HTTP request: %w", err)
+ }
+ defer func() { _ = resp.Body.Close() }()
+
+ respBody, err := io.ReadAll(resp.Body)
+ if err != nil {
+ return fmt.Errorf("failed to read body response: %w", err)
+ }
+
+ if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
+ return fmt.Errorf("unexpected status: %d, body: %s", resp.StatusCode, string(respBody))
+ }
+
+ return nil
+}
+
+func getDocumentsFromIndex(elasticURL, indexName, username, password string) ([]map[string]interface{}, error) {
+ url := fmt.Sprintf("%s/%s/_search", elasticURL, indexName)
+
+ body := `{"query":{"match_all":{}}}`
+
+ req, err := http.NewRequest(http.MethodPost, url, strings.NewReader(body))
+ if err != nil {
+ return nil, fmt.Errorf("failed to create HTTP request: %w", err)
+ }
+
+ req.Header.Set("Content-Type", "application/json")
+ if username != "" && password != "" {
+ req.SetBasicAuth(username, password)
+ }
+
+ client := &http.Client{Timeout: time.Second}
+ resp, err := client.Do(req)
+ if err != nil {
+ return nil, fmt.Errorf("failed to make HTTP request: %w", err)
+ }
+ defer func() { _ = resp.Body.Close() }()
+
+ respBody, err := io.ReadAll(resp.Body)
+ if err != nil {
+ return nil, fmt.Errorf("error reading response: %w", err)
+ }
+
+ if resp.StatusCode != http.StatusOK {
+ return nil, fmt.Errorf("unexpected status: %d, response: %s", resp.StatusCode, string(respBody))
+ }
+
+ type searchResponse struct {
+ Hits struct {
+ Hits []struct {
+ Source map[string]interface{} `json:"_source"`
+ } `json:"hits"`
+ } `json:"hits"`
+ }
+
+ var result searchResponse
+ if err := json.Unmarshal(respBody, &result); err != nil {
+ return nil, fmt.Errorf("failed to decode response: %w", err)
+ }
+
+ resultDocs := make([]map[string]interface{}, 0, len(result.Hits.Hits))
+
+ for _, hit := range result.Hits.Hits {
+ resultDocs = append(resultDocs, hit.Source)
+ }
+
+ return resultDocs, nil
+}
+
+func waitUntilIndexReady(elasticURL, indexName, username, password string, minDocs, retries int, delay time.Duration) error {
+ client := &http.Client{
+ Timeout: time.Second,
+ }
+
+ url := fmt.Sprintf("%s/%s/_count", elasticURL, indexName)
+ req, err := http.NewRequest(http.MethodGet, url, http.NoBody)
+ if err != nil {
+ return fmt.Errorf("failed to create request: %w", err)
+ }
+
+ req.Header.Set("Content-Type", "application/json")
+ if username != "" && password != "" {
+ req.SetBasicAuth(username, password)
+ }
+
+ for i := 0; i < retries; i++ {
+ ok, err := func() (bool, error) {
+ resp, err := client.Do(req)
+ if err != nil {
+ return false, fmt.Errorf("failed to make request: %w", err)
+ }
+ defer func() { _ = resp.Body.Close() }()
+
+ if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusServiceUnavailable {
+ return false, nil
+ }
+
+ if resp.StatusCode != http.StatusOK {
+ return false, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
+ }
+
+ body, err := io.ReadAll(resp.Body)
+ if err != nil {
+ return false, fmt.Errorf("failed to read response: %w", err)
+ }
+
+ var result map[string]interface{}
+ if err := json.Unmarshal(body, &result); err != nil {
+ return false, fmt.Errorf("failed to decode response: %w", err)
+ }
+
+ if count, ok := result["count"].(float64); ok {
+ if int(count) >= minDocs {
+ return true, nil
+ }
+ } else {
+ return false, fmt.Errorf("unexpected response structure")
+ }
+
+ return false, nil
+ }()
+
+ if err != nil {
+ return err
+ }
+ if ok {
+ return nil
+ }
+ time.Sleep(delay)
+ }
+
+ return fmt.Errorf("index '%s' did not meet conditions after %d retries", indexName, retries)
+}
diff --git a/e2e/start_work_test.go b/e2e/start_work_test.go
index 84f28a7db..39be266a1 100644
--- a/e2e/start_work_test.go
+++ b/e2e/start_work_test.go
@@ -4,19 +4,20 @@ package e2e_test
import (
"context"
- "log"
- "strconv"
- "testing"
- "time"
-
- "github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/e2e/file_clickhouse"
+ "github.com/ozontech/file.d/e2e/file_elasticsearch"
"github.com/ozontech/file.d/e2e/file_file"
"github.com/ozontech/file.d/e2e/http_file"
"github.com/ozontech/file.d/e2e/join_throttle"
"github.com/ozontech/file.d/e2e/kafka_auth"
"github.com/ozontech/file.d/e2e/kafka_file"
"github.com/ozontech/file.d/e2e/split_join"
+ "log"
+ "strconv"
+ "testing"
+ "time"
+
+ "github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/fd"
_ "github.com/ozontech/file.d/plugin/action/add_file_name"
_ "github.com/ozontech/file.d/plugin/action/add_host"
@@ -143,6 +144,17 @@ func TestE2EStabilityWorkCase(t *testing.T) {
e2eTest: &file_clickhouse.Config{},
cfgPath: "./file_clickhouse/config.yml",
},
+ {
+ name: "file_elasticsearch",
+ e2eTest: &file_elasticsearch.Config{
+ Count: 10,
+ Pipeline: "test-ingest-pipeline",
+ Endpoint: "http://localhost:19200",
+ Username: "elastic",
+ Password: "elastic",
+ },
+ cfgPath: "./file_elasticsearch/config.yml",
+ },
}
for num, test := range testsList {
diff --git a/plugin/output/elasticsearch/README.md b/plugin/output/elasticsearch/README.md
index 49805ebfb..43f3d0aa2 100755
--- a/plugin/output/elasticsearch/README.md
+++ b/plugin/output/elasticsearch/README.md
@@ -151,5 +151,11 @@ After a non-retryable write error, fall with a non-zero exit code or not
+**`ingest_pipeline`** *`string`*
+
+The name of the ingest pipeline to write events to.
+
+
+
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
\ No newline at end of file
diff --git a/plugin/output/elasticsearch/elasticsearch.go b/plugin/output/elasticsearch/elasticsearch.go
index edd308bad..81fda26e4 100644
--- a/plugin/output/elasticsearch/elasticsearch.go
+++ b/plugin/output/elasticsearch/elasticsearch.go
@@ -186,6 +186,11 @@ type Config struct {
// >
// > After a non-retryable write error, fall with a non-zero exit code or not
Strict bool `json:"strict" default:"false"` // *
+
+ // > @3@4@5@6
+ // >
+ // > The name of the ingest pipeline to write events to.
+ IngestPipeline string `json:"ingest_pipeline"` // *
}
type KeepAliveConfig struct {
@@ -293,7 +298,7 @@ func (p *Plugin) registerMetrics(ctl *metric.Ctl) {
func (p *Plugin) prepareClient() {
config := &xhttp.ClientConfig{
- Endpoints: prepareEndpoints(p.config.Endpoints),
+ Endpoints: prepareEndpoints(p.config.Endpoints, p.config.IngestPipeline),
ConnectionTimeout: p.config.ConnectionTimeout_ * 2,
AuthHeader: p.getAuthHeader(),
KeepAlive: &xhttp.ClientKeepAliveConfig{
@@ -317,13 +322,17 @@ func (p *Plugin) prepareClient() {
}
}
-func prepareEndpoints(endpoints []string) []string {
+func prepareEndpoints(endpoints []string, ingestPipeline string) []string {
res := make([]string, 0, len(endpoints))
for _, e := range endpoints {
if e[len(e)-1] == '/' {
e = e[:len(e)-1]
}
- res = append(res, e+"/_bulk?_source=false")
+ e += "/_bulk?_source=false"
+ if ingestPipeline != "" {
+ e += "&pipeline=" + ingestPipeline
+ }
+ res = append(res, e)
}
return res
}
diff --git a/plugin/output/elasticsearch/elasticsearch_test.go b/plugin/output/elasticsearch/elasticsearch_test.go
index e28ac4eb1..b3f6f30c8 100644
--- a/plugin/output/elasticsearch/elasticsearch_test.go
+++ b/plugin/output/elasticsearch/elasticsearch_test.go
@@ -81,23 +81,47 @@ func TestAppendEventWithCreateOpType(t *testing.T) {
}
func TestPrepareEndpoints(t *testing.T) {
- in := []string{
- "http://endpoint_1:9000",
- "http://endpoint_2:9000/",
- "https://endpoint_3:9000",
- "https://endpoint_4:9000/",
+ testCases := []struct {
+ in []string
+ want []string
+ pipeline string
+ }{
+ {
+ in: []string{
+ "http://endpoint_1:9000",
+ "http://endpoint_2:9000/",
+ "https://endpoint_3:9000",
+ "https://endpoint_4:9000/",
+ },
+ want: []string{
+ "http://endpoint_1:9000/_bulk?_source=false",
+ "http://endpoint_2:9000/_bulk?_source=false",
+ "https://endpoint_3:9000/_bulk?_source=false",
+ "https://endpoint_4:9000/_bulk?_source=false",
+ },
+ },
+ {
+ in: []string{
+ "http://endpoint_1:9000",
+ "http://endpoint_2:9000/",
+ "https://endpoint_3:9000",
+ "https://endpoint_4:9000/",
+ },
+ want: []string{
+ "http://endpoint_1:9000/_bulk?_source=false&pipeline=my_pipeline_1",
+ "http://endpoint_2:9000/_bulk?_source=false&pipeline=my_pipeline_1",
+ "https://endpoint_3:9000/_bulk?_source=false&pipeline=my_pipeline_1",
+ "https://endpoint_4:9000/_bulk?_source=false&pipeline=my_pipeline_1",
+ },
+ pipeline: "my_pipeline_1",
+ },
}
- want := []string{
- "http://endpoint_1:9000/_bulk?_source=false",
- "http://endpoint_2:9000/_bulk?_source=false",
- "https://endpoint_3:9000/_bulk?_source=false",
- "https://endpoint_4:9000/_bulk?_source=false",
- }
-
- got := prepareEndpoints(in)
- require.Len(t, got, len(want))
- for i := range got {
- assert.Equal(t, want[i], got[i])
+ for _, tc := range testCases {
+ got := prepareEndpoints(tc.in, tc.pipeline)
+ require.Len(t, got, len(tc.want))
+ for i := range got {
+ assert.Equal(t, tc.want[i], got[i])
+ }
}
}