diff --git a/indexer/packages/postgres/src/index.ts b/indexer/packages/postgres/src/index.ts index 70c0d719a56..fed23f558d0 100644 --- a/indexer/packages/postgres/src/index.ts +++ b/indexer/packages/postgres/src/index.ts @@ -70,3 +70,4 @@ export * as testConstants from '../__tests__/helpers/constants'; export * as testConversionHelpers from '../__tests__/helpers/conversion-helpers'; export * as helpers from './db/helpers'; +export * as loopHelpers from './loops/loopHelper'; diff --git a/indexer/services/ender/__tests__/caches/orderbook-mid-price-memory-cache.test.ts b/indexer/services/ender/__tests__/caches/orderbook-mid-price-memory-cache.test.ts new file mode 100644 index 00000000000..62f21dac932 --- /dev/null +++ b/indexer/services/ender/__tests__/caches/orderbook-mid-price-memory-cache.test.ts @@ -0,0 +1,83 @@ +import { OrderbookMidPricesCache } from '@dydxprotocol-indexer/redis'; +import * as orderbookMidPriceMemoryCache from '../../src/caches/orderbook-mid-price-memory-cache'; +import { + dbHelpers, + testMocks, +} from '@dydxprotocol-indexer/postgres'; +import config from '../../src/config'; +import { logger, stats } from '@dydxprotocol-indexer/base'; + +describe('orderbook-mid-price-memory-cache', () => { + + beforeAll(async () => { + await dbHelpers.migrate(); + await dbHelpers.clearData(); + }); + + beforeEach(async () => { + await testMocks.seedData(); + }); + + afterEach(async () => { + await dbHelpers.clearData(); + }); + + describe('getOrderbookMidPrice', () => { + it('should return the mid price for a given ticker', async () => { + jest.spyOn(OrderbookMidPricesCache, 'getMedianPrices') + .mockReturnValue(Promise.resolve({ 'BTC-USD': '300', 'ETH-USD': '200' })); + + await orderbookMidPriceMemoryCache.updateOrderbookMidPrices(); + + expect(orderbookMidPriceMemoryCache.getOrderbookMidPrice('BTC-USD')).toBe('300'); + expect(orderbookMidPriceMemoryCache.getOrderbookMidPrice('ETH-USD')).toBe('200'); + }); + }); + + describe('updateOrderbookMidPrices', () => { + it('should update the orderbook mid price cache', async () => { + const mockMedianPrices = { + 'BTC-USD': '50000', + 'ETH-USD': '3000', + 'SOL-USD': '1000', + }; + + jest.spyOn(OrderbookMidPricesCache, 'getMedianPrices') + .mockResolvedValue(mockMedianPrices); + + await orderbookMidPriceMemoryCache.updateOrderbookMidPrices(); + + expect(orderbookMidPriceMemoryCache.getOrderbookMidPrice('BTC-USD')).toBe('50000'); + expect(orderbookMidPriceMemoryCache.getOrderbookMidPrice('ETH-USD')).toBe('3000'); + expect(orderbookMidPriceMemoryCache.getOrderbookMidPrice('SOL-USD')).toBe('1000'); + }); + + it('should handle errors and log them', async () => { + const mockError = new Error('Test error'); + jest.spyOn(OrderbookMidPricesCache, 'getMedianPrices').mockImplementation(() => { + throw mockError; + }); + + jest.spyOn(logger, 'error'); + await orderbookMidPriceMemoryCache.updateOrderbookMidPrices(); + + expect(logger.error).toHaveBeenCalledWith( + expect.objectContaining({ + at: 'orderbook-mid-price-cache#updateOrderbookMidPrices', + message: 'Failed to fetch OrderbookMidPrices', + error: mockError, + }), + ); + }); + + it('should record timing stats', async () => { + jest.spyOn(stats, 'timing'); + await orderbookMidPriceMemoryCache.updateOrderbookMidPrices(); + + expect(stats.timing).toHaveBeenCalledWith( + `${config.SERVICE_NAME}.update_orderbook_mid_prices_cache.timing`, + expect.any(Number), + ); + }); + }); +}); diff --git a/indexer/services/ender/__tests__/lib/candles-generator.test.ts b/indexer/services/ender/__tests__/lib/candles-generator.test.ts index e0e6565dc85..f20b68354c6 100644 --- a/indexer/services/ender/__tests__/lib/candles-generator.test.ts +++ b/indexer/services/ender/__tests__/lib/candles-generator.test.ts @@ -25,6 +25,7 @@ import _ from 'lodash'; import { clearCandlesMap, getCandlesMap, startCandleCache, } from '../../src/caches/candle-cache'; +import * as OrderbookMidPriceMemoryCache from '../../src/caches/orderbook-mid-price-memory-cache'; import config from '../../src/config'; import { CandlesGenerator, getOrderbookMidPriceMap } from '../../src/lib/candles-generator'; import { KafkaPublisher } from '../../src/lib/kafka-publisher'; @@ -124,6 +125,7 @@ describe('candleHelper', () => { setCachePrice(ticker, '100000'); setCachePrice(ticker, '105000'); setCachePrice(ticker, '110000'); + await OrderbookMidPriceMemoryCache.updateOrderbookMidPrices(); await runUpdateCandles(publisher); @@ -141,8 +143,8 @@ describe('candleHelper', () => { id: CandleTable.uuid(currentStartedAt, defaultCandle.ticker, resolution), startedAt: currentStartedAt, resolution, - orderbookMidPriceClose: null, - orderbookMidPriceOpen: null, + orderbookMidPriceClose: '105000', + orderbookMidPriceOpen: '105000', }; }, ); @@ -167,6 +169,7 @@ describe('candleHelper', () => { setCachePrice(ticker, '80000'); setCachePrice(ticker, '81000'); setCachePrice(ticker, '80500'); + await OrderbookMidPriceMemoryCache.updateOrderbookMidPrices(); // Create Perpetual Position to set open position const openInterest: string = '100'; @@ -189,8 +192,8 @@ describe('candleHelper', () => { startedAt: currentStartedAt, resolution, startingOpenInterest: openInterest, - orderbookMidPriceClose: null, - orderbookMidPriceOpen: null, + orderbookMidPriceClose: '80500', + orderbookMidPriceOpen: '80500', }; }, ); @@ -313,8 +316,8 @@ describe('candleHelper', () => { usdVolume: '0', trades: 0, startingOpenInterest: '100', - orderbookMidPriceClose: null, - orderbookMidPriceOpen: null, + orderbookMidPriceClose: '1000', + orderbookMidPriceOpen: '1000', }, true, 1000, @@ -344,8 +347,8 @@ describe('candleHelper', () => { startedAt, resolution: CandleResolution.ONE_MINUTE, startingOpenInterest: '100', - orderbookMidPriceClose: null, - orderbookMidPriceOpen: null, + orderbookMidPriceClose: '1000', + orderbookMidPriceOpen: '1000', }, true, // contains kafka messages 1000, // orderbook mid price @@ -438,6 +441,7 @@ describe('candleHelper', () => { orderbookMidPrice: number, ) => { setCachePrice('BTC-USD', orderbookMidPrice.toFixed()); + await OrderbookMidPriceMemoryCache.updateOrderbookMidPrices(); if (initialCandle !== undefined) { await CandleTable.create(initialCandle); @@ -473,16 +477,209 @@ describe('candleHelper', () => { expectTimingStats(); }); + it('Updates previous candle orderBookMidPriceClose if startTime is past candle resolution', async () => { + // Create existing candles + const existingPrice: string = '7000'; + const startingOpenInterest: string = '200'; + const baseTokenVolume: string = '10'; + const usdVolume: string = Big(existingPrice).times(baseTokenVolume).toString(); + const orderbookMidPriceClose = '7500'; + const orderbookMidPriceOpen = '8000'; + await Promise.all( + _.map(Object.values(CandleResolution), (resolution: CandleResolution) => { + return CandleTable.create({ + startedAt: previousStartedAt, + ticker: testConstants.defaultPerpetualMarket.ticker, + resolution, + low: existingPrice, + high: existingPrice, + open: existingPrice, + close: existingPrice, + baseTokenVolume, + usdVolume, + trades: existingTrades, + startingOpenInterest, + orderbookMidPriceClose, + orderbookMidPriceOpen, + }); + }), + ); + await startCandleCache(); + + setCachePrice('BTC-USD', '10005'); + await OrderbookMidPriceMemoryCache.updateOrderbookMidPrices(); + + const publisher: KafkaPublisher = new KafkaPublisher(); + publisher.addEvents([ + defaultTradeKafkaEvent, + defaultTradeKafkaEvent2, + ]); + + // Create new candles, with trades + await runUpdateCandles(publisher).then(async () => { + + // Verify previous candles have orderbookMidPriceClose updated + const previousExpectedCandles: CandleFromDatabase[] = _.map( + Object.values(CandleResolution), + (resolution: CandleResolution) => { + return { + id: CandleTable.uuid(previousStartedAt, defaultCandle.ticker, resolution), + startedAt: previousStartedAt, + ticker: defaultCandle.ticker, + resolution, + low: existingPrice, + high: existingPrice, + open: existingPrice, + close: existingPrice, + baseTokenVolume, + usdVolume, + trades: existingTrades, + startingOpenInterest, + orderbookMidPriceClose: '10005', + orderbookMidPriceOpen, + }; + }, + ); + await verifyCandlesInPostgres(previousExpectedCandles); + }); + + // Verify new candles were created + const expectedCandles: CandleFromDatabase[] = _.map( + Object.values(CandleResolution), + (resolution: CandleResolution) => { + const currentStartedAt: IsoString = helpers.calculateNormalizedCandleStartTime( + testConstants.createdDateTime, + resolution, + ).toISO(); + + return { + id: CandleTable.uuid(currentStartedAt, defaultCandle.ticker, resolution), + startedAt: currentStartedAt, + ticker: defaultCandle.ticker, + resolution, + low: '10000', + high: defaultPrice2, + open: '10000', + close: defaultPrice2, + baseTokenVolume: '20', + usdVolume: '250000', + trades: 2, + startingOpenInterest: '0', + orderbookMidPriceClose: '10005', + orderbookMidPriceOpen: '10005', + }; + }, + ); + await verifyCandlesInPostgres(expectedCandles); + await validateCandlesCache(); + expectTimingStats(); + }); + + it('creates an empty candle and updates the previous candle orderBookMidPriceClose if startTime is past candle resolution', async () => { + // Create existing candles + const existingPrice: string = '7000'; + const startingOpenInterest: string = '200'; + const baseTokenVolume: string = '10'; + const usdVolume: string = Big(existingPrice).times(baseTokenVolume).toString(); + const orderbookMidPriceClose = '7500'; + const orderbookMidPriceOpen = '8000'; + + await Promise.all( + _.map(Object.values(CandleResolution), (resolution: CandleResolution) => { + return CandleTable.create({ + startedAt: previousStartedAt, + ticker: testConstants.defaultPerpetualMarket.ticker, + resolution, + low: existingPrice, + high: existingPrice, + open: existingPrice, + close: existingPrice, + baseTokenVolume, + usdVolume, + trades: existingTrades, + startingOpenInterest, + orderbookMidPriceClose, + orderbookMidPriceOpen, + }); + }), + ); + await startCandleCache(); + + setCachePrice('BTC-USD', '10005'); + await OrderbookMidPriceMemoryCache.updateOrderbookMidPrices(); + + const publisher: KafkaPublisher = new KafkaPublisher(); + publisher.addEvents([]); + + // Create new candles, without trades + await runUpdateCandles(publisher); + + // Verify previous candles have orderbookMidPriceClose updated + const previousExpectedCandles: CandleFromDatabase[] = _.map( + Object.values(CandleResolution), + (resolution: CandleResolution) => { + return { + id: CandleTable.uuid(previousStartedAt, defaultCandle.ticker, resolution), + startedAt: previousStartedAt, + ticker: defaultCandle.ticker, + resolution, + low: existingPrice, + high: existingPrice, + open: existingPrice, + close: existingPrice, + baseTokenVolume, + usdVolume, + trades: existingTrades, + startingOpenInterest, + orderbookMidPriceClose: '10005', + orderbookMidPriceOpen, + }; + }, + ); + await verifyCandlesInPostgres(previousExpectedCandles); + + // Verify new empty candle was created + const expectedCandles: CandleFromDatabase[] = _.map( + Object.values(CandleResolution), + (resolution: CandleResolution) => { + const currentStartedAt: IsoString = helpers.calculateNormalizedCandleStartTime( + testConstants.createdDateTime, + resolution, + ).toISO(); + + return { + id: CandleTable.uuid(currentStartedAt, defaultCandle.ticker, resolution), + startedAt: currentStartedAt, + ticker: defaultCandle.ticker, + resolution, + low: existingPrice, + high: existingPrice, + open: existingPrice, + close: existingPrice, + baseTokenVolume: '0', + usdVolume: '0', + trades: 0, + startingOpenInterest: '0', + orderbookMidPriceClose: '10005', + orderbookMidPriceOpen: '10005', + }; + }, + ); + await verifyCandlesInPostgres(expectedCandles); + + }); + it('successfully creates an orderbook price map for each market', async () => { setCachePrice('BTC-USD', '105000'); setCachePrice('ISO-USD', '115000'); setCachePrice('ETH-USD', '150000'); + await OrderbookMidPriceMemoryCache.updateOrderbookMidPrices(); const map = await getOrderbookMidPriceMap(); expect(map).toEqual({ - 'BTC-USD': undefined, - 'ETH-USD': undefined, - 'ISO-USD': undefined, + 'BTC-USD': '105000', + 'ETH-USD': '150000', + 'ISO-USD': '115000', 'ISO2-USD': undefined, 'SHIB-USD': undefined, }); diff --git a/indexer/services/ender/src/caches/orderbook-mid-price-memory-cache.ts b/indexer/services/ender/src/caches/orderbook-mid-price-memory-cache.ts new file mode 100644 index 00000000000..94fca2428ef --- /dev/null +++ b/indexer/services/ender/src/caches/orderbook-mid-price-memory-cache.ts @@ -0,0 +1,58 @@ +import { logger, stats } from '@dydxprotocol-indexer/base'; +import { + PerpetualMarketFromDatabase, + perpetualMarketRefresher, + loopHelpers, +} from '@dydxprotocol-indexer/postgres'; +import { OrderbookMidPricesCache } from '@dydxprotocol-indexer/redis'; + +import config from '../config'; +import { redisClient } from '../helpers/redis/redis-controller'; + +interface OrderbookMidPriceCache { + [ticker: string]: string | undefined, +} + +let orderbookMidPriceCache: OrderbookMidPriceCache = {}; + +/** + * Refresh loop to cache the list of all perpetual markets from the database in-memory. + */ +export async function start(): Promise { + await loopHelpers.startUpdateLoop( + updateOrderbookMidPrices, + config.ORDERBOOK_MID_PRICE_REFRESH_INTERVAL_MS, + 'updateOrderbookMidPrices', + ); +} + +export function getOrderbookMidPrice(ticker: string): string | undefined { + return orderbookMidPriceCache[ticker]; +} + +export async function updateOrderbookMidPrices(): Promise { + const startTime: number = Date.now(); + try { + const perpetualMarkets: PerpetualMarketFromDatabase[] = Object.values( + perpetualMarketRefresher.getPerpetualMarketsMap(), + ); + + const tickers: string[] = perpetualMarkets.map((market) => market.ticker); + + orderbookMidPriceCache = await OrderbookMidPricesCache.getMedianPrices( + redisClient, + tickers, + ); + } catch (error) { + logger.error({ + at: 'orderbook-mid-price-cache#updateOrderbookMidPrices', + message: 'Failed to fetch OrderbookMidPrices', + error, + }); + } finally { + stats.timing( + `${config.SERVICE_NAME}.update_orderbook_mid_prices_cache.timing`, + Date.now() - startTime, + ); + } +} diff --git a/indexer/services/ender/src/config.ts b/indexer/services/ender/src/config.ts index c22122b318f..413872ec869 100644 --- a/indexer/services/ender/src/config.ts +++ b/indexer/services/ender/src/config.ts @@ -6,6 +6,7 @@ import { parseSchema, baseConfigSchema, parseBoolean, + parseInteger, } from '@dydxprotocol-indexer/base'; import { kafkaConfigSchema, @@ -23,6 +24,7 @@ export const configSchema = { SEND_WEBSOCKET_MESSAGES: parseBoolean({ default: true, }), + ORDERBOOK_MID_PRICE_REFRESH_INTERVAL_MS: parseInteger({ default: 10_000 }), // 10 seconds }; export default parseSchema(configSchema); diff --git a/indexer/services/ender/src/index.ts b/indexer/services/ender/src/index.ts index c72d8542eee..07702f9b7d4 100644 --- a/indexer/services/ender/src/index.ts +++ b/indexer/services/ender/src/index.ts @@ -5,6 +5,7 @@ import { } from '@dydxprotocol-indexer/postgres'; import { initializeAllCaches } from './caches/block-cache'; +import * as OrderbookMidPriceMemoryCache from './caches/orderbook-mid-price-memory-cache'; import config from './config'; import { connect } from './helpers/kafka/kafka-controller'; import { createPostgresFunctions } from './helpers/postgres/postgres-functions'; @@ -28,8 +29,10 @@ async function startKafka(): Promise { ]); // Ender does not need to refresh its caches in a loop because Ender is the only service that // writes to the key attributes of perpetual_markets, asset_refresher, and market_refresher - // The only exception are the aggregated properties of perpetual_markets + // The two exceptions are the aggregated properties of perpetual_markets and the + // OrderbookMidPriceMemoryCache await initializeAllCaches(); + wrapBackgroundTask(OrderbookMidPriceMemoryCache.start(), true, 'startUpdateOrderbookMidPrices'); await connect(); await startConsumer(); diff --git a/indexer/services/ender/src/lib/candles-generator.ts b/indexer/services/ender/src/lib/candles-generator.ts index f1daa75f065..744c17f92e8 100644 --- a/indexer/services/ender/src/lib/candles-generator.ts +++ b/indexer/services/ender/src/lib/candles-generator.ts @@ -26,6 +26,7 @@ import _ from 'lodash'; import { DateTime } from 'luxon'; import { getCandle } from '../caches/candle-cache'; +import { getOrderbookMidPrice } from '../caches/orderbook-mid-price-memory-cache'; import config from '../config'; import { KafkaPublisher } from './kafka-publisher'; import { ConsolidatedKafkaEvent, SingleTradeMessage } from './types'; @@ -169,7 +170,7 @@ export class CandlesGenerator { const promises: Promise[] = []; const openInterestMap: OpenInterestMap = await this.getOpenInterestMap(); - const orderbookMidPriceMap = await getOrderbookMidPriceMap(); + const orderbookMidPriceMap = getOrderbookMidPriceMap(); _.forEach( Object.values(perpetualMarketRefresher.getPerpetualMarketsMap()), (perpetualMarket: PerpetualMarketFromDatabase) => { @@ -531,18 +532,13 @@ export class CandlesGenerator { /** * Get the cached orderbook mid price for a given ticker */ -export async function getOrderbookMidPriceMap(): Promise<{ [ticker: string]: OrderbookMidPrice }> { +export function getOrderbookMidPriceMap(): { [ticker: string]: OrderbookMidPrice } { const start: number = Date.now(); const perpetualMarkets = Object.values(perpetualMarketRefresher.getPerpetualMarketsMap()); - const promises = perpetualMarkets.map(async (perpetualMarket: PerpetualMarketFromDatabase) => { - return Promise.resolve({ [perpetualMarket.ticker]: undefined }); - }); - - const pricesArray = await Promise.all(promises); const priceMap: { [ticker: string]: OrderbookMidPrice } = {}; - pricesArray.forEach((price) => { - Object.assign(priceMap, price); + perpetualMarkets.forEach((perpetualMarket: PerpetualMarketFromDatabase) => { + priceMap[perpetualMarket.ticker] = getOrderbookMidPrice(perpetualMarket.ticker); }); stats.timing(`${config.SERVICE_NAME}.get_orderbook_mid_price_map.timing`, Date.now() - start);