Skip to content

Commit

Permalink
Checkpoint.
Browse files Browse the repository at this point in the history
  • Loading branch information
Crystal Lemire committed Nov 9, 2023
1 parent d6be653 commit 9a48642
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 43 deletions.
4 changes: 2 additions & 2 deletions protocol/daemons/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Server struct {
fileHandler daemontypes.FileHandler
socketAddress string

updateMonitor *types.UpdateMonitor
updateMonitor *types.HealthMonitor

BridgeServer
PriceFeedServer
Expand All @@ -47,7 +47,7 @@ func NewServer(
gsrv: grpcServer,
fileHandler: fileHandler,
socketAddress: socketAddress,
updateMonitor: types.NewUpdateFrequencyMonitor(types.DaemonStartupGracePeriod, logger),
updateMonitor: types.NewHealthMonitor(types.DaemonStartupGracePeriod, logger),
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package types
import (
"fmt"
"github.com/cometbft/cometbft/libs/log"
"github.com/dydxprotocol/v4-chain/protocol/daemons/types"
"sync"
"time"
)
Expand All @@ -12,11 +13,10 @@ type updateMetadata struct {
updateFrequency time.Duration
}

// UpdateMonitor monitors the update frequency of daemon services. If a daemon service does not respond within
// the maximum acceptable update delay set when the daemon is registered, the monitor will log an error and halt the
// protocol. This was judged to be the best solution for network performance because it prevents any validator from
// participating in the network at all if a daemon service is not responding.
type UpdateMonitor struct {
// HealthMonitor monitors the health of daemon services, which implement the HealthCheckable interface. If a
// registered health-checkable service sustains an unhealthy state for the maximum acceptable unhealthy duration,
// the monitor will execute a callback function.
type HealthMonitor struct {
// serviceToUpdateMetadata maps daemon service names to their update metadata.
serviceToUpdateMetadata map[string]updateMetadata
// stopped indicates whether the monitor has been stopped. Additional daemon services cannot be registered
Expand All @@ -28,44 +28,46 @@ type UpdateMonitor struct {
// lock is used to synchronize access to the monitor.
lock sync.Mutex

// These fields are initialized in NewUpdateFrequencyMonitor and are not modified after initialization.
// These fields are initialized in NewHealthMonitor and are not modified after initialization.
logger log.Logger
daemonStartupGracePeriod time.Duration
}

// NewUpdateFrequencyMonitor creates a new update frequency monitor.
func NewUpdateFrequencyMonitor(daemonStartupGracePeriod time.Duration, logger log.Logger) *UpdateMonitor {
return &UpdateMonitor{
// NewHealthMonitor creates a new health monitor.
func NewHealthMonitor(daemonStartupGracePeriod time.Duration, logger log.Logger) *HealthMonitor {
return &HealthMonitor{
serviceToUpdateMetadata: make(map[string]updateMetadata),
logger: logger,
daemonStartupGracePeriod: daemonStartupGracePeriod,
}
}

func (ufm *UpdateMonitor) DisableForTesting() {
func (ufm *HealthMonitor) DisableForTesting() {
ufm.lock.Lock()
defer ufm.lock.Unlock()

ufm.disabled = true
}

// RegisterDaemonServiceWithCallback registers a new daemon service with the update frequency monitor. If the daemon
// service fails to respond within the maximum acceptable update delay, the monitor will execute the callback function.
// This method is synchronized. The method returns an error if the daemon service was already registered or the
// RegisterHealthCheckableWithCallback registers a HealthCheckable with the health monitor. If the service
// stays unhealthy every time it is polled during the maximum acceptable unhealthy duration, the monitor will
// execute the callback function.
// This method is synchronized. The method returns an error if the service was already registered or the
// monitor has already been stopped.
func (ufm *UpdateMonitor) RegisterDaemonServiceWithCallback(
service string,
maximumAcceptableUpdateDelay time.Duration,
func (ufm *HealthMonitor) RegisterHealthCheckableWithCallback(
hc types.HealthCheckable,
maximumAcceptableUnhealthyDuration time.Duration,
callback func(),
) error {
ufm.lock.Lock()
defer ufm.lock.Unlock()

if maximumAcceptableUpdateDelay <= 0 {
if maximumAcceptableUnhealthyDuration <= 0 {
return fmt.Errorf(
"registration failure for service %v: maximum acceptable update delay %v must be positive",
service,
maximumAcceptableUpdateDelay,
"health check registration failure for service %v: "+
"maximum acceptable unhealthy duration %v must be positive",
hc.ServiceName(),
maximumAcceptableUnhealthyDuration,
)
}

Expand All @@ -78,16 +80,19 @@ func (ufm *UpdateMonitor) RegisterDaemonServiceWithCallback(
// This could be a concern for short-running integration test cases, where the network
// stops before all daemon services have been registered.
if ufm.stopped {
return fmt.Errorf("registration failure for service %v: monitor has been stopped", service)
return fmt.Errorf(
"health check registration failure for service %v: monitor has been stopped",
hc.ServiceName(),
)
}

if _, ok := ufm.serviceToUpdateMetadata[service]; ok {
return fmt.Errorf("service %v already registered", service)
if _, ok := ufm.serviceToUpdateMetadata[hc.ServiceName()]; ok {
return fmt.Errorf("service %v already registered", hc.ServiceName())
}

ufm.serviceToUpdateMetadata[service] = updateMetadata{
timer: time.AfterFunc(ufm.daemonStartupGracePeriod+maximumAcceptableUpdateDelay, callback),
updateFrequency: maximumAcceptableUpdateDelay,
ufm.serviceToUpdateMetadata[hc.ServiceName()] = updateMetadata{
timer: time.AfterFunc(ufm.daemonStartupGracePeriod+maximumAcceptableUnhealthyDuration, callback),
updateFrequency: maximumAcceptableUnhealthyDuration,
}
return nil
}
Expand All @@ -114,21 +119,21 @@ func LogErrorServiceNotResponding(service string, logger log.Logger) func() {

// RegisterDaemonService registers a new daemon service with the update frequency monitor. If the daemon service
// fails to respond within the maximum acceptable update delay, the monitor will log an error.
// This method is synchronized. The method an error if the daemon service was already registered or the monitor has
// This method is synchronized. The method an error if the service was already registered or the monitor has
// already been stopped.
func (ufm *UpdateMonitor) RegisterDaemonService(
service string,
func (ufm *HealthMonitor) RegisterDaemonService(
hc types.HealthCheckable,
maximumAcceptableUpdateDelay time.Duration,
) error {
return ufm.RegisterDaemonServiceWithCallback(
service,
return ufm.RegisterHealthCheckableWithCallback(
hc,
maximumAcceptableUpdateDelay,
LogErrorServiceNotResponding(service, ufm.logger),
LogErrorServiceNotResponding(hc.ServiceName(), ufm.logger),
)
}

// Stop stops the update frequency monitor. This method is synchronized.
func (ufm *UpdateMonitor) Stop() {
func (ufm *HealthMonitor) Stop() {
ufm.lock.Lock()
defer ufm.lock.Unlock()

Expand All @@ -145,7 +150,7 @@ func (ufm *UpdateMonitor) Stop() {

// RegisterValidResponse registers a valid response from the daemon service. This will reset the timer for the
// daemon service. This method is synchronized.
func (ufm *UpdateMonitor) RegisterValidResponse(service string) error {
func (ufm *HealthMonitor) RegisterValidResponse(service string) error {
ufm.lock.Lock()
defer ufm.lock.Unlock()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ var (
zeroDuration = 0 * time.Second
)

func createTestMonitor() (*types.UpdateMonitor, *mocks.Logger) {
func createTestMonitor() (*types.HealthMonitor, *mocks.Logger) {
logger := &mocks.Logger{}
return types.NewUpdateFrequencyMonitor(zeroDuration, logger), logger
return types.NewHealthMonitor(zeroDuration, logger), logger
}

// The following tests may still intermittently fail on an overloaded system as they rely
Expand Down Expand Up @@ -54,7 +54,7 @@ func TestRegisterDaemonServiceWithCallback_Success(t *testing.T) {
callbackCalled := atomic.Bool{}

ufm, _ := createTestMonitor()
err := ufm.RegisterDaemonServiceWithCallback("test-service", 200*time.Millisecond, func() {
err := ufm.RegisterHealthCheckableWithCallback("test-service", 200*time.Millisecond, func() {
callbackCalled.Store(true)
})
require.NoError(t, err)
Expand Down Expand Up @@ -100,13 +100,13 @@ func TestRegisterDaemonServiceWithCallback_DoubleRegistrationFails(t *testing.T)

ufm, _ := createTestMonitor()
// First registration should succeed.
err := ufm.RegisterDaemonServiceWithCallback("test-service", 200*time.Millisecond, func() {
err := ufm.RegisterHealthCheckableWithCallback("test-service", 200*time.Millisecond, func() {
callback1Called.Store(true)
})
require.NoError(t, err)

// Register the same daemon service again. This should fail, and 50ms update frequency should be ignored.
err = ufm.RegisterDaemonServiceWithCallback("test-service", 50*time.Millisecond, func() {
err = ufm.RegisterHealthCheckableWithCallback("test-service", 50*time.Millisecond, func() {
callback2Called.Store(true)
})
require.ErrorContains(t, err, "service already registered")
Expand Down Expand Up @@ -146,7 +146,7 @@ func TestRegisterDaemonServiceWithCallback_RegistrationFailsAfterStop(t *testing
callbackCalled := atomic.Bool{}

// Registering a daemon service with a callback should fail after the monitor has been stopped.
err := ufm.RegisterDaemonServiceWithCallback("test-service", 50*time.Millisecond, func() {
err := ufm.RegisterHealthCheckableWithCallback("test-service", 50*time.Millisecond, func() {
callbackCalled.Store(true)
})
require.ErrorContains(t, err, "monitor has been stopped")
Expand All @@ -169,7 +169,7 @@ func TestRegisterValidResponse_NegativeUpdateDelay(t *testing.T) {

func TestRegisterValidResponseWithCallback_NegativeUpdateDelay(t *testing.T) {
ufm, _ := createTestMonitor()
err := ufm.RegisterDaemonServiceWithCallback("test-service", -50*time.Millisecond, func() {})
err := ufm.RegisterHealthCheckableWithCallback("test-service", -50*time.Millisecond, func() {})
require.ErrorContains(t, err, "update delay -50ms must be positive")
}

Expand Down
11 changes: 11 additions & 0 deletions protocol/daemons/types/health_checkable.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ type HealthCheckable interface {
ReportFailure(err error)
// ReportSuccess records a successful update.
ReportSuccess()

ServiceName() string
}

// timestampWithError couples a timestamp and error to make it easier to update them in tandem.
Expand Down Expand Up @@ -65,6 +67,9 @@ type timeBoundedHealthCheckable struct {

// logger is the logger used to log errors.
logger log.Logger

// serviceName is the name of the service being monitored. This field is read-only and not synchronized.
serviceName string
}

// NewTimeBoundedHealthCheckable creates a new HealthCheckable instance.
Expand All @@ -76,12 +81,18 @@ func NewTimeBoundedHealthCheckable(
hc := &timeBoundedHealthCheckable{
timeProvider: timeProvider,
logger: logger,
serviceName: serviceName,
}
// Initialize the timeBoudnedHealthCheckable to an unhealthy state by reporting an error.
hc.ReportFailure(fmt.Errorf("%v is initializing", serviceName))
return hc
}

// ServiceName returns the name of the service being monitored.
func (hc *timeBoundedHealthCheckable) ServiceName() string {
return hc.serviceName
}

// ReportSuccess records a successful update. This method is thread-safe.
func (h *timeBoundedHealthCheckable) ReportSuccess() {
h.Lock()
Expand Down

0 comments on commit 9a48642

Please sign in to comment.