Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CORE-354] - Add tracking for market availability in order to alert in datadog (backport #706) #712

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions protocol/daemons/pricefeed/client/price_fetcher/price_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package price_fetcher
import (
"context"
"fmt"
"github.com/cosmos/cosmos-sdk/telemetry"
daemontypes "github.com/dydxprotocol/v4-chain/protocol/daemons/types"
"math/rand"
"sync"
"time"

Expand Down Expand Up @@ -183,6 +185,27 @@ func (pf *PriceFetcher) RunTaskLoop(requestHandler daemontypes.RequestHandler) {
}
}

// emitMarketAvailabilityMetrics emits telemetry that tracks whether a market was available when queried on an exchange.
// Success is tracked by (market, exchange) so that we can track the availability of each market on each exchange.
func emitMarketAvailabilityMetrics(exchangeId types.ExchangeId, id types.MarketId, available bool) {
success := metrics.Success
if !available {
success = metrics.Error
}
telemetry.IncrCounterWithLabels(
[]string{
metrics.PricefeedDaemon,
metrics.PriceFetcherQueryForMarket,
success,
},
1,
[]gometrics.Label{
pricefeedmetrics.GetLabelForExchangeId(exchangeId),
pricefeedmetrics.GetLabelForMarketId(id),
},
)
}

// runSubTask makes a single query to an exchange for market prices. This query can be for 1 or
// n markets.
// For single market exchanges, a task loop execution will execute multiple runSubTask goroutines, where
Expand Down Expand Up @@ -237,11 +260,28 @@ func (pf *PriceFetcher) runSubTask(
taskLoopDefinition.marketExponents,
)

// Emit metrics at the `AvailableMarketsSampleRate`.
emitMetricsSample := rand.Float64() < metrics.AvailableMarketsSampleRate

if err != nil {
pf.writeToBufferedChannel(exchangeId, nil, err)

// Since the query failed, report all markets as unavailable, according to the sampling rate.
if emitMetricsSample {
for _, marketId := range marketIds {
emitMarketAvailabilityMetrics(exchangeId, marketId, false)
}
}

return
}

// Track which markets were available when queried, and which were not, for telemetry.
availableMarkets := make(map[types.MarketId]bool, len(marketIds))
for _, marketId := range marketIds {
availableMarkets[marketId] = false
}

for _, price := range prices {
// No price should validly be zero. A price of zero points to an error in the API queried.
if price.Price == uint64(0) {
Expand Down Expand Up @@ -269,8 +309,18 @@ func (pf *PriceFetcher) runSubTask(
price.LastUpdatedAt,
)

// Report market as available.
availableMarkets[price.MarketId] = true

pf.writeToBufferedChannel(exchangeId, price, err)
}

// Emit metrics on this exchange's market availability according to the sampling rate.
if emitMetricsSample {
for marketId, available := range availableMarkets {
emitMarketAvailabilityMetrics(exchangeId, marketId, available)
}
}
}

// writeToBufferedChannel writes the (price, error) generated during querying to the price fetcher's
Expand Down
6 changes: 5 additions & 1 deletion protocol/lib/metrics/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ const (
MarketUpdaterUpdateMarkets = "market_updater_update_markets"
PriceEncoderPriceConversion = "price_encoder_price_conversion"
PriceFetcherQueryExchange = "price_fetcher_query_exchange"
PriceFetcherQueryForMarket = "price_fetcher_query_for_market_sampled"
PriceFetcherSubtaskLoop = "price_fetcher_subtask_loop"
PriceFetcherSubtaskLoopAndSetCtxTimeout = "price_fetcher_subtask_loop_and_set_ctx_timeout"
PriceUpdateCount = "price_update_count"
Expand Down Expand Up @@ -388,4 +389,7 @@ const (
ValidatorVolumeQuoteQuantums = "validator_volume_quote_quantums"
)

const LatencyMetricSampleRate = 0.01
const (
LatencyMetricSampleRate = 0.01
AvailableMarketsSampleRate = .1
)
Loading