Skip to content

Commit

Permalink
Add tracking for market availability in order to alert in datadog.
Browse files Browse the repository at this point in the history
  • Loading branch information
Crystal Lemire committed Oct 25, 2023
1 parent 9b416b3 commit e29a863
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 0 deletions.
42 changes: 42 additions & 0 deletions protocol/daemons/pricefeed/client/price_fetcher/price_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package price_fetcher
import (
"context"
"fmt"
"github.com/cosmos/cosmos-sdk/telemetry"
daemontypes "github.com/dydxprotocol/v4-chain/protocol/daemons/types"
"sync"
"time"
Expand Down Expand Up @@ -183,6 +184,27 @@ func (pf *PriceFetcher) RunTaskLoop(requestHandler daemontypes.RequestHandler) {
}
}

// logMarketAvailability emits telemetry that tracks whether a market was available when queried on an exchange.
// Success is tracked by (market, exchange) so that we can track the availability of each market on each exchange.
func logMarketAvailability(exchangeId types.ExchangeId, id types.MarketId, available bool) {
success := metrics.Success
if !available {
success = metrics.Error
}
telemetry.IncrCounterWithLabels(
[]string{
metrics.PricefeedDaemon,
metrics.PriceFetcherQueryForMarket,
success,
},
1,
[]gometrics.Label{
pricefeedmetrics.GetLabelForExchangeId(exchangeId),
pricefeedmetrics.GetLabelForMarketId(id),
},
)
}

// runSubTask makes a single query to an exchange for market prices. This query can be for 1 or
// n markets.
// For single market exchanges, a task loop execution will execute multiple runSubTask goroutines, where
Expand Down Expand Up @@ -239,9 +261,21 @@ func (pf *PriceFetcher) runSubTask(

if err != nil {
pf.writeToBufferedChannel(exchangeId, nil, err)

// Since the query failed, report all markets as unavailable.
for _, marketId := range marketIds {
logMarketAvailability(exchangeId, marketId, false)
}

return
}

// Track which markets were available when queried, and which were not, for telemetry.
availableMarkets := make(map[types.MarketId]bool, len(marketIds))
for _, marketId := range marketIds {
availableMarkets[marketId] = false
}

for _, price := range prices {
// No price should validly be zero. A price of zero points to an error in the API queried.
if price.Price == uint64(0) {
Expand Down Expand Up @@ -269,8 +303,16 @@ func (pf *PriceFetcher) runSubTask(
price.LastUpdatedAt,
)

// Report market as available.
availableMarkets[price.MarketId] = true

pf.writeToBufferedChannel(exchangeId, price, err)
}

// Emit metrics on this exchange's market availability.
for marketId, available := range availableMarkets {
logMarketAvailability(exchangeId, marketId, available)
}
}

// writeToBufferedChannel writes the (price, error) generated during querying to the price fetcher's
Expand Down
1 change: 1 addition & 0 deletions protocol/lib/metrics/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ const (
MarketUpdaterUpdateMarkets = "market_updater_update_markets"
PriceEncoderPriceConversion = "price_encoder_price_conversion"
PriceFetcherQueryExchange = "price_fetcher_query_exchange"
PriceFetcherQueryForMarket = "price_fetcher_query_for_market"
PriceFetcherSubtaskLoop = "price_fetcher_subtask_loop"
PriceFetcherSubtaskLoopAndSetCtxTimeout = "price_fetcher_subtask_loop_and_set_ctx_timeout"
PriceUpdateCount = "price_update_count"
Expand Down

0 comments on commit e29a863

Please sign in to comment.