Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CORE-666] - Robust health check #715

Merged
merged 28 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
9a9cc58
HealthCheck interface. Pricefeed daemon implementation of interface. …
Oct 11, 2023
3b00aaf
Checkpoint - optional refactor of subtaskrunner into client methods.
Oct 11, 2023
2250685
Back out subtaskrunner changes.
Oct 27, 2023
a032923
Test cases for daemon health.
Oct 27, 2023
49538d9
Channel cleanup.
Oct 27, 2023
656c256
Undo completeStartup and fix comment.
Oct 27, 2023
a1401da
Comments.
Oct 27, 2023
4f499ad
mega-nit: missing period.
Oct 27, 2023
d9f7108
remove daemon client. update internal health indicator to track error…
Nov 1, 2023
d1e2aee
Pricefeed updates with new health checkable shared implementation.
Nov 1, 2023
6c75e51
Updates to health-checkable - test cases, specific error messages.
Nov 2, 2023
3795643
Migrate pricefeed changes. Lint fix for health check tests.
Nov 2, 2023
cebb504
Remove accidental addition to this branch.
Nov 2, 2023
d73c727
Remove explicit toggle to unhealthy state. This will be caught after …
Nov 2, 2023
1e7f8a2
Comment cleanup.
Nov 2, 2023
393b53b
PR feedback.
Nov 2, 2023
0c48c23
Performance improvement - reduce test time from 100ms to sub-ms for h…
Nov 2, 2023
c325aec
Refactor test util method since it will be used for all daemons.
Nov 2, 2023
9db3da6
Update protocol/daemons/types/health_checkable.go
clemire Nov 3, 2023
eee7e9c
Update protocol/daemons/types/health_checkable.go
clemire Nov 3, 2023
a59c138
Update protocol/daemons/types/health_checkable.go
clemire Nov 3, 2023
0d8d97b
Update protocol/daemons/types/health_checkable.go
clemire Nov 3, 2023
2ef12a2
Update protocol/daemons/types/health_checkable.go
clemire Nov 3, 2023
34dba18
Update protocol/daemons/types/health_checkable.go
clemire Nov 3, 2023
4d14a5b
Remove timestamp parameter from health check reporth methods.
Nov 3, 2023
7f4ae9b
Update comment for consistency.
Nov 3, 2023
a87f57f
Update protocol/daemons/types/health_checkable_test.go
clemire Nov 4, 2023
1ac3bd9
Update protocol/daemons/types/health_checkable_test.go
clemire Nov 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions protocol/daemons/liquidation/client/sub_task_runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package client

import (
"context"
"github.com/cosmos/cosmos-sdk/telemetry"
"github.com/dydxprotocol/v4-chain/protocol/daemons/flags"
"github.com/dydxprotocol/v4-chain/protocol/daemons/liquidation/api"
"github.com/dydxprotocol/v4-chain/protocol/lib/metrics"
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"
"time"
)

// SubTaskRunner provides an interface that encapsulates the liquidations daemon logic to gather and report
// potentially liquidatable subaccount ids. This interface is used to mock the daemon logic in tests.
type SubTaskRunner interface {
RunLiquidationDaemonTaskLoop(
ctx context.Context,
liqFlags flags.LiquidationFlags,
subaccountQueryClient satypes.QueryClient,
clobQueryClient clobtypes.QueryClient,
liquidationServiceClient api.LiquidationServiceClient,
) error
}

type SubTaskRunnerImpl struct{}

// Ensure SubTaskRunnerImpl implements the SubTaskRunner interface.
var _ SubTaskRunner = (*SubTaskRunnerImpl)(nil)

// RunLiquidationDaemonTaskLoop contains the logic to communicate with various gRPC services
// to find the liquidatable subaccount ids.
func (s *SubTaskRunnerImpl) RunLiquidationDaemonTaskLoop(
ctx context.Context,
liqFlags flags.LiquidationFlags,
subaccountQueryClient satypes.QueryClient,
clobQueryClient clobtypes.QueryClient,
liquidationServiceClient api.LiquidationServiceClient,
) error {
defer telemetry.ModuleMeasureSince(
metrics.LiquidationDaemon,
time.Now(),
metrics.MainTaskLoop,
metrics.Latency,
)

// 1. Fetch all subaccounts from query service.
subaccounts, err := GetAllSubaccounts(
ctx,
subaccountQueryClient,
liqFlags.SubaccountPageLimit,
)
if err != nil {
return err
}

// 2. Check collateralization statuses of subaccounts with at least one open position.
liquidatableSubaccountIds, err := GetLiquidatableSubaccountIds(
ctx,
clobQueryClient,
liqFlags,
subaccounts,
)
if err != nil {
return err
}

// 3. Send the list of liquidatable subaccount ids to the daemon server.
err = SendLiquidatableSubaccountIds(
ctx,
liquidationServiceClient,
liquidatableSubaccountIds,
)
if err != nil {
return err
}

return nil
}
clemire marked this conversation as resolved.
Show resolved Hide resolved
43 changes: 40 additions & 3 deletions protocol/daemons/pricefeed/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"context"
sdkerrors "cosmossdk.io/errors"
"errors"
"fmt"
"sync"
Expand Down Expand Up @@ -49,12 +50,46 @@ type Client struct {

// Ensure stop only executes one time.
stopDaemon sync.Once

// healthLock is used to synchronize access to the healthError field.
healthLock sync.Mutex

// healthError is used to track daemon health. For a healthy daemon, this field is nil. For an unhealthy daemon,
// this field is set to the error that caused the daemon to become unhealthy.
healthError error
}

// Ensure Client implements the HealthCheckable interface.
var _ daemontypes.HealthCheckable = (*Client)(nil)

// HealthCheck returns an error if the daemon is unhealthy.
// The daemon is considered unhealthy if it is not producing non-empty price updates. We expect a short period of time
// to pass on daemon startup where the daemon is unhealthy as the price cache warms up.
// This method is synchronized by isHealthy, which is an atomic.Bool.
func (c *Client) HealthCheck(_ context.Context) error {
c.healthLock.Lock()
defer c.healthLock.Unlock()

if c.healthError == nil {
return nil
}
return sdkerrors.Wrap(c.healthError, "price deamon unhealthy")
}

// setHealth sets the health of the daemon. Setting the health with a nil err indicates the daemon is healthy. If the
// daemon is unhealthy, the err parameter will be used to track the reason. This method is synchronized by healthLock.
func (c *Client) setHealth(err error) {
c.healthLock.Lock()
defer c.healthLock.Unlock()

c.healthError = err
}

func newClient() *Client {
client := &Client{
tickers: []*time.Ticker{},
stops: []chan bool{},
tickers: []*time.Ticker{},
stops: []chan bool{},
healthError: errors.New("daemon uninitialized"),
}

// Set the client's daemonStartup state to indicate that the daemon has not finished starting up.
Expand All @@ -66,7 +101,7 @@ func newClient() *Client {
// for any subtask kicked off by the client. The ticker and channel are tracked in order to properly clean up and send
// all needed stop signals when the daemon is stopped.
// Note: this method is not synchronized. It is expected to be called from the client's `StartNewClient` method before
// `client.CompleteStartup`.
// the daemonStartup waitgroup signals.
func (c *Client) newTickerWithStop(intervalMs int) (*time.Ticker, <-chan bool) {
ticker := time.NewTicker(time.Duration(intervalMs) * time.Millisecond)
c.tickers = append(c.tickers, ticker)
Expand All @@ -91,6 +126,7 @@ func (c *Client) Stop() {
}

c.runningSubtasksWaitGroup.Wait()
c.setHealth(errors.New("daemon stopped"))
})
}

Expand Down Expand Up @@ -249,6 +285,7 @@ func (c *Client) start(ctx context.Context,

pricefeedClient := api.NewPriceFeedServiceClient(daemonConn)
subTaskRunner.StartPriceUpdater(
c,
ctx,
priceUpdaterTicker,
priceUpdaterStop,
Expand Down
89 changes: 89 additions & 0 deletions protocol/daemons/pricefeed/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type FakeSubTaskRunner struct {

// StartPriceUpdater replaces `client.StartPriceUpdater` and advances `UpdaterCallCount` by one.
func (f *FakeSubTaskRunner) StartPriceUpdater(
c *Client,
ctx context.Context,
ticker *time.Ticker,
stop <-chan bool,
Expand Down Expand Up @@ -247,6 +248,10 @@ func TestStart_InvalidConfig(t *testing.T) {
&faketaskRunner,
)

// Expect daemon is not healthy on startup. Daemon becomes healthy after the first successful market
// update.
require.ErrorContains(t, client.HealthCheck(grpc_util.Ctx), "daemon uninitialized")

if tc.expectedError == nil {
require.NoError(t, err)
} else {
Expand Down Expand Up @@ -619,6 +624,7 @@ func TestPriceUpdater_Mixed(t *testing.T) {
},
"No exchange market prices, does not call `UpdateMarketPrices`": {
exchangeAndMarketPrices: []*client.ExchangeIdMarketPriceTimestamp{},
priceUpdateError: types.ErrEmptyMarketPriceUpdate,
},
"One market for one exchange": {
exchangeAndMarketPrices: []*client.ExchangeIdMarketPriceTimestamp{
Expand Down Expand Up @@ -721,6 +727,89 @@ func TestPriceUpdater_Mixed(t *testing.T) {
}
}

// singleTickTickerAndStop creates a ticker that ticks once before the stop channel is signaled. In order for this
// ticker to be effective, it needs to be consumed within 100ms.
func singleTickTickerAndStop() (*time.Ticker, chan bool) {
// Create a ticker with a duration long enough that we do not expect to see a tick within the timeframe
// of a normal unit test.
ticker := time.NewTicker(10 * time.Minute)
// Override the ticker's channel with a new channel we can insert into directly, and add a single tick.
newChan := make(chan time.Time, 1)
newChan <- time.Now()
ticker.C = newChan

stop := make(chan bool, 1)

// After 100ms, stop the ticker and signal the stop channel.
go func() {
time.Sleep(100 * time.Millisecond)
ticker.Stop()
stop <- true
}()

return ticker, stop
}

func TestHealthCheck_Mixed(t *testing.T) {
tests := map[string]struct {
updateMarketPricesError error
expectedError error
}{
"No error - daemon healthy": {
updateMarketPricesError: nil,
expectedError: nil,
},
"Error - daemon unhealthy": {
updateMarketPricesError: fmt.Errorf("failed to update market prices"),
expectedError: fmt.Errorf(
"price deamon unhealthy: failed to run price updater task loop for price daemon: " +
"failed to update market prices",
),
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
// Setup.
// Create `ExchangeIdMarketPriceTimestamp` and populate it with market-price updates.
etmp, err := types.NewExchangeToMarketPrices([]types.ExchangeId{constants.ExchangeId1})
require.NoError(t, err)
etmp.UpdatePrice(constants.ExchangeId1, constants.Market9_TimeT_Price1)

// Create a mock `PriceFeedServiceClient`.
mockPriceFeedClient := generateMockQueryClient()

// Mock the `UpdateMarketPrices` call to return an error if specified.
mockPriceFeedClient.On("UpdateMarketPrices", grpc_util.Ctx, mock.Anything).
Return(nil, tc.updateMarketPricesError).Once()

ticker, stop := singleTickTickerAndStop()
client := newClient()

// Act.
// Run the price updater for a single tick with a successful update. Expect the daemon to toggle to a
// healthy state.
subTaskRunnerImpl.StartPriceUpdater(
client,
grpc_util.Ctx,
ticker,
stop,
etmp,
mockPriceFeedClient,
log.NewNopLogger(),
)
// Assert.
if tc.expectedError == nil {
require.NoError(t, client.HealthCheck(grpc_util.Ctx))
} else {
require.ErrorContains(t, client.HealthCheck(grpc_util.Ctx), tc.expectedError.Error())
}

// Cleanup.
close(stop)
})
}
}

// TestMarketUpdater_Mixed tests the `RunMarketParamUpdaterTaskLoop` function invokes the grpc
// query to the prices query client and that if the query succeeds, the config is updated.
func TestMarketUpdater_Mixed(t *testing.T) {
Expand Down
14 changes: 11 additions & 3 deletions protocol/daemons/pricefeed/client/sub_task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"context"
"cosmossdk.io/errors"
"github.com/dydxprotocol/v4-chain/protocol/daemons/pricefeed/client/constants"
"github.com/dydxprotocol/v4-chain/protocol/daemons/pricefeed/client/price_encoder"
"github.com/dydxprotocol/v4-chain/protocol/daemons/pricefeed/client/price_fetcher"
Expand Down Expand Up @@ -35,6 +36,7 @@ var _ SubTaskRunner = (*SubTaskRunnerImpl)(nil)
// SubTaskRunner is the interface for running pricefeed client task functions.
type SubTaskRunner interface {
StartPriceUpdater(
c *Client,
ctx context.Context,
ticker *time.Ticker,
stop <-chan bool,
Expand Down Expand Up @@ -76,6 +78,7 @@ type SubTaskRunner interface {
// StartPriceUpdater runs in the daemon's main goroutine and does not need access to the daemon's wait group
// to signal task completion.
func (s *SubTaskRunnerImpl) StartPriceUpdater(
c *Client,
ctx context.Context,
ticker *time.Ticker,
stop <-chan bool,
Expand All @@ -88,9 +91,11 @@ func (s *SubTaskRunnerImpl) StartPriceUpdater(
case <-ticker.C:
err := RunPriceUpdaterTaskLoop(ctx, exchangeToMarketPrices, priceFeedServiceClient, logger)
if err != nil {
panic(err)
logger.Error("Failed to run price updater task loop for price daemon", constants.ErrorLogKey, err)
}

c.setHealth(errors.Wrap(err, "failed to run price updater task loop for price daemon"))

case <-stop:
return
}
Expand Down Expand Up @@ -265,8 +270,10 @@ func RunPriceUpdaterTaskLoop(
metrics.Latency,
)

// On startup the length of request will likely be 0. However, sending a request of length 0
// is a fatal error.
// On startup the length of request will likely be 0. Even so, we return an error here because this
// is unexpected behavior once the daemon reaches a steady state. The daemon health check process should
// be robust enough to ignore temporarily unhealthy daemons.
// Sending a request of length 0, however, causes a panic.
// panic: rpc error: code = Unknown desc = Market price update has length of 0.
if len(request.MarketPriceUpdates) > 0 {
_, err := priceFeedServiceClient.UpdateMarketPrices(ctx, request)
Comment on lines 274 to 283
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: This review was outside of the patch, so it was mapped to the patch with the greatest overlap. Original lines [274-306]

The RunPriceUpdaterTaskLoop function now returns an error of type types.ErrEmptyMarketPriceUpdate if the length of request.MarketPriceUpdates is 0. This is a good practice as it prevents sending empty updates which could cause a panic. However, it might be better to check this condition before starting the telemetry measurement on line 278 to avoid unnecessary computations for empty updates.

- if len(request.MarketPriceUpdates) > 0 {
- 	_, err := priceFeedServiceClient.UpdateMarketPrices(ctx, request)
- 	if err != nil {
- 		logger.Error("Failed to run price updater task loop for price daemon", "error", err)
- 		telemetry.IncrCounter(
- 			1,
- 			metrics.PricefeedDaemon,
- 			metrics.PriceUpdaterTaskLoop,
- 			metrics.Error,
- 		)
- 		return err
- 	}
- } else {
- 	logger.Info("Price update had length of 0")
- 	telemetry.IncrCounter(
- 		1,
- 		metrics.PricefeedDaemon,
- 		metrics.PriceUpdaterZeroPrices,
- 		metrics.Count,
- 	)
- 	return types.ErrEmptyMarketPriceUpdate
- }
+ if len(request.MarketPriceUpdates) == 0 {
+ 	logger.Info("Price update had length of 0")
+ 	telemetry.IncrCounter(
+ 		1,
+ 		metrics.PricefeedDaemon,
+ 		metrics.PriceUpdaterZeroPrices,
+ 		metrics.Count,
+ 	)
+ 	return types.ErrEmptyMarketPriceUpdate
+ }
+ _, err := priceFeedServiceClient.UpdateMarketPrices(ctx, request)
+ if err != nil {
+ 	logger.Error("Failed to run price updater task loop for price daemon", "error", err)
+ 	telemetry.IncrCounter(
+ 		1,
+ 		metrics.PricefeedDaemon,
+ 		metrics.PriceUpdaterTaskLoop,
+ 		metrics.Error,
+ 	)
+ 	return err
+ }

Expand All @@ -291,6 +298,7 @@ func RunPriceUpdaterTaskLoop(
metrics.PriceUpdaterZeroPrices,
metrics.Count,
)
return types.ErrEmptyMarketPriceUpdate
}

return nil
Expand Down
9 changes: 9 additions & 0 deletions protocol/daemons/pricefeed/client/types/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package types

import (
"errors"
)

var (
ErrEmptyMarketPriceUpdate = errors.New("Market price update has length of 0")
)
Loading
Loading