Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CORE-666] - Robust health check #715

Merged
merged 28 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
9a9cc58
HealthCheck interface. Pricefeed daemon implementation of interface. …
Oct 11, 2023
3b00aaf
Checkpoint - optional refactor of subtaskrunner into client methods.
Oct 11, 2023
2250685
Back out subtaskrunner changes.
Oct 27, 2023
a032923
Test cases for daemon health.
Oct 27, 2023
49538d9
Channel cleanup.
Oct 27, 2023
656c256
Undo completeStartup and fix comment.
Oct 27, 2023
a1401da
Comments.
Oct 27, 2023
4f499ad
mega-nit: missing period.
Oct 27, 2023
d9f7108
remove daemon client. update internal health indicator to track error…
Nov 1, 2023
d1e2aee
Pricefeed updates with new health checkable shared implementation.
Nov 1, 2023
6c75e51
Updates to health-checkable - test cases, specific error messages.
Nov 2, 2023
3795643
Migrate pricefeed changes. Lint fix for health check tests.
Nov 2, 2023
cebb504
Remove accidental addition to this branch.
Nov 2, 2023
d73c727
Remove explicit toggle to unhealthy state. This will be caught after …
Nov 2, 2023
1e7f8a2
Comment cleanup.
Nov 2, 2023
393b53b
PR feedback.
Nov 2, 2023
0c48c23
Performance improvement - reduce test time from 100ms to sub-ms for h…
Nov 2, 2023
c325aec
Refactor test util method since it will be used for all daemons.
Nov 2, 2023
9db3da6
Update protocol/daemons/types/health_checkable.go
clemire Nov 3, 2023
eee7e9c
Update protocol/daemons/types/health_checkable.go
clemire Nov 3, 2023
a59c138
Update protocol/daemons/types/health_checkable.go
clemire Nov 3, 2023
0d8d97b
Update protocol/daemons/types/health_checkable.go
clemire Nov 3, 2023
2ef12a2
Update protocol/daemons/types/health_checkable.go
clemire Nov 3, 2023
34dba18
Update protocol/daemons/types/health_checkable.go
clemire Nov 3, 2023
4d14a5b
Remove timestamp parameter from health check reporth methods.
Nov 3, 2023
7f4ae9b
Update comment for consistency.
Nov 3, 2023
a87f57f
Update protocol/daemons/types/health_checkable_test.go
clemire Nov 4, 2023
1ac3bd9
Update protocol/daemons/types/health_checkable_test.go
clemire Nov 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion protocol/daemons/pricefeed/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ import (
// Note: price fetchers manage their own subtasks by blocking on their completion on every subtask run.
// When the price fetcher is stopped, it will wait for all of its own subtasks to complete before returning.
type Client struct {
// include HealthCheckable to track the health of the daemon.
daemontypes.HealthCheckable

// daemonStartup tracks whether the daemon has finished startup. The daemon
// cannot be stopped until all persistent daemon subtasks have been launched within `Start`.
daemonStartup sync.WaitGroup
Expand All @@ -51,10 +54,17 @@ type Client struct {
stopDaemon sync.Once
}

// Ensure Client implements the HealthCheckable interface.
var _ daemontypes.HealthCheckable = (*Client)(nil)

func newClient() *Client {
client := &Client{
tickers: []*time.Ticker{},
stops: []chan bool{},
HealthCheckable: daemontypes.NewTimeBoundedHealthCheckable(
constants.PricefeedDaemonModuleName,
&libtime.TimeProviderImpl{},
),
}

// Set the client's daemonStartup state to indicate that the daemon has not finished starting up.
Expand All @@ -66,7 +76,7 @@ func newClient() *Client {
// for any subtask kicked off by the client. The ticker and channel are tracked in order to properly clean up and send
// all needed stop signals when the daemon is stopped.
// Note: this method is not synchronized. It is expected to be called from the client's `StartNewClient` method before
// `client.CompleteStartup`.
// the daemonStartup waitgroup signals.
func (c *Client) newTickerWithStop(intervalMs int) (*time.Ticker, <-chan bool) {
ticker := time.NewTicker(time.Duration(intervalMs) * time.Millisecond)
c.tickers = append(c.tickers, ticker)
Expand Down Expand Up @@ -249,6 +259,7 @@ func (c *Client) start(ctx context.Context,

pricefeedClient := api.NewPriceFeedServiceClient(daemonConn)
subTaskRunner.StartPriceUpdater(
c,
ctx,
priceUpdaterTicker,
priceUpdaterStop,
Expand Down
71 changes: 71 additions & 0 deletions protocol/daemons/pricefeed/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
pricefeed_types "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types/pricefeed"
daemontypes "github.com/dydxprotocol/v4-chain/protocol/daemons/types"
"github.com/dydxprotocol/v4-chain/protocol/testutil/appoptions"
daemontestutils "github.com/dydxprotocol/v4-chain/protocol/testutil/daemons"
grpc_util "github.com/dydxprotocol/v4-chain/protocol/testutil/grpc"
pricetypes "github.com/dydxprotocol/v4-chain/protocol/x/prices/types"
"google.golang.org/grpc"
Expand Down Expand Up @@ -51,6 +52,7 @@ type FakeSubTaskRunner struct {

// StartPriceUpdater replaces `client.StartPriceUpdater` and advances `UpdaterCallCount` by one.
func (f *FakeSubTaskRunner) StartPriceUpdater(
c *Client,
ctx context.Context,
ticker *time.Ticker,
stop <-chan bool,
Expand Down Expand Up @@ -247,6 +249,14 @@ func TestStart_InvalidConfig(t *testing.T) {
&faketaskRunner,
)

// Expect daemon is not healthy on startup. Daemon becomes healthy after the first successful market
// update.
require.ErrorContains(
t,
client.HealthCheck(),
"no successful update has occurred",
)

if tc.expectedError == nil {
require.NoError(t, err)
} else {
Expand Down Expand Up @@ -619,6 +629,7 @@ func TestPriceUpdater_Mixed(t *testing.T) {
},
"No exchange market prices, does not call `UpdateMarketPrices`": {
exchangeAndMarketPrices: []*client.ExchangeIdMarketPriceTimestamp{},
priceUpdateError: types.ErrEmptyMarketPriceUpdate,
},
"One market for one exchange": {
exchangeAndMarketPrices: []*client.ExchangeIdMarketPriceTimestamp{
Expand Down Expand Up @@ -721,6 +732,66 @@ func TestPriceUpdater_Mixed(t *testing.T) {
}
}

func TestHealthCheck_Mixed(t *testing.T) {
tests := map[string]struct {
updateMarketPricesError error
expectedError error
}{
"No error - daemon healthy": {
updateMarketPricesError: nil,
expectedError: nil,
},
"Error - daemon unhealthy": {
updateMarketPricesError: fmt.Errorf("failed to update market prices"),
expectedError: fmt.Errorf(
"failed to run price updater task loop for price daemon: failed to update market prices",
),
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
// Setup.
// Create `ExchangeIdMarketPriceTimestamp` and populate it with market-price updates.
etmp, err := types.NewExchangeToMarketPrices([]types.ExchangeId{constants.ExchangeId1})
require.NoError(t, err)
etmp.UpdatePrice(constants.ExchangeId1, constants.Market9_TimeT_Price1)

// Create a mock `PriceFeedServiceClient`.
mockPriceFeedClient := generateMockQueryClient()

// Mock the `UpdateMarketPrices` call to return an error if specified.
mockPriceFeedClient.On("UpdateMarketPrices", grpc_util.Ctx, mock.Anything).
Return(nil, tc.updateMarketPricesError).Once()

ticker, stop := daemontestutils.SingleTickTickerAndStop()
client := newClient()

// Act.
// Run the price updater for a single tick. Expect the daemon to toggle health state based on
// `UpdateMarketPrices` error response.
subTaskRunnerImpl.StartPriceUpdater(
client,
grpc_util.Ctx,
ticker,
stop,
etmp,
mockPriceFeedClient,
log.NewNopLogger(),
)

// Assert.
if tc.expectedError == nil {
require.NoError(t, client.HealthCheck())
} else {
require.ErrorContains(t, client.HealthCheck(), tc.expectedError.Error())
}

// Cleanup.
close(stop)
})
}
}

// TestMarketUpdater_Mixed tests the `RunMarketParamUpdaterTaskLoop` function invokes the grpc
// query to the prices query client and that if the query succeeds, the config is updated.
func TestMarketUpdater_Mixed(t *testing.T) {
Expand Down
20 changes: 16 additions & 4 deletions protocol/daemons/pricefeed/client/sub_task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"context"
"cosmossdk.io/errors"
"github.com/dydxprotocol/v4-chain/protocol/daemons/pricefeed/client/constants"
"github.com/dydxprotocol/v4-chain/protocol/daemons/pricefeed/client/price_encoder"
"github.com/dydxprotocol/v4-chain/protocol/daemons/pricefeed/client/price_fetcher"
Expand Down Expand Up @@ -35,6 +36,7 @@ var _ SubTaskRunner = (*SubTaskRunnerImpl)(nil)
// SubTaskRunner is the interface for running pricefeed client task functions.
type SubTaskRunner interface {
StartPriceUpdater(
c *Client,
ctx context.Context,
ticker *time.Ticker,
stop <-chan bool,
Expand Down Expand Up @@ -76,6 +78,7 @@ type SubTaskRunner interface {
// StartPriceUpdater runs in the daemon's main goroutine and does not need access to the daemon's wait group
// to signal task completion.
func (s *SubTaskRunnerImpl) StartPriceUpdater(
c *Client,
ctx context.Context,
ticker *time.Ticker,
stop <-chan bool,
Expand All @@ -87,8 +90,14 @@ func (s *SubTaskRunnerImpl) StartPriceUpdater(
select {
case <-ticker.C:
err := RunPriceUpdaterTaskLoop(ctx, exchangeToMarketPrices, priceFeedServiceClient, logger)
if err != nil {
panic(err)

if err == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should something similar happen for the following subtasks too?

  • StartPriceEncoder
  • StartPriceFetcher
  • StartMarketParamUpdater

Why or why not?

Copy link
Contributor Author

@clemire clemire Nov 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • StartPriceEncoder
  • StartPriceFetcher

No, because I think measuring the contents of the market update already captures more or less the same information, and is much simpler, than instrumenting these subtasks as well.

  • StartMarketParamUpdater

I think we made the decision that if a market update fails, we will continue to operate the protocol with the current market configuration on the daemon so that trading can continue for existing markets. (The reasonable assumption here IMO is that market param updates are failing not just for one validator, but across the network.) Marking unhealthy and panicking on a failure here could bring down the network instead of allowing degraded performance by continuing to support trading for existing markets. The better approach would probably be to continue the network and allow another governance vote to fix any bad parameters in the exchange_config_json.

One possibility to consider is that some daemons could fail and others succeed due to disagreeing app versions, but I think we should catch this by upgrading via governance to enforce compatibility if there is a backwards-incompatible change in daemon configuration.

Copy link
Contributor

@ttl33 ttl33 Nov 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, makes sense.

For StartMarketParamUpdater do we have clear err log msgs to indicate this failed? Do we have dashboard + alerting on this?

// Record update success for the daemon health check.
c.ReportSuccess()
} else {
logger.Error("Failed to run price updater task loop for price daemon", constants.ErrorLogKey, err)
// Record update failure for the daemon health check.
c.ReportFailure(errors.Wrap(err, "failed to run price updater task loop for price daemon"))
}

case <-stop:
Expand Down Expand Up @@ -265,8 +274,10 @@ func RunPriceUpdaterTaskLoop(
metrics.Latency,
)

// On startup the length of request will likely be 0. However, sending a request of length 0
// is a fatal error.
// On startup the length of request will likely be 0. Even so, we return an error here because this
// is unexpected behavior once the daemon reaches a steady state. The daemon health check process should
// be robust enough to ignore temporarily unhealthy daemons.
// Sending a request of length 0, however, causes a panic.
// panic: rpc error: code = Unknown desc = Market price update has length of 0.
if len(request.MarketPriceUpdates) > 0 {
_, err := priceFeedServiceClient.UpdateMarketPrices(ctx, request)
Comment on lines 274 to 283
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: This review was outside of the patch, so it was mapped to the patch with the greatest overlap. Original lines [274-306]

The RunPriceUpdaterTaskLoop function now returns an error of type types.ErrEmptyMarketPriceUpdate if the length of request.MarketPriceUpdates is 0. This is a good practice as it prevents sending empty updates which could cause a panic. However, it might be better to check this condition before starting the telemetry measurement on line 278 to avoid unnecessary computations for empty updates.

- if len(request.MarketPriceUpdates) > 0 {
- 	_, err := priceFeedServiceClient.UpdateMarketPrices(ctx, request)
- 	if err != nil {
- 		logger.Error("Failed to run price updater task loop for price daemon", "error", err)
- 		telemetry.IncrCounter(
- 			1,
- 			metrics.PricefeedDaemon,
- 			metrics.PriceUpdaterTaskLoop,
- 			metrics.Error,
- 		)
- 		return err
- 	}
- } else {
- 	logger.Info("Price update had length of 0")
- 	telemetry.IncrCounter(
- 		1,
- 		metrics.PricefeedDaemon,
- 		metrics.PriceUpdaterZeroPrices,
- 		metrics.Count,
- 	)
- 	return types.ErrEmptyMarketPriceUpdate
- }
+ if len(request.MarketPriceUpdates) == 0 {
+ 	logger.Info("Price update had length of 0")
+ 	telemetry.IncrCounter(
+ 		1,
+ 		metrics.PricefeedDaemon,
+ 		metrics.PriceUpdaterZeroPrices,
+ 		metrics.Count,
+ 	)
+ 	return types.ErrEmptyMarketPriceUpdate
+ }
+ _, err := priceFeedServiceClient.UpdateMarketPrices(ctx, request)
+ if err != nil {
+ 	logger.Error("Failed to run price updater task loop for price daemon", "error", err)
+ 	telemetry.IncrCounter(
+ 		1,
+ 		metrics.PricefeedDaemon,
+ 		metrics.PriceUpdaterTaskLoop,
+ 		metrics.Error,
+ 	)
+ 	return err
+ }

Expand All @@ -291,6 +302,7 @@ func RunPriceUpdaterTaskLoop(
metrics.PriceUpdaterZeroPrices,
metrics.Count,
)
return types.ErrEmptyMarketPriceUpdate
}

return nil
Expand Down
9 changes: 9 additions & 0 deletions protocol/daemons/pricefeed/client/types/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package types

import (
"errors"
)

var (
ErrEmptyMarketPriceUpdate = errors.New("Market price update has length of 0")
)
132 changes: 132 additions & 0 deletions protocol/daemons/types/health_checkable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package types

import (
"fmt"
libtime "github.com/dydxprotocol/v4-chain/protocol/lib/time"
"sync"
"time"
)

const (
MaxAcceptableUpdateDelay = 5 * time.Minute
)

// HealthCheckable is a common interface for services that can be health checked.
//
// Instances of this type are thread-safe.
type HealthCheckable interface {
clemire marked this conversation as resolved.
Show resolved Hide resolved
// HealthCheck returns an error if a service is unhealthy. If the service is healthy, this method returns nil.
HealthCheck() (err error)
// ReportFailure records a failed update.
ReportFailure(err error)
// ReportSuccess records a successful update.
ReportSuccess()
}

// timestampWithError couples a timestamp and error to make it easier to update them in tandem.
type timestampWithError struct {
timestamp time.Time
err error
}

func (u *timestampWithError) Update(timestamp time.Time, err error) {
u.timestamp = timestamp
u.err = err
}

func (u *timestampWithError) Timestamp() time.Time {
return u.timestamp
}

func (u *timestampWithError) Error() error {
return u.err
}

// timeBoundedHealthCheckable implements the HealthCheckable interface by tracking the timestamps of the last successful
// and failed updates.
// If any of the following occurs, then the service should be considered unhealthy:
// - no update has occurred
// - the most recent update failed, or
// - the daemon has not seen a successful update within `MaxAcceptableUpdateDelay`.
//
// This object is thread-safe.
type timeBoundedHealthCheckable struct {
clemire marked this conversation as resolved.
Show resolved Hide resolved
sync.Mutex

// lastSuccessfulUpdate is the timestamp of the last successful update.
lastSuccessfulUpdate time.Time
// lastFailedUpdate is the timestamp, error pair of the last failed update.
lastFailedUpdate timestampWithError

// timeProvider is the time provider used to determine the current time. This is used for timestamping
// creation and checking for update staleness during HealthCheck.
timeProvider libtime.TimeProvider
clemire marked this conversation as resolved.
Show resolved Hide resolved
}

// NewTimeBoundedHealthCheckable creates a new HealthCheckable instance.
func NewTimeBoundedHealthCheckable(serviceName string, timeProvider libtime.TimeProvider) HealthCheckable {
hc := &timeBoundedHealthCheckable{
timeProvider: timeProvider,
}
// Initialize the timeBoudnedHealthCheckable to an unhealthy state by reporting an error.
hc.ReportFailure(fmt.Errorf("%v is initializing", serviceName))
return hc
}

// ReportSuccess records a successful update. This method is thread-safe.
func (h *timeBoundedHealthCheckable) ReportSuccess() {
h.Lock()
defer h.Unlock()

h.lastSuccessfulUpdate = h.timeProvider.Now()
}

// ReportFailure records a failed update. This method is thread-safe.
func (h *timeBoundedHealthCheckable) ReportFailure(err error) {
h.Lock()
defer h.Unlock()
h.lastFailedUpdate.Update(h.timeProvider.Now(), err)
}

// HealthCheck returns an error if a service is unhealthy.
// The service is unhealthy if any of the following are true:
// - no successful update has occurred
// - the most recent update failed, or
// - the daemon has not seen a successful update in at least 5 minutes,
// Note: since the timeBoundedHealthCheckable is not exposed and can only be created via
// NewTimeBoundedHealthCheckable, we expect that the lastFailedUpdate is never a zero value.
// This method is synchronized.
func (h *timeBoundedHealthCheckable) HealthCheck() error {
h.Lock()
defer h.Unlock()

if h.lastSuccessfulUpdate.IsZero() {
return fmt.Errorf(
"no successful update has occurred; last failed update occurred at %v with error '%w'",
h.lastFailedUpdate.Timestamp(),
h.lastFailedUpdate.Error(),
)
}

if h.lastFailedUpdate.Timestamp().After(h.lastSuccessfulUpdate) {
return fmt.Errorf(
"last update failed at %v with error: '%w', most recent successful update occurred at %v",
h.lastFailedUpdate.Timestamp(),
h.lastFailedUpdate.Error(),
h.lastSuccessfulUpdate,
)
}

// If the last successful update was more than 5 minutes ago, report the specific error.
if h.timeProvider.Now().Sub(h.lastSuccessfulUpdate) > MaxAcceptableUpdateDelay {
return fmt.Errorf(
"last successful update occurred at %v, which is more than %v ago. Last failure occurred at %v with error '%w'",
h.lastSuccessfulUpdate,
MaxAcceptableUpdateDelay,
h.lastFailedUpdate.Timestamp(),
h.lastFailedUpdate.Error(),
)
}

return nil
}
Loading
Loading