From d15697e8d9f7b3e2af1dca2601d2222afb30a755 Mon Sep 17 00:00:00 2001
From: jayy04 <103467857+jayy04@users.noreply.github.com>
Date: Thu, 7 Dec 2023 21:49:43 -0500
Subject: [PATCH] [CLOB-1038] liquidation daemon - grpc to get all market
 prices (#857)

---
 protocol/daemons/liquidation/client/client.go |  3 +
 .../daemons/liquidation/client/grpc_helper.go | 92 +++++++++++++++++-
 .../liquidation/client/grpc_helper_test.go    | 95 +++++++++++++++++++
 protocol/lib/metrics/metric_keys.go           |  2 +
 4 files changed, 189 insertions(+), 3 deletions(-)

diff --git a/protocol/daemons/liquidation/client/client.go b/protocol/daemons/liquidation/client/client.go
index 5be3317676..d078695d8d 100644
--- a/protocol/daemons/liquidation/client/client.go
+++ b/protocol/daemons/liquidation/client/client.go
@@ -14,6 +14,7 @@ import (
 	daemontypes "github.com/dydxprotocol/v4-chain/protocol/daemons/types"
 	blocktimetypes "github.com/dydxprotocol/v4-chain/protocol/x/blocktime/types"
 	clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
+	pricestypes "github.com/dydxprotocol/v4-chain/protocol/x/prices/types"
 	satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"
 )
 
@@ -23,6 +24,7 @@ type Client struct {
 	// Query clients
 	BlocktimeQueryClient     blocktimetypes.QueryClient
 	SubaccountQueryClient    satypes.QueryClient
+	PricesQueryClient        pricestypes.QueryClient
 	ClobQueryClient          clobtypes.QueryClient
 	LiquidationServiceClient api.LiquidationServiceClient
 
@@ -89,6 +91,7 @@ func (c *Client) Start(
 	// Initialize the query clients. These are used to query the Cosmos gRPC query services.
 	c.BlocktimeQueryClient = blocktimetypes.NewQueryClient(queryConn)
 	c.SubaccountQueryClient = satypes.NewQueryClient(queryConn)
+	c.PricesQueryClient = pricestypes.NewQueryClient(queryConn)
 	c.ClobQueryClient = clobtypes.NewQueryClient(queryConn)
 	c.LiquidationServiceClient = api.NewLiquidationServiceClient(daemonConn)
 
diff --git a/protocol/daemons/liquidation/client/grpc_helper.go b/protocol/daemons/liquidation/client/grpc_helper.go
index 6373c9a37a..64233903a6 100644
--- a/protocol/daemons/liquidation/client/grpc_helper.go
+++ b/protocol/daemons/liquidation/client/grpc_helper.go
@@ -2,16 +2,20 @@ package client
 
 import (
 	"context"
+	"fmt"
 	"time"
 
 	gometrics "github.com/armon/go-metrics"
 	"github.com/cosmos/cosmos-sdk/telemetry"
+	"github.com/cosmos/cosmos-sdk/types/grpc"
 	"github.com/cosmos/cosmos-sdk/types/query"
 	"github.com/dydxprotocol/v4-chain/protocol/daemons/liquidation/api"
 	"github.com/dydxprotocol/v4-chain/protocol/lib/metrics"
 	blocktimetypes "github.com/dydxprotocol/v4-chain/protocol/x/blocktime/types"
 	clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
+	pricestypes "github.com/dydxprotocol/v4-chain/protocol/x/prices/types"
 	satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"
+	"google.golang.org/grpc/metadata"
 )
 
 // GetPreviousBlockInfo queries a gRPC server using `QueryPreviousBlockInfoRequest`
@@ -37,11 +41,60 @@ func (c *Client) GetPreviousBlockInfo(
 	return response.Info.Height, nil
 }
 
+// GetAllMarketPrices queries gRPC server and returns a list of market prices.
+func (c *Client) GetAllMarketPrices(
+	ctx context.Context,
+	blockHeight uint32,
+	pageLimit uint64,
+) (
+	marketPrices []pricestypes.MarketPrice,
+	err error,
+) {
+	defer metrics.ModuleMeasureSince(
+		metrics.LiquidationDaemon,
+		metrics.DaemonGetAllMarketPricesLatency,
+		time.Now(),
+	)
+
+	marketPrices = make([]pricestypes.MarketPrice, 0)
+
+	// Set the block height header to the block height of the previous block.
+	ctx = metadata.NewOutgoingContext(
+		ctx,
+		metadata.Pairs(
+			grpc.GRPCBlockHeightHeader,
+			fmt.Sprintf("%d", blockHeight),
+		),
+	)
+
+	var nextKey []byte
+	for {
+		marketPricesFromKey, next, err := getMarketPricesFromKey(
+			ctx,
+			c.PricesQueryClient,
+			nextKey,
+			pageLimit,
+		)
+
+		if err != nil {
+			return nil, err
+		}
+
+		marketPrices = append(marketPrices, marketPricesFromKey...)
+		nextKey = next
+
+		if len(nextKey) == 0 {
+			break
+		}
+	}
+	return marketPrices, nil
+}
+
 // GetAllSubaccounts queries a gRPC server and returns a list of subaccounts and
 // their balances and open positions.
 func (c *Client) GetAllSubaccounts(
 	ctx context.Context,
-	limit uint64,
+	pageLimit uint64,
 ) (
 	subaccounts []satypes.Subaccount,
 	err error,
@@ -54,8 +107,8 @@ func (c *Client) GetAllSubaccounts(
 		subaccountsFromKey, next, err := getSubaccountsFromKey(
 			ctx,
 			c.SubaccountQueryClient,
-			limit,
 			nextKey,
+			pageLimit,
 		)
 
 		if err != nil {
@@ -140,8 +193,8 @@ func (c *Client) SendLiquidatableSubaccountIds(
 func getSubaccountsFromKey(
 	ctx context.Context,
 	client satypes.QueryClient,
-	limit uint64,
 	pageRequestKey []byte,
+	limit uint64,
 ) (
 	subaccounts []satypes.Subaccount,
 	nextKey []byte,
@@ -172,3 +225,36 @@ func getSubaccountsFromKey(
 	}
 	return response.Subaccount, nextKey, nil
 }
+
+func getMarketPricesFromKey(
+	ctx context.Context,
+	client pricestypes.QueryClient,
+	pageRequestKey []byte,
+	limit uint64,
+) (
+	marketPrices []pricestypes.MarketPrice,
+	nextKey []byte,
+	err error,
+) {
+	defer metrics.ModuleMeasureSince(
+		metrics.LiquidationDaemon,
+		metrics.DaemonGetMarketPricesPaginatedLatency,
+		time.Now(),
+	)
+
+	query := &pricestypes.QueryAllMarketPricesRequest{
+		Pagination: &query.PageRequest{
+			Key:   pageRequestKey,
+			Limit: limit,
+		},
+	}
+
+	response, err := client.AllMarketPrices(ctx, query)
+	if err != nil {
+		return nil, nil, err
+	}
+	if response.Pagination != nil {
+		nextKey = response.Pagination.NextKey
+	}
+	return response.MarketPrices, nextKey, nil
+}
diff --git a/protocol/daemons/liquidation/client/grpc_helper_test.go b/protocol/daemons/liquidation/client/grpc_helper_test.go
index b2a25a3a80..3d75d2450a 100644
--- a/protocol/daemons/liquidation/client/grpc_helper_test.go
+++ b/protocol/daemons/liquidation/client/grpc_helper_test.go
@@ -15,6 +15,7 @@ import (
 	"github.com/dydxprotocol/v4-chain/protocol/testutil/grpc"
 	blocktimetypes "github.com/dydxprotocol/v4-chain/protocol/x/blocktime/types"
 	clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
+	pricestypes "github.com/dydxprotocol/v4-chain/protocol/x/prices/types"
 	satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"
 	"github.com/stretchr/testify/mock"
 	"github.com/stretchr/testify/require"
@@ -175,6 +176,100 @@ func TestGetAllSubaccounts(t *testing.T) {
 	}
 }
 
+func TestGetAllMarketPrices(t *testing.T) {
+	tests := map[string]struct {
+		// mocks
+		setupMocks func(ctx context.Context, mck *mocks.QueryClient)
+		limit      uint64
+
+		// expectations
+		expectedMarketPrices []pricestypes.MarketPrice
+		expectedError        error
+	}{
+		"Success": {
+			setupMocks: func(ctx context.Context, mck *mocks.QueryClient) {
+				req := &pricestypes.QueryAllMarketPricesRequest{
+					Pagination: &query.PageRequest{
+						Limit: 1_000,
+					},
+				}
+				response := &pricestypes.QueryAllMarketPricesResponse{
+					MarketPrices: constants.TestMarketPrices,
+				}
+				mck.On("AllMarketPrices", mock.Anything, req).Return(response, nil)
+			},
+			limit:                1_000,
+			expectedMarketPrices: constants.TestMarketPrices,
+		},
+		"Success Paginated": {
+			setupMocks: func(ctx context.Context, mck *mocks.QueryClient) {
+				req := &pricestypes.QueryAllMarketPricesRequest{
+					Pagination: &query.PageRequest{
+						Limit: 2,
+					},
+				}
+				nextKey := []byte("next key")
+				response := &pricestypes.QueryAllMarketPricesResponse{
+					MarketPrices: []pricestypes.MarketPrice{
+						constants.TestMarketPrices[0],
+						constants.TestMarketPrices[1],
+					},
+					Pagination: &query.PageResponse{
+						NextKey: nextKey,
+					},
+				}
+				mck.On("AllMarketPrices", mock.Anything, req).Return(response, nil)
+				req2 := &pricestypes.QueryAllMarketPricesRequest{
+					Pagination: &query.PageRequest{
+						Key:   nextKey,
+						Limit: 2,
+					},
+				}
+				response2 := &pricestypes.QueryAllMarketPricesResponse{
+					MarketPrices: []pricestypes.MarketPrice{
+						constants.TestMarketPrices[2],
+					},
+				}
+				mck.On("AllMarketPrices", mock.Anything, req2).Return(response2, nil)
+			},
+			limit:                2,
+			expectedMarketPrices: constants.TestMarketPrices,
+		},
+		"Errors are propagated": {
+			setupMocks: func(ctx context.Context, mck *mocks.QueryClient) {
+				req := &pricestypes.QueryAllMarketPricesRequest{
+					Pagination: &query.PageRequest{
+						Limit: 1_000,
+					},
+				}
+				mck.On("AllMarketPrices", mock.Anything, req).Return(nil, errors.New("test error"))
+			},
+			limit:         1_000,
+			expectedError: errors.New("test error"),
+		},
+	}
+
+	for name, tc := range tests {
+		t.Run(name, func(t *testing.T) {
+			queryClientMock := &mocks.QueryClient{}
+			tc.setupMocks(grpc.Ctx, queryClientMock)
+
+			daemon := client.NewClient(log.NewNopLogger())
+			daemon.PricesQueryClient = queryClientMock
+			actual, err := daemon.GetAllMarketPrices(
+				grpc.Ctx,
+				uint32(50),
+				tc.limit,
+			)
+			if err != nil {
+				require.EqualError(t, err, tc.expectedError.Error())
+			} else {
+				require.Equal(t, tc.expectedMarketPrices, actual)
+			}
+		})
+	}
+}
+
 func TestCheckCollateralizationForSubaccounts(t *testing.T) {
 	tests := map[string]struct {
 		// mocks
diff --git a/protocol/lib/metrics/metric_keys.go b/protocol/lib/metrics/metric_keys.go
index ad05127cd6..e342f7af1b 100644
--- a/protocol/lib/metrics/metric_keys.go
+++ b/protocol/lib/metrics/metric_keys.go
@@ -39,5 +39,7 @@ const (
 	// Measure Since
 	ClobOffsettingSubaccountPerpetualPosition = "clob_offsetting_subaccount_perpetual_position"
 	DaemonGetPreviousBlockInfoLatency         = "daemon_get_previous_block_info_latency"
+	DaemonGetAllMarketPricesLatency           = "daemon_get_all_market_prices_latency"
+	DaemonGetMarketPricesPaginatedLatency     = "daemon_get_market_prices_paginated_latency"
 	MevLatency                                = "mev_latency"
 )