Skip to content

Commit

Permalink
Merge pull request #99 from nokia/split-events
Browse files Browse the repository at this point in the history
outputs: allow splitting a list of events
  • Loading branch information
karimra authored Apr 17, 2023
2 parents 978378d + cee33da commit 523bd90
Show file tree
Hide file tree
Showing 13 changed files with 210 additions and 117 deletions.
3 changes: 3 additions & 0 deletions docs/user_guide/outputs/file_output.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ outputs:
# which will set the target to the value configured under `subscription.$subscription-name.target` if any,
# otherwise it will set it to the target name stripped of the port number (if present)
target-template:
# boolean, valid only if format is `event`.
# if true, arrays of events are split and marshaled as JSON objects instead of an array of dicts.
split-events: false
# string, a GoTemplate that is executed using the received gNMI message as input.
# the template execution is the last step before the data is written to the file,
# First the received message is formatted according to the `format` field above, then the `event-processors` are applied if any
Expand Down
3 changes: 3 additions & 0 deletions docs/user_guide/outputs/jetstream_output.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ outputs:
# which will set the target to the value configured under `subscription.$subscription-name.target` if any,
# otherwise it will set it to the target name stripped of the port number (if present)
target-template:
# boolean, valid only if format is `event`.
# if true, arrays of events are split and marshaled as JSON objects instead of an array of dicts.
split-events: false
# string, a GoTemplate that is executed using the received gNMI message as input.
# the template execution is the last step before the data is written to the file.
# First the received message is formatted according to the `format` field above, then the `event-processors` are applied if any
Expand Down
3 changes: 3 additions & 0 deletions docs/user_guide/outputs/kafka_output.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ outputs:
# which will set the target to the value configured under `subscription.$subscription-name.target` if any,
# otherwise it will set it to the target name stripped of the port number (if present)
target-template:
# boolean, valid only if format is `event`.
# if true, arrays of events are split and marshaled as JSON objects instead of an array of dicts.
split-events: false
# string, a GoTemplate that is executed using the received gNMI message as input.
# the template execution is the last step before the data is written to the file,
# First the received message is formatted according to the `format` field above, then the `event-processors` are applied if any
Expand Down
3 changes: 3 additions & 0 deletions docs/user_guide/outputs/nats_output.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ outputs:
# which will set the target to the value configured under `subscription.$subscription-name.target` if any,
# otherwise it will set it to the target name stripped of the port number (if present)
target-template:
# boolean, valid only if format is `event`.
# if true, arrays of events are split and marshaled as JSON objects instead of an array of dicts.
split-events: false
# string, a GoTemplate that is executed using the received gNMI message as input.
# the template execution is the last step before the data is written to the file,
# First the received message is formatted according to the `format` field above, then the `event-processors` are applied if any
Expand Down
3 changes: 3 additions & 0 deletions docs/user_guide/outputs/tcp_output.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ outputs:
# which will set the target to the value configured under `subscription.$subscription-name.target` if any,
# otherwise it will set it to the target name stripped of the port number (if present)
target-template:
# boolean, valid only if format is `event`.
# if true, arrays of events are split and marshaled as JSON objects instead of an array of dicts.
split-events: false
# boolean, if true the message timestamp is changed to current time
override-timestamps: false
# string, a delimiter to be sent after each message.
Expand Down
3 changes: 3 additions & 0 deletions docs/user_guide/outputs/udp_output.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ outputs:
# which will set the target to the value configured under `subscription.$subscription-name.target` if any,
# otherwise it will set it to the target name stripped of the port number (if present)
target-template:
# boolean, valid only if format is `event`.
# if true, arrays of events are split and marshaled as JSON objects instead of an array of dicts.
split-events: false
# boolean, if true the message timestamp is changed to current time
override-timestamps: false
# time duration to wait before re-dial in case there is a failure
Expand Down
38 changes: 20 additions & 18 deletions outputs/file/file_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type Config struct {
Multiline bool `mapstructure:"multiline,omitempty"`
Indent string `mapstructure:"indent,omitempty"`
Separator string `mapstructure:"separator,omitempty"`
SplitEvents bool `mapstructure:"split-events,omitempty"`
OverrideTimestamps bool `mapstructure:"override-timestamps,omitempty"`
AddTarget string `mapstructure:"add-target,omitempty"`
TargetTemplate string `mapstructure:"target-template,omitempty"`
Expand Down Expand Up @@ -228,39 +229,40 @@ func (f *File) Write(ctx context.Context, rsp proto.Message, meta outputs.Meta)
if err != nil {
f.logger.Printf("failed to add target to the response: %v", err)
}

b, err := f.mo.Marshal(rsp, meta, f.evps...)
bb, err := outputs.Marshal(rsp, meta, f.mo, f.Cfg.SplitEvents, f.evps...)
if err != nil {
if f.Cfg.Debug {
f.logger.Printf("failed marshaling proto msg: %v", err)
}
numberOfFailWriteMsgs.WithLabelValues(f.file.Name(), "marshal_error").Inc()
return
}
if len(b) == 0 {
if len(bb) == 0 {
return
}
if f.msgTpl != nil {
b, err = outputs.ExecTemplate(b, f.msgTpl)
for _, b := range bb {
if f.msgTpl != nil {
b, err = outputs.ExecTemplate(b, f.msgTpl)
if err != nil {
if f.Cfg.Debug {
log.Printf("failed to execute template: %v", err)
}
numberOfFailWriteMsgs.WithLabelValues(f.file.Name(), "template_error").Inc()
continue
}
}

n, err := f.file.Write(append(b, []byte(f.Cfg.Separator)...))
if err != nil {
if f.Cfg.Debug {
log.Printf("failed to execute template: %v", err)
f.logger.Printf("failed to write to file '%s': %v", f.file.Name(), err)
}
numberOfFailWriteMsgs.WithLabelValues(f.file.Name(), "template_error").Inc()
numberOfFailWriteMsgs.WithLabelValues(f.file.Name(), "write_error").Inc()
return
}
numberOfWrittenBytes.WithLabelValues(f.file.Name()).Add(float64(n))
numberOfWrittenMsgs.WithLabelValues(f.file.Name()).Inc()
}

n, err := f.file.Write(append(b, []byte(f.Cfg.Separator)...))
if err != nil {
if f.Cfg.Debug {
f.logger.Printf("failed to write to file '%s': %v", f.file.Name(), err)
}
numberOfFailWriteMsgs.WithLabelValues(f.file.Name(), "write_error").Inc()
return
}
numberOfWrittenBytes.WithLabelValues(f.file.Name()).Add(float64(n))
numberOfWrittenMsgs.WithLabelValues(f.file.Name()).Inc()
}

func (f *File) WriteEvent(ctx context.Context, ev *formatters.EventMsg) {}
Expand Down
71 changes: 37 additions & 34 deletions outputs/kafka_output/kafka_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type config struct {
AddTarget string `mapstructure:"add-target,omitempty"`
TargetTemplate string `mapstructure:"target-template,omitempty"`
MsgTemplate string `mapstructure:"msg-template,omitempty"`
SplitEvents bool `mapstructure:"split-events,omitempty"`
NumWorkers int `mapstructure:"num-workers,omitempty"`
Debug bool `mapstructure:"debug,omitempty"`
BufferSize int `mapstructure:"buffer-size,omitempty"`
Expand Down Expand Up @@ -308,7 +309,7 @@ CRPROD:
if err != nil {
k.logger.Printf("failed to add target to the response: %v", err)
}
b, err := k.mo.Marshal(pmsg, m.GetMeta(), k.evps...)
bb, err := outputs.Marshal(pmsg, m.GetMeta(), k.mo, k.Cfg.SplitEvents, k.evps...)
if err != nil {
if k.Cfg.Debug {
k.logger.Printf("%s failed marshaling proto msg: %v", workerLogPrefix, err)
Expand All @@ -318,47 +319,49 @@ CRPROD:
}
continue
}
if len(b) == 0 {
if len(bb) == 0 {
continue
}
if k.msgTpl != nil {
b, err = outputs.ExecTemplate(b, k.msgTpl)
if err != nil {
if k.Cfg.Debug {
log.Printf("failed to execute template: %v", err)
for _, b := range bb {
if k.msgTpl != nil {
b, err = outputs.ExecTemplate(b, k.msgTpl)
if err != nil {
if k.Cfg.Debug {
log.Printf("failed to execute template: %v", err)
}
kafkaNumberOfFailSendMsgs.WithLabelValues(config.ClientID, "template_error").Inc()
continue
}
kafkaNumberOfFailSendMsgs.WithLabelValues(config.ClientID, "template_error").Inc()
return
}
}

msg := &sarama.ProducerMessage{
Topic: k.Cfg.Topic,
Value: sarama.ByteEncoder(b),
}
if k.Cfg.InsertKey {
msg.Key = sarama.ByteEncoder(k.partitionKey(m.GetMeta()))
}
var start time.Time
if k.Cfg.EnableMetrics {
start = time.Now()
}
_, _, err = producer.SendMessage(msg)
if err != nil {
if k.Cfg.Debug {
k.logger.Printf("%s failed to send a kafka msg to topic '%s': %v", workerLogPrefix, k.Cfg.Topic, err)
msg := &sarama.ProducerMessage{
Topic: k.Cfg.Topic,
Value: sarama.ByteEncoder(b),
}
if k.Cfg.InsertKey {
msg.Key = sarama.ByteEncoder(k.partitionKey(m.GetMeta()))
}
var start time.Time
if k.Cfg.EnableMetrics {
kafkaNumberOfFailSendMsgs.WithLabelValues(config.ClientID, "send_error").Inc()
start = time.Now()
}
_, _, err = producer.SendMessage(msg)
if err != nil {
if k.Cfg.Debug {
k.logger.Printf("%s failed to send a kafka msg to topic '%s': %v", workerLogPrefix, k.Cfg.Topic, err)
}
if k.Cfg.EnableMetrics {
kafkaNumberOfFailSendMsgs.WithLabelValues(config.ClientID, "send_error").Inc()
}
producer.Close()
time.Sleep(k.Cfg.RecoveryWaitTime)
goto CRPROD
}
if k.Cfg.EnableMetrics {
kafkaSendDuration.WithLabelValues(config.ClientID).Set(float64(time.Since(start).Nanoseconds()))
kafkaNumberOfSentMsgs.WithLabelValues(config.ClientID).Inc()
kafkaNumberOfSentBytes.WithLabelValues(config.ClientID).Add(float64(len(b)))
}
producer.Close()
time.Sleep(k.Cfg.RecoveryWaitTime)
goto CRPROD
}
if k.Cfg.EnableMetrics {
kafkaSendDuration.WithLabelValues(config.ClientID).Set(float64(time.Since(start).Nanoseconds()))
kafkaNumberOfSentMsgs.WithLabelValues(config.ClientID).Inc()
kafkaNumberOfSentBytes.WithLabelValues(config.ClientID).Add(float64(len(b)))
}
}
}
Expand Down
69 changes: 36 additions & 33 deletions outputs/nats_outputs/jetstream/jetstream_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type config struct {
ConnectTimeWait time.Duration `mapstructure:"connect-time-wait,omitempty" json:"connect-time-wait,omitempty"`
TLS *types.TLSConfig `mapstructure:"tls,omitempty" json:"tls,omitempty"`
Format string `mapstructure:"format,omitempty" json:"format,omitempty"`
SplitEvents bool `mapstructure:"split-events,omitempty"`
AddTarget string `mapstructure:"add-target,omitempty" json:"add-target,omitempty"`
TargetTemplate string `mapstructure:"target-template,omitempty" json:"target-template,omitempty"`
MsgTemplate string `mapstructure:"msg-template,omitempty" json:"msg-template,omitempty"`
Expand Down Expand Up @@ -397,7 +398,7 @@ CRCONN:
}
}
for _, r := range rs {
b, err := n.mo.Marshal(r, m.GetMeta(), n.evps...)
bb, err := outputs.Marshal(r, m.GetMeta(), n.mo, n.Cfg.SplitEvents, n.evps...)
if err != nil {
if n.Cfg.Debug {
n.logger.Printf("%s failed marshaling proto msg: %v", workerLogPrefix, err)
Expand All @@ -407,50 +408,52 @@ CRCONN:
}
continue
}
if len(b) == 0 {
if len(bb) == 0 {
continue
}
if n.msgTpl != nil {
b, err = outputs.ExecTemplate(b, n.msgTpl)
for _, b := range bb {
if n.msgTpl != nil {
b, err = outputs.ExecTemplate(b, n.msgTpl)
if err != nil {
if n.Cfg.Debug {
log.Printf("failed to execute template: %v", err)
}
jetStreamNumberOfFailSendMsgs.WithLabelValues(cfg.Name, "template_error").Inc()
continue
}
}

subject, err = n.subjectName(r, m.GetMeta())
if err != nil {
if n.Cfg.Debug {
log.Printf("failed to execute template: %v", err)
n.logger.Printf("%s failed to get subject name: %v", workerLogPrefix, err)
}
if n.Cfg.EnableMetrics {
jetStreamNumberOfFailSendMsgs.WithLabelValues(cfg.Name, "subject_name_error").Inc()
}
jetStreamNumberOfFailSendMsgs.WithLabelValues(cfg.Name, "template_error").Inc()
continue
}
}

subject, err = n.subjectName(r, m.GetMeta())
if err != nil {
if n.Cfg.Debug {
n.logger.Printf("%s failed to get subject name: %v", workerLogPrefix, err)
}
var start time.Time
if n.Cfg.EnableMetrics {
jetStreamNumberOfFailSendMsgs.WithLabelValues(cfg.Name, "subject_name_error").Inc()
start = time.Now()
}
continue
}
var start time.Time
if n.Cfg.EnableMetrics {
start = time.Now()
}
_, err = js.Publish(subject, b)
if err != nil {
if n.Cfg.Debug {
n.logger.Printf("%s failed to write to subject '%s': %v", workerLogPrefix, subject, err)
_, err = js.Publish(subject, b)
if err != nil {
if n.Cfg.Debug {
n.logger.Printf("%s failed to write to subject '%s': %v", workerLogPrefix, subject, err)
}
if n.Cfg.EnableMetrics {
jetStreamNumberOfFailSendMsgs.WithLabelValues(cfg.Name, "publish_error").Inc()
}
natsConn.Close()
time.Sleep(cfg.ConnectTimeWait)
goto CRCONN
}
if n.Cfg.EnableMetrics {
jetStreamNumberOfFailSendMsgs.WithLabelValues(cfg.Name, "publish_error").Inc()
jetStreamSendDuration.WithLabelValues(cfg.Name).Set(float64(time.Since(start).Nanoseconds()))
jetStreamNumberOfSentMsgs.WithLabelValues(cfg.Name, subject).Inc()
jetStreamNumberOfSentBytes.WithLabelValues(cfg.Name, subject).Add(float64(len(b)))
}
natsConn.Close()
time.Sleep(cfg.ConnectTimeWait)
goto CRCONN
}
if n.Cfg.EnableMetrics {
jetStreamSendDuration.WithLabelValues(cfg.Name).Set(float64(time.Since(start).Nanoseconds()))
jetStreamNumberOfSentMsgs.WithLabelValues(cfg.Name, subject).Inc()
jetStreamNumberOfSentBytes.WithLabelValues(cfg.Name, subject).Add(float64(len(b)))
}
}
}
Expand Down
Loading

0 comments on commit 523bd90

Please sign in to comment.