Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for specifying ingest pipelines in Elasticsearch #744

Merged
merged 7 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions e2e/file_elasticsearch/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
pipelines:
file_elasticsearch:
input:
type: file
persistence_mode: async
watching_dir: SOME_DIR
offsets_file: SOME_FILE
offsets_op: reset
output:
type: elasticsearch
endpoints:
- http://localhost:9200
username: SOME_USERNAME
password: SOME_PASSWORD
index_format: SOME_INDEX
21 changes: 21 additions & 0 deletions e2e/file_elasticsearch/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# https://github.com/elastic/start-local/tree/main
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.17.0
container_name: es-local-test
ports:
- "19200:9200"
environment:
- discovery.type=single-node
- ELASTIC_PASSWORD=elastic
- xpack.security.enabled=true
- xpack.security.http.ssl.enabled=false
healthcheck:
test:
[
"CMD-SHELL",
"curl --output /dev/null --silent --head --fail -u elastic:elastic http://elasticsearch:19200",
]
interval: 10s
timeout: 10s
retries: 10
76 changes: 76 additions & 0 deletions e2e/file_elasticsearch/file_elasticsearch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package file_elasticsearch

import (
"fmt"
"math/rand"
"os"
"path"
"path/filepath"
"testing"
"time"

"github.com/ozontech/file.d/cfg"
"github.com/stretchr/testify/require"
)

// This test verifies that messages sent to Elasticsearch are correctly processed by the ingest pipeline
// and that each message is assigned a 'processed_at' field containing a timestamp.

// Config for file-elasticsearch plugin e2e test
type Config struct {
Count int
Endpoint string
Pipeline string
Username string
Password string
dir string
index string
}

// Configure sets additional fields for input and output plugins
func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string) {
c.dir = t.TempDir()
offsetsDir := t.TempDir()

input := conf.Pipelines[pipelineName].Raw.Get("input")
input.Set("watching_dir", c.dir)
input.Set("filename_pattern", "messages.log")
input.Set("offsets_file", filepath.Join(offsetsDir, "offsets.yaml"))

output := conf.Pipelines[pipelineName].Raw.Get("output")
c.index = fmt.Sprintf("my-index-%d", rand.Intn(1000))
output.Set("index_format", c.index)
output.Set("pipeline", c.Pipeline)
output.Set("username", c.Username)
output.Set("password", c.Password)
output.Set("endpoints", []string{c.Endpoint})

err := createIngestPipeline(c.Endpoint, c.Pipeline, c.Username, c.Password)
require.NoError(t, err)
}

// Send creates file and writes messages
func (c *Config) Send(t *testing.T) {
file, err := os.Create(path.Join(c.dir, "messages.log"))
require.NoError(t, err)
defer func() { _ = file.Close() }()

for i := 0; i < c.Count; i++ {
_, err = file.WriteString("{\"message\":\"test\"}\n")
require.NoError(t, err)
}
}

// Validate waits for the message processing to complete
func (c *Config) Validate(t *testing.T) {
err := waitUntilIndexReady(c.Endpoint, c.index, c.Username, c.Password, c.Count, 10, 250*time.Millisecond)
require.NoError(t, err)
docs, err := getDocumentsFromIndex(c.Endpoint, c.index, c.Username, c.Password)
require.NoError(t, err)
require.Len(t, docs, c.Count)
for _, doc := range docs {
if _, ok := doc["processed_at"]; !ok {
t.Errorf("doc %v doesn't have processed_at field", doc)
}
}
}
177 changes: 177 additions & 0 deletions e2e/file_elasticsearch/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package file_elasticsearch

import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"time"
)

func createIngestPipeline(elasticURL, pipelineID, username, password string) error {
url := fmt.Sprintf("%s/_ingest/pipeline/%s", elasticURL, pipelineID)

pipelineBody := map[string]interface{}{
"description": "test ingest pipeline",
"processors": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"field": "processed_at",
"value": "{{_ingest.timestamp}}",
},
},
},
}

body, err := json.Marshal(pipelineBody)
if err != nil {
return fmt.Errorf("failed to marshal body: %w", err)
}
kirillov6 marked this conversation as resolved.
Show resolved Hide resolved

req, err := http.NewRequest(http.MethodPut, url, bytes.NewBuffer(body))
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}

req.Header.Set("Content-Type", "application/json")
if username != "" && password != "" {
req.SetBasicAuth(username, password)
}

client := &http.Client{Timeout: time.Second}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to make HTTP request: %w", err)
}

respBody, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read body response: %w", err)
}
_ = resp.Body.Close()
kirillov6 marked this conversation as resolved.
Show resolved Hide resolved

if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
return fmt.Errorf("unexpected status: %d, body: %s", resp.StatusCode, string(respBody))
}

return nil
}

func getDocumentsFromIndex(elasticURL, indexName, username, password string) ([]map[string]interface{}, error) {
url := fmt.Sprintf("%s/%s/_search", elasticURL, indexName)

body := []byte(`{"query":{"match_all":{}}}`)

req, err := http.NewRequest(http.MethodPost, url, strings.NewReader(string(body)))
kirillov6 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, fmt.Errorf("failed to create HTTP request: %w", err)
}

req.Header.Set("Content-Type", "application/json")
if username != "" && password != "" {
req.SetBasicAuth(username, password)
}

client := &http.Client{Timeout: time.Second}
resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to make HTTP request: %w", err)
}

respBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("error reading response: %w", err)
}
_ = resp.Body.Close()
kirillov6 marked this conversation as resolved.
Show resolved Hide resolved

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status: %d, response: %s", resp.StatusCode, string(respBody))
}

var result map[string]interface{}
if err := json.Unmarshal(respBody, &result); err != nil {
return nil, fmt.Errorf("failed to decode response: %w", err)
}

var resultDocs []map[string]interface{}

if hits, ok := result["hits"].(map[string]interface{}); ok {
if docs, ok := hits["hits"].([]interface{}); ok {
for _, doc := range docs {
mappedDoc, ok := doc.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("unexpected document structure")
}
source, ok := mappedDoc["_source"].(map[string]interface{})
if !ok {
return nil, fmt.Errorf("_source field has unexpected structure")
}
resultDocs = append(resultDocs, source)
}
}
} else {
return nil, fmt.Errorf("unexpected response structure")
}
kirillov6 marked this conversation as resolved.
Show resolved Hide resolved

return resultDocs, nil
}

func waitUntilIndexReady(elasticURL, indexName, username, password string, minDocs, retries int, delay time.Duration) error {
client := &http.Client{
Timeout: time.Second,
}

for i := 0; i < retries; i++ {
url := fmt.Sprintf("%s/%s/_count", elasticURL, indexName)
req, err := http.NewRequest(http.MethodGet, url, http.NoBody)
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}

req.Header.Set("Content-Type", "application/json")
if username != "" && password != "" {
req.SetBasicAuth(username, password)
}
kirillov6 marked this conversation as resolved.
Show resolved Hide resolved

resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to make request: %w", err)
}

if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusServiceUnavailable {
_ = resp.Body.Close()
kirillov6 marked this conversation as resolved.
Show resolved Hide resolved
time.Sleep(delay)
continue
}

if resp.StatusCode != http.StatusOK {
_ = resp.Body.Close()
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to reade response: %w", err)
}
_ = resp.Body.Close()

var result map[string]interface{}
if err := json.Unmarshal(body, &result); err != nil {
return fmt.Errorf("failed to decode response: %w", err)
}

if count, ok := result["count"].(float64); ok {
if int(count) >= minDocs {
return nil
}
} else {
return fmt.Errorf("unexpected response structure")
}

time.Sleep(delay)
}

return fmt.Errorf("index '%s' did not meet conditions after %d retries", indexName, retries)
}
24 changes: 18 additions & 6 deletions e2e/start_work_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@ package e2e_test

import (
"context"
"log"
"strconv"
"testing"
"time"

"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/e2e/file_clickhouse"
"github.com/ozontech/file.d/e2e/file_elasticsearch"
"github.com/ozontech/file.d/e2e/file_file"
"github.com/ozontech/file.d/e2e/http_file"
"github.com/ozontech/file.d/e2e/join_throttle"
"github.com/ozontech/file.d/e2e/kafka_auth"
"github.com/ozontech/file.d/e2e/kafka_file"
"github.com/ozontech/file.d/e2e/split_join"
"log"
"strconv"
"testing"
"time"

"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/fd"
_ "github.com/ozontech/file.d/plugin/action/add_file_name"
_ "github.com/ozontech/file.d/plugin/action/add_host"
Expand Down Expand Up @@ -143,6 +144,17 @@ func TestE2EStabilityWorkCase(t *testing.T) {
e2eTest: &file_clickhouse.Config{},
cfgPath: "./file_clickhouse/config.yml",
},
{
name: "file_elasticsearch",
e2eTest: &file_elasticsearch.Config{
Count: 10,
Pipeline: "test-ingest-pipeline",
Endpoint: "http://localhost:19200",
Username: "elastic",
Password: "elastic",
},
cfgPath: "./file_elasticsearch/config.yml",
},
}

for num, test := range testsList {
Expand Down
6 changes: 6 additions & 0 deletions plugin/output/elasticsearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,5 +151,11 @@ After a non-retryable write error, fall with a non-zero exit code or not

<br>

**`pipeline`** *`string`*

The name of the ingest pipeline to write events to.

<br>


<br>*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
15 changes: 12 additions & 3 deletions plugin/output/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,11 @@ type Config struct {
// >
// > After a non-retryable write error, fall with a non-zero exit code or not
Strict bool `json:"strict" default:"false"` // *

// > @3@4@5@6
// >
// > The name of the ingest pipeline to write events to.
Pipeline string `json:"pipeline"` // *
kirillov6 marked this conversation as resolved.
Show resolved Hide resolved
}

type KeepAliveConfig struct {
Expand Down Expand Up @@ -293,7 +298,7 @@ func (p *Plugin) registerMetrics(ctl *metric.Ctl) {

func (p *Plugin) prepareClient() {
config := &xhttp.ClientConfig{
Endpoints: prepareEndpoints(p.config.Endpoints),
Endpoints: prepareEndpoints(p.config.Endpoints, p.config.Pipeline),
ConnectionTimeout: p.config.ConnectionTimeout_ * 2,
AuthHeader: p.getAuthHeader(),
KeepAlive: &xhttp.ClientKeepAliveConfig{
Expand All @@ -317,13 +322,17 @@ func (p *Plugin) prepareClient() {
}
}

func prepareEndpoints(endpoints []string) []string {
func prepareEndpoints(endpoints []string, ingestPipeline string) []string {
res := make([]string, 0, len(endpoints))
for _, e := range endpoints {
if e[len(e)-1] == '/' {
e = e[:len(e)-1]
}
res = append(res, e+"/_bulk?_source=false")
e += "/_bulk?_source=false"
if ingestPipeline != "" {
e += "&pipeline=" + ingestPipeline
}
res = append(res, e)
}
return res
}
Expand Down
Loading
Loading