From 9a48642f789823a1d92248d6e6c8c3524c29dc7c Mon Sep 17 00:00:00 2001 From: Crystal Lemire Date: Wed, 8 Nov 2023 16:16:20 -0800 Subject: [PATCH] Checkpoint. --- protocol/daemons/server/server.go | 4 +- .../{update_monitor.go => health_monitor.go} | 73 ++++++++++--------- ...monitor_test.go => health_monitor_test.go} | 14 ++-- protocol/daemons/types/health_checkable.go | 11 +++ 4 files changed, 59 insertions(+), 43 deletions(-) rename protocol/daemons/server/types/{update_monitor.go => health_monitor.go} (62%) rename protocol/daemons/server/types/{update_monitor_test.go => health_monitor_test.go} (91%) diff --git a/protocol/daemons/server/server.go b/protocol/daemons/server/server.go index d146587731c..2735e23bbd4 100644 --- a/protocol/daemons/server/server.go +++ b/protocol/daemons/server/server.go @@ -26,7 +26,7 @@ type Server struct { fileHandler daemontypes.FileHandler socketAddress string - updateMonitor *types.UpdateMonitor + updateMonitor *types.HealthMonitor BridgeServer PriceFeedServer @@ -47,7 +47,7 @@ func NewServer( gsrv: grpcServer, fileHandler: fileHandler, socketAddress: socketAddress, - updateMonitor: types.NewUpdateFrequencyMonitor(types.DaemonStartupGracePeriod, logger), + updateMonitor: types.NewHealthMonitor(types.DaemonStartupGracePeriod, logger), } } diff --git a/protocol/daemons/server/types/update_monitor.go b/protocol/daemons/server/types/health_monitor.go similarity index 62% rename from protocol/daemons/server/types/update_monitor.go rename to protocol/daemons/server/types/health_monitor.go index 7c437ff0d7c..e0659cf5c9b 100644 --- a/protocol/daemons/server/types/update_monitor.go +++ b/protocol/daemons/server/types/health_monitor.go @@ -3,6 +3,7 @@ package types import ( "fmt" "github.com/cometbft/cometbft/libs/log" + "github.com/dydxprotocol/v4-chain/protocol/daemons/types" "sync" "time" ) @@ -12,11 +13,10 @@ type updateMetadata struct { updateFrequency time.Duration } -// UpdateMonitor monitors the update frequency of daemon services. If a daemon service does not respond within -// the maximum acceptable update delay set when the daemon is registered, the monitor will log an error and halt the -// protocol. This was judged to be the best solution for network performance because it prevents any validator from -// participating in the network at all if a daemon service is not responding. -type UpdateMonitor struct { +// HealthMonitor monitors the health of daemon services, which implement the HealthCheckable interface. If a +// registered health-checkable service sustains an unhealthy state for the maximum acceptable unhealthy duration, +// the monitor will execute a callback function. +type HealthMonitor struct { // serviceToUpdateMetadata maps daemon service names to their update metadata. serviceToUpdateMetadata map[string]updateMetadata // stopped indicates whether the monitor has been stopped. Additional daemon services cannot be registered @@ -28,44 +28,46 @@ type UpdateMonitor struct { // lock is used to synchronize access to the monitor. lock sync.Mutex - // These fields are initialized in NewUpdateFrequencyMonitor and are not modified after initialization. + // These fields are initialized in NewHealthMonitor and are not modified after initialization. logger log.Logger daemonStartupGracePeriod time.Duration } -// NewUpdateFrequencyMonitor creates a new update frequency monitor. -func NewUpdateFrequencyMonitor(daemonStartupGracePeriod time.Duration, logger log.Logger) *UpdateMonitor { - return &UpdateMonitor{ +// NewHealthMonitor creates a new health monitor. +func NewHealthMonitor(daemonStartupGracePeriod time.Duration, logger log.Logger) *HealthMonitor { + return &HealthMonitor{ serviceToUpdateMetadata: make(map[string]updateMetadata), logger: logger, daemonStartupGracePeriod: daemonStartupGracePeriod, } } -func (ufm *UpdateMonitor) DisableForTesting() { +func (ufm *HealthMonitor) DisableForTesting() { ufm.lock.Lock() defer ufm.lock.Unlock() ufm.disabled = true } -// RegisterDaemonServiceWithCallback registers a new daemon service with the update frequency monitor. If the daemon -// service fails to respond within the maximum acceptable update delay, the monitor will execute the callback function. -// This method is synchronized. The method returns an error if the daemon service was already registered or the +// RegisterHealthCheckableWithCallback registers a HealthCheckable with the health monitor. If the service +// stays unhealthy every time it is polled during the maximum acceptable unhealthy duration, the monitor will +// execute the callback function. +// This method is synchronized. The method returns an error if the service was already registered or the // monitor has already been stopped. -func (ufm *UpdateMonitor) RegisterDaemonServiceWithCallback( - service string, - maximumAcceptableUpdateDelay time.Duration, +func (ufm *HealthMonitor) RegisterHealthCheckableWithCallback( + hc types.HealthCheckable, + maximumAcceptableUnhealthyDuration time.Duration, callback func(), ) error { ufm.lock.Lock() defer ufm.lock.Unlock() - if maximumAcceptableUpdateDelay <= 0 { + if maximumAcceptableUnhealthyDuration <= 0 { return fmt.Errorf( - "registration failure for service %v: maximum acceptable update delay %v must be positive", - service, - maximumAcceptableUpdateDelay, + "health check registration failure for service %v: "+ + "maximum acceptable unhealthy duration %v must be positive", + hc.ServiceName(), + maximumAcceptableUnhealthyDuration, ) } @@ -78,16 +80,19 @@ func (ufm *UpdateMonitor) RegisterDaemonServiceWithCallback( // This could be a concern for short-running integration test cases, where the network // stops before all daemon services have been registered. if ufm.stopped { - return fmt.Errorf("registration failure for service %v: monitor has been stopped", service) + return fmt.Errorf( + "health check registration failure for service %v: monitor has been stopped", + hc.ServiceName(), + ) } - if _, ok := ufm.serviceToUpdateMetadata[service]; ok { - return fmt.Errorf("service %v already registered", service) + if _, ok := ufm.serviceToUpdateMetadata[hc.ServiceName()]; ok { + return fmt.Errorf("service %v already registered", hc.ServiceName()) } - ufm.serviceToUpdateMetadata[service] = updateMetadata{ - timer: time.AfterFunc(ufm.daemonStartupGracePeriod+maximumAcceptableUpdateDelay, callback), - updateFrequency: maximumAcceptableUpdateDelay, + ufm.serviceToUpdateMetadata[hc.ServiceName()] = updateMetadata{ + timer: time.AfterFunc(ufm.daemonStartupGracePeriod+maximumAcceptableUnhealthyDuration, callback), + updateFrequency: maximumAcceptableUnhealthyDuration, } return nil } @@ -114,21 +119,21 @@ func LogErrorServiceNotResponding(service string, logger log.Logger) func() { // RegisterDaemonService registers a new daemon service with the update frequency monitor. If the daemon service // fails to respond within the maximum acceptable update delay, the monitor will log an error. -// This method is synchronized. The method an error if the daemon service was already registered or the monitor has +// This method is synchronized. The method an error if the service was already registered or the monitor has // already been stopped. -func (ufm *UpdateMonitor) RegisterDaemonService( - service string, +func (ufm *HealthMonitor) RegisterDaemonService( + hc types.HealthCheckable, maximumAcceptableUpdateDelay time.Duration, ) error { - return ufm.RegisterDaemonServiceWithCallback( - service, + return ufm.RegisterHealthCheckableWithCallback( + hc, maximumAcceptableUpdateDelay, - LogErrorServiceNotResponding(service, ufm.logger), + LogErrorServiceNotResponding(hc.ServiceName(), ufm.logger), ) } // Stop stops the update frequency monitor. This method is synchronized. -func (ufm *UpdateMonitor) Stop() { +func (ufm *HealthMonitor) Stop() { ufm.lock.Lock() defer ufm.lock.Unlock() @@ -145,7 +150,7 @@ func (ufm *UpdateMonitor) Stop() { // RegisterValidResponse registers a valid response from the daemon service. This will reset the timer for the // daemon service. This method is synchronized. -func (ufm *UpdateMonitor) RegisterValidResponse(service string) error { +func (ufm *HealthMonitor) RegisterValidResponse(service string) error { ufm.lock.Lock() defer ufm.lock.Unlock() diff --git a/protocol/daemons/server/types/update_monitor_test.go b/protocol/daemons/server/types/health_monitor_test.go similarity index 91% rename from protocol/daemons/server/types/update_monitor_test.go rename to protocol/daemons/server/types/health_monitor_test.go index c9abf1094f3..e1e6dc11a06 100644 --- a/protocol/daemons/server/types/update_monitor_test.go +++ b/protocol/daemons/server/types/health_monitor_test.go @@ -14,9 +14,9 @@ var ( zeroDuration = 0 * time.Second ) -func createTestMonitor() (*types.UpdateMonitor, *mocks.Logger) { +func createTestMonitor() (*types.HealthMonitor, *mocks.Logger) { logger := &mocks.Logger{} - return types.NewUpdateFrequencyMonitor(zeroDuration, logger), logger + return types.NewHealthMonitor(zeroDuration, logger), logger } // The following tests may still intermittently fail on an overloaded system as they rely @@ -54,7 +54,7 @@ func TestRegisterDaemonServiceWithCallback_Success(t *testing.T) { callbackCalled := atomic.Bool{} ufm, _ := createTestMonitor() - err := ufm.RegisterDaemonServiceWithCallback("test-service", 200*time.Millisecond, func() { + err := ufm.RegisterHealthCheckableWithCallback("test-service", 200*time.Millisecond, func() { callbackCalled.Store(true) }) require.NoError(t, err) @@ -100,13 +100,13 @@ func TestRegisterDaemonServiceWithCallback_DoubleRegistrationFails(t *testing.T) ufm, _ := createTestMonitor() // First registration should succeed. - err := ufm.RegisterDaemonServiceWithCallback("test-service", 200*time.Millisecond, func() { + err := ufm.RegisterHealthCheckableWithCallback("test-service", 200*time.Millisecond, func() { callback1Called.Store(true) }) require.NoError(t, err) // Register the same daemon service again. This should fail, and 50ms update frequency should be ignored. - err = ufm.RegisterDaemonServiceWithCallback("test-service", 50*time.Millisecond, func() { + err = ufm.RegisterHealthCheckableWithCallback("test-service", 50*time.Millisecond, func() { callback2Called.Store(true) }) require.ErrorContains(t, err, "service already registered") @@ -146,7 +146,7 @@ func TestRegisterDaemonServiceWithCallback_RegistrationFailsAfterStop(t *testing callbackCalled := atomic.Bool{} // Registering a daemon service with a callback should fail after the monitor has been stopped. - err := ufm.RegisterDaemonServiceWithCallback("test-service", 50*time.Millisecond, func() { + err := ufm.RegisterHealthCheckableWithCallback("test-service", 50*time.Millisecond, func() { callbackCalled.Store(true) }) require.ErrorContains(t, err, "monitor has been stopped") @@ -169,7 +169,7 @@ func TestRegisterValidResponse_NegativeUpdateDelay(t *testing.T) { func TestRegisterValidResponseWithCallback_NegativeUpdateDelay(t *testing.T) { ufm, _ := createTestMonitor() - err := ufm.RegisterDaemonServiceWithCallback("test-service", -50*time.Millisecond, func() {}) + err := ufm.RegisterHealthCheckableWithCallback("test-service", -50*time.Millisecond, func() {}) require.ErrorContains(t, err, "update delay -50ms must be positive") } diff --git a/protocol/daemons/types/health_checkable.go b/protocol/daemons/types/health_checkable.go index da79405685e..9fc47b0f8ae 100644 --- a/protocol/daemons/types/health_checkable.go +++ b/protocol/daemons/types/health_checkable.go @@ -22,6 +22,8 @@ type HealthCheckable interface { ReportFailure(err error) // ReportSuccess records a successful update. ReportSuccess() + + ServiceName() string } // timestampWithError couples a timestamp and error to make it easier to update them in tandem. @@ -65,6 +67,9 @@ type timeBoundedHealthCheckable struct { // logger is the logger used to log errors. logger log.Logger + + // serviceName is the name of the service being monitored. This field is read-only and not synchronized. + serviceName string } // NewTimeBoundedHealthCheckable creates a new HealthCheckable instance. @@ -76,12 +81,18 @@ func NewTimeBoundedHealthCheckable( hc := &timeBoundedHealthCheckable{ timeProvider: timeProvider, logger: logger, + serviceName: serviceName, } // Initialize the timeBoudnedHealthCheckable to an unhealthy state by reporting an error. hc.ReportFailure(fmt.Errorf("%v is initializing", serviceName)) return hc } +// ServiceName returns the name of the service being monitored. +func (hc *timeBoundedHealthCheckable) ServiceName() string { + return hc.serviceName +} + // ReportSuccess records a successful update. This method is thread-safe. func (h *timeBoundedHealthCheckable) ReportSuccess() { h.Lock()