Skip to content

Commit

Permalink
PR comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
Crystal Lemire committed Dec 7, 2023
1 parent 7eb8519 commit f996947
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 43 deletions.
12 changes: 6 additions & 6 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ func New(
if daemonFlags.Liquidation.Enabled {
app.LiquidationsClient = liquidationclient.NewClient(logger)
go func() {
app.MonitorDaemon(app.LiquidationsClient, MaximumDaemonUnhealthyDuration)
app.RegisterDaemonWithHealthMonitor(app.LiquidationsClient, MaximumDaemonUnhealthyDuration)
if err := app.LiquidationsClient.Start(
// The client will use `context.Background` so that it can have a different context from
// the main application.
Expand Down Expand Up @@ -645,7 +645,7 @@ func New(
constants.StaticExchangeDetails,
&pricefeedclient.SubTaskRunnerImpl{},
)
app.MonitorDaemon(app.PriceFeedClient, MaximumDaemonUnhealthyDuration)
app.RegisterDaemonWithHealthMonitor(app.PriceFeedClient, MaximumDaemonUnhealthyDuration)
}

// Start Bridge Daemon.
Expand All @@ -655,7 +655,7 @@ func New(
// environments.
app.BridgeClient = bridgeclient.NewClient(logger)
go func() {
app.MonitorDaemon(app.BridgeClient, MaximumDaemonUnhealthyDuration)
app.RegisterDaemonWithHealthMonitor(app.BridgeClient, MaximumDaemonUnhealthyDuration)
if err := app.BridgeClient.Start(
// The client will use `context.Background` so that it can have a different context from
// the main application.
Expand Down Expand Up @@ -1230,9 +1230,9 @@ func New(
return app
}

// MonitorDaemon registers a daemon service with the update monitor. If the daemon does not register, the method will
// panic.
func (app *App) MonitorDaemon(
// RegisterDaemonWithHealthMonitor registers a daemon service with the update monitor, which will commence monitoring
// the health of the daemon. If the daemon does not register, the method will panic.
func (app *App) RegisterDaemonWithHealthMonitor(
healthCheckableDaemon daemontypes.HealthCheckable,
maximumAcceptableUpdateDelay time.Duration,
) {
Expand Down
6 changes: 3 additions & 3 deletions protocol/app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,17 +225,17 @@ func TestModuleBasics(t *testing.T) {
require.Equal(t, expectedFieldTypes, actualFieldTypes, "Module basics does not match expected")
}

func TestMonitorDaemon_Panics(t *testing.T) {
func TestRegisterDaemonWithHealthMonitor_Panics(t *testing.T) {
app := testapp.DefaultTestApp(nil)
hc := &mocks.HealthCheckable{}
hc.On("ServiceName").Return("test-service")
hc.On("HealthCheck").Return(nil)

app.MonitorDaemon(hc, 5*time.Minute)
app.RegisterDaemonWithHealthMonitor(hc, 5*time.Minute)
// The second registration should fail, causing a panic.
require.PanicsWithError(
t,
"service test-service already registered",
func() { app.MonitorDaemon(hc, 5*time.Minute) },
func() { app.RegisterDaemonWithHealthMonitor(hc, 5*time.Minute) },
)
}
12 changes: 4 additions & 8 deletions protocol/daemons/server/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,12 @@ func (s *Server) AddBridgeEvents(
response *api.AddBridgeEventsResponse,
err error,
) {
// Capture valid responses in metrics.
defer func() {
if err == nil {
s.reportValidResponse(types.PricefeedDaemonServiceName)
}
}()

s.reportValidResponse(types.BridgeDaemonServiceName)
if err = s.bridgeEventManager.AddBridgeEvents(req.BridgeEvents); err != nil {
return nil, err
}

// Capture valid responses in metrics.
s.reportValidResponse(types.BridgeDaemonServiceName)

return &api.AddBridgeEventsResponse{}, nil
}
11 changes: 4 additions & 7 deletions protocol/daemons/server/liquidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,6 @@ func (s *Server) LiquidateSubaccounts(
response *api.LiquidateSubaccountsResponse,
err error,
) {
// Capture valid responses in metrics.
defer func() {
if err == nil {
s.reportValidResponse(types.PricefeedDaemonServiceName)
}
}()

telemetry.ModuleSetGauge(
metrics.LiquidationDaemon,
float32(len(req.SubaccountIds)),
Expand All @@ -49,5 +42,9 @@ func (s *Server) LiquidateSubaccounts(
)

s.liquidatableSubaccountIds.UpdateSubaccountIds(req.SubaccountIds)

// Capture valid responses in metrics.
s.reportValidResponse(types.LiquidationsDaemonServiceName)

return &api.LiquidateSubaccountsResponse{}, nil
}
12 changes: 5 additions & 7 deletions protocol/daemons/server/pricefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,6 @@ func (s *Server) UpdateMarketPrices(
response *api.UpdateMarketPricesResponse,
err error,
) {
// Capture valid responses in metrics.
defer func() {
if err == nil {
s.reportValidResponse(types.PricefeedDaemonServiceName)
}
}()

// Measure latency in ingesting and handling gRPC price update.
defer telemetry.ModuleMeasureSince(
metrics.PricefeedServer,
Expand All @@ -54,6 +47,8 @@ func (s *Server) UpdateMarketPrices(
metrics.Latency,
)

// This panic is an unexpected condition because we initialize the market price cache in app initialization before
// starting the server or daemons.
if s.marketToExchange == nil {
panic(
errorsmod.Wrapf(
Expand All @@ -71,6 +66,9 @@ func (s *Server) UpdateMarketPrices(

s.marketToExchange.UpdatePrices(req.MarketPriceUpdates)

// Capture valid responses in metrics.
s.reportValidResponse(types.PricefeedDaemonServiceName)

return &api.UpdateMarketPricesResponse{}, nil
}

Expand Down
71 changes: 60 additions & 11 deletions protocol/daemons/server/types/health_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,36 @@ type timestampWithError struct {
err error
}

// Update updates the timeStampWithError to reflect the current error. If the timestamp is zero, this is the first
// update, so set the timestamp.
func (u *timestampWithError) Update(timestamp time.Time, err error) {
// If the timestamp is zero, this is the first update, so set the timestamp.
if u.timestamp.IsZero() {
u.timestamp = timestamp
}

u.err = err
}

// Reset resets the timestampWithError to its zero value, indicating that the service is healthy.
func (u *timestampWithError) Reset() {
u.timestamp = time.Time{}
u.err = nil
}

// IsZero returns true if the timestampWithError is zero, indicating that the service is healthy.
func (u *timestampWithError) IsZero() bool {
return u.timestamp.IsZero() && u.err == nil
}

// Timestamp returns the timestamp associated with the timestampWithError, which is the timestamp of the first error
// in the current error streak.
func (u *timestampWithError) Timestamp() time.Time {
return u.timestamp
}

// Error returns the error associated with the timestampWithError, which is the most recent error in the current error
// streak.
func (u *timestampWithError) Error() error {
return u.err
}
Expand All @@ -48,16 +57,31 @@ type healthCheckerMutableState struct {
// lock is used to synchronize access to mutable state fields.
lock sync.Mutex

// mostRecentSuccess is the timestamp of the most recent successful health check.
// Access to mostRecentSuccess is synchronized.
mostRecentSuccess time.Time
// lastSuccessTimestamp is the timestamp of the most recent successful health check.
// Access to lastSuccessTimestamp is synchronized.
lastSuccessTimestamp time.Time

// mostRecentFailureStreakError tracks the timestamp of the first error in the most recent streak of errors, as well
// as the most recent error. It is updated on every error and reset every time the service sees a healthy response.
// This field is used to determine how long the daemon has been unhealthy. If this timestamp is nil, then either
// the service has never been unhealthy, or the most recent error streak ended before it could trigger a callback.
// Access to mostRecentFailureStreakError is synchronized.
mostRecentFailureStreakError timestampWithError

// timer triggers a health check poll for a health-checkable service.
timer *time.Timer

// stopped indicates whether the health checker has been stopped. Additional health checks cannot be scheduled
// after the health checker has been stopped.
stopped bool
}

// newHealthCheckerMutableState creates a new health checker mutable state scheduled to trigger a poll after the
// initial poll delay.
func newHealthCheckerMutableState(initialPollDelay time.Duration, pollFunc func()) *healthCheckerMutableState {
return &healthCheckerMutableState{
timer: time.AfterFunc(initialPollDelay, pollFunc),
}
}

// ReportSuccess updates the health checker's mutable state to reflect a successful health check and schedules the next
Expand All @@ -66,7 +90,7 @@ func (u *healthCheckerMutableState) ReportSuccess(now time.Time) {
u.lock.Lock()
defer u.lock.Unlock()

u.mostRecentSuccess = now
u.lastSuccessTimestamp = now

// Whenever the service is healthy, reset the first failure in streak timestamp.
u.mostRecentFailureStreakError.Reset()
Expand All @@ -83,6 +107,35 @@ func (u *healthCheckerMutableState) ReportFailure(now time.Time, err error) time
return now.Sub(u.mostRecentFailureStreakError.Timestamp())
}

// SchedulePoll schedules the next poll for the health-checkable service. If the service is stopped, the next poll
// will not be scheduled. This method is synchronized.
func (u *healthCheckerMutableState) SchedulePoll(nextPollDelay time.Duration) {
u.lock.Lock()
defer u.lock.Unlock()

// Don't schedule a poll if the health checker has been stopped.
if u.stopped {
return
}

// Schedule the next poll.
u.timer.Reset(nextPollDelay)
}

// Stop stops the health checker. This method is synchronized.
func (u *healthCheckerMutableState) Stop() {
u.lock.Lock()
defer u.lock.Unlock()

// Don't stop the health checker if it has already been stopped.
if u.stopped {
return
}

u.timer.Stop()
u.stopped = true
}

// healthChecker encapsulates the logic for monitoring the health of a health-checkable service.
type healthChecker struct {
// mutableState is the mutable state of the health checker. Access to these fields is synchronized.
Expand All @@ -94,9 +147,6 @@ type healthChecker struct {
// pollFrequency is the frequency at which the health-checkable service is polled.
pollFrequency time.Duration

// timer triggers a health check poll for a health-checkable service.
timer *time.Timer

// maxAcceptableUnhealthyDuration is the maximum acceptable duration for a health-checkable service to
// remain unhealthy. If the service remains unhealthy for this duration, the monitor will execute the
// specified callback function.
Expand Down Expand Up @@ -134,12 +184,12 @@ func (hc *healthChecker) Poll() {
}

// Schedule next poll.
hc.timer.Reset(hc.pollFrequency)
hc.mutableState.SchedulePoll(hc.pollFrequency)
}

// Stop stops the health checker. This method is not synchronized, as the timer does not need synchronization.
func (hc *healthChecker) Stop() {
hc.timer.Stop()
hc.mutableState.Stop()
}

// StartNewHealthChecker creates and starts a new health checker for a health-checkable service.
Expand All @@ -159,11 +209,10 @@ func StartNewHealthChecker(
timeProvider: timeProvider,
maxAcceptableUnhealthyDuration: maximumAcceptableUnhealthyDuration,
logger: logger,
mutableState: &healthCheckerMutableState{},
}

// The first poll is scheduled after the startup grace period to allow the service to initialize.
checker.timer = time.AfterFunc(startupGracePeriod, checker.Poll)
checker.mutableState = newHealthCheckerMutableState(startupGracePeriod, checker.Poll)

return checker
}
5 changes: 4 additions & 1 deletion protocol/daemons/server/types/health_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import (
const (
// HealthCheckPollFrequency is the frequency at which the health-checkable service is polled.
HealthCheckPollFrequency = 5 * time.Second

// HealthMonitorLogModuleName is the module name used for logging within the health monitor.
HealthMonitorLogModuleName = "daemon-health-monitor"
)

// healthMonitorMutableState tracks all mutable state associated with the health monitor. This state is gathered into
Expand Down Expand Up @@ -132,7 +135,7 @@ func NewHealthMonitor(
) *HealthMonitor {
return &HealthMonitor{
mutableState: newHealthMonitorMutableState(),
logger: logger.With(cosmoslog.ModuleKey, "health-monitor"),
logger: logger.With(cosmoslog.ModuleKey, HealthMonitorLogModuleName),
startupGracePeriod: startupGracePeriod,
pollingFrequency: pollingFrequency,
}
Expand Down

0 comments on commit f996947

Please sign in to comment.