Skip to content

Commit

Permalink
multi event output filters and array split filter
Browse files Browse the repository at this point in the history
  • Loading branch information
f4t committed Dec 30, 2020
1 parent d461391 commit 677eeae
Show file tree
Hide file tree
Showing 17 changed files with 245 additions and 50 deletions.
24 changes: 19 additions & 5 deletions config/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var (
// TypeFilterConfig is interface of filter module
type TypeFilterConfig interface {
TypeCommonConfig
Event(context.Context, logevent.LogEvent) (logevent.LogEvent, bool)
Event(context.Context, logevent.LogEvent) ([]logevent.LogEvent, bool)
CommonFilter(context.Context, logevent.LogEvent) logevent.LogEvent
}

Expand All @@ -25,6 +25,7 @@ func (f *FilterConfig) IsConfigured() bool {
return len(f.AddTags) != 0 || len(f.AddFields) != 0 || len(f.RemoveTags) != 0 || len(f.RemoveFields) != 0
}

// CommonFilter applies common inline filters such as add/remove fields/tags
func (f *FilterConfig) CommonFilter(ctx context.Context, event logevent.LogEvent) logevent.LogEvent {

event.AddTag(f.AddTags...)
Expand Down Expand Up @@ -102,13 +103,26 @@ func (t *Config) startFilters() (err error) {
}
case event := <-t.chInFilter:
var ok bool
events := make([]logevent.LogEvent, 0)
events = append(events, event)
for _, filter := range filters {
event, ok = filter.Event(t.ctx, event)
if ok {
event = filter.CommonFilter(t.ctx, event)
eventsOut := make([]logevent.LogEvent, 0)
for _, event := range events {
var filteredEvents []logevent.LogEvent
filteredEvents, ok = filter.Event(t.ctx, event)
if ok {
for i, evt := range filteredEvents {
filteredEvents[i] = filter.CommonFilter(t.ctx, evt)
}
}
eventsOut = append(eventsOut, filteredEvents...)
}
events = eventsOut
}

for _, evt := range events {
t.chFilterOut <- evt
}
t.chFilterOut <- event
}
}
})
Expand Down
10 changes: 7 additions & 3 deletions filter/addfield/filteraddfield.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,14 @@ func InitHandler(ctx context.Context, raw *config.ConfigRaw) (config.TypeFilterC
}

// Event the main filter event
func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (logevent.LogEvent, bool) {
func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) ([]logevent.LogEvent, bool) {
eventsOut := make([]logevent.LogEvent, 0)
if _, ok := event.Extra[f.Key]; ok {
return event, false
eventsOut = append(eventsOut, event)
return eventsOut, false
}

event.SetValue(f.Key, event.Format(f.Value))
return event, true
eventsOut = append(eventsOut, event)
return eventsOut, true
}
24 changes: 16 additions & 8 deletions filter/cond/filtercond.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,34 +137,42 @@ func InitHandler(ctx context.Context, raw *config.ConfigRaw) (config.TypeFilterC
}

// Event the main filter event
func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (logevent.LogEvent, bool) {
func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) ([]logevent.LogEvent, bool) {
eventsOut := make([]logevent.LogEvent, 0)
if f.expression != nil {
ep := EventParameters{Event: &event}
ret, err := f.expression.Eval(&ep)
if err != nil {
goglog.Logger.Error(err)
event.AddTag(ErrorTag)
return event, false
eventsOut = append(eventsOut, event)
return eventsOut, false
}
if r, ok := ret.(bool); ok {
if r {
for _, filter := range f.filters {
event, ok = filter.Event(ctx, event)
eventsOut, ok = filter.Event(ctx, event)
if ok {
event = filter.CommonFilter(ctx, event)
for _, evt := range eventsOut {
evt = filter.CommonFilter(ctx, evt)
}
}
}
} else {
for _, filter := range f.elseFilters {
event, ok = filter.Event(ctx, event)
eventsOut, ok = filter.Event(ctx, event)
if ok {
event = filter.CommonFilter(ctx, event)
for _, evt := range eventsOut {
evt = filter.CommonFilter(ctx, evt)
}
}
}
}
return event, true
eventsOut = append(eventsOut, event)
return eventsOut, true
}
goglog.Logger.Warn("filter cond condition returns not a boolean, ignored")
}
return event, false
eventsOut = append(eventsOut, event)
return eventsOut, false
}
9 changes: 6 additions & 3 deletions filter/date/filterdate.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ func InitHandler(ctx context.Context, raw *config.ConfigRaw) (config.TypeFilterC
}

// Event the main filter event
func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (logevent.LogEvent, bool) {
func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) ([]logevent.LogEvent, bool) {
eventsOut := make([]logevent.LogEvent, 0)
var (
timestamp time.Time
err error
Expand Down Expand Up @@ -95,15 +96,17 @@ func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (loge
if err != nil {
event.AddTag(ErrorTag)
goglog.Logger.Error(err)
return event, false
eventsOut = append(eventsOut, event)
return eventsOut, false
}
if f.Target == DefaultTarget {
event.Timestamp = timestamp.UTC()
} else {
event.SetValue(f.Target, timestamp.UTC())
}

return event, true
eventsOut = append(eventsOut, event)
return eventsOut, true
}

func convertFloat(value float64) (int64, int64) {
Expand Down
21 changes: 14 additions & 7 deletions filter/geoip2/filtergeoip2.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,19 @@ func InitHandler(ctx context.Context, raw *config.ConfigRaw) (config.TypeFilterC
}

// Event the main filter event
func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (logevent.LogEvent, bool) {
func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) ([]logevent.LogEvent, bool) {
eventsOut := make([]logevent.LogEvent, 0)
ipstr := event.GetString(f.IPField)
if ipstr == "" {
// Passthru if empty
return event, false
eventsOut = append(eventsOut, event)
return eventsOut, false
}
ip := net.ParseIP(ipstr)
if f.SkipPrivate && f.privateIP(ip) {
// Passthru
return event, false
eventsOut = append(eventsOut, event)
return eventsOut, false
}
var err error
var record *geoip2.City
Expand All @@ -107,17 +110,20 @@ func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (loge
goglog.Logger.Error(err)
}
event.AddTag(ErrorTag)
return event, false
eventsOut = append(eventsOut, event)
return eventsOut, false
}
f.cache.Add(ipstr, record)
}
if record == nil {
event.AddTag(ErrorTag)
return event, false
eventsOut = append(eventsOut, event)
return eventsOut, false
}
if record.Location.Latitude == 0 && record.Location.Longitude == 0 {
event.AddTag(ErrorTag)
return event, false
eventsOut = append(eventsOut, event)
return eventsOut, false
}

if f.FlatFormat {
Expand Down Expand Up @@ -172,7 +178,8 @@ func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (loge
event.SetValue(f.Key, m)
}

return event, true
eventsOut = append(eventsOut, event)
return eventsOut, true
}

func (f *FilterConfig) privateIP(ip net.IP) bool {
Expand Down
9 changes: 6 additions & 3 deletions filter/gonx/filtergonx.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,23 @@ func InitHandler(ctx context.Context, raw *config.ConfigRaw) (config.TypeFilterC
}

// Event the main filter event
func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (logevent.LogEvent, bool) {
func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) ([]logevent.LogEvent, bool) {
eventsOut := make([]logevent.LogEvent, 0)
message := event.GetString(f.Source)
reader := gonx.NewParserReader(strings.NewReader(message), f.parser)
entry, err := reader.Read()
if err != nil {
event.AddTag(ErrorTag)
goglog.Logger.Errorf("%s: %q", err, message)
return event, false
eventsOut = append(eventsOut, event)
return eventsOut, false
}

for _, field := range f.fields {
s, _ := entry.Field(field)
event.SetValue(field, s)
}

return event, true
eventsOut = append(eventsOut, event)
return eventsOut, true
}
9 changes: 6 additions & 3 deletions filter/grok/filtergrok.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ func InitHandler(ctx context.Context, raw *config.ConfigRaw) (config.TypeFilterC
}

// Event the main filter event
func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (logevent.LogEvent, bool) {
func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) ([]logevent.LogEvent, bool) {
eventsOut := make([]logevent.LogEvent, 0)
message := event.GetString(f.Source)
found := false
for _, thisMatch := range f.Match {
Expand All @@ -105,8 +106,10 @@ func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (loge
if !found {
event.AddTag(ErrorTag)
goglog.Logger.Debugf("grok: no matches for %q", message)
return event, false
eventsOut = append(eventsOut, event)
return eventsOut, false
}

return event, true
eventsOut = append(eventsOut, event)
return eventsOut, true
}
9 changes: 6 additions & 3 deletions filter/json/filterjson.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,14 @@ func InitHandler(ctx context.Context, raw *config.ConfigRaw) (config.TypeFilterC
}

// Event the main filter event
func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (logevent.LogEvent, bool) {
func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) ([]logevent.LogEvent, bool) {
eventsOut := make([]logevent.LogEvent, 0)
var parsedMessage map[string]interface{}
if err := jsoniter.Unmarshal([]byte(event.GetString(f.Source)), &parsedMessage); err != nil {
event.AddTag(ErrorTag)
goglog.Logger.Error(err)
return event, false
eventsOut = append(eventsOut, event)
return eventsOut, false
}

if f.Appendkey != "" {
Expand All @@ -79,5 +81,6 @@ func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (loge
}
}

return event, true
eventsOut = append(eventsOut, event)
return eventsOut, true
}
6 changes: 4 additions & 2 deletions filter/mutate/filtermutate.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ func InitHandler(ctx context.Context, raw *config.ConfigRaw) (config.TypeFilterC
}

// Event the main filter event
func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (logevent.LogEvent, bool) {
func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) ([]logevent.LogEvent, bool) {
eventsOut := make([]logevent.LogEvent, 0)
if f.Split[0] != "" {
event.SetValue(f.Split[0], strings.Split(event.GetString(f.Split[0]), f.Split[1]))
}
Expand All @@ -75,7 +76,8 @@ func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (loge
event.Remove(f.Rename[0])
}
// always return true here for configured filter
return event, true
eventsOut = append(eventsOut, event)
return eventsOut, true
}

func mergeField(event logevent.LogEvent, destinationName, source string) logevent.LogEvent {
Expand Down
9 changes: 6 additions & 3 deletions filter/ratelimit/filterratelimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,18 @@ func InitHandler(ctx context.Context, raw *config.ConfigRaw) (config.TypeFilterC
}

// Event the main filter event
func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (logevent.LogEvent, bool) {
func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) ([]logevent.LogEvent, bool) {
eventsOut := make([]logevent.LogEvent, 0)
if event.Extra == nil {
event.Extra = map[string]interface{}{}
}

if f.throttle == nil {
return event, false
eventsOut = append(eventsOut, event)
return eventsOut, false
}

<-f.throttle
return event, true
eventsOut = append(eventsOut, event)
return eventsOut, true
}
6 changes: 4 additions & 2 deletions filter/removefield/filterremovefield.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ func InitHandler(ctx context.Context, raw *config.ConfigRaw) (config.TypeFilterC
}

// Event the main filter event
func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (logevent.LogEvent, bool) {
func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) ([]logevent.LogEvent, bool) {
eventsOut := make([]logevent.LogEvent, 0)
if event.Extra == nil {
event.Extra = map[string]interface{}{}
}
Expand All @@ -63,5 +64,6 @@ func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (loge
}

// TODO: remove unset field return false
return event, true
eventsOut = append(eventsOut, event)
return eventsOut, true
}
Loading

0 comments on commit 677eeae

Please sign in to comment.