Skip to content

Commit

Permalink
Input/backend followup (#480)
Browse files Browse the repository at this point in the history
* fix validator

* Make new stuff better, fix little bug in json logging middleware, etc

* Version bump

* Update good test, scrap junk sink test for now
  • Loading branch information
jakthom authored Mar 3, 2023
1 parent d910ed1 commit 39a2615
Show file tree
Hide file tree
Showing 32 changed files with 255 additions and 258 deletions.
2 changes: 1 addition & 1 deletion .VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v0.13.0
v0.13.1
20 changes: 16 additions & 4 deletions cmd/buz/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,10 @@ func (a *App) initializeInputs() {
&snowplow.SnowplowInput{},
}
for _, i := range inputs {
i.Initialize(a.engine, &a.manifold, a.config, a.collectorMeta)
err := i.Initialize(a.engine, &a.manifold, a.config, a.collectorMeta)
if err != nil {
log.Fatal().Err(err).Msg("failed to initialize input")
}
}
}

Expand All @@ -197,7 +200,10 @@ func (a *App) serverlessMode() {
if err != nil {
log.Fatal().Err(err)
}
a.manifold.Shutdown()
err = a.manifold.Shutdown()
if err != nil {
log.Error().Err(err).Msg("manifold failed to shut down safely")
}
}

func (a *App) standardMode() {
Expand All @@ -220,10 +226,16 @@ func (a *App) standardMode() {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
if err := srv.Shutdown(ctx); err != nil {
a.manifold.Shutdown()
err := a.manifold.Shutdown()
if err != nil {
log.Error().Err(err).Msg("manifold failed to shut down safely")
}
log.Fatal().Stack().Err(err).Msg("server forced to shutdown")
}
a.manifold.Shutdown()
err := a.manifold.Shutdown()
if err != nil {
log.Error().Err(err).Msg("manifold failed to shut down safely")
}
tele.Sis(a.collectorMeta)
}

Expand Down
4 changes: 2 additions & 2 deletions deploy/terraform/aws/app_runner/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ variable "buz_domain" {
}

variable "buz_version" {
description = "The version of Buz to run. \n\nExample: v0.11.15"
description = "The version of Buz to run. \n\nExample: v0.13.1"
type = string
default = "v0.13.0"
default = "v0.13.1"
}

variable "buz_service_container_concurrency" {
Expand Down
4 changes: 2 additions & 2 deletions deploy/terraform/aws/lambda/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ variable "buz_domain" {
}

variable "buz_version" {
description = "The version of Buz to run. \n\nExample: v0.13.0"
description = "The version of Buz to run. \n\nExample: v0.13.1"
type = string
default = "v0.13.0"
default = "v0.13.1"
}

variable "buz_lambda_memory_limit" {
Expand Down
4 changes: 2 additions & 2 deletions deploy/terraform/gcp/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ variable "buz_domain" {
}

variable "buz_version" {
description = "The version of Buz to run. \n\nExample: v0.13.0"
description = "The version of Buz to run. \n\nExample: v0.13.1"
type = string
default = "v0.13.0"
default = "v0.13.1"
}

variable "buz_mode" {
Expand Down
81 changes: 80 additions & 1 deletion examples/devel/buz/simple.conf.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ app:
name: buz-bootstrap
env: development
port: 8080
trackerDomain: bootstrap.silverton.io
trackerDomain: bootstrap.buz.dev
enableConfigRoute: true

middleware:
Expand Down Expand Up @@ -91,6 +91,85 @@ sinks:
- name: easyfeedback
type: stdout
deliveryRequired: true
- name: blackhole
type: blackhole
deliveryRequired: true
# - name: local
# type: file
# deliveryRequired: true
# - name: web
# type: http
# deliveryRequired: true
# url: https://SOMEURL/squawk
# - name: pg1
# type: postgres
# deliveryRequired: true
# hosts:
# - 127.0.0.1
# port: 5432
# database: buz
# user: buz
# password: buz
# - name: easyfeedback
# type: stdout
# deliveryRequired: true
# - name: productanalytics
# type: amplitude
# deliveryRequired: true
# region: standard
# apiKey: SOMEAPIKEY
# - name: msql1
# type: mysql
# deliveryRequired: true
# hosts:
# - 127.0.0.1
# port: 3306
# database: buz
# user: buz
# password: buz
# - name: mongo
# type: mongodb
# deliveryRequired: true
# hosts:
# - 127.0.0.1
# port: 27017
# database: buz
# user: buz
# password: buz
# - name: elastic
# type: elasticsearch
# deliveryRequired: true
# hosts:
# - "http://127.0.0.1:9200"
# user: elastic
# password: elastic
# - name: broker
# type: nats
# deliveryRequired: true
# hosts:
# - 127.0.0.1
# user: buz
# password: buz
# - name: kafka
# type: kafka
# deliveryRequired: true
# brokers:
# - 127.0.0.1:9092
# - name: redpanda
# type: redpanda
# deliveryRequired: true
# brokers:
# - 127.0.0.1:9092
# - name: pubsub
# type: pubsub
# deliveryRequired: true
# project: YOURPROJECT
# - name: kinesis
# type: kinesis
# deliveryRequired: true
# - name: firehose
# type: kinesis-firehose
# deliveryRequired: true

squawkBox:
enabled: true
Expand Down
2 changes: 1 addition & 1 deletion examples/quickstart/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ x-dependency:
services:
buz:
container_name: buz
image: ghcr.io/silverton-io/buz:v0.13.0
image: ghcr.io/silverton-io/buz:v0.13.1
volumes:
- type: bind
source: ./buz/quickstart.conf.yml
Expand Down
46 changes: 18 additions & 28 deletions pkg/annotator/annotator.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2022 Silverton Data, Inc.
// Copyright (c) 2023 Silverton Data, Inc.
// You may use, distribute, and modify this code under the terms of the Apache-2.0 license, a copy of
// which may be found at https://github.com/silverton-io/buz/blob/main/LICENSE

Expand Down Expand Up @@ -35,37 +35,27 @@ func Annotate(envelopes []envelope.Envelope, registry *registry.Registry) []enve
// NOTE - this has the potential to be confusing in the case that
// schema-level validation is disabled.
// Payload validation is still executed in that case but the outcome is disregarded.
schemaExists, validationError, schemaContents := validator.GetSchemaContents(envelope.EventMeta.Schema, registry)
if !schemaExists {
// If schema doesn't exist
// consider the payload invalid as could have disabled validation if wanted
valid := false
isValid, validationError, schemaContents := validator.ValidatePayload(envelope.EventMeta.Schema, envelope.Payload, registry)
m := getMetadataFromSchema(schemaContents)
if m.Namespace != "" {
envelope.EventMeta.Namespace = m.Namespace
}
envelope.EventMeta.Vendor = m.Vendor
envelope.EventMeta.Version = m.Version
envelope.EventMeta.Format = m.Format
if m.DisableValidation {
// If schema-level validation is disabled
// consider the payload valid.
valid := true
envelope.Validation.IsValid = &valid
envelope.Validation.Error = &validationError
envelope.EventMeta.DisableValidation = m.DisableValidation
} else {
m := getMetadataFromSchema(schemaContents)
if m.Namespace != "" {
envelope.EventMeta.Namespace = m.Namespace
}
envelope.EventMeta.Vendor = m.Vendor
envelope.EventMeta.Version = m.Version
envelope.EventMeta.Format = m.Format
if m.DisableValidation {
// If schema-level validation is disabled
// consider the payload valid.
valid := true
envelope.Validation.IsValid = &valid
envelope.EventMeta.DisableValidation = m.DisableValidation
} else {
isValid, validationError := validator.ValidatePayload(schemaContents, envelope.Payload)
envelope.Validation.IsValid = &isValid
if !isValid {
// Annotate the envelope with associated validation errors
envelope.Validation.Error = &validationError
}
envelope.Validation.IsValid = &isValid
if !isValid {
// Annotate the envelope with associated validation errors
envelope.Validation.Error = &validationError
}
}

e = append(e, envelope)
}
return e
Expand Down
1 change: 0 additions & 1 deletion pkg/backend/amplitude/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ func (s *Sink) Initialize(conf config.Sink) error {
return err
}
s.endpoint, s.apiKey = *endpoint, conf.ApiKey
s.StartWorker()
return nil
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/backend/backendutils/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"

"github.com/google/uuid"
"github.com/rs/zerolog/log"
"github.com/silverton-io/buz/pkg/config"
"github.com/silverton-io/buz/pkg/envelope"
)
Expand Down Expand Up @@ -35,7 +36,10 @@ func StartSinkWorker(input <-chan []envelope.Envelope, shutdown <-chan int, sink
select {
case envelopes := <-input:
ctx := context.Background()
sink.Dequeue(ctx, envelopes)
err := sink.Dequeue(ctx, envelopes)
if err != nil {
log.Error().Err(err).Interface("metadata", sink.Metadata()).Msg("could not dequeue")
}
case <-shutdown:
return
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/backend/blackhole/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (s *Sink) Metadata() backendutils.SinkMetadata {

func (s *Sink) Initialize(conf config.Sink) error {
id := uuid.New()
s.id, s.name, s.sinkType = &id, conf.Name, conf.Type
s.id, s.sinkType, s.name = &id, conf.Type, conf.Name
s.deliveryRequired, s.fanout = conf.DeliveryRequired, conf.Fanout
return nil
}
Expand All @@ -47,7 +47,10 @@ func (s *Sink) Enqueue(envelopes []envelope.Envelope) error {
log.Debug().Interface("metadata", s.Metadata()).Msg("enqueueing envelopes")
// This is a blackhole. It does nothing but dequeue
ctx := context.Background()
s.Dequeue(ctx, envelopes)
err := s.Dequeue(ctx, envelopes)
if err != nil {
log.Error().Err(err).Interface("metadata", s.Metadata()).Msg("could not dequeue")
}
return nil
}

Expand Down
1 change: 0 additions & 1 deletion pkg/backend/elasticsearch/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ func (s *Sink) Initialize(conf config.Sink) error {
s.client, s.defaultEventsIndex = es, constants.BUZ_EVENTS
s.input = make(chan []envelope.Envelope, 10000)
s.shutdown = make(chan int, 1)
s.StartWorker()
return nil
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/backend/file/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/rs/zerolog/log"
"github.com/silverton-io/buz/pkg/backend/backendutils"
"github.com/silverton-io/buz/pkg/config"
"github.com/silverton-io/buz/pkg/constants"
"github.com/silverton-io/buz/pkg/envelope"
)

Expand All @@ -22,7 +23,7 @@ type Sink struct {
name string
deliveryRequired bool
fanout bool
outputFile string
defaultFile string
input chan []envelope.Envelope
shutdown chan int
}
Expand All @@ -43,8 +44,7 @@ func (s *Sink) Initialize(conf config.Sink) error {
s.deliveryRequired, s.fanout = conf.DeliveryRequired, conf.Fanout
s.input = make(chan []envelope.Envelope, 10000)
s.shutdown = make(chan int, 1)
s.outputFile = "buz_events.json"
s.StartWorker()
s.defaultFile = constants.BUZ_EVENTS + ".json"
return nil
}

Expand Down Expand Up @@ -84,7 +84,7 @@ func (s *Sink) Enqueue(envelopes []envelope.Envelope) error {

func (s *Sink) Dequeue(ctx context.Context, envelopes []envelope.Envelope) error {
log.Debug().Interface("metadata", s.Metadata()).Msg("dequeueing envelopes")
err := s.batchPublish(ctx, s.outputFile, envelopes)
err := s.batchPublish(ctx, s.defaultFile, envelopes)
return err
}

Expand Down
1 change: 0 additions & 1 deletion pkg/backend/http/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ func (s *Sink) Initialize(conf config.Sink) error {
s.url = *url
s.input = make(chan []envelope.Envelope, 10000)
s.shutdown = make(chan int, 1)
s.StartWorker()
return nil
}

Expand Down
1 change: 0 additions & 1 deletion pkg/backend/kafka/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ func (s *Sink) Initialize(conf config.Sink) error {
}
s.input = make(chan []envelope.Envelope, 10000)
s.shutdown = make(chan int, 1)
s.StartWorker()
return nil
}

Expand Down
1 change: 0 additions & 1 deletion pkg/backend/kinesis/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ func (s *Sink) Initialize(conf config.Sink) error {
s.client, s.defaultStream = client, constants.BUZ_EVENTS
s.input = make(chan []envelope.Envelope, 10000)
s.shutdown = make(chan int, 1)
s.StartWorker()
return err
}

Expand Down
1 change: 0 additions & 1 deletion pkg/backend/kinesisFirehose/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ func (s *Sink) Initialize(conf config.Sink) error {
s.client, s.defaultStream = client, constants.BUZ_EVENTS
s.input = make(chan []envelope.Envelope, 10000)
s.shutdown = make(chan int, 1)
s.StartWorker()
return err
}

Expand Down
1 change: 0 additions & 1 deletion pkg/backend/mongodb/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ func (s *Sink) Initialize(conf config.Sink) error {
s.defaultEventsCollection = vCollection
s.input = make(chan []envelope.Envelope, 10000)
s.shutdown = make(chan int, 1)
s.StartWorker()
return nil
}

Expand Down
1 change: 0 additions & 1 deletion pkg/backend/mysqldb/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ func (s *Sink) Initialize(conf config.Sink) error {
return ensureErr
}
}
s.StartWorker()
return nil
}

Expand Down
1 change: 0 additions & 1 deletion pkg/backend/nats/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ func (s *Sink) Initialize(conf config.Sink) error {
s.defaultEventsSubject = constants.BUZ_EVENTS
s.input = make(chan []envelope.Envelope, 10000)
s.shutdown = make(chan int, 1)
s.StartWorker()
return nil
}

Expand Down
Loading

0 comments on commit 39a2615

Please sign in to comment.