Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(db): Improve market-updater task and getLatestPrice query (backport #2730) #2731

Open
wants to merge 1 commit into
base: release/indexer/v8.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ describe('Oracle price store', () => {
]);

const oraclePrices: PriceMap = await OraclePriceTable
.findLatestPrices(
.findLatestPricesBeforeOrAtHeight(
updatedHeight,
);

Expand Down Expand Up @@ -278,8 +278,7 @@ describe('Oracle price store', () => {
const blockPromises = blockHeights.map((height) => BlockTable.create({
...defaultBlock,
blockHeight: height,
}),
);
}));

await Promise.all(blockPromises);
await Promise.all([
Expand Down Expand Up @@ -319,12 +318,79 @@ describe('Oracle price store', () => {
]);

const oraclePrices: PriceMap = await OraclePriceTable
.findLatestPrices(
.findLatestPricesBeforeOrAtHeight(
defaultOraclePrice.effectiveAtHeight,
);

expect(oraclePrices).toEqual(expect.objectContaining({
[defaultOraclePrice.marketId]: defaultOraclePrice.price,
}));
});

it('Successfully finds latest prices by dateTime using LEFT JOIN LATERAL', async () => {
const now: string = DateTime.utc().toISO();
const yesterday: string = DateTime.utc().minus({ days: 1 }).toISO();
const twoDaysAgo: string = DateTime.utc().minus({ days: 2 }).toISO();

const recentPrice: OraclePriceCreateObject = {
...defaultOraclePrice,
price: '10000.05',
effectiveAtHeight: '10',
effectiveAt: now,
};

const olderPrice: OraclePriceCreateObject = {
...defaultOraclePrice,
price: '9500.75',
effectiveAtHeight: '9',
effectiveAt: yesterday,
};

const oldestPrice: OraclePriceCreateObject = {
...defaultOraclePrice,
price: '9000.50',
effectiveAtHeight: '8',
effectiveAt: twoDaysAgo,
};

const market2Price: OraclePriceCreateObject = {
...defaultOraclePrice2,
price: '500.25',
effectiveAtHeight: '11',
effectiveAt: yesterday,
};

const blockHeights = ['8', '9', '10', '11'];
const blockPromises = blockHeights.map((height) => BlockTable.create({
...defaultBlock,
blockHeight: height,
}));

await Promise.all(blockPromises);

await Promise.all([
OraclePriceTable.create(recentPrice),
OraclePriceTable.create(olderPrice),
OraclePriceTable.create(oldestPrice),
OraclePriceTable.create(market2Price),
]);

const yesterdayPrices: PriceMap = await OraclePriceTable.findLatestPricesByDateTime(yesterday);
expect(yesterdayPrices).toEqual({
[defaultMarket.id]: olderPrice.price,
[defaultMarket2.id]: market2Price.price,
});

const twoDaysAgoPrices: PriceMap = await
OraclePriceTable.findLatestPricesByDateTime(twoDaysAgo);
expect(twoDaysAgoPrices).toEqual({
[defaultMarket.id]: oldestPrice.price,
});

const currentPrices: PriceMap = await OraclePriceTable.findLatestPricesByDateTime(now);
expect(currentPrices).toEqual({
[defaultMarket.id]: recentPrice.price,
[defaultMarket2.id]: market2Price.price,
});
});
});
56 changes: 30 additions & 26 deletions indexer/packages/postgres/src/stores/oracle-price-table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,34 +178,36 @@ function constructPriceMap(oraclePrices: OraclePriceFromDatabase[]): PriceMap {
}, {});
}

async function findLatestPricesByDateTime(
export async function findLatestPricesByDateTime(
latestDateTimeISO: string,
): Promise<PriceMap> {
const baseQuery: QueryBuilder<OraclePriceModel> = setupBaseQuery<OraclePriceModel>(
OraclePriceModel,
{ readReplica: true },
);

const innerQuery: QueryBuilder<OraclePriceModel> = setupBaseQuery<OraclePriceModel>(
OraclePriceModel,
{ readReplica: true },
);

const subQuery = innerQuery
.select('marketId')
.max('effectiveAt as maxEffectiveAt')
.where('effectiveAt', '<=', latestDateTimeISO)
.groupBy('marketId');
// Use raw query with LEFT JOIN LATERAL for better performance.
// This query enables Postgres to utilize the index on the effectiveAt column
// for individual markets.
const query = `
SELECT m.id AS "marketId",
p."price",
p."effectiveAt",
p."effectiveAtHeight",
p."id"
FROM "markets" m
LEFT JOIN LATERAL (
SELECT "id", "price", "effectiveAt", "effectiveAtHeight"
FROM "oracle_prices"
WHERE "marketId" = m.id
AND "effectiveAt" <= ?
ORDER BY "effectiveAt" DESC
LIMIT 1
) p ON TRUE
WHERE p."price" IS NOT NULL
`;

const oraclePrices: OraclePriceFromDatabase[] = await baseQuery
.innerJoin(subQuery.as('sub'), function joinConditions() {
this
.on('oracle_prices.marketId', '=', 'sub.marketId')
.andOn('oracle_prices.effectiveAt', '=', 'sub.maxEffectiveAt');
})
.returning('*');
const result = await knexReadReplica.getConnection().raw(
query,
[latestDateTimeISO],
) as unknown as { rows: OraclePriceFromDatabase[] };

return constructPriceMap(oraclePrices);
return constructPriceMap(result.rows);
}

export async function getPricesFrom24hAgo(
Expand All @@ -219,7 +221,7 @@ export async function getLatestPrices(): Promise<PriceMap> {
return findLatestPricesByDateTime(now);
}

export async function findLatestPrices(
export async function findLatestPricesBeforeOrAtHeight(
effectiveBeforeOrAtHeight: string,
transaction?: Knex.Transaction,
): Promise<PriceMap> {
Expand All @@ -229,17 +231,19 @@ export async function findLatestPrices(
WHERE ("marketId", "effectiveAtHeight") IN (
SELECT "marketId", MAX("effectiveAtHeight")
FROM "oracle_prices"
WHERE "effectiveAtHeight" <= '${effectiveBeforeOrAtHeight}'
WHERE "effectiveAtHeight" <= ?
GROUP BY "marketId");
`;
let result: { rows: OraclePriceFromDatabase[] };
if (transaction === undefined) {
result = await knexReadReplica.getConnection().raw(
query,
[effectiveBeforeOrAtHeight],
) as unknown as { rows: OraclePriceFromDatabase[] };
} else {
result = await knexReadReplica.getConnection().raw(
query,
[effectiveBeforeOrAtHeight],
).transacting(transaction) as unknown as { rows: OraclePriceFromDatabase[] };
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ export async function getPnlTicksCreateObjects(
blockHeight,
txId,
),
OraclePriceTable.findLatestPrices(blockHeight),
OraclePriceTable.findLatestPricesBeforeOrAtHeight(blockHeight),
FundingIndexUpdatesTable.findFundingIndexMap(blockHeight),
]);
stats.timing(
Expand Down
27 changes: 23 additions & 4 deletions indexer/services/roundtable/src/tasks/market-updater.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,28 @@ export function getPriceChange(
export default async function runTask(): Promise<void> {
const start: number = Date.now();

const liquidityTiers:
LiquidityTiersFromDatabase[] = await LiquidityTiersTable.findAll({}, []);
const perpetualMarkets:
PerpetualMarketFromDatabase[] = await PerpetualMarketTable.findAll({}, []);
// Run all initial database queries in parallel
const [
liquidityTiers,
perpetualMarkets,
latestPrices,
prices24hAgo,
] : [
LiquidityTiersFromDatabase[],
PerpetualMarketFromDatabase[],
PriceMap,
PriceMap,
] = await Promise.all([
LiquidityTiersTable.findAll({}, []),
PerpetualMarketTable.findAll({}, []),
OraclePriceTable.getLatestPrices(),
OraclePriceTable.getPricesFrom24hAgo(),
]);

// Derive data from perpetual markets
const perpetualMarketIds: string[] = _.map(perpetualMarkets, PerpetualMarketColumns.id);
const clobPairIds: string[] = _.map(perpetualMarkets, PerpetualMarketColumns.clobPairId);
<<<<<<< HEAD
const tickerDefaultFundingRate1HPairs: [string, string][] = _.map(
perpetualMarkets,
(market) => [
Expand All @@ -64,6 +80,9 @@ export default async function runTask(): Promise<void> {
);
const latestPrices: PriceMap = await OraclePriceTable.getLatestPrices();
const prices24hAgo: PriceMap = await OraclePriceTable.getPricesFrom24hAgo();
=======
const tickers: string[] = _.map(perpetualMarkets, PerpetualMarketColumns.ticker);
>>>>>>> 2bec0a29 (perf(db): Improve `market-updater` task and `getLatestPrice` query (#2730))

stats.timing(
`${config.SERVICE_NAME}.market_updater_initial_queries`,
Expand Down
Loading