Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CORE-666] - Robust health check #715

Merged
merged 28 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
9a9cc58
HealthCheck interface. Pricefeed daemon implementation of interface. …
Oct 11, 2023
3b00aaf
Checkpoint - optional refactor of subtaskrunner into client methods.
Oct 11, 2023
2250685
Back out subtaskrunner changes.
Oct 27, 2023
a032923
Test cases for daemon health.
Oct 27, 2023
49538d9
Channel cleanup.
Oct 27, 2023
656c256
Undo completeStartup and fix comment.
Oct 27, 2023
a1401da
Comments.
Oct 27, 2023
4f499ad
mega-nit: missing period.
Oct 27, 2023
d9f7108
remove daemon client. update internal health indicator to track error…
Nov 1, 2023
d1e2aee
Pricefeed updates with new health checkable shared implementation.
Nov 1, 2023
6c75e51
Updates to health-checkable - test cases, specific error messages.
Nov 2, 2023
3795643
Migrate pricefeed changes. Lint fix for health check tests.
Nov 2, 2023
cebb504
Remove accidental addition to this branch.
Nov 2, 2023
d73c727
Remove explicit toggle to unhealthy state. This will be caught after …
Nov 2, 2023
1e7f8a2
Comment cleanup.
Nov 2, 2023
393b53b
PR feedback.
Nov 2, 2023
0c48c23
Performance improvement - reduce test time from 100ms to sub-ms for h…
Nov 2, 2023
c325aec
Refactor test util method since it will be used for all daemons.
Nov 2, 2023
9db3da6
Update protocol/daemons/types/health_checkable.go
clemire Nov 3, 2023
eee7e9c
Update protocol/daemons/types/health_checkable.go
clemire Nov 3, 2023
a59c138
Update protocol/daemons/types/health_checkable.go
clemire Nov 3, 2023
0d8d97b
Update protocol/daemons/types/health_checkable.go
clemire Nov 3, 2023
2ef12a2
Update protocol/daemons/types/health_checkable.go
clemire Nov 3, 2023
34dba18
Update protocol/daemons/types/health_checkable.go
clemire Nov 3, 2023
4d14a5b
Remove timestamp parameter from health check reporth methods.
Nov 3, 2023
7f4ae9b
Update comment for consistency.
Nov 3, 2023
a87f57f
Update protocol/daemons/types/health_checkable_test.go
clemire Nov 4, 2023
1ac3bd9
Update protocol/daemons/types/health_checkable_test.go
clemire Nov 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions protocol/daemons/liquidation/client/sub_task_runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package client

import (
"context"
"github.com/cosmos/cosmos-sdk/telemetry"
"github.com/dydxprotocol/v4-chain/protocol/daemons/flags"
"github.com/dydxprotocol/v4-chain/protocol/daemons/liquidation/api"
"github.com/dydxprotocol/v4-chain/protocol/lib/metrics"
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"
"time"
)

// SubTaskRunner provides an interface that encapsulates the liquidations daemon logic to gather and report
// potentially liquidatable subaccount ids. This interface is used to mock the daemon logic in tests.
type SubTaskRunner interface {
RunLiquidationDaemonTaskLoop(
ctx context.Context,
liqFlags flags.LiquidationFlags,
subaccountQueryClient satypes.QueryClient,
clobQueryClient clobtypes.QueryClient,
liquidationServiceClient api.LiquidationServiceClient,
) error
}

type SubTaskRunnerImpl struct{}

// Ensure SubTaskRunnerImpl implements the SubTaskRunner interface.
var _ SubTaskRunner = (*SubTaskRunnerImpl)(nil)

// RunLiquidationDaemonTaskLoop contains the logic to communicate with various gRPC services
// to find the liquidatable subaccount ids.
func (s *SubTaskRunnerImpl) RunLiquidationDaemonTaskLoop(
ctx context.Context,
liqFlags flags.LiquidationFlags,
subaccountQueryClient satypes.QueryClient,
clobQueryClient clobtypes.QueryClient,
liquidationServiceClient api.LiquidationServiceClient,
) error {
defer telemetry.ModuleMeasureSince(
metrics.LiquidationDaemon,
time.Now(),
metrics.MainTaskLoop,
metrics.Latency,
)

// 1. Fetch all subaccounts from query service.
subaccounts, err := GetAllSubaccounts(
ctx,
subaccountQueryClient,
liqFlags.SubaccountPageLimit,
)
if err != nil {
return err
}

// 2. Check collateralization statuses of subaccounts with at least one open position.
liquidatableSubaccountIds, err := GetLiquidatableSubaccountIds(
ctx,
clobQueryClient,
liqFlags,
subaccounts,
)
if err != nil {
return err
}

// 3. Send the list of liquidatable subaccount ids to the daemon server.
err = SendLiquidatableSubaccountIds(
ctx,
liquidationServiceClient,
liquidatableSubaccountIds,
)
if err != nil {
return err
}

return nil
}
clemire marked this conversation as resolved.
Show resolved Hide resolved
16 changes: 15 additions & 1 deletion protocol/daemons/pricefeed/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,14 @@ type Client struct {

// Ensure stop only executes one time.
stopDaemon sync.Once

// include HealthCheckableImpl to track the health of the daemon.
daemontypes.HealthCheckableImpl
clemire marked this conversation as resolved.
Show resolved Hide resolved
}

// Ensure Client implements the HealthCheckable interface.
var _ daemontypes.HealthCheckable = (*Client)(nil)

func newClient() *Client {
client := &Client{
tickers: []*time.Ticker{},
Expand All @@ -59,14 +65,15 @@ func newClient() *Client {

// Set the client's daemonStartup state to indicate that the daemon has not finished starting up.
client.daemonStartup.Add(1)
client.InitializeHealthStatus(constants.PricefeedDaemonModuleName, &libtime.TimeProviderImpl{})
return client
}

// newTickerWithStop creates a new ticker and a channel for iteratively looping through a subtask with a stop signal
// for any subtask kicked off by the client. The ticker and channel are tracked in order to properly clean up and send
// all needed stop signals when the daemon is stopped.
// Note: this method is not synchronized. It is expected to be called from the client's `StartNewClient` method before
// `client.CompleteStartup`.
// the daemonStartup waitgroup signals.
func (c *Client) newTickerWithStop(intervalMs int) (*time.Ticker, <-chan bool) {
ticker := time.NewTicker(time.Duration(intervalMs) * time.Millisecond)
c.tickers = append(c.tickers, ticker)
Expand All @@ -91,6 +98,12 @@ func (c *Client) Stop() {
}

c.runningSubtasksWaitGroup.Wait()

// When the daemon stops, toggle it into an unhealthy state.
c.RecordUpdateFailure(
&libtime.TimeProviderImpl{},
fmt.Errorf("%v stopped", constants.PricefeedDaemonModuleName),
)
})
}

Expand Down Expand Up @@ -249,6 +262,7 @@ func (c *Client) start(ctx context.Context,

pricefeedClient := api.NewPriceFeedServiceClient(daemonConn)
subTaskRunner.StartPriceUpdater(
c,
ctx,
priceUpdaterTicker,
priceUpdaterStop,
Expand Down
94 changes: 94 additions & 0 deletions protocol/daemons/pricefeed/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
daemonserver "github.com/dydxprotocol/v4-chain/protocol/daemons/server"
pricefeed_types "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types/pricefeed"
daemontypes "github.com/dydxprotocol/v4-chain/protocol/daemons/types"
libtime "github.com/dydxprotocol/v4-chain/protocol/lib/time"
"github.com/dydxprotocol/v4-chain/protocol/testutil/appoptions"
grpc_util "github.com/dydxprotocol/v4-chain/protocol/testutil/grpc"
pricetypes "github.com/dydxprotocol/v4-chain/protocol/x/prices/types"
Expand Down Expand Up @@ -51,6 +52,7 @@ type FakeSubTaskRunner struct {

// StartPriceUpdater replaces `client.StartPriceUpdater` and advances `UpdaterCallCount` by one.
func (f *FakeSubTaskRunner) StartPriceUpdater(
c *Client,
ctx context.Context,
ticker *time.Ticker,
stop <-chan bool,
Expand Down Expand Up @@ -247,6 +249,14 @@ func TestStart_InvalidConfig(t *testing.T) {
&faketaskRunner,
)

// Expect daemon is not healthy on startup. Daemon becomes healthy after the first successful market
// update.
require.ErrorContains(
t,
client.HealthCheck(&libtime.TimeProviderImpl{}),
"pricefeed-daemon is initializing",
)

if tc.expectedError == nil {
require.NoError(t, err)
} else {
Expand Down Expand Up @@ -619,6 +629,7 @@ func TestPriceUpdater_Mixed(t *testing.T) {
},
"No exchange market prices, does not call `UpdateMarketPrices`": {
exchangeAndMarketPrices: []*client.ExchangeIdMarketPriceTimestamp{},
priceUpdateError: types.ErrEmptyMarketPriceUpdate,
},
"One market for one exchange": {
exchangeAndMarketPrices: []*client.ExchangeIdMarketPriceTimestamp{
Expand Down Expand Up @@ -721,6 +732,89 @@ func TestPriceUpdater_Mixed(t *testing.T) {
}
}

// singleTickTickerAndStop creates a ticker that ticks once before the stop channel is signaled. In order for this
// ticker to be effective, it needs to be consumed within 100ms.
func singleTickTickerAndStop() (*time.Ticker, chan bool) {
// Create a ticker with a duration long enough that we do not expect to see a tick within the timeframe
// of a normal unit test.
ticker := time.NewTicker(10 * time.Minute)
// Override the ticker's channel with a new channel we can insert into directly, and add a single tick.
newChan := make(chan time.Time, 1)
newChan <- time.Now()
ticker.C = newChan

stop := make(chan bool, 1)

// After 100ms, stop the ticker and signal the stop channel.
go func() {
time.Sleep(100 * time.Millisecond)
ticker.Stop()
stop <- true
}()

return ticker, stop
}

func TestHealthCheck_Mixed(t *testing.T) {
tests := map[string]struct {
updateMarketPricesError error
expectedError error
}{
"No error - daemon healthy": {
updateMarketPricesError: nil,
expectedError: nil,
},
"Error - daemon unhealthy": {
updateMarketPricesError: fmt.Errorf("failed to update market prices"),
expectedError: fmt.Errorf(
"failed to run price updater task loop for price daemon: failed to update market prices",
),
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
// Setup.
// Create `ExchangeIdMarketPriceTimestamp` and populate it with market-price updates.
etmp, err := types.NewExchangeToMarketPrices([]types.ExchangeId{constants.ExchangeId1})
require.NoError(t, err)
etmp.UpdatePrice(constants.ExchangeId1, constants.Market9_TimeT_Price1)

// Create a mock `PriceFeedServiceClient`.
mockPriceFeedClient := generateMockQueryClient()

// Mock the `UpdateMarketPrices` call to return an error if specified.
mockPriceFeedClient.On("UpdateMarketPrices", grpc_util.Ctx, mock.Anything).
Return(nil, tc.updateMarketPricesError).Once()

ticker, stop := singleTickTickerAndStop()
client := newClient()

// Act.
// Run the price updater for a single tick with a successful update. Expect the daemon to toggle to a
// healthy state.
subTaskRunnerImpl.StartPriceUpdater(
client,
grpc_util.Ctx,
ticker,
stop,
etmp,
mockPriceFeedClient,
log.NewNopLogger(),
)
// Assert.
timeProvider := &libtime.TimeProviderImpl{}
if tc.expectedError == nil {
require.NoError(t, client.HealthCheck(timeProvider))
} else {
require.ErrorContains(t, client.HealthCheck(timeProvider), tc.expectedError.Error())
}

// Cleanup.
close(stop)
})
}
}

// TestMarketUpdater_Mixed tests the `RunMarketParamUpdaterTaskLoop` function invokes the grpc
// query to the prices query client and that if the query succeeds, the config is updated.
func TestMarketUpdater_Mixed(t *testing.T) {
Expand Down
24 changes: 20 additions & 4 deletions protocol/daemons/pricefeed/client/sub_task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package client

import (
"context"
"cosmossdk.io/errors"
"github.com/dydxprotocol/v4-chain/protocol/daemons/pricefeed/client/constants"
"github.com/dydxprotocol/v4-chain/protocol/daemons/pricefeed/client/price_encoder"
"github.com/dydxprotocol/v4-chain/protocol/daemons/pricefeed/client/price_fetcher"
daemontypes "github.com/dydxprotocol/v4-chain/protocol/daemons/types"
libtime "github.com/dydxprotocol/v4-chain/protocol/lib/time"
pricetypes "github.com/dydxprotocol/v4-chain/protocol/x/prices/types"
"net/http"
"time"
Expand Down Expand Up @@ -35,6 +37,7 @@ var _ SubTaskRunner = (*SubTaskRunnerImpl)(nil)
// SubTaskRunner is the interface for running pricefeed client task functions.
type SubTaskRunner interface {
StartPriceUpdater(
c *Client,
ctx context.Context,
ticker *time.Ticker,
stop <-chan bool,
Expand Down Expand Up @@ -76,6 +79,7 @@ type SubTaskRunner interface {
// StartPriceUpdater runs in the daemon's main goroutine and does not need access to the daemon's wait group
// to signal task completion.
func (s *SubTaskRunnerImpl) StartPriceUpdater(
c *Client,
ctx context.Context,
ticker *time.Ticker,
stop <-chan bool,
Expand All @@ -87,8 +91,17 @@ func (s *SubTaskRunnerImpl) StartPriceUpdater(
select {
case <-ticker.C:
err := RunPriceUpdaterTaskLoop(ctx, exchangeToMarketPrices, priceFeedServiceClient, logger)
if err != nil {
panic(err)

if err == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should something similar happen for the following subtasks too?

  • StartPriceEncoder
  • StartPriceFetcher
  • StartMarketParamUpdater

Why or why not?

Copy link
Contributor Author

@clemire clemire Nov 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • StartPriceEncoder
  • StartPriceFetcher

No, because I think measuring the contents of the market update already captures more or less the same information, and is much simpler, than instrumenting these subtasks as well.

  • StartMarketParamUpdater

I think we made the decision that if a market update fails, we will continue to operate the protocol with the current market configuration on the daemon so that trading can continue for existing markets. (The reasonable assumption here IMO is that market param updates are failing not just for one validator, but across the network.) Marking unhealthy and panicking on a failure here could bring down the network instead of allowing degraded performance by continuing to support trading for existing markets. The better approach would probably be to continue the network and allow another governance vote to fix any bad parameters in the exchange_config_json.

One possibility to consider is that some daemons could fail and others succeed due to disagreeing app versions, but I think we should catch this by upgrading via governance to enforce compatibility if there is a backwards-incompatible change in daemon configuration.

Copy link
Contributor

@ttl33 ttl33 Nov 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, makes sense.

For StartMarketParamUpdater do we have clear err log msgs to indicate this failed? Do we have dashboard + alerting on this?

// Record update success for the daemon health check.
c.RecordUpdateSuccess(&libtime.TimeProviderImpl{})
clemire marked this conversation as resolved.
Show resolved Hide resolved
} else {
logger.Error("Failed to run price updater task loop for price daemon", constants.ErrorLogKey, err)
// Record update failure for the daemon health check.
c.RecordUpdateFailure(
&libtime.TimeProviderImpl{},
errors.Wrap(err, "failed to run price updater task loop for price daemon"),
)
}

case <-stop:
Expand Down Expand Up @@ -265,8 +278,10 @@ func RunPriceUpdaterTaskLoop(
metrics.Latency,
)

// On startup the length of request will likely be 0. However, sending a request of length 0
// is a fatal error.
// On startup the length of request will likely be 0. Even so, we return an error here because this
// is unexpected behavior once the daemon reaches a steady state. The daemon health check process should
// be robust enough to ignore temporarily unhealthy daemons.
// Sending a request of length 0, however, causes a panic.
// panic: rpc error: code = Unknown desc = Market price update has length of 0.
if len(request.MarketPriceUpdates) > 0 {
_, err := priceFeedServiceClient.UpdateMarketPrices(ctx, request)
Comment on lines 274 to 283
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: This review was outside of the patch, so it was mapped to the patch with the greatest overlap. Original lines [274-306]

The RunPriceUpdaterTaskLoop function now returns an error of type types.ErrEmptyMarketPriceUpdate if the length of request.MarketPriceUpdates is 0. This is a good practice as it prevents sending empty updates which could cause a panic. However, it might be better to check this condition before starting the telemetry measurement on line 278 to avoid unnecessary computations for empty updates.

- if len(request.MarketPriceUpdates) > 0 {
- 	_, err := priceFeedServiceClient.UpdateMarketPrices(ctx, request)
- 	if err != nil {
- 		logger.Error("Failed to run price updater task loop for price daemon", "error", err)
- 		telemetry.IncrCounter(
- 			1,
- 			metrics.PricefeedDaemon,
- 			metrics.PriceUpdaterTaskLoop,
- 			metrics.Error,
- 		)
- 		return err
- 	}
- } else {
- 	logger.Info("Price update had length of 0")
- 	telemetry.IncrCounter(
- 		1,
- 		metrics.PricefeedDaemon,
- 		metrics.PriceUpdaterZeroPrices,
- 		metrics.Count,
- 	)
- 	return types.ErrEmptyMarketPriceUpdate
- }
+ if len(request.MarketPriceUpdates) == 0 {
+ 	logger.Info("Price update had length of 0")
+ 	telemetry.IncrCounter(
+ 		1,
+ 		metrics.PricefeedDaemon,
+ 		metrics.PriceUpdaterZeroPrices,
+ 		metrics.Count,
+ 	)
+ 	return types.ErrEmptyMarketPriceUpdate
+ }
+ _, err := priceFeedServiceClient.UpdateMarketPrices(ctx, request)
+ if err != nil {
+ 	logger.Error("Failed to run price updater task loop for price daemon", "error", err)
+ 	telemetry.IncrCounter(
+ 		1,
+ 		metrics.PricefeedDaemon,
+ 		metrics.PriceUpdaterTaskLoop,
+ 		metrics.Error,
+ 	)
+ 	return err
+ }

Expand All @@ -291,6 +306,7 @@ func RunPriceUpdaterTaskLoop(
metrics.PriceUpdaterZeroPrices,
metrics.Count,
)
return types.ErrEmptyMarketPriceUpdate
}

return nil
Expand Down
9 changes: 9 additions & 0 deletions protocol/daemons/pricefeed/client/types/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package types

import (
"errors"
)

var (
ErrEmptyMarketPriceUpdate = errors.New("Market price update has length of 0")
)
Loading
Loading