Skip to content

Commit

Permalink
Update liquidations daemon to report success after every paginated gR…
Browse files Browse the repository at this point in the history
…PC query.

Update error for a more specific test.

PR comments.

Revert liquidations health reporting to occur only at task loop boundary.

Checkpoint.

checkpoint - implementation, moved methods to app. need to migrate / add testing.

Update test app.

Checkpoint for tests.

Finish test cases.

App unit test + cleanup.

Cleanup, rename.

cleanup.
  • Loading branch information
Crystal Lemire committed Nov 21, 2023
1 parent 2ebe335 commit 42bc296
Show file tree
Hide file tree
Showing 17 changed files with 738 additions and 525 deletions.
53 changes: 43 additions & 10 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import (
"encoding/json"
"errors"
"io"
"math"
"math/big"
"net/http"
"os"
"path/filepath"
"runtime/debug"
"sync"
"time"

autocliv1 "cosmossdk.io/api/cosmos/autocli/v1"
reflectionv1 "cosmossdk.io/api/cosmos/reflection/v1"
Expand Down Expand Up @@ -183,6 +183,10 @@ import (
var (
// DefaultNodeHome default home directories for the application daemon
DefaultNodeHome string

// MaximumDaemonUnhealthyDuration is the maximum amount of time that a daemon can be unhealthy before the
// application panics.
MaximumDaemonUnhealthyDuration = 5 * time.Minute
)

var (
Expand Down Expand Up @@ -290,6 +294,8 @@ type App struct {
PriceFeedClient *pricefeedclient.Client
LiquidationsClient *liquidationclient.Client
BridgeClient *bridgeclient.Client

HealthMonitor *daemonservertypes.HealthMonitor
}

// assertAppPreconditions assert invariants required for an application to start.
Expand Down Expand Up @@ -589,6 +595,11 @@ func New(
bridgeEventManager := bridgedaemontypes.NewBridgeEventManager(timeProvider)
app.Server.WithBridgeEventManager(bridgeEventManager)

app.HealthMonitor = daemonservertypes.NewHealthMonitor(
daemonservertypes.DaemonStartupGracePeriod,
daemonservertypes.HealthCheckPollFrequency,
app.Logger(),
)
// Create a closure for starting daemons and daemon server. Daemon services are delayed until after the gRPC
// service is started because daemons depend on the gRPC service being available. If a node is initialized
// with a genesis time in the future, then the gRPC service will not be available until the genesis time, the
Expand All @@ -600,9 +611,6 @@ func New(

// Start liquidations client for sending potentially liquidatable subaccounts to the application.
if daemonFlags.Liquidation.Enabled {
app.Server.ExpectLiquidationsDaemon(
daemonservertypes.MaximumAcceptableUpdateDelay(daemonFlags.Liquidation.LoopDelayMs),
)
app.LiquidationsClient = liquidationclient.NewClient(logger)
go func() {
if err := app.LiquidationsClient.Start(
Expand All @@ -615,13 +623,13 @@ func New(
); err != nil {
panic(err)
}
app.MonitorDaemon(app.LiquidationsClient, MaximumDaemonUnhealthyDuration)
}()
}

// Non-validating full-nodes have no need to run the price daemon.
if !appFlags.NonValidatingFullNode && daemonFlags.Price.Enabled {
exchangeQueryConfig := constants.StaticExchangeQueryConfig
app.Server.ExpectPricefeedDaemon(daemonservertypes.MaximumAcceptableUpdateDelay(daemonFlags.Price.LoopDelayMs))
// Start pricefeed client for sending prices for the pricefeed server to consume. These prices
// are retrieved via third-party APIs like Binance and then are encoded in-memory and
// periodically sent via gRPC to a shared socket with the server.
Expand All @@ -637,16 +645,16 @@ func New(
constants.StaticExchangeDetails,
&pricefeedclient.SubTaskRunnerImpl{},
)
app.MonitorDaemon(app.PriceFeedClient, MaximumDaemonUnhealthyDuration)
}

// Start Bridge Daemon.
// Non-validating full-nodes have no need to run the bridge daemon.
if !appFlags.NonValidatingFullNode && daemonFlags.Bridge.Enabled {
// TODO(CORE-582): Re-enable bridge daemon registration once the bridge daemon is fixed in local / CI
// environments.
// app.Server.ExpectBridgeDaemon(daemonservertypes.MaximumAcceptableUpdateDelay(daemonFlags.Bridge.LoopDelayMs))
app.BridgeClient = bridgeclient.NewClient(logger)
go func() {
app.BridgeClient = bridgeclient.NewClient(logger)
if err := app.BridgeClient.Start(
// The client will use `context.Background` so that it can have a different context from
// the main application.
Expand All @@ -657,6 +665,7 @@ func New(
); err != nil {
panic(err)
}
app.MonitorDaemon(app.BridgeClient, MaximumDaemonUnhealthyDuration)
}()
}

Expand All @@ -676,9 +685,8 @@ func New(
}
}()
// Don't panic if metrics daemon loops are delayed. Use maximum value.
app.Server.ExpectMetricsDaemon(
daemonservertypes.MaximumAcceptableUpdateDelay(math.MaxUint32),
)
// TODO(CORE-666): Refactor metrics daemon and track health here
// app.MonitorDaemon(app.MetricsDaemon, MaximumDaemonUnhealthyDuration)
metricsclient.Start(
// The client will use `context.Background` so that it can have a different context from
// the main application.
Expand Down Expand Up @@ -1222,6 +1230,31 @@ func New(
return app
}

// MonitorDaemon registers a daemon service with the update monitor. If the daemon does not register, the method will
// panic.
func (app *App) MonitorDaemon(
healthCheckableDaemon daemontypes.HealthCheckable,
maximumAcceptableUpdateDelay time.Duration,
) {
if err := app.HealthMonitor.RegisterService(healthCheckableDaemon, maximumAcceptableUpdateDelay); err != nil {
app.Logger().Error(
"Failed to register daemon service with update monitor",
"error",
err,
"service",
healthCheckableDaemon.ServiceName(),
"maximumAcceptableUpdateDelay",
maximumAcceptableUpdateDelay,
)
panic(err)
}
}

// DisableHealthMonitorForTesting disables the health monitor for testing.
func (app *App) DisableHealthMonitorForTesting() {
app.HealthMonitor.DisableForTesting()
}

// hydrateMemStores hydrates the memStores used for caching state.
func (app *App) hydrateMemStores() {
// Create an `uncachedCtx` where the underlying MultiStore is the `rootMultiStore`.
Expand Down
13 changes: 13 additions & 0 deletions protocol/app/app_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package app_test

import (
"github.com/dydxprotocol/v4-chain/protocol/mocks"
"gopkg.in/typ.v4/slices"
"reflect"
"strings"
"testing"
"time"

delaymsgmodule "github.com/dydxprotocol/v4-chain/protocol/x/delaymsg"

Expand Down Expand Up @@ -222,3 +224,14 @@ func TestModuleBasics(t *testing.T) {
actualFieldTypes := getMapFieldsAndTypes(reflect.ValueOf(basic_manager.ModuleBasics))
require.Equal(t, expectedFieldTypes, actualFieldTypes, "Module basics does not match expected")
}

func TestMonitorDaemon_Panics(t *testing.T) {
app := testapp.DefaultTestApp(nil)
hc := &mocks.HealthCheckable{}
hc.On("ServiceName").Return("test-service")
hc.On("HealthCheck").Return(nil)

app.MonitorDaemon(hc, 5*time.Minute)
// The second registration should fail, causing a panic.
require.Panics(t, func() { app.MonitorDaemon(hc, 5*time.Minute) })
}
12 changes: 11 additions & 1 deletion protocol/daemons/pricefeed/client/client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package client_test
import (
"fmt"
"github.com/cometbft/cometbft/libs/log"
"github.com/dydxprotocol/v4-chain/protocol/app"
appflags "github.com/dydxprotocol/v4-chain/protocol/app/flags"
"github.com/dydxprotocol/v4-chain/protocol/daemons/flags"
"github.com/dydxprotocol/v4-chain/protocol/daemons/pricefeed/client"
Expand Down Expand Up @@ -219,6 +220,7 @@ type PriceDaemonIntegrationTestSuite struct {
exchangeServer *pricefeed.ExchangeServer
daemonServer *daemonserver.Server
exchangePriceCache *pricefeedserver_types.MarketToExchangePrices
healthMonitor *servertypes.HealthMonitor

pricesMockQueryServer *mocks.QueryServer
pricesGrpcServer *grpc.Server
Expand Down Expand Up @@ -278,7 +280,13 @@ func (s *PriceDaemonIntegrationTestSuite) SetupTest() {
&daemontypes.FileHandlerImpl{},
s.daemonFlags.Shared.SocketAddress,
)
s.daemonServer.ExpectPricefeedDaemon(servertypes.MaximumAcceptableUpdateDelay(s.daemonFlags.Price.LoopDelayMs))

s.healthMonitor = servertypes.NewHealthMonitor(
servertypes.DaemonStartupGracePeriod,
servertypes.HealthCheckPollFrequency,
log.TestingLogger(),
)

s.exchangePriceCache = pricefeedserver_types.NewMarketToExchangePrices(pricefeed_types.MaxPriceAge)
s.daemonServer.WithPriceFeedMarketToExchangePrices(s.exchangePriceCache)

Expand Down Expand Up @@ -329,6 +337,8 @@ func (s *PriceDaemonIntegrationTestSuite) startClient() {
testExchangeToQueryDetails,
&client.SubTaskRunnerImpl{},
)
err := s.healthMonitor.RegisterService(s.pricefeedDaemon, app.MaximumDaemonUnhealthyDuration)
s.Require().NoError(err)
}

// expectPricesWithTimeout waits for the exchange price cache to contain the expected prices, with a timeout.
Expand Down
11 changes: 0 additions & 11 deletions protocol/daemons/server/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ package server

import (
"context"
"github.com/dydxprotocol/v4-chain/protocol/daemons/server/types"
"time"

"github.com/dydxprotocol/v4-chain/protocol/daemons/bridge/api"
bdtypes "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types/bridge"
)
Expand All @@ -23,14 +20,6 @@ func (server *Server) WithBridgeEventManager(
return server
}

// ExpectBridgeDaemon registers the bridge daemon with the server. This is required
// in order to ensure that the daemon service is called at least once during every
// maximumAcceptableUpdateDelay duration. It will cause the protocol to panic if the daemon does not
// respond within maximumAcceptableUpdateDelay duration.
func (server *Server) ExpectBridgeDaemon(maximumAcceptableUpdateDelay time.Duration) {
server.registerDaemon(types.BridgeDaemonServiceName, maximumAcceptableUpdateDelay)
}

// AddBridgeEvents stores any bridge events recognized by the daemon
// in a go-routine safe slice.
func (s *Server) AddBridgeEvents(
Expand Down
17 changes: 0 additions & 17 deletions protocol/daemons/server/liquidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ package server

import (
"context"
"github.com/dydxprotocol/v4-chain/protocol/daemons/server/types"
"time"

"github.com/cosmos/cosmos-sdk/telemetry"
"github.com/dydxprotocol/v4-chain/protocol/daemons/liquidation/api"
liquidationtypes "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types/liquidations"
Expand All @@ -26,14 +23,6 @@ func (server *Server) WithLiquidatableSubaccountIds(
return server
}

// ExpectLiquidationsDaemon registers the liquidations daemon with the server. This is required
// in order to ensure that the daemon service is called at least once during every
// maximumAcceptableUpdateDelay duration. It will cause the protocol to panic if the daemon does not
// respond within maximumAcceptableUpdateDelay duration.
func (server *Server) ExpectLiquidationsDaemon(maximumAcceptableUpdateDelay time.Duration) {
server.registerDaemon(types.LiquidationsDaemonServiceName, maximumAcceptableUpdateDelay)
}

// LiquidateSubaccounts stores the list of potentially liquidatable subaccount ids
// in a go-routine safe slice.
func (s *Server) LiquidateSubaccounts(
Expand All @@ -47,12 +36,6 @@ func (s *Server) LiquidateSubaccounts(
metrics.Received,
metrics.Count,
)
// If the daemon is unable to report a response, there is either an error in the registration of
// this daemon, or another one. In either case, the protocol should panic.
if err := s.reportResponse(types.LiquidationsDaemonServiceName); err != nil {
s.logger.Error("Failed to report liquidations response to update monitor", "error", err)
}

s.liquidatableSubaccountIds.UpdateSubaccountIds(req.SubaccountIds)
return &api.LiquidateSubaccountsResponse{}, nil
}
15 changes: 0 additions & 15 deletions protocol/daemons/server/metrics.go

This file was deleted.

15 changes: 0 additions & 15 deletions protocol/daemons/server/pricefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package server
import (
"context"
errorsmod "cosmossdk.io/errors"
servertypes "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types"
"time"

gometrics "github.com/armon/go-metrics"
Expand Down Expand Up @@ -31,14 +30,6 @@ func (server *Server) WithPriceFeedMarketToExchangePrices(
return server
}

// ExpectPricefeedDaemon registers the pricefeed daemon with the server. This is required
// in order to ensure that the daemon service is called at least once during every
// maximumAcceptableUpdateDelay duration. It will cause the protocol to panic if the daemon does not
// respond within maximumAcceptableUpdateDelay duration.
func (server *Server) ExpectPricefeedDaemon(maximumAcceptableUpdateDelay time.Duration) {
server.registerDaemon(servertypes.PricefeedDaemonServiceName, maximumAcceptableUpdateDelay)
}

// UpdateMarketPrices updates prices from exchanges for each market provided.
func (s *Server) UpdateMarketPrices(
ctx context.Context,
Expand All @@ -52,12 +43,6 @@ func (s *Server) UpdateMarketPrices(
metrics.Latency,
)

// If the daemon is unable to report a response, there is either an error in the registration of
// this daemon, or another one. In either case, the protocol should panic.
if err := s.reportResponse(servertypes.PricefeedDaemonServiceName); err != nil {
panic(err)
}

if s.marketToExchange == nil {
panic(
errorsmod.Wrapf(
Expand Down
Loading

0 comments on commit 42bc296

Please sign in to comment.