-
Notifications
You must be signed in to change notification settings - Fork 117
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CORE-666] - Robust health check #715
Changes from 22 commits
9a9cc58
3b00aaf
2250685
a032923
49538d9
656c256
a1401da
4f499ad
d9f7108
d1e2aee
6c75e51
3795643
cebb504
d73c727
1e7f8a2
393b53b
0c48c23
c325aec
9db3da6
eee7e9c
a59c138
0d8d97b
2ef12a2
34dba18
4d14a5b
7f4ae9b
a87f57f
1ac3bd9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ package client | |
|
||
import ( | ||
"context" | ||
"cosmossdk.io/errors" | ||
"github.com/dydxprotocol/v4-chain/protocol/daemons/pricefeed/client/constants" | ||
"github.com/dydxprotocol/v4-chain/protocol/daemons/pricefeed/client/price_encoder" | ||
"github.com/dydxprotocol/v4-chain/protocol/daemons/pricefeed/client/price_fetcher" | ||
|
@@ -35,6 +36,7 @@ var _ SubTaskRunner = (*SubTaskRunnerImpl)(nil) | |
// SubTaskRunner is the interface for running pricefeed client task functions. | ||
type SubTaskRunner interface { | ||
StartPriceUpdater( | ||
c *Client, | ||
ctx context.Context, | ||
ticker *time.Ticker, | ||
stop <-chan bool, | ||
|
@@ -76,6 +78,7 @@ type SubTaskRunner interface { | |
// StartPriceUpdater runs in the daemon's main goroutine and does not need access to the daemon's wait group | ||
// to signal task completion. | ||
func (s *SubTaskRunnerImpl) StartPriceUpdater( | ||
c *Client, | ||
ctx context.Context, | ||
ticker *time.Ticker, | ||
stop <-chan bool, | ||
|
@@ -87,8 +90,17 @@ func (s *SubTaskRunnerImpl) StartPriceUpdater( | |
select { | ||
case <-ticker.C: | ||
err := RunPriceUpdaterTaskLoop(ctx, exchangeToMarketPrices, priceFeedServiceClient, logger) | ||
if err != nil { | ||
panic(err) | ||
|
||
if err == nil { | ||
// Record update success for the daemon health check. | ||
c.ReportSuccess(time.Now()) | ||
} else { | ||
logger.Error("Failed to run price updater task loop for price daemon", constants.ErrorLogKey, err) | ||
// Record update failure for the daemon health check. | ||
c.ReportFailure( | ||
time.Now(), | ||
errors.Wrap(err, "failed to run price updater task loop for price daemon"), | ||
) | ||
} | ||
|
||
case <-stop: | ||
|
@@ -265,8 +277,10 @@ func RunPriceUpdaterTaskLoop( | |
metrics.Latency, | ||
) | ||
|
||
// On startup the length of request will likely be 0. However, sending a request of length 0 | ||
// is a fatal error. | ||
// On startup the length of request will likely be 0. Even so, we return an error here because this | ||
// is unexpected behavior once the daemon reaches a steady state. The daemon health check process should | ||
// be robust enough to ignore temporarily unhealthy daemons. | ||
// Sending a request of length 0, however, causes a panic. | ||
// panic: rpc error: code = Unknown desc = Market price update has length of 0. | ||
if len(request.MarketPriceUpdates) > 0 { | ||
_, err := priceFeedServiceClient.UpdateMarketPrices(ctx, request) | ||
Comment on lines
274
to
283
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The - if len(request.MarketPriceUpdates) > 0 {
- _, err := priceFeedServiceClient.UpdateMarketPrices(ctx, request)
- if err != nil {
- logger.Error("Failed to run price updater task loop for price daemon", "error", err)
- telemetry.IncrCounter(
- 1,
- metrics.PricefeedDaemon,
- metrics.PriceUpdaterTaskLoop,
- metrics.Error,
- )
- return err
- }
- } else {
- logger.Info("Price update had length of 0")
- telemetry.IncrCounter(
- 1,
- metrics.PricefeedDaemon,
- metrics.PriceUpdaterZeroPrices,
- metrics.Count,
- )
- return types.ErrEmptyMarketPriceUpdate
- }
+ if len(request.MarketPriceUpdates) == 0 {
+ logger.Info("Price update had length of 0")
+ telemetry.IncrCounter(
+ 1,
+ metrics.PricefeedDaemon,
+ metrics.PriceUpdaterZeroPrices,
+ metrics.Count,
+ )
+ return types.ErrEmptyMarketPriceUpdate
+ }
+ _, err := priceFeedServiceClient.UpdateMarketPrices(ctx, request)
+ if err != nil {
+ logger.Error("Failed to run price updater task loop for price daemon", "error", err)
+ telemetry.IncrCounter(
+ 1,
+ metrics.PricefeedDaemon,
+ metrics.PriceUpdaterTaskLoop,
+ metrics.Error,
+ )
+ return err
+ } |
||
|
@@ -291,6 +305,7 @@ func RunPriceUpdaterTaskLoop( | |
metrics.PriceUpdaterZeroPrices, | ||
metrics.Count, | ||
) | ||
return types.ErrEmptyMarketPriceUpdate | ||
} | ||
|
||
return nil | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
package types | ||
|
||
import ( | ||
"errors" | ||
) | ||
|
||
var ( | ||
ErrEmptyMarketPriceUpdate = errors.New("Market price update has length of 0") | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,133 @@ | ||
package types | ||
|
||
import ( | ||
"fmt" | ||
libtime "github.com/dydxprotocol/v4-chain/protocol/lib/time" | ||
"sync" | ||
"time" | ||
) | ||
|
||
const ( | ||
MaxAcceptableUpdateDelay = 5 * time.Minute | ||
) | ||
|
||
// HealthCheckable is a common interface for services that can be health checked. | ||
// | ||
// Instances of this type are thread-safe. | ||
type HealthCheckable interface { | ||
clemire marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// HealthCheck returns an error if a service is unhealthy. If the service is healthy, this method returns nil. | ||
HealthCheck() (err error) | ||
// ReportFailure records a failed update. | ||
ReportFailure(timestamp time.Time, err error) | ||
// ReportSuccess records a successful update. | ||
ReportSuccess(timestamp time.Time) | ||
} | ||
|
||
// timestampWithError couples a timestamp and error to make it easier to update them in tandem. | ||
type timestampWithError struct { | ||
timestamp time.Time | ||
err error | ||
} | ||
|
||
func (u *timestampWithError) Update(timestamp time.Time, err error) { | ||
u.timestamp = timestamp | ||
u.err = err | ||
} | ||
|
||
func (u *timestampWithError) Timestamp() time.Time { | ||
return u.timestamp | ||
} | ||
|
||
func (u *timestampWithError) Error() error { | ||
return u.err | ||
} | ||
|
||
// timeBoundedHealthCheckable implements the HealthCheckable interface by tracking the timestamps of the last successful | ||
// failed updates. | ||
clemire marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// If any of the following occurs, then the service should be considered unhealthy: | ||
// - no update has occurred | ||
// - the most recent update failed, or | ||
// - the daemon has not seen a successful update within `MaxAcceptableUpdateDelay`. | ||
// | ||
// This object is thread-safe. | ||
type timeBoundedHealthCheckable struct { | ||
clemire marked this conversation as resolved.
Show resolved
Hide resolved
|
||
sync.Mutex | ||
|
||
// lastSuccessfulUpdate is the timestamp of the last successful update. | ||
lastSuccessfulUpdate time.Time | ||
// lastFailedUpdate is the timestamp, error pair of the last failed update. After the HealthCheckableImpl is | ||
// initialized, this should never be a zero value. | ||
clemire marked this conversation as resolved.
Show resolved
Hide resolved
|
||
lastFailedUpdate timestampWithError | ||
|
||
// timeProvider is the time provider used to determine the current time. This is used for timestamping | ||
// creation and checking for update staleness during HealthCheck. | ||
timeProvider libtime.TimeProvider | ||
clemire marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
// NewTimeBoundedHealthCheckable creates a new HealthCheckable instance. | ||
func NewTimeBoundedHealthCheckable(serviceName string, timeProvider libtime.TimeProvider) HealthCheckable { | ||
hc := &timeBoundedHealthCheckable{ | ||
timeProvider: timeProvider, | ||
} | ||
// Initialize the timeBoudnedHealthCheckable to an unhealthy state by reporting an error. | ||
hc.ReportFailure(timeProvider.Now(), fmt.Errorf("%v is initializing", serviceName)) | ||
return hc | ||
} | ||
|
||
// ReportSuccess records a successful update. This method is thread-safe. | ||
func (h *timeBoundedHealthCheckable) ReportSuccess(timestamp time.Time) { | ||
h.Lock() | ||
defer h.Unlock() | ||
|
||
h.lastSuccessfulUpdate = timestamp | ||
} | ||
|
||
// ReportFailure records a failed update. This method is thread-safe. | ||
func (h *timeBoundedHealthCheckable) ReportFailure(timestamp time.Time, err error) { | ||
h.Lock() | ||
defer h.Unlock() | ||
h.lastFailedUpdate.Update(timestamp, err) | ||
} | ||
|
||
// HealthCheck returns an error if a service is unhealthy. | ||
// The service is unhealthy if any of the following are true: | ||
// - no successful update has occurred | ||
// - the most recent update failed, or | ||
// - the daemon has not seen a successful update in at least 5 minutes, | ||
// Note: since the timeBoundedHealthCheckable is not exposed and can only be created via | ||
// NewTimeBoundedHealthCheckable, we expect that the lastFailedUpdate is never a zero value. | ||
// This method is synchronized. | ||
func (h *timeBoundedHealthCheckable) HealthCheck() error { | ||
h.Lock() | ||
defer h.Unlock() | ||
|
||
if h.lastSuccessfulUpdate.IsZero() { | ||
return fmt.Errorf( | ||
"no successful update has occurred; last failed update occurred at %v with error '%w'", | ||
h.lastFailedUpdate.Timestamp(), | ||
h.lastFailedUpdate.Error(), | ||
) | ||
} | ||
|
||
if h.lastFailedUpdate.Timestamp().After(h.lastSuccessfulUpdate) { | ||
return fmt.Errorf( | ||
"last update failed at %v with error: '%w', most recent successful update occurred at %v", | ||
h.lastFailedUpdate.Timestamp(), | ||
h.lastFailedUpdate.Error(), | ||
h.lastSuccessfulUpdate, | ||
) | ||
} | ||
|
||
// If the last successful update was more than 5 minutes ago, report the specific error. | ||
if h.timeProvider.Now().Sub(h.lastSuccessfulUpdate) > MaxAcceptableUpdateDelay { | ||
return fmt.Errorf( | ||
"last successful update occurred at %v, which is more than %v ago. Last failure occurred at %v with error '%w'", | ||
h.lastSuccessfulUpdate, | ||
MaxAcceptableUpdateDelay, | ||
h.lastFailedUpdate.Timestamp(), | ||
h.lastFailedUpdate.Error(), | ||
) | ||
} | ||
|
||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should something similar happen for the following subtasks too?
StartPriceEncoder
StartPriceFetcher
StartMarketParamUpdater
Why or why not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, because I think measuring the contents of the market update already captures more or less the same information, and is much simpler, than instrumenting these subtasks as well.
I think we made the decision that if a market update fails, we will continue to operate the protocol with the current market configuration on the daemon so that trading can continue for existing markets. (The reasonable assumption here IMO is that market param updates are failing not just for one validator, but across the network.) Marking unhealthy and panicking on a failure here could bring down the network instead of allowing degraded performance by continuing to support trading for existing markets. The better approach would probably be to continue the network and allow another governance vote to fix any bad parameters in the exchange_config_json.
One possibility to consider is that some daemons could fail and others succeed due to disagreeing app versions, but I think we should catch this by upgrading via governance to enforce compatibility if there is a backwards-incompatible change in daemon configuration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, makes sense.
For
StartMarketParamUpdater
do we have clear err log msgs to indicate this failed? Do we have dashboard + alerting on this?