From e29a86368f7b6553d48fe7be729f7e6f8089dde5 Mon Sep 17 00:00:00 2001 From: Crystal Lemire Date: Wed, 25 Oct 2023 15:20:53 -0700 Subject: [PATCH] Add tracking for market availability in order to alert in datadog. --- .../client/price_fetcher/price_fetcher.go | 42 +++++++++++++++++++ protocol/lib/metrics/constants.go | 1 + 2 files changed, 43 insertions(+) diff --git a/protocol/daemons/pricefeed/client/price_fetcher/price_fetcher.go b/protocol/daemons/pricefeed/client/price_fetcher/price_fetcher.go index 8a8377b4dd..e685b120f6 100644 --- a/protocol/daemons/pricefeed/client/price_fetcher/price_fetcher.go +++ b/protocol/daemons/pricefeed/client/price_fetcher/price_fetcher.go @@ -3,6 +3,7 @@ package price_fetcher import ( "context" "fmt" + "github.com/cosmos/cosmos-sdk/telemetry" daemontypes "github.com/dydxprotocol/v4-chain/protocol/daemons/types" "sync" "time" @@ -183,6 +184,27 @@ func (pf *PriceFetcher) RunTaskLoop(requestHandler daemontypes.RequestHandler) { } } +// logMarketAvailability emits telemetry that tracks whether a market was available when queried on an exchange. +// Success is tracked by (market, exchange) so that we can track the availability of each market on each exchange. +func logMarketAvailability(exchangeId types.ExchangeId, id types.MarketId, available bool) { + success := metrics.Success + if !available { + success = metrics.Error + } + telemetry.IncrCounterWithLabels( + []string{ + metrics.PricefeedDaemon, + metrics.PriceFetcherQueryForMarket, + success, + }, + 1, + []gometrics.Label{ + pricefeedmetrics.GetLabelForExchangeId(exchangeId), + pricefeedmetrics.GetLabelForMarketId(id), + }, + ) +} + // runSubTask makes a single query to an exchange for market prices. This query can be for 1 or // n markets. // For single market exchanges, a task loop execution will execute multiple runSubTask goroutines, where @@ -239,9 +261,21 @@ func (pf *PriceFetcher) runSubTask( if err != nil { pf.writeToBufferedChannel(exchangeId, nil, err) + + // Since the query failed, report all markets as unavailable. + for _, marketId := range marketIds { + logMarketAvailability(exchangeId, marketId, false) + } + return } + // Track which markets were available when queried, and which were not, for telemetry. + availableMarkets := make(map[types.MarketId]bool, len(marketIds)) + for _, marketId := range marketIds { + availableMarkets[marketId] = false + } + for _, price := range prices { // No price should validly be zero. A price of zero points to an error in the API queried. if price.Price == uint64(0) { @@ -269,8 +303,16 @@ func (pf *PriceFetcher) runSubTask( price.LastUpdatedAt, ) + // Report market as available. + availableMarkets[price.MarketId] = true + pf.writeToBufferedChannel(exchangeId, price, err) } + + // Emit metrics on this exchange's market availability. + for marketId, available := range availableMarkets { + logMarketAvailability(exchangeId, marketId, available) + } } // writeToBufferedChannel writes the (price, error) generated during querying to the price fetcher's diff --git a/protocol/lib/metrics/constants.go b/protocol/lib/metrics/constants.go index 7e03df2b34..77ea2e5ef2 100644 --- a/protocol/lib/metrics/constants.go +++ b/protocol/lib/metrics/constants.go @@ -343,6 +343,7 @@ const ( MarketUpdaterUpdateMarkets = "market_updater_update_markets" PriceEncoderPriceConversion = "price_encoder_price_conversion" PriceFetcherQueryExchange = "price_fetcher_query_exchange" + PriceFetcherQueryForMarket = "price_fetcher_query_for_market" PriceFetcherSubtaskLoop = "price_fetcher_subtask_loop" PriceFetcherSubtaskLoopAndSetCtxTimeout = "price_fetcher_subtask_loop_and_set_ctx_timeout" PriceUpdateCount = "price_update_count"