Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IND-552] add roundtable task to take fast sync Postgres snapshots every 4 hours #912

Merged
merged 15 commits into from
Jan 5, 2024
1 change: 1 addition & 0 deletions indexer/packages/base/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ export const ONE_MINUTE_IN_MILLISECONDS: number = 60 * ONE_SECOND_IN_MILLISECOND
export const FIVE_MINUTES_IN_MILLISECONDS: number = 5 * ONE_MINUTE_IN_MILLISECONDS;
export const TEN_MINUTES_IN_MILLISECONDS: number = 10 * ONE_MINUTE_IN_MILLISECONDS;
export const ONE_HOUR_IN_MILLISECONDS: number = 60 * ONE_MINUTE_IN_MILLISECONDS;
export const FOUR_HOURS_IN_MILLISECONDS: number = 4 * ONE_HOUR_IN_MILLISECONDS;
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import config from '../../src/config';
import { asMock } from '@dydxprotocol-indexer/dev';
import {
createDBSnapshot,
deleteOldFastSyncSnapshots,
getMostRecentDBSnapshotIdentifier,
} from '../../src/helpers/aws';
import takeFastSyncSnapshotTask from '../../src/tasks/take-fast-sync-snapshot';
import { DateTime } from 'luxon';

jest.mock('../../src/helpers/aws');

describe('fast-sync-export-db-snapshot', () => {
const snapshotIdentifier: string = `${config.FAST_SYNC_SNAPSHOT_IDENTIFIER_PREFIX}-postgres-main-staging-2022-05-03-04-16`;
beforeAll(() => {
config.RDS_INSTANCE_NAME = 'postgres-main-staging';
});

beforeEach(() => {
jest.resetAllMocks();
asMock(getMostRecentDBSnapshotIdentifier).mockImplementation(
async () => Promise.resolve(snapshotIdentifier),
);
});

afterAll(jest.resetAllMocks);

it('Last snapshot was taken more than interval ago', async () => {
await takeFastSyncSnapshotTask();

expect(createDBSnapshot).toHaveBeenCalled();
expect(deleteOldFastSyncSnapshots).toHaveBeenCalled();
});

it('Last snapshot was taken less than interval ago', async () => {
const timestamp: string = DateTime.utc().minus({ minutes: 1 }).toFormat('yyyy-MM-dd-HH-mm');
asMock(getMostRecentDBSnapshotIdentifier).mockImplementation(
async () => Promise.resolve(`${config.FAST_SYNC_SNAPSHOT_IDENTIFIER_PREFIX}-postgres-main-staging-${timestamp}`),
);

await takeFastSyncSnapshotTask();

expect(createDBSnapshot).not.toHaveBeenCalled();
expect(deleteOldFastSyncSnapshots).not.toHaveBeenCalled();
});

it('No existing snapshot', async () => {
asMock(getMostRecentDBSnapshotIdentifier).mockImplementation(
async () => Promise.resolve(undefined),
);

await takeFastSyncSnapshotTask();

expect(createDBSnapshot).toHaveBeenCalled();
expect(deleteOldFastSyncSnapshots).toHaveBeenCalled();
});
});
6 changes: 6 additions & 0 deletions indexer/services/roundtable/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
ONE_HOUR_IN_MILLISECONDS,
ONE_SECOND_IN_MILLISECONDS,
TEN_SECONDS_IN_MILLISECONDS,
FOUR_HOURS_IN_MILLISECONDS,
} from '@dydxprotocol-indexer/base';
import {
kafkaConfigSchema,
Expand All @@ -40,6 +41,7 @@ export const configSchema = {
LOOPS_ORDERBOOK_INSTRUMENTATION: parseBoolean({ default: true }),
LOOPS_CANCEL_STALE_ORDERS: parseBoolean({ default: true }),
LOOPS_ENABLED_UPDATE_RESEARCH_ENVIRONMENT: parseBoolean({ default: false }),
LOOPS_ENABLED_TAKE_FAST_SYNC_SNAPSHOTS: parseBoolean({ default: true }),
LOOPS_ENABLED_TRACK_LAG: parseBoolean({ default: false }),
LOOPS_ENABLED_REMOVE_OLD_ORDER_UPDATES: parseBoolean({ default: true }),
LOOPS_ENABLED_AGGREGATE_TRADING_REWARDS: parseBoolean({ default: true }),
Expand All @@ -66,6 +68,9 @@ export const configSchema = {
LOOPS_INTERVAL_MS_UPDATE_RESEARCH_ENVIRONMENT: parseInteger({
default: ONE_HOUR_IN_MILLISECONDS,
}),
LOOPS_INTERVAL_MS_TAKE_FAST_SYNC_SNAPSHOTS: parseInteger({
default: FOUR_HOURS_IN_MILLISECONDS,
}),
LOOPS_INTERVAL_MS_UPDATE_COMPLIANCE_DATA: parseInteger({
default: FIVE_MINUTES_IN_MILLISECONDS,
}),
Expand Down Expand Up @@ -112,6 +117,7 @@ export const configSchema = {
AWS_ACCOUNT_ID: parseString(),
AWS_REGION: parseString(),
S3_BUCKET_ARN: parseString(),
FAST_SYNC_SNAPSHOT_IDENTIFIER_PREFIX: parseString({ default: 'fast-sync' }),
ECS_TASK_ROLE_ARN: parseString(),
KMS_KEY_ARN: parseString(),
RDS_INSTANCE_NAME: parseString(),
Expand Down
167 changes: 157 additions & 10 deletions indexer/services/roundtable/src/helpers/aws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,177 @@ enum ExportTaskStatus {
COMPLETE = 'complete',
}

const S3_BUCKET_NAME = config.S3_BUCKET_ARN.split(':::')[1];
export const S3_BUCKET_NAME = config.S3_BUCKET_ARN.split(':::')[1];
export const S3_LOCATION_PREFIX = `s3://${S3_BUCKET_NAME}`;

/**
* Delete snapshots for the RDS instance older than the specified number of days.
* Defaults to 7 days.
* @param rds
* @param daysOld
*/
export async function deleteOldFastSyncSnapshots(rds: RDS, daysOld: number = 7): Promise<void> {
try {
const cutoffTime: number = new Date().getTime() - daysOld * 24 * 60 * 60 * 1000;
let marker;
do {
const response: RDS.DBSnapshotMessage = await rds.describeDBSnapshots({
DBInstanceIdentifier: config.RDS_INSTANCE_NAME,
MaxRecords: 20, // Maximum number of records per page
Marker: marker, // Marker for pagination
}).promise();

if (response.DBSnapshots === undefined) {
logger.error({
at: `${atStart}deleteOldSnapshots`,
message: `No DB snapshots found with identifier: ${config.RDS_INSTANCE_NAME}`,
});
return;
}

// Filter for fast sync snapshots older than cutoffTime
const oldFastSyncSnapshots = response.DBSnapshots.filter((snapshot) => {
if (!snapshot.DBSnapshotIdentifier!.startsWith(
config.FAST_SYNC_SNAPSHOT_IDENTIFIER_PREFIX,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we log an error here? Technically there shouldn't be anything that doesn't match our fast sync identifier right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the rds automated daily snapshots will also be included here

)) {
return false;
}
const snapshotDate = snapshot.SnapshotCreateTime!.getTime();
return snapshotDate < cutoffTime;
});

// Delete each old snapshot
for (const snapshot of oldFastSyncSnapshots) {
logger.info({
at: `${atStart}deleteOldSnapshots`,
message: 'Deleting snapshot',
snapshotIdentifier: snapshot.DBSnapshotIdentifier,
});
const snapshotResult: RDS.Types.DeleteDBSnapshotResult = await rds.deleteDBSnapshot(
{ DBSnapshotIdentifier: snapshot.DBSnapshotIdentifier! },
).promise();
logger.info({
at: `${atStart}deleteOldSnapshots`,
message: 'Snapshot deleted',
snapshotIdentifier: snapshotResult.DBSnapshot!.DBSnapshotIdentifier!,
});
}

marker = response.Marker;
} while (marker);
} catch (error) {
logger.error({
at: `${atStart}deleteOldSnapshots`,
message: 'Error deleting old snapshots',
error,
});
throw error;
}
}

/**
* @description Get most recent snapshot identifier for an RDS database.
* @param rds - RDS client
* @param snapshotIdentifierPrefixInclude - Only include snapshots with snapshot identifier
* that starts with snapshotIdentifierPrefixInclude
* @param snapshotIdentifierPrefixExclude - Exclude snapshots with snapshot identifier
* that starts with snapshotIdentifierPrefixExclude
*/
// TODO(CLOB-672): Verify this function returns the most recent DB snapshot.
export async function getMostRecentDBSnapshotIdentifier(rds: RDS): Promise<string> {
const awsResponse: RDS.DBSnapshotMessage = await rds.describeDBSnapshots({
DBInstanceIdentifier: config.RDS_INSTANCE_NAME,
MaxRecords: 20, // this is the minimum
}).promise();
export async function getMostRecentDBSnapshotIdentifier(
rds: RDS,
snapshotIdentifierPrefixInclude?: string,
snapshotIdentifierPrefixExclude?: string,
): Promise<string | undefined> {
let snapshots: RDS.DBSnapshotList = [];
let marker: string | undefined;

do {
const awsResponse: RDS.DBSnapshotMessage = await rds.describeDBSnapshots({
DBInstanceIdentifier: config.RDS_INSTANCE_NAME,
MaxRecords: 20, // Maximum number of records per page
Marker: marker, // Marker for pagination
}).promise();

if (awsResponse.DBSnapshots === undefined) {
throw Error(`No DB snapshots found with identifier: ${config.RDS_INSTANCE_NAME}`);
}

if (awsResponse.DBSnapshots === undefined) {
throw Error(`No DB snapshots found with identifier: ${config.RDS_INSTANCE_NAME}`);
snapshots = snapshots.concat(awsResponse.DBSnapshots);
marker = awsResponse.Marker;
} while (marker);

// Filter snapshots based on include/exclude prefixes
if (snapshotIdentifierPrefixInclude !== undefined) {
snapshots = snapshots
.filter((snapshot) => snapshot.DBSnapshotIdentifier &&
snapshot.DBSnapshotIdentifier.startsWith(snapshotIdentifierPrefixInclude));
}
if (snapshotIdentifierPrefixExclude !== undefined) {
snapshots = snapshots
.filter((snapshot) => snapshot.DBSnapshotIdentifier &&
!snapshot.DBSnapshotIdentifier.startsWith(snapshotIdentifierPrefixExclude));
}

// Sort snapshots by creation time in descending order
snapshots.sort((a, b) => b.SnapshotCreateTime!.getTime() - a.SnapshotCreateTime!.getTime());

logger.info({
at: `${atStart}getMostRecentDBSnapshotIdentifier`,
message: 'Described snapshots for database',
mostRecentSnapshot: awsResponse.DBSnapshots[awsResponse.DBSnapshots.length - 1],
mostRecentSnapshot: snapshots[0],
});

return awsResponse.DBSnapshots[awsResponse.DBSnapshots.length - 1].DBSnapshotIdentifier!;
// Return the latest snapshot identifier
return snapshots[0]?.DBSnapshotIdentifier;
}

/**
* @description Create DB snapshot for an RDS database. Only returns when the
* snapshot is available.
*/
export async function createDBSnapshot(
rds: RDS,
snapshotIdentifier: string,
dbInstanceIdentifier: string,
): Promise<string> {
const params = {
DBInstanceIdentifier: dbInstanceIdentifier,
DBSnapshotIdentifier: snapshotIdentifier,
};

try {
await rds.createDBSnapshot(params).promise();

// Wait for the DB snapshot to become available with the specified waiter configuration
await rds.waitFor('dBSnapshotAvailable', {
DBSnapshotIdentifier: snapshotIdentifier,
$waiter: {
delay: 60, // 60 seconds delay between each request
maxAttempts: 10, // Maximum of 10 attempts
},
}).promise();

// Once it's available, retrieve its details
const statusResponse = await rds.describeDBSnapshots(
{ DBSnapshotIdentifier: snapshotIdentifier },
).promise();

const snapshot = statusResponse.DBSnapshots![0];
if (snapshot.Status === 'available') {
return snapshot.DBSnapshotIdentifier!;
} else {
throw Error(`Snapshot is not in the available state: Status is ${snapshot.Status}`);
}
} catch (error) {
logger.error({
at: `${atStart}createDBSnapshot`,
message: 'Failed to create DB snapshot',
error,
snapshotIdentifier,
});
throw error;
}
}

/**
Expand Down
9 changes: 9 additions & 0 deletions indexer/services/roundtable/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import marketUpdaterTask from './tasks/market-updater';
import orderbookInstrumentationTask from './tasks/orderbook-instrumentation';
import removeExpiredOrdersTask from './tasks/remove-expired-orders';
import removeOldOrderUpdatesTask from './tasks/remove-old-order-updates';
import takeFastSyncSnapshotTask from './tasks/take-fast-sync-snapshot';
import trackLag from './tasks/track-lag';
import updateComplianceDataTask from './tasks/update-compliance-data';
import updateResearchEnvironmentTask from './tasks/update-research-environment';
Expand Down Expand Up @@ -100,6 +101,14 @@ async function start(): Promise<void> {
);
}

if (config.LOOPS_ENABLED_TAKE_FAST_SYNC_SNAPSHOTS) {
startLoop(
takeFastSyncSnapshotTask,
'take_fast_sync_snapshot',
config.LOOPS_INTERVAL_MS_TAKE_FAST_SYNC_SNAPSHOTS,
);
}

startLoop(
() => updateComplianceDataTask(complianceProvider),
'update_compliance_data',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import { logger, stats } from '@dydxprotocol-indexer/base';
import RDS from 'aws-sdk/clients/rds';
import { DateTime } from 'luxon';

import config from '../config';
import {
createDBSnapshot,
deleteOldFastSyncSnapshots,
getMostRecentDBSnapshotIdentifier,
} from '../helpers/aws';

const statStart: string = `${config.SERVICE_NAME}.fast_sync_export_db_snapshot`;

/**
* Checks if the difference between two dates is less than a given interval.
*
* @param startDate
* @param endDate
* @param intervalMs
*/
function isDifferenceLessThanInterval(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move this under runTask so that the top level function is at the top of the file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

startDate: string,
endDate: string,
intervalMs: number,
): boolean {
const parseDateString = (dateStr: string): Date => {
const [year, month, day, hour, minute] = dateStr.split('-').map(Number);
return new Date(year, month, day, hour, minute);
};

// Parse the date strings
const parsedDate1 = parseDateString(startDate);
const parsedDate2 = parseDateString(endDate);

// Calculate the difference in milliseconds
const differenceInMilliseconds = Math.abs(parsedDate1.getTime() - parsedDate2.getTime());

// Compare with the interval
return differenceInMilliseconds < intervalMs;
}

export default async function runTask(): Promise<void> {
const at: string = 'fast-sync-export-db-snapshot#runTask';
logger.info({ at, message: 'Starting task.' });

const rds: RDS = new RDS();

const dateString: string = DateTime.utc().toFormat('yyyy-MM-dd-HH-mm');
const snapshotIdentifier: string = `${config.FAST_SYNC_SNAPSHOT_IDENTIFIER_PREFIX}-${config.RDS_INSTANCE_NAME}-${dateString}`;
// check the time of the last snapshot
const lastSnapshotIdentifier: string | undefined = await getMostRecentDBSnapshotIdentifier(
rds,
config.FAST_SYNC_SNAPSHOT_IDENTIFIER_PREFIX,
);
if (lastSnapshotIdentifier !== undefined) {
const s3Date: string = lastSnapshotIdentifier.split(config.RDS_INSTANCE_NAME)[1].slice(1);
if (
isDifferenceLessThanInterval(
s3Date,
dateString,
config.LOOPS_INTERVAL_MS_TAKE_FAST_SYNC_SNAPSHOTS,
)
) {
stats.increment(`${statStart}.existingDbSnapshot`, 1);
logger.info({
at,
message: 'Last fast sync db snapshot was taken less than the interval ago',
interval: config.LOOPS_INTERVAL_MS_TAKE_FAST_SYNC_SNAPSHOTS,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: log the s3Date and dateString

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

});
return;
}
}
// Create the DB snapshot
const startSnapshot: number = Date.now();
const createdSnapshotIdentifier: string = await
createDBSnapshot(rds, snapshotIdentifier, config.RDS_INSTANCE_NAME);
logger.info({ at, message: 'Created DB snapshot.', snapshotIdentifier: createdSnapshotIdentifier });
stats.timing(`${statStart}.createDbSnapshot`, Date.now() - startSnapshot);
const startDeleteOldSnapshot: number = Date.now();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: separate into a separate function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// Delete old snapshots.
await deleteOldFastSyncSnapshots(rds);
stats.timing(`${statStart}.deleteOldSnapshots`, Date.now() - startDeleteOldSnapshot);
}
Loading