Skip to content

Commit

Permalink
Introducing a pluggable error handler (#13)
Browse files Browse the repository at this point in the history
* Introducing a pluggable error handler

* Allowing the user to provide a FatalErrorHandler with streamingclient.Config

Co-authored-by: Chrusty <>
  • Loading branch information
chrusty authored Nov 25, 2021
1 parent 3c991e8 commit c8e79f1
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 8 deletions.
23 changes: 17 additions & 6 deletions pkg/streaming-client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ const (

// Config defines parameters for the client:
type Config struct {
LogLevel logrus.Level // Logging level (default is "info")
SDKKey string // SDK key (copied from the UI), in the format "{namedCache}/environmentID/APIKey"
ServerAddress string // FeatureHub API endpoint
WaitForData bool // New() will block until some data has arrived
client interfaces.Client
LogLevel logrus.Level // Logging level (default is "info")
SDKKey string // SDK key (copied from the UI), in the format "{namedCache}/environmentID/APIKey"
ServerAddress string // FeatureHub API endpoint
WaitForData bool // New() will block until some data has arrived
client interfaces.Client // A FeatureHub client implementation
fatalErrorHandler *ErrorFunc // A user-provided handler func for fatal asynchronous errors
}

// NewConfig returns a configured Config:
Expand All @@ -43,7 +44,11 @@ func (c *Config) Connect() (*Config, error) {
}

// Start the client:
client.Start()
if c.fatalErrorHandler != nil {
client.WithFatalErrorHandler(*c.fatalErrorHandler).Start()
} else {
client.Start()
}

c.client = client
return c, nil
Expand Down Expand Up @@ -95,6 +100,12 @@ func (c *Config) WithContext(context *models.Context) *ClientWithContext {
}
}

// WithFatalErrorHandler configures an error handler which will be called for asynchronous fatal errors:
func (c *Config) WithFatalErrorHandler(fatalErrorFunc ErrorFunc) *Config {
c.fatalErrorHandler = &fatalErrorFunc
return c
}

// WithLogLevel adds a logLevel to the config:
func (c *Config) WithLogLevel(logLevel logrus.Level) *Config {
c.LogLevel = logLevel
Expand Down
18 changes: 18 additions & 0 deletions pkg/streaming-client/streaming_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@ import (
"github.com/sirupsen/logrus"
)

// ErrorFunc is called when asynchronous errors are encountered:
type ErrorFunc func(error, string, map[string]interface{})

// StreamingClient implements the client interface by by subscribing to server-side events:
type StreamingClient struct {
analyticsCollectors []interfaces.AnalyticsCollector
analyticsMutex sync.Mutex
apiClient *eventsource.Stream
config *Config
fatalErrorHandler ErrorFunc
features map[string]*models.FeatureState
featuresMutex sync.Mutex
featuresURL string
Expand Down Expand Up @@ -60,6 +64,9 @@ func NewStreamingClient(config *Config) (*StreamingClient, error) {
notifiers: make(notifiers),
}

// Use the default fatalErrorFunc to handle fatal errors:
client.WithFatalErrorHandler(client.fatalErrorFunc)

// Report that we're starting:
logger.WithField("server_address", client.config.ServerAddress).Info("Subscribing to FeatureHub server")

Expand All @@ -81,6 +88,11 @@ func NewStreamingClient(config *Config) (*StreamingClient, error) {
return client, nil
}

// FatalErrorFunc is called when an unrecoverable asynchronous error is encountered:
func (c *StreamingClient) fatalErrorFunc(err error, message string, details map[string]interface{}) {
c.logger.WithError(err).WithFields(details).Fatal(message)
}

// ReadinessListener defines a callback function which will be triggered once the client has received data for the first time:
func (c *StreamingClient) ReadinessListener(callbackFunc func()) {
c.readinessListener = callbackFunc
Expand Down Expand Up @@ -110,6 +122,12 @@ func (c *StreamingClient) WithContext(context *models.Context) *ClientWithContex
}
}

// WithFatalErrorHandler configures an error handler which will be called for asynchronous fatal errors:
func (c *StreamingClient) WithFatalErrorHandler(fatalErrorFunc ErrorFunc) *StreamingClient {
c.fatalErrorHandler = fatalErrorFunc
return c
}

// isReady triggers various notifications that the client is ready to serve data:
func (c *StreamingClient) isReady() {

Expand Down
13 changes: 11 additions & 2 deletions pkg/streaming-client/streaming_client_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ func (c *StreamingClient) handleEvents() {

// Failures (from the FeatureHub server):
case models.FHFailure:
c.logger.WithError(&errors.ErrFromAPI{}).WithField("event", event.Event()).WithField("message", event.Data()).Fatal("Failure from FeatureHub server")
details := map[string]interface{}{
"event": event.Event(),
"message": event.Data(),
}
c.fatalErrorHandler(&errors.ErrFromAPI{}, "Failure from FeatureHub server", details)

// One specific feature (replaces the previous version):
case models.FHFeature:
Expand All @@ -64,7 +68,12 @@ func (c *StreamingClient) handleSSEError(event eventsource.Event) {
if c.hasData {
c.logger.WithError(&errors.ErrFromAPI{}).WithField("event", event.Event()).WithField("message", event.Data()).Error("Error from API client")
} else {
c.logger.WithError(&errors.ErrFromAPI{}).WithField("event", event.Event()).WithField("message", event.Data()).Fatal("Error from API client")
// Use the fatailErrorFunc for this one:
details := map[string]interface{}{
"event": event.Event(),
"message": event.Data(),
}
c.fatalErrorFunc(&errors.ErrFromAPI{}, "Error from API client", details)
}
}

Expand Down

0 comments on commit c8e79f1

Please sign in to comment.