Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for specifying ingest pipelines in Elasticsearch #744

Merged
merged 7 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions e2e/file_elasticsearch/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
pipelines:
file_elasticsearch:
input:
type: file
persistence_mode: async
watching_dir: SOME_DIR
offsets_file: SOME_FILE
offsets_op: reset
output:
type: elasticsearch
endpoints:
- http://localhost:9200
username: SOME_USERNAME
password: SOME_PASSWORD
index_format: SOME_INDEX
21 changes: 21 additions & 0 deletions e2e/file_elasticsearch/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# https://github.com/elastic/start-local/tree/main
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.17.0
container_name: es-local-test
ports:
- "19200:9200"
environment:
- discovery.type=single-node
- ELASTIC_PASSWORD=elastic
- xpack.security.enabled=true
- xpack.security.http.ssl.enabled=false
healthcheck:
test:
[
"CMD-SHELL",
"curl --output /dev/null --silent --head --fail -u elastic:elastic http://elasticsearch:19200",
]
interval: 10s
timeout: 10s
retries: 10
76 changes: 76 additions & 0 deletions e2e/file_elasticsearch/file_elasticsearch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package file_elasticsearch

import (
"fmt"
"math/rand"
"os"
"path"
"path/filepath"
"testing"
"time"

"github.com/ozontech/file.d/cfg"
"github.com/stretchr/testify/require"
)

// This test verifies that messages sent to Elasticsearch are correctly processed by the ingest pipeline
// and that each message is assigned a 'processed_at' field containing a timestamp.

// Config for file-elasticsearch plugin e2e test
type Config struct {
Count int
Endpoint string
Pipeline string
Username string
Password string
dir string
index string
}

// Configure sets additional fields for input and output plugins
func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string) {
c.dir = t.TempDir()
offsetsDir := t.TempDir()

input := conf.Pipelines[pipelineName].Raw.Get("input")
input.Set("watching_dir", c.dir)
input.Set("filename_pattern", "messages.log")
input.Set("offsets_file", filepath.Join(offsetsDir, "offsets.yaml"))

output := conf.Pipelines[pipelineName].Raw.Get("output")
c.index = fmt.Sprintf("my-index-%d", rand.Intn(1000))
output.Set("index_format", c.index)
output.Set("ingest_pipeline", c.Pipeline)
output.Set("username", c.Username)
output.Set("password", c.Password)
output.Set("endpoints", []string{c.Endpoint})

err := createIngestPipeline(c.Endpoint, c.Pipeline, c.Username, c.Password)
require.NoError(t, err)
}

// Send creates file and writes messages
func (c *Config) Send(t *testing.T) {
file, err := os.Create(path.Join(c.dir, "messages.log"))
require.NoError(t, err)
defer func() { _ = file.Close() }()

for i := 0; i < c.Count; i++ {
_, err = file.WriteString("{\"message\":\"test\"}\n")
require.NoError(t, err)
}
}

// Validate waits for the message processing to complete
func (c *Config) Validate(t *testing.T) {
err := waitUntilIndexReady(c.Endpoint, c.index, c.Username, c.Password, c.Count, 10, 250*time.Millisecond)
require.NoError(t, err)
docs, err := getDocumentsFromIndex(c.Endpoint, c.index, c.Username, c.Password)
require.NoError(t, err)
require.Len(t, docs, c.Count)
for _, doc := range docs {
if _, ok := doc["processed_at"]; !ok {
t.Errorf("doc %v doesn't have processed_at field", doc)
}
}
}
162 changes: 162 additions & 0 deletions e2e/file_elasticsearch/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package file_elasticsearch

import (
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"time"
)

func createIngestPipeline(elasticURL, pipelineID, username, password string) error {
url := fmt.Sprintf("%s/_ingest/pipeline/%s", elasticURL, pipelineID)

pipelineBody := `{"description":"test ingest pipeline","processors":[{"set":{"field":"processed_at","value":"{{_ingest.timestamp}}"}}]}`

req, err := http.NewRequest(http.MethodPut, url, strings.NewReader(pipelineBody))
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}

req.Header.Set("Content-Type", "application/json")
if username != "" && password != "" {
req.SetBasicAuth(username, password)
}

client := &http.Client{Timeout: time.Second}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to make HTTP request: %w", err)
}
defer func() { _ = resp.Body.Close() }()

respBody, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read body response: %w", err)
}

if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
return fmt.Errorf("unexpected status: %d, body: %s", resp.StatusCode, string(respBody))
}

return nil
}

func getDocumentsFromIndex(elasticURL, indexName, username, password string) ([]map[string]interface{}, error) {
url := fmt.Sprintf("%s/%s/_search", elasticURL, indexName)

body := `{"query":{"match_all":{}}}`

req, err := http.NewRequest(http.MethodPost, url, strings.NewReader(body))
if err != nil {
return nil, fmt.Errorf("failed to create HTTP request: %w", err)
}

req.Header.Set("Content-Type", "application/json")
if username != "" && password != "" {
req.SetBasicAuth(username, password)
}

client := &http.Client{Timeout: time.Second}
resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to make HTTP request: %w", err)
}
defer func() { _ = resp.Body.Close() }()

respBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("error reading response: %w", err)
}

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status: %d, response: %s", resp.StatusCode, string(respBody))
}

type searchResponse struct {
Hits struct {
Hits []struct {
Source map[string]interface{} `json:"_source"`
} `json:"hits"`
} `json:"hits"`
}

var result searchResponse
if err := json.Unmarshal(respBody, &result); err != nil {
return nil, fmt.Errorf("failed to decode response: %w", err)
}

resultDocs := make([]map[string]interface{}, 0, len(result.Hits.Hits))

for _, hit := range result.Hits.Hits {
resultDocs = append(resultDocs, hit.Source)
}

return resultDocs, nil
}

func waitUntilIndexReady(elasticURL, indexName, username, password string, minDocs, retries int, delay time.Duration) error {
client := &http.Client{
Timeout: time.Second,
}

url := fmt.Sprintf("%s/%s/_count", elasticURL, indexName)
req, err := http.NewRequest(http.MethodGet, url, http.NoBody)
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}

req.Header.Set("Content-Type", "application/json")
if username != "" && password != "" {
req.SetBasicAuth(username, password)
}

for i := 0; i < retries; i++ {
ok, err := func() (bool, error) {
resp, err := client.Do(req)
if err != nil {
return false, fmt.Errorf("failed to make request: %w", err)
}
defer func() { _ = resp.Body.Close() }()

if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusServiceUnavailable {
return false, nil
}

if resp.StatusCode != http.StatusOK {
return false, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return false, fmt.Errorf("failed to read response: %w", err)
}

var result map[string]interface{}
if err := json.Unmarshal(body, &result); err != nil {
return false, fmt.Errorf("failed to decode response: %w", err)
}

if count, ok := result["count"].(float64); ok {
if int(count) >= minDocs {
return true, nil
}
} else {
return false, fmt.Errorf("unexpected response structure")
}

return false, nil
}()

if err != nil {
return err
}
if ok {
return nil
}
time.Sleep(delay)
}

return fmt.Errorf("index '%s' did not meet conditions after %d retries", indexName, retries)
}
24 changes: 18 additions & 6 deletions e2e/start_work_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@ package e2e_test

import (
"context"
"log"
"strconv"
"testing"
"time"

"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/e2e/file_clickhouse"
"github.com/ozontech/file.d/e2e/file_elasticsearch"
"github.com/ozontech/file.d/e2e/file_file"
"github.com/ozontech/file.d/e2e/http_file"
"github.com/ozontech/file.d/e2e/join_throttle"
"github.com/ozontech/file.d/e2e/kafka_auth"
"github.com/ozontech/file.d/e2e/kafka_file"
"github.com/ozontech/file.d/e2e/split_join"
"log"
"strconv"
"testing"
"time"

"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/fd"
_ "github.com/ozontech/file.d/plugin/action/add_file_name"
_ "github.com/ozontech/file.d/plugin/action/add_host"
Expand Down Expand Up @@ -143,6 +144,17 @@ func TestE2EStabilityWorkCase(t *testing.T) {
e2eTest: &file_clickhouse.Config{},
cfgPath: "./file_clickhouse/config.yml",
},
{
name: "file_elasticsearch",
e2eTest: &file_elasticsearch.Config{
Count: 10,
Pipeline: "test-ingest-pipeline",
Endpoint: "http://localhost:19200",
Username: "elastic",
Password: "elastic",
},
cfgPath: "./file_elasticsearch/config.yml",
},
}

for num, test := range testsList {
Expand Down
6 changes: 6 additions & 0 deletions plugin/output/elasticsearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,5 +151,11 @@ After a non-retryable write error, fall with a non-zero exit code or not

<br>

**`ingest_pipeline`** *`string`*

The name of the ingest pipeline to write events to.

<br>


<br>*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
15 changes: 12 additions & 3 deletions plugin/output/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,11 @@ type Config struct {
// >
// > After a non-retryable write error, fall with a non-zero exit code or not
Strict bool `json:"strict" default:"false"` // *

// > @3@4@5@6
// >
// > The name of the ingest pipeline to write events to.
IngestPipeline string `json:"ingest_pipeline"` // *
}

type KeepAliveConfig struct {
Expand Down Expand Up @@ -293,7 +298,7 @@ func (p *Plugin) registerMetrics(ctl *metric.Ctl) {

func (p *Plugin) prepareClient() {
config := &xhttp.ClientConfig{
Endpoints: prepareEndpoints(p.config.Endpoints),
Endpoints: prepareEndpoints(p.config.Endpoints, p.config.IngestPipeline),
ConnectionTimeout: p.config.ConnectionTimeout_ * 2,
AuthHeader: p.getAuthHeader(),
KeepAlive: &xhttp.ClientKeepAliveConfig{
Expand All @@ -317,13 +322,17 @@ func (p *Plugin) prepareClient() {
}
}

func prepareEndpoints(endpoints []string) []string {
func prepareEndpoints(endpoints []string, ingestPipeline string) []string {
res := make([]string, 0, len(endpoints))
for _, e := range endpoints {
if e[len(e)-1] == '/' {
e = e[:len(e)-1]
}
res = append(res, e+"/_bulk?_source=false")
e += "/_bulk?_source=false"
if ingestPipeline != "" {
e += "&pipeline=" + ingestPipeline
}
res = append(res, e)
}
return res
}
Expand Down
Loading