diff --git a/indexer/services/roundtable/__tests__/tasks/aggregate-trading-rewards.test.ts b/indexer/services/roundtable/__tests__/tasks/aggregate-trading-rewards.test.ts new file mode 100644 index 0000000000..e1111c1e1e --- /dev/null +++ b/indexer/services/roundtable/__tests__/tasks/aggregate-trading-rewards.test.ts @@ -0,0 +1,27 @@ +import { + dbHelpers, testMocks, +} from '@dydxprotocol-indexer/postgres'; + +describe('aggregate-trading-rewards', () => { + beforeAll(async () => { + await dbHelpers.migrate(); + await dbHelpers.clearData(); + }); + + beforeEach(async () => { + await testMocks.seedData(); + }); + + afterEach(async () => { + await dbHelpers.clearData(); + jest.resetAllMocks(); + }); + + afterAll(async () => { + await dbHelpers.teardown(); + }); + + it('temp', () => { + expect(true).toEqual(true); + }); +}); diff --git a/indexer/services/roundtable/src/config.ts b/indexer/services/roundtable/src/config.ts index 30a96ed643..06bd1ac273 100644 --- a/indexer/services/roundtable/src/config.ts +++ b/indexer/services/roundtable/src/config.ts @@ -42,6 +42,7 @@ export const configSchema = { LOOPS_ENABLED_UPDATE_RESEARCH_ENVIRONMENT: parseBoolean({ default: false }), LOOPS_ENABLED_TRACK_LAG: parseBoolean({ default: false }), LOOPS_ENABLED_REMOVE_OLD_ORDER_UPDATES: parseBoolean({ default: true }), + LOOPS_ENABLED_AGGREGATE_TRADING_REWARDS: parseBoolean({ default: true }), // Loop Timing LOOPS_INTERVAL_MS_MARKET_UPDATER: parseInteger({ @@ -74,6 +75,9 @@ export const configSchema = { LOOPS_INTERVAL_MS_REMOVE_OLD_ORDER_UPDATES: parseInteger({ default: THIRTY_SECONDS_IN_MILLISECONDS, }), + LOOPS_INTERVAL_MS_AGGREGATE_TRADING_REWARDS: parseInteger({ + default: ONE_MINUTE_IN_MILLISECONDS, + }), // Start delay START_DELAY_ENABLED: parseBoolean({ default: true }), diff --git a/indexer/services/roundtable/src/index.ts b/indexer/services/roundtable/src/index.ts index fb40d616d8..689abb32e0 100644 --- a/indexer/services/roundtable/src/index.ts +++ b/indexer/services/roundtable/src/index.ts @@ -8,6 +8,7 @@ import { redisClient, connect as connectToRedis, } from './helpers/redis'; +import aggregateTradingRewardsTasks from './tasks/aggregate-trading-rewards'; import cancelStaleOrdersTask from './tasks/cancel-stale-orders'; import createPnlTicksTask from './tasks/create-pnl-ticks'; import deleteZeroPriceLevelsTask from './tasks/delete-zero-price-levels'; @@ -121,6 +122,14 @@ async function start(): Promise { ); } + if (config.LOOPS_ENABLED_AGGREGATE_TRADING_REWARDS) { + startLoop( + aggregateTradingRewardsTasks, + 'aggregate_trading_rewards', + config.LOOPS_INTERVAL_MS_AGGREGATE_TRADING_REWARDS, + ); + } + logger.info({ at: 'index', message: 'Successfully started', diff --git a/indexer/services/roundtable/src/tasks/aggregate-trading-rewards.ts b/indexer/services/roundtable/src/tasks/aggregate-trading-rewards.ts new file mode 100644 index 0000000000..4d1b9cc5d5 --- /dev/null +++ b/indexer/services/roundtable/src/tasks/aggregate-trading-rewards.ts @@ -0,0 +1,63 @@ +import { + BlockFromDatabase, + BlockTable, + TradingRewardFromDatabase, +} from '@dydxprotocol-indexer/postgres'; +import { DateTime } from 'luxon'; + +/** + * Task: Aggregate Trading Rewards + * Description: This task aggregates trading rewards for a specific period of time. + * It retrieves trading data from the database, calculates the rewards, and stores the aggregated + * results. + */ +interface Interval { + start: DateTime; + end: DateTime; +} + +interface SortedTradingRewardData { + [address: string]: TradingRewardFromDatabase[]; +} + +export default async function runTask(): Promise { + // TODO(IND-499): Add resetting aggregation data when cache is empty + const interval: Interval | undefined = await getTradingRewardDataToProcessInterval(); + + const tradingRewardData: TradingRewardFromDatabase[] = await getTradingRewardDataToProcess( + interval, + ); + const sortedTradingRewardData: SortedTradingRewardData = sortTradingRewardData(tradingRewardData); + await updateTradingRewardsAggregation(sortedTradingRewardData); + // TODO(IND-499): Update AggregateTradingRewardsProcessedCache +} + +async function getTradingRewardDataToProcessInterval(): Promise { + const latestBlock: BlockFromDatabase = await BlockTable.getLatest(); + + // TODO(IND-499): Setup AggregateTradingRewardsProcessedCache for start time and add end time + return { + start: DateTime.fromISO(latestBlock.time), + end: DateTime.fromISO(latestBlock.time), + }; +} + +async function getTradingRewardDataToProcess( + _interval: Interval, +): Promise { + // TODO: Implement + return Promise.resolve([]); +} + +function sortTradingRewardData( + _tradingRewardData: TradingRewardFromDatabase[], +): SortedTradingRewardData { + // TODO: Implement + return {}; +} + +async function updateTradingRewardsAggregation( + _sortedTradingRewardData: SortedTradingRewardData, +): Promise { + // TODO: Implement +}