From 677eeae09d1974fbdb8cd39bd20a1ce6963ab5bb Mon Sep 17 00:00:00 2001 From: f4t Date: Wed, 30 Dec 2020 11:18:23 +0800 Subject: [PATCH] multi event output filters and array split filter --- config/filter.go | 24 ++++++-- filter/addfield/filteraddfield.go | 10 +++- filter/cond/filtercond.go | 24 +++++--- filter/date/filterdate.go | 9 ++- filter/geoip2/filtergeoip2.go | 21 ++++--- filter/gonx/filtergonx.go | 9 ++- filter/grok/filtergrok.go | 9 ++- filter/json/filterjson.go | 9 ++- filter/mutate/filtermutate.go | 6 +- filter/ratelimit/filterratelimit.go | 9 ++- filter/removefield/filterremovefield.go | 6 +- filter/split/filtersplit.go | 77 +++++++++++++++++++++++++ filter/split/filtersplit_test.go | 56 ++++++++++++++++++ filter/typeconv/filtertypeconv.go | 6 +- filter/urlparam/filterurlparam.go | 9 ++- filter/useragent/filteruseragent.go | 9 ++- modloader/modloader.go | 2 + 17 files changed, 245 insertions(+), 50 deletions(-) create mode 100644 filter/split/filtersplit.go create mode 100644 filter/split/filtersplit_test.go diff --git a/config/filter.go b/config/filter.go index 17cf9c87..a0556ff0 100644 --- a/config/filter.go +++ b/config/filter.go @@ -16,7 +16,7 @@ var ( // TypeFilterConfig is interface of filter module type TypeFilterConfig interface { TypeCommonConfig - Event(context.Context, logevent.LogEvent) (logevent.LogEvent, bool) + Event(context.Context, logevent.LogEvent) ([]logevent.LogEvent, bool) CommonFilter(context.Context, logevent.LogEvent) logevent.LogEvent } @@ -25,6 +25,7 @@ func (f *FilterConfig) IsConfigured() bool { return len(f.AddTags) != 0 || len(f.AddFields) != 0 || len(f.RemoveTags) != 0 || len(f.RemoveFields) != 0 } +// CommonFilter applies common inline filters such as add/remove fields/tags func (f *FilterConfig) CommonFilter(ctx context.Context, event logevent.LogEvent) logevent.LogEvent { event.AddTag(f.AddTags...) @@ -102,13 +103,26 @@ func (t *Config) startFilters() (err error) { } case event := <-t.chInFilter: var ok bool + events := make([]logevent.LogEvent, 0) + events = append(events, event) for _, filter := range filters { - event, ok = filter.Event(t.ctx, event) - if ok { - event = filter.CommonFilter(t.ctx, event) + eventsOut := make([]logevent.LogEvent, 0) + for _, event := range events { + var filteredEvents []logevent.LogEvent + filteredEvents, ok = filter.Event(t.ctx, event) + if ok { + for i, evt := range filteredEvents { + filteredEvents[i] = filter.CommonFilter(t.ctx, evt) + } + } + eventsOut = append(eventsOut, filteredEvents...) } + events = eventsOut + } + + for _, evt := range events { + t.chFilterOut <- evt } - t.chFilterOut <- event } } }) diff --git a/filter/addfield/filteraddfield.go b/filter/addfield/filteraddfield.go index d6a96a34..c51628cd 100644 --- a/filter/addfield/filteraddfield.go +++ b/filter/addfield/filteraddfield.go @@ -39,10 +39,14 @@ func InitHandler(ctx context.Context, raw *config.ConfigRaw) (config.TypeFilterC } // Event the main filter event -func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (logevent.LogEvent, bool) { +func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) ([]logevent.LogEvent, bool) { + eventsOut := make([]logevent.LogEvent, 0) if _, ok := event.Extra[f.Key]; ok { - return event, false + eventsOut = append(eventsOut, event) + return eventsOut, false } + event.SetValue(f.Key, event.Format(f.Value)) - return event, true + eventsOut = append(eventsOut, event) + return eventsOut, true } diff --git a/filter/cond/filtercond.go b/filter/cond/filtercond.go index 33cd249a..2f1fe49c 100644 --- a/filter/cond/filtercond.go +++ b/filter/cond/filtercond.go @@ -137,34 +137,42 @@ func InitHandler(ctx context.Context, raw *config.ConfigRaw) (config.TypeFilterC } // Event the main filter event -func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (logevent.LogEvent, bool) { +func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) ([]logevent.LogEvent, bool) { + eventsOut := make([]logevent.LogEvent, 0) if f.expression != nil { ep := EventParameters{Event: &event} ret, err := f.expression.Eval(&ep) if err != nil { goglog.Logger.Error(err) event.AddTag(ErrorTag) - return event, false + eventsOut = append(eventsOut, event) + return eventsOut, false } if r, ok := ret.(bool); ok { if r { for _, filter := range f.filters { - event, ok = filter.Event(ctx, event) + eventsOut, ok = filter.Event(ctx, event) if ok { - event = filter.CommonFilter(ctx, event) + for _, evt := range eventsOut { + evt = filter.CommonFilter(ctx, evt) + } } } } else { for _, filter := range f.elseFilters { - event, ok = filter.Event(ctx, event) + eventsOut, ok = filter.Event(ctx, event) if ok { - event = filter.CommonFilter(ctx, event) + for _, evt := range eventsOut { + evt = filter.CommonFilter(ctx, evt) + } } } } - return event, true + eventsOut = append(eventsOut, event) + return eventsOut, true } goglog.Logger.Warn("filter cond condition returns not a boolean, ignored") } - return event, false + eventsOut = append(eventsOut, event) + return eventsOut, false } diff --git a/filter/date/filterdate.go b/filter/date/filterdate.go index 217e5c69..8afac8f0 100644 --- a/filter/date/filterdate.go +++ b/filter/date/filterdate.go @@ -65,7 +65,8 @@ func InitHandler(ctx context.Context, raw *config.ConfigRaw) (config.TypeFilterC } // Event the main filter event -func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (logevent.LogEvent, bool) { +func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) ([]logevent.LogEvent, bool) { + eventsOut := make([]logevent.LogEvent, 0) var ( timestamp time.Time err error @@ -95,7 +96,8 @@ func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (loge if err != nil { event.AddTag(ErrorTag) goglog.Logger.Error(err) - return event, false + eventsOut = append(eventsOut, event) + return eventsOut, false } if f.Target == DefaultTarget { event.Timestamp = timestamp.UTC() @@ -103,7 +105,8 @@ func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (loge event.SetValue(f.Target, timestamp.UTC()) } - return event, true + eventsOut = append(eventsOut, event) + return eventsOut, true } func convertFloat(value float64) (int64, int64) { diff --git a/filter/geoip2/filtergeoip2.go b/filter/geoip2/filtergeoip2.go index 4a25845e..70ba205e 100644 --- a/filter/geoip2/filtergeoip2.go +++ b/filter/geoip2/filtergeoip2.go @@ -84,16 +84,19 @@ func InitHandler(ctx context.Context, raw *config.ConfigRaw) (config.TypeFilterC } // Event the main filter event -func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (logevent.LogEvent, bool) { +func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) ([]logevent.LogEvent, bool) { + eventsOut := make([]logevent.LogEvent, 0) ipstr := event.GetString(f.IPField) if ipstr == "" { // Passthru if empty - return event, false + eventsOut = append(eventsOut, event) + return eventsOut, false } ip := net.ParseIP(ipstr) if f.SkipPrivate && f.privateIP(ip) { // Passthru - return event, false + eventsOut = append(eventsOut, event) + return eventsOut, false } var err error var record *geoip2.City @@ -107,17 +110,20 @@ func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (loge goglog.Logger.Error(err) } event.AddTag(ErrorTag) - return event, false + eventsOut = append(eventsOut, event) + return eventsOut, false } f.cache.Add(ipstr, record) } if record == nil { event.AddTag(ErrorTag) - return event, false + eventsOut = append(eventsOut, event) + return eventsOut, false } if record.Location.Latitude == 0 && record.Location.Longitude == 0 { event.AddTag(ErrorTag) - return event, false + eventsOut = append(eventsOut, event) + return eventsOut, false } if f.FlatFormat { @@ -172,7 +178,8 @@ func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (loge event.SetValue(f.Key, m) } - return event, true + eventsOut = append(eventsOut, event) + return eventsOut, true } func (f *FilterConfig) privateIP(ip net.IP) bool { diff --git a/filter/gonx/filtergonx.go b/filter/gonx/filtergonx.go index 1edff55f..c58f87f1 100644 --- a/filter/gonx/filtergonx.go +++ b/filter/gonx/filtergonx.go @@ -72,14 +72,16 @@ func InitHandler(ctx context.Context, raw *config.ConfigRaw) (config.TypeFilterC } // Event the main filter event -func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (logevent.LogEvent, bool) { +func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) ([]logevent.LogEvent, bool) { + eventsOut := make([]logevent.LogEvent, 0) message := event.GetString(f.Source) reader := gonx.NewParserReader(strings.NewReader(message), f.parser) entry, err := reader.Read() if err != nil { event.AddTag(ErrorTag) goglog.Logger.Errorf("%s: %q", err, message) - return event, false + eventsOut = append(eventsOut, event) + return eventsOut, false } for _, field := range f.fields { @@ -87,5 +89,6 @@ func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (loge event.SetValue(field, s) } - return event, true + eventsOut = append(eventsOut, event) + return eventsOut, true } diff --git a/filter/grok/filtergrok.go b/filter/grok/filtergrok.go index 4ccb3ebf..291155a6 100644 --- a/filter/grok/filtergrok.go +++ b/filter/grok/filtergrok.go @@ -78,7 +78,8 @@ func InitHandler(ctx context.Context, raw *config.ConfigRaw) (config.TypeFilterC } // Event the main filter event -func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (logevent.LogEvent, bool) { +func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) ([]logevent.LogEvent, bool) { + eventsOut := make([]logevent.LogEvent, 0) message := event.GetString(f.Source) found := false for _, thisMatch := range f.Match { @@ -105,8 +106,10 @@ func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (loge if !found { event.AddTag(ErrorTag) goglog.Logger.Debugf("grok: no matches for %q", message) - return event, false + eventsOut = append(eventsOut, event) + return eventsOut, false } - return event, true + eventsOut = append(eventsOut, event) + return eventsOut, true } diff --git a/filter/json/filterjson.go b/filter/json/filterjson.go index 313e4602..46b17981 100644 --- a/filter/json/filterjson.go +++ b/filter/json/filterjson.go @@ -49,12 +49,14 @@ func InitHandler(ctx context.Context, raw *config.ConfigRaw) (config.TypeFilterC } // Event the main filter event -func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (logevent.LogEvent, bool) { +func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) ([]logevent.LogEvent, bool) { + eventsOut := make([]logevent.LogEvent, 0) var parsedMessage map[string]interface{} if err := jsoniter.Unmarshal([]byte(event.GetString(f.Source)), &parsedMessage); err != nil { event.AddTag(ErrorTag) goglog.Logger.Error(err) - return event, false + eventsOut = append(eventsOut, event) + return eventsOut, false } if f.Appendkey != "" { @@ -79,5 +81,6 @@ func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (loge } } - return event, true + eventsOut = append(eventsOut, event) + return eventsOut, true } diff --git a/filter/mutate/filtermutate.go b/filter/mutate/filtermutate.go index 6ca87f72..a676f76c 100644 --- a/filter/mutate/filtermutate.go +++ b/filter/mutate/filtermutate.go @@ -59,7 +59,8 @@ func InitHandler(ctx context.Context, raw *config.ConfigRaw) (config.TypeFilterC } // Event the main filter event -func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (logevent.LogEvent, bool) { +func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) ([]logevent.LogEvent, bool) { + eventsOut := make([]logevent.LogEvent, 0) if f.Split[0] != "" { event.SetValue(f.Split[0], strings.Split(event.GetString(f.Split[0]), f.Split[1])) } @@ -75,7 +76,8 @@ func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (loge event.Remove(f.Rename[0]) } // always return true here for configured filter - return event, true + eventsOut = append(eventsOut, event) + return eventsOut, true } func mergeField(event logevent.LogEvent, destinationName, source string) logevent.LogEvent { diff --git a/filter/ratelimit/filterratelimit.go b/filter/ratelimit/filterratelimit.go index 1eb5d0f7..78b585e1 100644 --- a/filter/ratelimit/filterratelimit.go +++ b/filter/ratelimit/filterratelimit.go @@ -70,15 +70,18 @@ func InitHandler(ctx context.Context, raw *config.ConfigRaw) (config.TypeFilterC } // Event the main filter event -func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (logevent.LogEvent, bool) { +func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) ([]logevent.LogEvent, bool) { + eventsOut := make([]logevent.LogEvent, 0) if event.Extra == nil { event.Extra = map[string]interface{}{} } if f.throttle == nil { - return event, false + eventsOut = append(eventsOut, event) + return eventsOut, false } <-f.throttle - return event, true + eventsOut = append(eventsOut, event) + return eventsOut, true } diff --git a/filter/removefield/filterremovefield.go b/filter/removefield/filterremovefield.go index 0a5cf99e..1a9cec84 100644 --- a/filter/removefield/filterremovefield.go +++ b/filter/removefield/filterremovefield.go @@ -49,7 +49,8 @@ func InitHandler(ctx context.Context, raw *config.ConfigRaw) (config.TypeFilterC } // Event the main filter event -func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (logevent.LogEvent, bool) { +func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) ([]logevent.LogEvent, bool) { + eventsOut := make([]logevent.LogEvent, 0) if event.Extra == nil { event.Extra = map[string]interface{}{} } @@ -63,5 +64,6 @@ func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (loge } // TODO: remove unset field return false - return event, true + eventsOut = append(eventsOut, event) + return eventsOut, true } diff --git a/filter/split/filtersplit.go b/filter/split/filtersplit.go new file mode 100644 index 00000000..ab8e2f34 --- /dev/null +++ b/filter/split/filtersplit.go @@ -0,0 +1,77 @@ +package filteraddfield + +import ( + "context" + + "github.com/tsaikd/gogstash/config" + "github.com/tsaikd/gogstash/config/logevent" +) + +// ModuleName is the name used in config file +const ModuleName = "split" + +// FilterConfig holds the configuration json fields and internal objects +type FilterConfig struct { + config.FilterConfig + Source string `json:"split_source"` + // Value string `json:"value"` +} + +// DefaultFilterConfig returns an FilterConfig struct with default values +func DefaultFilterConfig() FilterConfig { + return FilterConfig{ + FilterConfig: config.FilterConfig{ + CommonConfig: config.CommonConfig{ + Type: ModuleName, + }, + }, + } +} + +// InitHandler initialize the filter plugin +func InitHandler(ctx context.Context, raw *config.ConfigRaw) (config.TypeFilterConfig, error) { + conf := DefaultFilterConfig() + if err := config.ReflectConfig(raw, &conf); err != nil { + return nil, err + } + + return &conf, nil +} + +func CloneEvent(event logevent.LogEvent) logevent.LogEvent { + evt := logevent.LogEvent{ + Timestamp: event.Timestamp, + Tags: event.Tags, + Message: event.Message, + Extra: make(map[string]interface{}), + } + + // Copy from the original map to the target map + for key, value := range event.Extra { + evt.Extra[key] = value + } + + return evt +} + +// Event the main filter event +func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) ([]logevent.LogEvent, bool) { + + eventsOut := make([]logevent.LogEvent, 0) + if _, ok := event.Extra[f.Source]; !ok { + eventsOut = append(eventsOut, event) + event.AddTag("gogstash_filter_split_error") + return eventsOut, false + } + + splitItems, _ := event.GetValue(f.Source) + event.Remove(f.Source) + + for _, elem := range splitItems.([]interface{}) { + evt := CloneEvent(event) + evt.SetValue(f.Source, elem) + eventsOut = append(eventsOut, evt) + } + + return eventsOut, true +} diff --git a/filter/split/filtersplit_test.go b/filter/split/filtersplit_test.go new file mode 100644 index 00000000..b437577a --- /dev/null +++ b/filter/split/filtersplit_test.go @@ -0,0 +1,56 @@ +package filteraddfield + +import ( + "context" + "strings" + "testing" + "time" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/tsaikd/gogstash/config" + "github.com/tsaikd/gogstash/config/goglog" + "github.com/tsaikd/gogstash/config/logevent" +) + +func init() { + goglog.Logger.SetLevel(logrus.DebugLevel) + config.RegistFilterHandler(ModuleName, InitHandler) +} + +func Test_filter_add_field_module(t *testing.T) { + assert := assert.New(t) + assert.NotNil(assert) + require := require.New(t) + require.NotNil(require) + + ctx := context.Background() + conf, err := config.LoadFromYAML([]byte(strings.TrimSpace(` +debugch: true +filter: + - type: add_field + key: foo + value: bar + `))) + require.NoError(err) + require.NoError(conf.Start(ctx)) + + timestamp := time.Now() + expectedEvent := logevent.LogEvent{ + Timestamp: timestamp, + Message: "filter test message", + Extra: map[string]interface{}{ + "foo": "bar", + }, + } + + conf.TestInputEvent(logevent.LogEvent{ + Timestamp: timestamp, + Message: "filter test message", + }) + + if event, err := conf.TestGetOutputEvent(300 * time.Millisecond); assert.NoError(err) { + require.Equal(expectedEvent, event) + } +} diff --git a/filter/typeconv/filtertypeconv.go b/filter/typeconv/filtertypeconv.go index c3ee5181..f3d6d505 100644 --- a/filter/typeconv/filtertypeconv.go +++ b/filter/typeconv/filtertypeconv.go @@ -63,7 +63,8 @@ func InitHandler(ctx context.Context, raw *config.ConfigRaw) (config.TypeFilterC } // Event the main filter event -func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (logevent.LogEvent, bool) { +func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) ([]logevent.LogEvent, bool) { + eventsOut := make([]logevent.LogEvent, 0) for _, field := range f.Fields { if value, ok := event.GetValue(field); ok { switch f.ConvType { @@ -142,5 +143,6 @@ func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (loge } // TODO: no converts return false - return event, true + eventsOut = append(eventsOut, event) + return eventsOut, true } diff --git a/filter/urlparam/filterurlparam.go b/filter/urlparam/filterurlparam.go index 9e461647..961bc090 100644 --- a/filter/urlparam/filterurlparam.go +++ b/filter/urlparam/filterurlparam.go @@ -85,7 +85,8 @@ func InitHandler(ctx context.Context, raw *config.ConfigRaw) (config.TypeFilterC } // Event the main filter event -func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (logevent.LogEvent, bool) { +func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) ([]logevent.LogEvent, bool) { + eventsOut := make([]logevent.LogEvent, 0) message := event.GetString(f.Source) var ( u *url.URL @@ -101,7 +102,8 @@ func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (loge } if err != nil { goglog.Logger.Errorf("parse param failed, %s, %v", message, err) - return event, false + eventsOut = append(eventsOut, event) + return eventsOut, false } //url decode @@ -135,5 +137,6 @@ func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (loge } } } - return event, true + eventsOut = append(eventsOut, event) + return eventsOut, true } diff --git a/filter/useragent/filteruseragent.go b/filter/useragent/filteruseragent.go index 090b20d5..d0993cb2 100644 --- a/filter/useragent/filteruseragent.go +++ b/filter/useragent/filteruseragent.go @@ -108,10 +108,12 @@ func InitHandler(ctx context.Context, raw *config.ConfigRaw) (config.TypeFilterC } // Event the main filter event -func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (logevent.LogEvent, bool) { +func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) ([]logevent.LogEvent, bool) { + eventsOut := make([]logevent.LogEvent, 0) ua := event.GetString(f.Source) if ua == "" { - return event, false + eventsOut = append(eventsOut, event) + return eventsOut, false } var client *uaparser.Client // single-thread here @@ -148,5 +150,6 @@ func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (loge event.SetValue(f.fields.Patch, client.UserAgent.Patch) } } - return event, true + eventsOut = append(eventsOut, event) + return eventsOut, true } diff --git a/modloader/modloader.go b/modloader/modloader.go index b528f21e..3584cf28 100644 --- a/modloader/modloader.go +++ b/modloader/modloader.go @@ -13,6 +13,7 @@ import ( filtermutate "github.com/tsaikd/gogstash/filter/mutate" filterratelimit "github.com/tsaikd/gogstash/filter/ratelimit" filterremovefield "github.com/tsaikd/gogstash/filter/removefield" + filtersplit "github.com/tsaikd/gogstash/filter/split" filtertypeconv "github.com/tsaikd/gogstash/filter/typeconv" filterurlparam "github.com/tsaikd/gogstash/filter/urlparam" filteruseragent "github.com/tsaikd/gogstash/filter/useragent" @@ -70,6 +71,7 @@ func init() { config.RegistFilterHandler(filtertypeconv.ModuleName, filtertypeconv.InitHandler) config.RegistFilterHandler(filteruseragent.ModuleName, filteruseragent.InitHandler) config.RegistFilterHandler(filterurlparam.ModuleName, filterurlparam.InitHandler) + config.RegistFilterHandler(filtersplit.ModuleName, filtersplit.InitHandler) config.RegistOutputHandler(outputamqp.ModuleName, outputamqp.InitHandler) config.RegistOutputHandler(outputcond.ModuleName, outputcond.InitHandler)