From 6c0486830728a7b8be6ef53e30b2d340be9d5d10 Mon Sep 17 00:00:00 2001 From: Will Liu Date: Tue, 2 Jan 2024 12:38:45 -0500 Subject: [PATCH 01/11] add roundtable task to take fast sync snapshots at shorter cadence --- indexer/packages/base/src/constants.ts | 1 + indexer/services/roundtable/.env.test | 3 +- .../tasks/take-fast-sync-snapshot.test.ts | 48 +++++++++ indexer/services/roundtable/src/config.ts | 8 +- .../services/roundtable/src/helpers/aws.ts | 17 ++-- .../services/roundtable/src/helpers/sql.ts | 4 +- indexer/services/roundtable/src/index.ts | 9 ++ .../src/tasks/take-fast-sync-snapshot.ts | 98 +++++++++++++++++++ .../src/tasks/update-research-environment.ts | 16 ++- 9 files changed, 190 insertions(+), 14 deletions(-) create mode 100644 indexer/services/roundtable/__tests__/tasks/take-fast-sync-snapshot.test.ts create mode 100644 indexer/services/roundtable/src/tasks/take-fast-sync-snapshot.ts diff --git a/indexer/packages/base/src/constants.ts b/indexer/packages/base/src/constants.ts index d3c3442ac2..0699deea56 100644 --- a/indexer/packages/base/src/constants.ts +++ b/indexer/packages/base/src/constants.ts @@ -12,3 +12,4 @@ export const ONE_MINUTE_IN_MILLISECONDS: number = 60 * ONE_SECOND_IN_MILLISECOND export const FIVE_MINUTES_IN_MILLISECONDS: number = 5 * ONE_MINUTE_IN_MILLISECONDS; export const TEN_MINUTES_IN_MILLISECONDS: number = 10 * ONE_MINUTE_IN_MILLISECONDS; export const ONE_HOUR_IN_MILLISECONDS: number = 60 * ONE_MINUTE_IN_MILLISECONDS; +export const FOUR_HOURS_IN_MILLISECONDS: number = 4 * ONE_HOUR_IN_MILLISECONDS; diff --git a/indexer/services/roundtable/.env.test b/indexer/services/roundtable/.env.test index f505b5f05a..aa153fcf6b 100644 --- a/indexer/services/roundtable/.env.test +++ b/indexer/services/roundtable/.env.test @@ -6,7 +6,8 @@ DB_PASSWORD=dydxserver123 DB_PORT=5436 AWS_ACCOUNT_ID=invalid-test-account-id AWS_REGION=invalid-test-region -S3_BUCKET_ARN=invalid-test-bucket +RESEARCH_SNAPSHOT_S3_BUCKET_ARN=invalid-research-test-bucket +FAST_SYNC_SNAPSHOT_S3_BUCKET_ARN=invalid-snapshot-test-bucket ECS_TASK_ROLE_ARN=invalid-test-arn KMS_KEY_ARN=invalid-kms-key-arn RDS_INSTANCE_NAME=invalid-rds-instance-name diff --git a/indexer/services/roundtable/__tests__/tasks/take-fast-sync-snapshot.test.ts b/indexer/services/roundtable/__tests__/tasks/take-fast-sync-snapshot.test.ts new file mode 100644 index 0000000000..5c0c41ed00 --- /dev/null +++ b/indexer/services/roundtable/__tests__/tasks/take-fast-sync-snapshot.test.ts @@ -0,0 +1,48 @@ +import config from '../../src/config'; +import { asMock } from '@dydxprotocol-indexer/dev'; +import { + checkIfExportJobToS3IsOngoing, + checkIfS3ObjectExists, + getMostRecentDBSnapshotIdentifier, + startExportTask, +} from '../../src/helpers/aws'; +import takeFastSyncSnapshotTask from '../../src/tasks/take-fast-sync-snapshot'; + +jest.mock('../../src/helpers/aws'); + +describe('fast-sync-export-db-snapshot', () => { + beforeAll(() => { + config.RDS_INSTANCE_NAME = 'postgres-main-staging'; + }); + + beforeEach(() => { + jest.resetAllMocks(); + asMock(getMostRecentDBSnapshotIdentifier).mockImplementation(async () => Promise.resolve('postgres-main-staging-2022-05-03-04-16')); + }); + + afterAll(jest.resetAllMocks); + + it('s3Object exists', async () => { + asMock(checkIfS3ObjectExists).mockImplementation(async () => Promise.resolve(true)); + + await takeFastSyncSnapshotTask(); + + expect(checkIfExportJobToS3IsOngoing).not.toHaveBeenCalled(); + expect(startExportTask).not.toHaveBeenCalled(); + }); + + it('export job in progress', async () => { + asMock(checkIfExportJobToS3IsOngoing).mockImplementation( + async () => Promise.resolve(true)); + + await takeFastSyncSnapshotTask(); + + expect(startExportTask).not.toHaveBeenCalled(); + }); + + it('start export job', async () => { + await takeFastSyncSnapshotTask(); + + expect(startExportTask).toHaveBeenCalled(); + }); +}); diff --git a/indexer/services/roundtable/src/config.ts b/indexer/services/roundtable/src/config.ts index 30a96ed643..01fdbd2eff 100644 --- a/indexer/services/roundtable/src/config.ts +++ b/indexer/services/roundtable/src/config.ts @@ -15,6 +15,7 @@ import { ONE_HOUR_IN_MILLISECONDS, ONE_SECOND_IN_MILLISECONDS, TEN_SECONDS_IN_MILLISECONDS, + FOUR_HOURS_IN_MILLISECONDS, } from '@dydxprotocol-indexer/base'; import { kafkaConfigSchema, @@ -40,6 +41,7 @@ export const configSchema = { LOOPS_ORDERBOOK_INSTRUMENTATION: parseBoolean({ default: true }), LOOPS_CANCEL_STALE_ORDERS: parseBoolean({ default: true }), LOOPS_ENABLED_UPDATE_RESEARCH_ENVIRONMENT: parseBoolean({ default: false }), + LOOPS_ENABLED_TAKE_FAST_SYNC_SNAPSHOTS: parseBoolean({ default: false }), LOOPS_ENABLED_TRACK_LAG: parseBoolean({ default: false }), LOOPS_ENABLED_REMOVE_OLD_ORDER_UPDATES: parseBoolean({ default: true }), @@ -65,6 +67,9 @@ export const configSchema = { LOOPS_INTERVAL_MS_UPDATE_RESEARCH_ENVIRONMENT: parseInteger({ default: ONE_HOUR_IN_MILLISECONDS, }), + LOOPS_INTERVAL_MS_TAKE_FAST_SYNC_SNAPSHOTS: parseInteger({ + default: FOUR_HOURS_IN_MILLISECONDS, + }), LOOPS_INTERVAL_MS_UPDATE_COMPLIANCE_DATA: parseInteger({ default: FIVE_MINUTES_IN_MILLISECONDS, }), @@ -107,7 +112,8 @@ export const configSchema = { // Update research environment AWS_ACCOUNT_ID: parseString(), AWS_REGION: parseString(), - S3_BUCKET_ARN: parseString(), + RESEARCH_SNAPSHOT_S3_BUCKET_ARN: parseString(), + FAST_SYNC_SNAPSHOT_S3_BUCKET_ARN: parseString(), ECS_TASK_ROLE_ARN: parseString(), KMS_KEY_ARN: parseString(), RDS_INSTANCE_NAME: parseString(), diff --git a/indexer/services/roundtable/src/helpers/aws.ts b/indexer/services/roundtable/src/helpers/aws.ts index bb3a2741b3..4ff27039cf 100644 --- a/indexer/services/roundtable/src/helpers/aws.ts +++ b/indexer/services/roundtable/src/helpers/aws.ts @@ -16,8 +16,9 @@ enum ExportTaskStatus { COMPLETE = 'complete', } -const S3_BUCKET_NAME = config.S3_BUCKET_ARN.split(':::')[1]; -export const S3_LOCATION_PREFIX = `s3://${S3_BUCKET_NAME}`; +export const RESEARCH_SNAPSHOT_S3_BUCKET_NAME = config.RESEARCH_SNAPSHOT_S3_BUCKET_ARN.split(':::')[1]; +export const RESEARCH_SNAPSHOT_S3_LOCATION_PREFIX = `s3://${RESEARCH_SNAPSHOT_S3_BUCKET_NAME}`; +export const FAST_SYNC_SNAPSHOT_S3_BUCKET_NAME = config.FAST_SYNC_SNAPSHOT_S3_BUCKET_ARN.split(':::')[1]; /** * @description Get most recent snapshot identifier for an RDS database. @@ -45,9 +46,12 @@ export async function getMostRecentDBSnapshotIdentifier(rds: RDS): Promise { +export async function checkIfS3ObjectExists( + s3: S3, + s3Date: string, + bucket: string, +): Promise { const at: string = `${atStart}checkIfS3ObjectExists`; - const bucket: string = S3_BUCKET_NAME; const key: string = `${config.RDS_INSTANCE_NAME}-${s3Date}/export_info_${config.RDS_INSTANCE_NAME}-${s3Date}.json`; logger.info({ @@ -143,12 +147,13 @@ export async function checkIfExportJobToS3IsOngoing( export async function startExportTask( rds: RDS, rdsExportIdentifier: string, + bucket: string, ): Promise { // TODO: Add validation const sourceArnPrefix = `arn:aws:rds:${config.AWS_REGION}:${config.AWS_ACCOUNT_ID}:snapshot:rds:`; const awsResponse: RDS.ExportTask = await rds.startExportTask({ ExportTaskIdentifier: rdsExportIdentifier, - S3BucketName: S3_BUCKET_NAME, + S3BucketName: bucket, KmsKeyId: config.KMS_KEY_ARN, IamRoleArn: config.ECS_TASK_ROLE_ARN, SourceArn: `${sourceArnPrefix}${rdsExportIdentifier}`, @@ -216,7 +221,7 @@ export async function startAthenaQuery( Database: config.ATHENA_DATABASE_NAME, }, ResultConfiguration: { - OutputLocation: `${S3_LOCATION_PREFIX}/output/${timestamp}`, + OutputLocation: `${RESEARCH_SNAPSHOT_S3_LOCATION_PREFIX}/output/${timestamp}`, }, WorkGroup: config.ATHENA_WORKING_GROUP, }).promise(); diff --git a/indexer/services/roundtable/src/helpers/sql.ts b/indexer/services/roundtable/src/helpers/sql.ts index 24175fe557..43a3ae9ff0 100644 --- a/indexer/services/roundtable/src/helpers/sql.ts +++ b/indexer/services/roundtable/src/helpers/sql.ts @@ -1,4 +1,4 @@ -import { S3_LOCATION_PREFIX } from './aws'; +import { RESEARCH_SNAPSHOT_S3_LOCATION_PREFIX } from './aws'; export function castToTimestamp(column: string): string { return `CAST("${column}" AS timestamp) as "${column}"`; @@ -23,7 +23,7 @@ export function getExternalAthenaTableCreationStatement( 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' - LOCATION '${S3_LOCATION_PREFIX}/${rdsExportIdentifier}/dydx/public.${tableName}' + LOCATION '${RESEARCH_SNAPSHOT_S3_LOCATION_PREFIX}/${rdsExportIdentifier}/dydx/public.${tableName}' TBLPROPERTIES ('has_encrypted_data'='false'); `; } diff --git a/indexer/services/roundtable/src/index.ts b/indexer/services/roundtable/src/index.ts index fb40d616d8..258ea5ebed 100644 --- a/indexer/services/roundtable/src/index.ts +++ b/indexer/services/roundtable/src/index.ts @@ -11,6 +11,7 @@ import { import cancelStaleOrdersTask from './tasks/cancel-stale-orders'; import createPnlTicksTask from './tasks/create-pnl-ticks'; import deleteZeroPriceLevelsTask from './tasks/delete-zero-price-levels'; +import takeFastSyncSnapshotTask from './tasks/take-fast-sync-snapshot'; import marketUpdaterTask from './tasks/market-updater'; import orderbookInstrumentationTask from './tasks/orderbook-instrumentation'; import removeExpiredOrdersTask from './tasks/remove-expired-orders'; @@ -99,6 +100,14 @@ async function start(): Promise { ); } + if (config.LOOPS_ENABLED_TAKE_FAST_SYNC_SNAPSHOTS) { + startLoop( + takeFastSyncSnapshotTask, + 'take_fast_sync_snapshot', + config.LOOPS_INTERVAL_MS_TAKE_FAST_SYNC_SNAPSHOTS, + ); + } + startLoop( () => updateComplianceDataTask(complianceProvider), 'update_compliance_data', diff --git a/indexer/services/roundtable/src/tasks/take-fast-sync-snapshot.ts b/indexer/services/roundtable/src/tasks/take-fast-sync-snapshot.ts new file mode 100644 index 0000000000..0fc26f0689 --- /dev/null +++ b/indexer/services/roundtable/src/tasks/take-fast-sync-snapshot.ts @@ -0,0 +1,98 @@ +import { InfoObject, logger, stats } from '@dydxprotocol-indexer/base'; +import RDS from 'aws-sdk/clients/rds'; +import S3 from 'aws-sdk/clients/s3'; +import { DateTime } from 'luxon'; + +import config from '../config'; +import { + checkIfExportJobToS3IsOngoing, + checkIfS3ObjectExists, + FAST_SYNC_SNAPSHOT_S3_BUCKET_NAME, + getMostRecentDBSnapshotIdentifier, + startExportTask, +} from '../helpers/aws'; + +const statStart: string = `${config.SERVICE_NAME}.fast_sync_export_db_snapshot`; + +export default async function runTask(): Promise { + const at: string = 'fast-sync-export-db-snapshot#runTask'; + + const rds: RDS = new RDS(); + + // get most recent rds snapshot + const startDescribe: number = Date.now(); + const dateString: string = DateTime.utc().toFormat('yyyy-MM-dd'); + const mostRecentSnapshot: string = await getMostRecentDBSnapshotIdentifier(rds); + stats.timing(`${statStart}.describe_rds_snapshots`, Date.now() - startDescribe); + + // dev example: rds:dev-indexer-apne1-db-2023-06-25-18-34 + const s3Date: string = mostRecentSnapshot.split(config.RDS_INSTANCE_NAME)[1].slice(1); + const s3: S3 = new S3(); + + // check if s3 object exists + const startS3Check: number = Date.now(); + const s3ObjectExists: boolean = await checkIfS3ObjectExists( + s3, + s3Date, + FAST_SYNC_SNAPSHOT_S3_BUCKET_NAME, + ); + stats.timing(`${statStart}.checkS3Object`, Date.now() - startS3Check); + + const rdsExportIdentifier: string = `${config.RDS_INSTANCE_NAME}-fast-sync-${s3Date}`; + + // If the s3 object exists, return + if (s3ObjectExists) { + logger.info({ + at, + dateString, + message: 'S3 object exists.', + }); + return; + } + + // if we haven't created the object, check if it is being created + const rdsExportCheck: number = Date.now(); + const exportJobOngoing: boolean = await checkIfExportJobToS3IsOngoing(rds, rdsExportIdentifier); + stats.timing(`${statStart}.checkRdsExport`, Date.now() - rdsExportCheck); + + if (exportJobOngoing) { + logger.info({ + at, + dateString, + message: 'Will wait for export job to finish', + }); + return; + } + // start Export Job if S3 Object does not exist + const startExport: number = Date.now(); + try { + const exportData: RDS.ExportTask = await startExportTask( + rds, + rdsExportIdentifier, + FAST_SYNC_SNAPSHOT_S3_BUCKET_NAME, + ); + + logger.info({ + at, + message: 'Started an export task', + exportData, + }); + } catch (error) { // TODO handle this by finding the most recent snapshot earlier + const message: InfoObject = { + at, + message: 'export to S3 failed', + error, + }; + + if (error.name === 'DBSnapshotNotFound') { + stats.increment(`${statStart}.no_s3_snapshot`, 1); + + logger.info(message); + return; + } + + logger.error(message); + } finally { + stats.timing(`${statStart}.rdsSnapshotExport`, Date.now() - startExport); + } +} diff --git a/indexer/services/roundtable/src/tasks/update-research-environment.ts b/indexer/services/roundtable/src/tasks/update-research-environment.ts index a0a85d6f89..3deb3fd66f 100644 --- a/indexer/services/roundtable/src/tasks/update-research-environment.ts +++ b/indexer/services/roundtable/src/tasks/update-research-environment.ts @@ -15,7 +15,7 @@ import { checkIfS3ObjectExists, getMostRecentDBSnapshotIdentifier, startExportTask, - startAthenaQuery, + startAthenaQuery, RESEARCH_SNAPSHOT_S3_BUCKET_NAME, } from '../helpers/aws'; import { AthenaTableDDLQueries } from '../helpers/types'; import * as athenaAssetPositions from '../lib/athena-ddl-tables/asset_positions'; @@ -75,10 +75,14 @@ export default async function runTask(): Promise { // check if s3 object exists const startS3Check: number = Date.now(); - const s3ObjectExists: boolean = await checkIfS3ObjectExists(s3, s3Date); + const s3ObjectExists: boolean = await checkIfS3ObjectExists( + s3, + s3Date, + RESEARCH_SNAPSHOT_S3_BUCKET_NAME, + ); stats.timing(`${statStart}.checkS3Object`, Date.now() - startS3Check); - const rdsExportIdentifier: string = `${config.RDS_INSTANCE_NAME}-${s3Date}`; + const rdsExportIdentifier: string = `${config.RDS_INSTANCE_NAME}-research-${s3Date}`; // If the s3 object exists, attempt to add Athena tables or if we are skipping for test purposes if (s3ObjectExists || config.SKIP_TO_ATHENA_TABLE_WRITING) { @@ -110,7 +114,11 @@ export default async function runTask(): Promise { // start Export Job if S3 Object does not exist const startExport: number = Date.now(); try { - const exportData: RDS.ExportTask = await startExportTask(rds, rdsExportIdentifier); + const exportData: RDS.ExportTask = await startExportTask( + rds, + rdsExportIdentifier, + RESEARCH_SNAPSHOT_S3_BUCKET_NAME, + ); logger.info({ at, From ffe0b2f7230c8ef17ac682eff9604e453498205b Mon Sep 17 00:00:00 2001 From: Will Liu Date: Tue, 2 Jan 2024 12:44:41 -0500 Subject: [PATCH 02/11] lint --- indexer/services/roundtable/src/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexer/services/roundtable/src/index.ts b/indexer/services/roundtable/src/index.ts index 258ea5ebed..6ae7ab6664 100644 --- a/indexer/services/roundtable/src/index.ts +++ b/indexer/services/roundtable/src/index.ts @@ -11,11 +11,11 @@ import { import cancelStaleOrdersTask from './tasks/cancel-stale-orders'; import createPnlTicksTask from './tasks/create-pnl-ticks'; import deleteZeroPriceLevelsTask from './tasks/delete-zero-price-levels'; -import takeFastSyncSnapshotTask from './tasks/take-fast-sync-snapshot'; import marketUpdaterTask from './tasks/market-updater'; import orderbookInstrumentationTask from './tasks/orderbook-instrumentation'; import removeExpiredOrdersTask from './tasks/remove-expired-orders'; import removeOldOrderUpdatesTask from './tasks/remove-old-order-updates'; +import takeFastSyncSnapshotTask from './tasks/take-fast-sync-snapshot'; import trackLag from './tasks/track-lag'; import updateComplianceDataTask from './tasks/update-compliance-data'; import updateResearchEnvironmentTask from './tasks/update-research-environment'; From 697bc6ab058612a26243a9975f94e06476dc4cc4 Mon Sep 17 00:00:00 2001 From: Will Liu Date: Wed, 3 Jan 2024 12:39:54 -0500 Subject: [PATCH 03/11] fix --- .../src/tasks/take-fast-sync-snapshot.ts | 41 +++---------------- 1 file changed, 6 insertions(+), 35 deletions(-) diff --git a/indexer/services/roundtable/src/tasks/take-fast-sync-snapshot.ts b/indexer/services/roundtable/src/tasks/take-fast-sync-snapshot.ts index 0fc26f0689..e9774164a4 100644 --- a/indexer/services/roundtable/src/tasks/take-fast-sync-snapshot.ts +++ b/indexer/services/roundtable/src/tasks/take-fast-sync-snapshot.ts @@ -1,14 +1,11 @@ import { InfoObject, logger, stats } from '@dydxprotocol-indexer/base'; import RDS from 'aws-sdk/clients/rds'; -import S3 from 'aws-sdk/clients/s3'; import { DateTime } from 'luxon'; import config from '../config'; import { checkIfExportJobToS3IsOngoing, - checkIfS3ObjectExists, FAST_SYNC_SNAPSHOT_S3_BUCKET_NAME, - getMostRecentDBSnapshotIdentifier, startExportTask, } from '../helpers/aws'; @@ -16,41 +13,14 @@ const statStart: string = `${config.SERVICE_NAME}.fast_sync_export_db_snapshot`; export default async function runTask(): Promise { const at: string = 'fast-sync-export-db-snapshot#runTask'; + logger.info({ at, message: 'Starting task.' }); const rds: RDS = new RDS(); - // get most recent rds snapshot - const startDescribe: number = Date.now(); - const dateString: string = DateTime.utc().toFormat('yyyy-MM-dd'); - const mostRecentSnapshot: string = await getMostRecentDBSnapshotIdentifier(rds); - stats.timing(`${statStart}.describe_rds_snapshots`, Date.now() - startDescribe); + const dateString: string = DateTime.utc().toFormat('yyyy-MM-dd-HH-mm'); + const rdsExportIdentifier: string = `${config.RDS_INSTANCE_NAME}-fast-sync-${dateString}`; - // dev example: rds:dev-indexer-apne1-db-2023-06-25-18-34 - const s3Date: string = mostRecentSnapshot.split(config.RDS_INSTANCE_NAME)[1].slice(1); - const s3: S3 = new S3(); - - // check if s3 object exists - const startS3Check: number = Date.now(); - const s3ObjectExists: boolean = await checkIfS3ObjectExists( - s3, - s3Date, - FAST_SYNC_SNAPSHOT_S3_BUCKET_NAME, - ); - stats.timing(`${statStart}.checkS3Object`, Date.now() - startS3Check); - - const rdsExportIdentifier: string = `${config.RDS_INSTANCE_NAME}-fast-sync-${s3Date}`; - - // If the s3 object exists, return - if (s3ObjectExists) { - logger.info({ - at, - dateString, - message: 'S3 object exists.', - }); - return; - } - - // if we haven't created the object, check if it is being created + // check if it is being created const rdsExportCheck: number = Date.now(); const exportJobOngoing: boolean = await checkIfExportJobToS3IsOngoing(rds, rdsExportIdentifier); stats.timing(`${statStart}.checkRdsExport`, Date.now() - rdsExportCheck); @@ -63,7 +33,8 @@ export default async function runTask(): Promise { }); return; } - // start Export Job if S3 Object does not exist + + // start Export Job if not already started const startExport: number = Date.now(); try { const exportData: RDS.ExportTask = await startExportTask( From 64054d6bf62151550ef46471b51172868eff1f93 Mon Sep 17 00:00:00 2001 From: Will Liu Date: Wed, 3 Jan 2024 13:29:14 -0500 Subject: [PATCH 04/11] latest --- .../tasks/take-fast-sync-snapshot.test.ts | 36 +++++------ indexer/services/roundtable/src/config.ts | 1 + .../services/roundtable/src/helpers/aws.ts | 50 ++++++++++++++- .../src/tasks/take-fast-sync-snapshot.ts | 61 +++++++++++++++---- .../src/tasks/update-research-environment.ts | 6 +- 5 files changed, 117 insertions(+), 37 deletions(-) diff --git a/indexer/services/roundtable/__tests__/tasks/take-fast-sync-snapshot.test.ts b/indexer/services/roundtable/__tests__/tasks/take-fast-sync-snapshot.test.ts index 5c0c41ed00..5cfd320399 100644 --- a/indexer/services/roundtable/__tests__/tasks/take-fast-sync-snapshot.test.ts +++ b/indexer/services/roundtable/__tests__/tasks/take-fast-sync-snapshot.test.ts @@ -1,48 +1,42 @@ import config from '../../src/config'; import { asMock } from '@dydxprotocol-indexer/dev'; -import { - checkIfExportJobToS3IsOngoing, - checkIfS3ObjectExists, - getMostRecentDBSnapshotIdentifier, - startExportTask, -} from '../../src/helpers/aws'; +import { createDBSnapshot, getMostRecentDBSnapshotIdentifier, startExportTask } from '../../src/helpers/aws'; import takeFastSyncSnapshotTask from '../../src/tasks/take-fast-sync-snapshot'; +import { DateTime } from 'luxon'; jest.mock('../../src/helpers/aws'); describe('fast-sync-export-db-snapshot', () => { + const snapshotIdentifier: string = `${config.FAST_SYNC_SNAPSHOT_IDENTIFIER_PREFIX}-postgres-main-staging-2022-05-03-04-16`; beforeAll(() => { config.RDS_INSTANCE_NAME = 'postgres-main-staging'; }); beforeEach(() => { jest.resetAllMocks(); - asMock(getMostRecentDBSnapshotIdentifier).mockImplementation(async () => Promise.resolve('postgres-main-staging-2022-05-03-04-16')); + asMock(getMostRecentDBSnapshotIdentifier).mockImplementation( + async () => Promise.resolve(snapshotIdentifier), + ); }); afterAll(jest.resetAllMocks); - it('s3Object exists', async () => { - asMock(checkIfS3ObjectExists).mockImplementation(async () => Promise.resolve(true)); - + it('Last snapshot was taken more than interval ago', async () => { await takeFastSyncSnapshotTask(); - expect(checkIfExportJobToS3IsOngoing).not.toHaveBeenCalled(); - expect(startExportTask).not.toHaveBeenCalled(); + expect(createDBSnapshot).toHaveBeenCalled(); + expect(startExportTask).toHaveBeenCalled(); }); - it('export job in progress', async () => { - asMock(checkIfExportJobToS3IsOngoing).mockImplementation( - async () => Promise.resolve(true)); + it('Last snapshot was taken less than interval ago', async () => { + const timestamp: string = DateTime.utc().minus({ minutes: 1 }).toFormat('yyyy-MM-dd-HH-mm'); + asMock(getMostRecentDBSnapshotIdentifier).mockImplementation( + async () => Promise.resolve(`${config.FAST_SYNC_SNAPSHOT_IDENTIFIER_PREFIX}-postgres-main-staging-${timestamp}`), + ); await takeFastSyncSnapshotTask(); + expect(createDBSnapshot).not.toHaveBeenCalled(); expect(startExportTask).not.toHaveBeenCalled(); }); - - it('start export job', async () => { - await takeFastSyncSnapshotTask(); - - expect(startExportTask).toHaveBeenCalled(); - }); }); diff --git a/indexer/services/roundtable/src/config.ts b/indexer/services/roundtable/src/config.ts index 119a17c712..7ce91f2334 100644 --- a/indexer/services/roundtable/src/config.ts +++ b/indexer/services/roundtable/src/config.ts @@ -118,6 +118,7 @@ export const configSchema = { AWS_REGION: parseString(), RESEARCH_SNAPSHOT_S3_BUCKET_ARN: parseString(), FAST_SYNC_SNAPSHOT_S3_BUCKET_ARN: parseString(), + FAST_SYNC_SNAPSHOT_IDENTIFIER_PREFIX: parseString({ default: 'fast-sync' }), ECS_TASK_ROLE_ARN: parseString(), KMS_KEY_ARN: parseString(), RDS_INSTANCE_NAME: parseString(), diff --git a/indexer/services/roundtable/src/helpers/aws.ts b/indexer/services/roundtable/src/helpers/aws.ts index 4ff27039cf..01bac25ee9 100644 --- a/indexer/services/roundtable/src/helpers/aws.ts +++ b/indexer/services/roundtable/src/helpers/aws.ts @@ -22,9 +22,18 @@ export const FAST_SYNC_SNAPSHOT_S3_BUCKET_NAME = config.FAST_SYNC_SNAPSHOT_S3_BU /** * @description Get most recent snapshot identifier for an RDS database. + * @param rds - RDS client + * @param snapshotIdentifierPrefixInclude - Only include snapshots with snapshot identifier + * that starts with prefixInclude + * @param snapshotIdentifierPrefixExclude - Only include snapshots with snapshot identifier + * that does not start with prefixExclude */ // TODO(CLOB-672): Verify this function returns the most recent DB snapshot. -export async function getMostRecentDBSnapshotIdentifier(rds: RDS): Promise { +export async function getMostRecentDBSnapshotIdentifier( + rds: RDS, + snapshotIdentifierPrefixInclude?: string, + snapshotIdentifierPrefixExclude?: string, +): Promise { const awsResponse: RDS.DBSnapshotMessage = await rds.describeDBSnapshots({ DBInstanceIdentifier: config.RDS_INSTANCE_NAME, MaxRecords: 20, // this is the minimum @@ -34,13 +43,48 @@ export async function getMostRecentDBSnapshotIdentifier(rds: RDS): Promise snapshot.DBSnapshotIdentifier && + snapshot.DBSnapshotIdentifier.startsWith(snapshotIdentifierPrefixInclude), + ); + } + if (snapshotIdentifierPrefixExclude !== undefined) { + snapshots = awsResponse.DBSnapshots + .filter((snapshot) => snapshot.DBSnapshotIdentifier && + !snapshot.DBSnapshotIdentifier.startsWith(snapshotIdentifierPrefixExclude), + ); + } + logger.info({ at: `${atStart}getMostRecentDBSnapshotIdentifier`, message: 'Described snapshots for database', - mostRecentSnapshot: awsResponse.DBSnapshots[awsResponse.DBSnapshots.length - 1], + mostRecentSnapshot: snapshots[snapshots.length - 1], }); - return awsResponse.DBSnapshots[awsResponse.DBSnapshots.length - 1].DBSnapshotIdentifier!; + return snapshots[snapshots.length - 1].DBSnapshotIdentifier!; +} + +/** + * @description Create DB snapshot for an RDS database. + */ +export async function createDBSnapshot( + rds: RDS, + snapshotIdentifier: string, + dbInstanceIdentifier: string, +): Promise { + const params = { + DBInstanceIdentifier: dbInstanceIdentifier, + DBSnapshotIdentifier: snapshotIdentifier, + }; + + const awsResponse: RDS.CreateDBSnapshotResult = await rds.createDBSnapshot(params).promise(); + if (awsResponse.DBSnapshot === undefined) { + throw Error(`No DB snapshot was created with identifier: ${snapshotIdentifier}`); + } + return awsResponse.DBSnapshot.DBSnapshotIdentifier!; } /** diff --git a/indexer/services/roundtable/src/tasks/take-fast-sync-snapshot.ts b/indexer/services/roundtable/src/tasks/take-fast-sync-snapshot.ts index e9774164a4..324d97203f 100644 --- a/indexer/services/roundtable/src/tasks/take-fast-sync-snapshot.ts +++ b/indexer/services/roundtable/src/tasks/take-fast-sync-snapshot.ts @@ -4,13 +4,42 @@ import { DateTime } from 'luxon'; import config from '../config'; import { - checkIfExportJobToS3IsOngoing, + createDBSnapshot, FAST_SYNC_SNAPSHOT_S3_BUCKET_NAME, + getMostRecentDBSnapshotIdentifier, startExportTask, } from '../helpers/aws'; const statStart: string = `${config.SERVICE_NAME}.fast_sync_export_db_snapshot`; +/** + * Checks if the difference between two dates is less than a given interval. + * + * @param startDate + * @param endDate + * @param intervalMs + */ +function isDifferenceLessThanInterval( + startDate: string, + endDate: string, + intervalMs: number, +): boolean { + const parseDateString = (dateStr: string): Date => { + const [year, month, day, hour, minute] = dateStr.split('-').map(Number); + return new Date(year, month, day, hour, minute); + }; + + // Parse the date strings + const parsedDate1 = parseDateString(startDate); + const parsedDate2 = parseDateString(endDate); + + // Calculate the difference in milliseconds + const differenceInMilliseconds = Math.abs(parsedDate1.getTime() - parsedDate2.getTime()); + + // Compare with the interval + return differenceInMilliseconds < intervalMs; +} + export default async function runTask(): Promise { const at: string = 'fast-sync-export-db-snapshot#runTask'; logger.info({ at, message: 'Starting task.' }); @@ -18,23 +47,31 @@ export default async function runTask(): Promise { const rds: RDS = new RDS(); const dateString: string = DateTime.utc().toFormat('yyyy-MM-dd-HH-mm'); - const rdsExportIdentifier: string = `${config.RDS_INSTANCE_NAME}-fast-sync-${dateString}`; - - // check if it is being created - const rdsExportCheck: number = Date.now(); - const exportJobOngoing: boolean = await checkIfExportJobToS3IsOngoing(rds, rdsExportIdentifier); - stats.timing(`${statStart}.checkRdsExport`, Date.now() - rdsExportCheck); - - if (exportJobOngoing) { + const rdsExportIdentifier: string = `${config.FAST_SYNC_SNAPSHOT_IDENTIFIER_PREFIX}-${config.RDS_INSTANCE_NAME}-${dateString}`; + // check the time of the last snapshot + const lastSnapshotIdentifier: string = await getMostRecentDBSnapshotIdentifier( + rds, + config.FAST_SYNC_SNAPSHOT_IDENTIFIER_PREFIX, + ); + const s3Date: string = lastSnapshotIdentifier.split(config.RDS_INSTANCE_NAME)[1].slice(1); + if ( + isDifferenceLessThanInterval( + s3Date, + dateString, + config.LOOPS_INTERVAL_MS_TAKE_FAST_SYNC_SNAPSHOTS, + ) + ) { logger.info({ at, - dateString, - message: 'Will wait for export job to finish', + message: 'Last fast sync db snapshot was taken less than the interval ago', + interval: config.LOOPS_INTERVAL_MS_TAKE_FAST_SYNC_SNAPSHOTS, }); return; } + // Create the DB snapshot + await createDBSnapshot(rds, rdsExportIdentifier, config.RDS_INSTANCE_NAME); - // start Export Job if not already started + // start S3 Export Job. const startExport: number = Date.now(); try { const exportData: RDS.ExportTask = await startExportTask( diff --git a/indexer/services/roundtable/src/tasks/update-research-environment.ts b/indexer/services/roundtable/src/tasks/update-research-environment.ts index 3deb3fd66f..ad2017d0d2 100644 --- a/indexer/services/roundtable/src/tasks/update-research-environment.ts +++ b/indexer/services/roundtable/src/tasks/update-research-environment.ts @@ -66,7 +66,11 @@ export default async function runTask(): Promise { // get most recent rds snapshot const startDescribe: number = Date.now(); const dateString: string = DateTime.utc().toFormat('yyyy-MM-dd'); - const mostRecentSnapshot: string = await getMostRecentDBSnapshotIdentifier(rds); + const mostRecentSnapshot: string = await getMostRecentDBSnapshotIdentifier( + rds, + undefined, + config.FAST_SYNC_SNAPSHOT_IDENTIFIER_PREFIX, + ); stats.timing(`${statStart}.describe_rds_snapshots`, Date.now() - startDescribe); // dev example: rds:dev-indexer-apne1-db-2023-06-25-18-34 From 424ca36e1099f2b4e719fb07855e429724cc7cc3 Mon Sep 17 00:00:00 2001 From: Will Liu Date: Wed, 3 Jan 2024 16:14:14 -0500 Subject: [PATCH 05/11] latest --- .../tasks/take-fast-sync-snapshot.test.ts | 11 ++++ .../services/roundtable/src/helpers/aws.ts | 51 +++++++++++++++---- .../src/tasks/take-fast-sync-snapshot.ts | 33 ++++++------ .../src/tasks/update-research-environment.ts | 3 +- 4 files changed, 72 insertions(+), 26 deletions(-) diff --git a/indexer/services/roundtable/__tests__/tasks/take-fast-sync-snapshot.test.ts b/indexer/services/roundtable/__tests__/tasks/take-fast-sync-snapshot.test.ts index 5cfd320399..70cf7f4faf 100644 --- a/indexer/services/roundtable/__tests__/tasks/take-fast-sync-snapshot.test.ts +++ b/indexer/services/roundtable/__tests__/tasks/take-fast-sync-snapshot.test.ts @@ -39,4 +39,15 @@ describe('fast-sync-export-db-snapshot', () => { expect(createDBSnapshot).not.toHaveBeenCalled(); expect(startExportTask).not.toHaveBeenCalled(); }); + + it('No existing snapshot', async () => { + asMock(getMostRecentDBSnapshotIdentifier).mockImplementation( + async () => Promise.resolve(undefined), + ); + + await takeFastSyncSnapshotTask(); + + expect(createDBSnapshot).toHaveBeenCalled(); + expect(startExportTask).toHaveBeenCalled(); + }); }); diff --git a/indexer/services/roundtable/src/helpers/aws.ts b/indexer/services/roundtable/src/helpers/aws.ts index 01bac25ee9..3f9552cd59 100644 --- a/indexer/services/roundtable/src/helpers/aws.ts +++ b/indexer/services/roundtable/src/helpers/aws.ts @@ -33,7 +33,7 @@ export async function getMostRecentDBSnapshotIdentifier( rds: RDS, snapshotIdentifierPrefixInclude?: string, snapshotIdentifierPrefixExclude?: string, -): Promise { +): Promise { const awsResponse: RDS.DBSnapshotMessage = await rds.describeDBSnapshots({ DBInstanceIdentifier: config.RDS_INSTANCE_NAME, MaxRecords: 20, // this is the minimum @@ -46,13 +46,13 @@ export async function getMostRecentDBSnapshotIdentifier( let snapshots: RDS.DBSnapshotList = awsResponse.DBSnapshots; // Only include snapshots with snapshot identifier that starts with prefixInclude if (snapshotIdentifierPrefixInclude !== undefined) { - snapshots = awsResponse.DBSnapshots + snapshots = snapshots .filter((snapshot) => snapshot.DBSnapshotIdentifier && snapshot.DBSnapshotIdentifier.startsWith(snapshotIdentifierPrefixInclude), ); } if (snapshotIdentifierPrefixExclude !== undefined) { - snapshots = awsResponse.DBSnapshots + snapshots = snapshots .filter((snapshot) => snapshot.DBSnapshotIdentifier && !snapshot.DBSnapshotIdentifier.startsWith(snapshotIdentifierPrefixExclude), ); @@ -64,11 +64,12 @@ export async function getMostRecentDBSnapshotIdentifier( mostRecentSnapshot: snapshots[snapshots.length - 1], }); - return snapshots[snapshots.length - 1].DBSnapshotIdentifier!; + return snapshots[snapshots.length - 1]?.DBSnapshotIdentifier; } /** - * @description Create DB snapshot for an RDS database. + * @description Create DB snapshot for an RDS database. Only returns when the + * snapshot is available. */ export async function createDBSnapshot( rds: RDS, @@ -80,11 +81,37 @@ export async function createDBSnapshot( DBSnapshotIdentifier: snapshotIdentifier, }; - const awsResponse: RDS.CreateDBSnapshotResult = await rds.createDBSnapshot(params).promise(); - if (awsResponse.DBSnapshot === undefined) { - throw Error(`No DB snapshot was created with identifier: ${snapshotIdentifier}`); + try { + await rds.createDBSnapshot(params).promise(); + // Polling function to check snapshot status. Only return when the snapshot is available. + const waitForSnapshot = async () => { + // eslint-disable-next-line no-constant-condition + while (true) { + const statusResponse = await rds.describeDBSnapshots( + { DBSnapshotIdentifier: snapshotIdentifier }, + ).promise(); + const snapshot = statusResponse.DBSnapshots![0]; + if (snapshot.Status === 'available') { + return snapshot.DBSnapshotIdentifier!; + } else if (snapshot.Status === 'failed') { + throw Error(`Snapshot creation failed for identifier: ${snapshotIdentifier}`); + } + + // Wait for 1 minute before checking again + await new Promise((resolve) => setTimeout(resolve, 60000)); + } + }; + + return await waitForSnapshot(); + } catch (error) { + logger.error({ + at: `${atStart}createDBSnapshot`, + message: 'Failed to create DB snapshot', + error, + snapshotIdentifier, + }); + throw error; } - return awsResponse.DBSnapshot.DBSnapshotIdentifier!; } /** @@ -192,9 +219,13 @@ export async function startExportTask( rds: RDS, rdsExportIdentifier: string, bucket: string, + isAutomatedSnapshot: boolean, ): Promise { // TODO: Add validation - const sourceArnPrefix = `arn:aws:rds:${config.AWS_REGION}:${config.AWS_ACCOUNT_ID}:snapshot:rds:`; + let sourceArnPrefix: string = `arn:aws:rds:${config.AWS_REGION}:${config.AWS_ACCOUNT_ID}:snapshot:`; + if (isAutomatedSnapshot) { + sourceArnPrefix = sourceArnPrefix.concat('rds:'); + } const awsResponse: RDS.ExportTask = await rds.startExportTask({ ExportTaskIdentifier: rdsExportIdentifier, S3BucketName: bucket, diff --git a/indexer/services/roundtable/src/tasks/take-fast-sync-snapshot.ts b/indexer/services/roundtable/src/tasks/take-fast-sync-snapshot.ts index 324d97203f..6b26d51108 100644 --- a/indexer/services/roundtable/src/tasks/take-fast-sync-snapshot.ts +++ b/indexer/services/roundtable/src/tasks/take-fast-sync-snapshot.ts @@ -49,24 +49,26 @@ export default async function runTask(): Promise { const dateString: string = DateTime.utc().toFormat('yyyy-MM-dd-HH-mm'); const rdsExportIdentifier: string = `${config.FAST_SYNC_SNAPSHOT_IDENTIFIER_PREFIX}-${config.RDS_INSTANCE_NAME}-${dateString}`; // check the time of the last snapshot - const lastSnapshotIdentifier: string = await getMostRecentDBSnapshotIdentifier( + const lastSnapshotIdentifier: string | undefined = await getMostRecentDBSnapshotIdentifier( rds, config.FAST_SYNC_SNAPSHOT_IDENTIFIER_PREFIX, ); - const s3Date: string = lastSnapshotIdentifier.split(config.RDS_INSTANCE_NAME)[1].slice(1); - if ( - isDifferenceLessThanInterval( - s3Date, - dateString, - config.LOOPS_INTERVAL_MS_TAKE_FAST_SYNC_SNAPSHOTS, - ) - ) { - logger.info({ - at, - message: 'Last fast sync db snapshot was taken less than the interval ago', - interval: config.LOOPS_INTERVAL_MS_TAKE_FAST_SYNC_SNAPSHOTS, - }); - return; + if (lastSnapshotIdentifier !== undefined) { + const s3Date: string = lastSnapshotIdentifier.split(config.RDS_INSTANCE_NAME)[1].slice(1); + if ( + isDifferenceLessThanInterval( + s3Date, + dateString, + config.LOOPS_INTERVAL_MS_TAKE_FAST_SYNC_SNAPSHOTS, + ) + ) { + logger.info({ + at, + message: 'Last fast sync db snapshot was taken less than the interval ago', + interval: config.LOOPS_INTERVAL_MS_TAKE_FAST_SYNC_SNAPSHOTS, + }); + return; + } } // Create the DB snapshot await createDBSnapshot(rds, rdsExportIdentifier, config.RDS_INSTANCE_NAME); @@ -78,6 +80,7 @@ export default async function runTask(): Promise { rds, rdsExportIdentifier, FAST_SYNC_SNAPSHOT_S3_BUCKET_NAME, + false, ); logger.info({ diff --git a/indexer/services/roundtable/src/tasks/update-research-environment.ts b/indexer/services/roundtable/src/tasks/update-research-environment.ts index ad2017d0d2..30989a5b2c 100644 --- a/indexer/services/roundtable/src/tasks/update-research-environment.ts +++ b/indexer/services/roundtable/src/tasks/update-research-environment.ts @@ -70,7 +70,7 @@ export default async function runTask(): Promise { rds, undefined, config.FAST_SYNC_SNAPSHOT_IDENTIFIER_PREFIX, - ); + ) as string; stats.timing(`${statStart}.describe_rds_snapshots`, Date.now() - startDescribe); // dev example: rds:dev-indexer-apne1-db-2023-06-25-18-34 @@ -122,6 +122,7 @@ export default async function runTask(): Promise { rds, rdsExportIdentifier, RESEARCH_SNAPSHOT_S3_BUCKET_NAME, + true, ); logger.info({ From 62a00b2b051e654aecf10f19a2b78f2bd85564a5 Mon Sep 17 00:00:00 2001 From: Will Liu Date: Wed, 3 Jan 2024 16:27:43 -0500 Subject: [PATCH 06/11] s3 export --- indexer/services/roundtable/src/config.ts | 1 + .../src/tasks/take-fast-sync-snapshot.ts | 59 ++++++++++--------- 2 files changed, 33 insertions(+), 27 deletions(-) diff --git a/indexer/services/roundtable/src/config.ts b/indexer/services/roundtable/src/config.ts index 7ce91f2334..c030c52b6e 100644 --- a/indexer/services/roundtable/src/config.ts +++ b/indexer/services/roundtable/src/config.ts @@ -119,6 +119,7 @@ export const configSchema = { RESEARCH_SNAPSHOT_S3_BUCKET_ARN: parseString(), FAST_SYNC_SNAPSHOT_S3_BUCKET_ARN: parseString(), FAST_SYNC_SNAPSHOT_IDENTIFIER_PREFIX: parseString({ default: 'fast-sync' }), + EXPORT_FAST_SYNC_SNAPSHOTS_TO_S3: parseBoolean({ default: false }), ECS_TASK_ROLE_ARN: parseString(), KMS_KEY_ARN: parseString(), RDS_INSTANCE_NAME: parseString(), diff --git a/indexer/services/roundtable/src/tasks/take-fast-sync-snapshot.ts b/indexer/services/roundtable/src/tasks/take-fast-sync-snapshot.ts index 6b26d51108..8e3206e7e0 100644 --- a/indexer/services/roundtable/src/tasks/take-fast-sync-snapshot.ts +++ b/indexer/services/roundtable/src/tasks/take-fast-sync-snapshot.ts @@ -62,6 +62,7 @@ export default async function runTask(): Promise { config.LOOPS_INTERVAL_MS_TAKE_FAST_SYNC_SNAPSHOTS, ) ) { + stats.increment(`${statStart}.existingDbSnapshot`, 1); logger.info({ at, message: 'Last fast sync db snapshot was taken less than the interval ago', @@ -71,39 +72,43 @@ export default async function runTask(): Promise { } } // Create the DB snapshot + const startSnapshot: number = Date.now(); await createDBSnapshot(rds, rdsExportIdentifier, config.RDS_INSTANCE_NAME); + stats.timing(`${statStart}.createDbSnapshot`, Date.now() - startSnapshot); // start S3 Export Job. - const startExport: number = Date.now(); - try { - const exportData: RDS.ExportTask = await startExportTask( - rds, - rdsExportIdentifier, - FAST_SYNC_SNAPSHOT_S3_BUCKET_NAME, - false, - ); + if (config.EXPORT_FAST_SYNC_SNAPSHOTS_TO_S3) { + const startExport: number = Date.now(); + try { + const exportData: RDS.ExportTask = await startExportTask( + rds, + rdsExportIdentifier, + FAST_SYNC_SNAPSHOT_S3_BUCKET_NAME, + false, + ); - logger.info({ - at, - message: 'Started an export task', - exportData, - }); - } catch (error) { // TODO handle this by finding the most recent snapshot earlier - const message: InfoObject = { - at, - message: 'export to S3 failed', - error, - }; + logger.info({ + at, + message: 'Started an export task', + exportData, + }); + } catch (error) { // TODO handle this by finding the most recent snapshot earlier + const message: InfoObject = { + at, + message: 'export to S3 failed', + error, + }; - if (error.name === 'DBSnapshotNotFound') { - stats.increment(`${statStart}.no_s3_snapshot`, 1); + if (error.name === 'DBSnapshotNotFound') { + stats.increment(`${statStart}.no_s3_snapshot`, 1); - logger.info(message); - return; - } + logger.info(message); + return; + } - logger.error(message); - } finally { - stats.timing(`${statStart}.rdsSnapshotExport`, Date.now() - startExport); + logger.error(message); + } finally { + stats.timing(`${statStart}.rdsSnapshotExport`, Date.now() - startExport); + } } } From 8460623cdef1991374b8dd59c74cf7644107b2a9 Mon Sep 17 00:00:00 2001 From: Will Liu Date: Thu, 4 Jan 2024 10:33:05 -0500 Subject: [PATCH 07/11] fix --- indexer/services/roundtable/.env.test | 3 +- .../tasks/take-fast-sync-snapshot.test.ts | 5 +- indexer/services/roundtable/src/config.ts | 4 +- .../services/roundtable/src/helpers/aws.ts | 23 +++------- .../services/roundtable/src/helpers/sql.ts | 4 +- .../src/tasks/take-fast-sync-snapshot.ts | 46 ++----------------- .../src/tasks/update-research-environment.ts | 17 ++----- 7 files changed, 21 insertions(+), 81 deletions(-) diff --git a/indexer/services/roundtable/.env.test b/indexer/services/roundtable/.env.test index aa153fcf6b..f505b5f05a 100644 --- a/indexer/services/roundtable/.env.test +++ b/indexer/services/roundtable/.env.test @@ -6,8 +6,7 @@ DB_PASSWORD=dydxserver123 DB_PORT=5436 AWS_ACCOUNT_ID=invalid-test-account-id AWS_REGION=invalid-test-region -RESEARCH_SNAPSHOT_S3_BUCKET_ARN=invalid-research-test-bucket -FAST_SYNC_SNAPSHOT_S3_BUCKET_ARN=invalid-snapshot-test-bucket +S3_BUCKET_ARN=invalid-test-bucket ECS_TASK_ROLE_ARN=invalid-test-arn KMS_KEY_ARN=invalid-kms-key-arn RDS_INSTANCE_NAME=invalid-rds-instance-name diff --git a/indexer/services/roundtable/__tests__/tasks/take-fast-sync-snapshot.test.ts b/indexer/services/roundtable/__tests__/tasks/take-fast-sync-snapshot.test.ts index 70cf7f4faf..c41356c6eb 100644 --- a/indexer/services/roundtable/__tests__/tasks/take-fast-sync-snapshot.test.ts +++ b/indexer/services/roundtable/__tests__/tasks/take-fast-sync-snapshot.test.ts @@ -1,6 +1,6 @@ import config from '../../src/config'; import { asMock } from '@dydxprotocol-indexer/dev'; -import { createDBSnapshot, getMostRecentDBSnapshotIdentifier, startExportTask } from '../../src/helpers/aws'; +import { createDBSnapshot, getMostRecentDBSnapshotIdentifier } from '../../src/helpers/aws'; import takeFastSyncSnapshotTask from '../../src/tasks/take-fast-sync-snapshot'; import { DateTime } from 'luxon'; @@ -25,7 +25,6 @@ describe('fast-sync-export-db-snapshot', () => { await takeFastSyncSnapshotTask(); expect(createDBSnapshot).toHaveBeenCalled(); - expect(startExportTask).toHaveBeenCalled(); }); it('Last snapshot was taken less than interval ago', async () => { @@ -37,7 +36,6 @@ describe('fast-sync-export-db-snapshot', () => { await takeFastSyncSnapshotTask(); expect(createDBSnapshot).not.toHaveBeenCalled(); - expect(startExportTask).not.toHaveBeenCalled(); }); it('No existing snapshot', async () => { @@ -48,6 +46,5 @@ describe('fast-sync-export-db-snapshot', () => { await takeFastSyncSnapshotTask(); expect(createDBSnapshot).toHaveBeenCalled(); - expect(startExportTask).toHaveBeenCalled(); }); }); diff --git a/indexer/services/roundtable/src/config.ts b/indexer/services/roundtable/src/config.ts index c030c52b6e..9ec482f8d5 100644 --- a/indexer/services/roundtable/src/config.ts +++ b/indexer/services/roundtable/src/config.ts @@ -116,10 +116,8 @@ export const configSchema = { // Update research environment AWS_ACCOUNT_ID: parseString(), AWS_REGION: parseString(), - RESEARCH_SNAPSHOT_S3_BUCKET_ARN: parseString(), - FAST_SYNC_SNAPSHOT_S3_BUCKET_ARN: parseString(), + S3_BUCKET_ARN: parseString(), FAST_SYNC_SNAPSHOT_IDENTIFIER_PREFIX: parseString({ default: 'fast-sync' }), - EXPORT_FAST_SYNC_SNAPSHOTS_TO_S3: parseBoolean({ default: false }), ECS_TASK_ROLE_ARN: parseString(), KMS_KEY_ARN: parseString(), RDS_INSTANCE_NAME: parseString(), diff --git a/indexer/services/roundtable/src/helpers/aws.ts b/indexer/services/roundtable/src/helpers/aws.ts index 3f9552cd59..250e460972 100644 --- a/indexer/services/roundtable/src/helpers/aws.ts +++ b/indexer/services/roundtable/src/helpers/aws.ts @@ -16,9 +16,8 @@ enum ExportTaskStatus { COMPLETE = 'complete', } -export const RESEARCH_SNAPSHOT_S3_BUCKET_NAME = config.RESEARCH_SNAPSHOT_S3_BUCKET_ARN.split(':::')[1]; -export const RESEARCH_SNAPSHOT_S3_LOCATION_PREFIX = `s3://${RESEARCH_SNAPSHOT_S3_BUCKET_NAME}`; -export const FAST_SYNC_SNAPSHOT_S3_BUCKET_NAME = config.FAST_SYNC_SNAPSHOT_S3_BUCKET_ARN.split(':::')[1]; +export const S3_BUCKET_NAME = config.S3_BUCKET_ARN.split(':::')[1]; +export const S3_LOCATION_PREFIX = `s3://${S3_BUCKET_NAME}`; /** * @description Get most recent snapshot identifier for an RDS database. @@ -117,12 +116,9 @@ export async function createDBSnapshot( /** * @description Check if an S3 Object already exists. */ -export async function checkIfS3ObjectExists( - s3: S3, - s3Date: string, - bucket: string, -): Promise { +export async function checkIfS3ObjectExists(s3: S3, s3Date: string): Promise { const at: string = `${atStart}checkIfS3ObjectExists`; + const bucket: string = S3_BUCKET_NAME; const key: string = `${config.RDS_INSTANCE_NAME}-${s3Date}/export_info_${config.RDS_INSTANCE_NAME}-${s3Date}.json`; logger.info({ @@ -218,17 +214,12 @@ export async function checkIfExportJobToS3IsOngoing( export async function startExportTask( rds: RDS, rdsExportIdentifier: string, - bucket: string, - isAutomatedSnapshot: boolean, ): Promise { // TODO: Add validation - let sourceArnPrefix: string = `arn:aws:rds:${config.AWS_REGION}:${config.AWS_ACCOUNT_ID}:snapshot:`; - if (isAutomatedSnapshot) { - sourceArnPrefix = sourceArnPrefix.concat('rds:'); - } + const sourceArnPrefix = `arn:aws:rds:${config.AWS_REGION}:${config.AWS_ACCOUNT_ID}:snapshot:rds:`; const awsResponse: RDS.ExportTask = await rds.startExportTask({ ExportTaskIdentifier: rdsExportIdentifier, - S3BucketName: bucket, + S3BucketName: S3_BUCKET_NAME, KmsKeyId: config.KMS_KEY_ARN, IamRoleArn: config.ECS_TASK_ROLE_ARN, SourceArn: `${sourceArnPrefix}${rdsExportIdentifier}`, @@ -296,7 +287,7 @@ export async function startAthenaQuery( Database: config.ATHENA_DATABASE_NAME, }, ResultConfiguration: { - OutputLocation: `${RESEARCH_SNAPSHOT_S3_LOCATION_PREFIX}/output/${timestamp}`, + OutputLocation: `${S3_LOCATION_PREFIX}/output/${timestamp}`, }, WorkGroup: config.ATHENA_WORKING_GROUP, }).promise(); diff --git a/indexer/services/roundtable/src/helpers/sql.ts b/indexer/services/roundtable/src/helpers/sql.ts index 43a3ae9ff0..24175fe557 100644 --- a/indexer/services/roundtable/src/helpers/sql.ts +++ b/indexer/services/roundtable/src/helpers/sql.ts @@ -1,4 +1,4 @@ -import { RESEARCH_SNAPSHOT_S3_LOCATION_PREFIX } from './aws'; +import { S3_LOCATION_PREFIX } from './aws'; export function castToTimestamp(column: string): string { return `CAST("${column}" AS timestamp) as "${column}"`; @@ -23,7 +23,7 @@ export function getExternalAthenaTableCreationStatement( 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' - LOCATION '${RESEARCH_SNAPSHOT_S3_LOCATION_PREFIX}/${rdsExportIdentifier}/dydx/public.${tableName}' + LOCATION '${S3_LOCATION_PREFIX}/${rdsExportIdentifier}/dydx/public.${tableName}' TBLPROPERTIES ('has_encrypted_data'='false'); `; } diff --git a/indexer/services/roundtable/src/tasks/take-fast-sync-snapshot.ts b/indexer/services/roundtable/src/tasks/take-fast-sync-snapshot.ts index 8e3206e7e0..66d88dd539 100644 --- a/indexer/services/roundtable/src/tasks/take-fast-sync-snapshot.ts +++ b/indexer/services/roundtable/src/tasks/take-fast-sync-snapshot.ts @@ -1,13 +1,11 @@ -import { InfoObject, logger, stats } from '@dydxprotocol-indexer/base'; +import { logger, stats } from '@dydxprotocol-indexer/base'; import RDS from 'aws-sdk/clients/rds'; import { DateTime } from 'luxon'; import config from '../config'; import { createDBSnapshot, - FAST_SYNC_SNAPSHOT_S3_BUCKET_NAME, getMostRecentDBSnapshotIdentifier, - startExportTask, } from '../helpers/aws'; const statStart: string = `${config.SERVICE_NAME}.fast_sync_export_db_snapshot`; @@ -47,7 +45,7 @@ export default async function runTask(): Promise { const rds: RDS = new RDS(); const dateString: string = DateTime.utc().toFormat('yyyy-MM-dd-HH-mm'); - const rdsExportIdentifier: string = `${config.FAST_SYNC_SNAPSHOT_IDENTIFIER_PREFIX}-${config.RDS_INSTANCE_NAME}-${dateString}`; + const snapshotIdentifier: string = `${config.FAST_SYNC_SNAPSHOT_IDENTIFIER_PREFIX}-${config.RDS_INSTANCE_NAME}-${dateString}`; // check the time of the last snapshot const lastSnapshotIdentifier: string | undefined = await getMostRecentDBSnapshotIdentifier( rds, @@ -73,42 +71,8 @@ export default async function runTask(): Promise { } // Create the DB snapshot const startSnapshot: number = Date.now(); - await createDBSnapshot(rds, rdsExportIdentifier, config.RDS_INSTANCE_NAME); + const createdSnapshotIdentifier: string = await + createDBSnapshot(rds, snapshotIdentifier, config.RDS_INSTANCE_NAME); + logger.info({ at, message: 'Created DB snapshot.', snapshotIdentifier: createdSnapshotIdentifier }); stats.timing(`${statStart}.createDbSnapshot`, Date.now() - startSnapshot); - - // start S3 Export Job. - if (config.EXPORT_FAST_SYNC_SNAPSHOTS_TO_S3) { - const startExport: number = Date.now(); - try { - const exportData: RDS.ExportTask = await startExportTask( - rds, - rdsExportIdentifier, - FAST_SYNC_SNAPSHOT_S3_BUCKET_NAME, - false, - ); - - logger.info({ - at, - message: 'Started an export task', - exportData, - }); - } catch (error) { // TODO handle this by finding the most recent snapshot earlier - const message: InfoObject = { - at, - message: 'export to S3 failed', - error, - }; - - if (error.name === 'DBSnapshotNotFound') { - stats.increment(`${statStart}.no_s3_snapshot`, 1); - - logger.info(message); - return; - } - - logger.error(message); - } finally { - stats.timing(`${statStart}.rdsSnapshotExport`, Date.now() - startExport); - } - } } diff --git a/indexer/services/roundtable/src/tasks/update-research-environment.ts b/indexer/services/roundtable/src/tasks/update-research-environment.ts index 30989a5b2c..c18fb94f8c 100644 --- a/indexer/services/roundtable/src/tasks/update-research-environment.ts +++ b/indexer/services/roundtable/src/tasks/update-research-environment.ts @@ -15,7 +15,7 @@ import { checkIfS3ObjectExists, getMostRecentDBSnapshotIdentifier, startExportTask, - startAthenaQuery, RESEARCH_SNAPSHOT_S3_BUCKET_NAME, + startAthenaQuery, } from '../helpers/aws'; import { AthenaTableDDLQueries } from '../helpers/types'; import * as athenaAssetPositions from '../lib/athena-ddl-tables/asset_positions'; @@ -79,14 +79,10 @@ export default async function runTask(): Promise { // check if s3 object exists const startS3Check: number = Date.now(); - const s3ObjectExists: boolean = await checkIfS3ObjectExists( - s3, - s3Date, - RESEARCH_SNAPSHOT_S3_BUCKET_NAME, - ); + const s3ObjectExists: boolean = await checkIfS3ObjectExists(s3, s3Date); stats.timing(`${statStart}.checkS3Object`, Date.now() - startS3Check); - const rdsExportIdentifier: string = `${config.RDS_INSTANCE_NAME}-research-${s3Date}`; + const rdsExportIdentifier: string = `${config.RDS_INSTANCE_NAME}-${s3Date}`; // If the s3 object exists, attempt to add Athena tables or if we are skipping for test purposes if (s3ObjectExists || config.SKIP_TO_ATHENA_TABLE_WRITING) { @@ -118,12 +114,7 @@ export default async function runTask(): Promise { // start Export Job if S3 Object does not exist const startExport: number = Date.now(); try { - const exportData: RDS.ExportTask = await startExportTask( - rds, - rdsExportIdentifier, - RESEARCH_SNAPSHOT_S3_BUCKET_NAME, - true, - ); + const exportData: RDS.ExportTask = await startExportTask(rds, rdsExportIdentifier); logger.info({ at, From 4de0f177720c8d65348a09f385e4aa56f871dc7f Mon Sep 17 00:00:00 2001 From: Will Liu Date: Thu, 4 Jan 2024 10:35:37 -0500 Subject: [PATCH 08/11] update cmt --- indexer/services/roundtable/src/helpers/aws.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/indexer/services/roundtable/src/helpers/aws.ts b/indexer/services/roundtable/src/helpers/aws.ts index 250e460972..89e29dc591 100644 --- a/indexer/services/roundtable/src/helpers/aws.ts +++ b/indexer/services/roundtable/src/helpers/aws.ts @@ -23,9 +23,9 @@ export const S3_LOCATION_PREFIX = `s3://${S3_BUCKET_NAME}`; * @description Get most recent snapshot identifier for an RDS database. * @param rds - RDS client * @param snapshotIdentifierPrefixInclude - Only include snapshots with snapshot identifier - * that starts with prefixInclude - * @param snapshotIdentifierPrefixExclude - Only include snapshots with snapshot identifier - * that does not start with prefixExclude + * that starts with snapshotIdentifierPrefixInclude + * @param snapshotIdentifierPrefixExclude - Exclude snapshots with snapshot identifier + * that starts with snapshotIdentifierPrefixExclude */ // TODO(CLOB-672): Verify this function returns the most recent DB snapshot. export async function getMostRecentDBSnapshotIdentifier( From f5ad0415ba891e604527eb4d9553428b1379d17f Mon Sep 17 00:00:00 2001 From: Will Liu Date: Thu, 4 Jan 2024 13:43:52 -0500 Subject: [PATCH 09/11] address cmts --- .../services/roundtable/src/helpers/aws.ts | 142 ++++++++++++++---- .../src/tasks/take-fast-sync-snapshot.ts | 5 + 2 files changed, 114 insertions(+), 33 deletions(-) diff --git a/indexer/services/roundtable/src/helpers/aws.ts b/indexer/services/roundtable/src/helpers/aws.ts index 89e29dc591..6db10c8bef 100644 --- a/indexer/services/roundtable/src/helpers/aws.ts +++ b/indexer/services/roundtable/src/helpers/aws.ts @@ -19,6 +19,71 @@ enum ExportTaskStatus { export const S3_BUCKET_NAME = config.S3_BUCKET_ARN.split(':::')[1]; export const S3_LOCATION_PREFIX = `s3://${S3_BUCKET_NAME}`; +/** + * Delete snapshots for the RDS instance older than the specified number of days. + * Defaults to 7 days. + * @param rds + * @param daysOld + */ +export async function deleteOldFastSyncSnapshots(rds: RDS, daysOld: number = 7): Promise { + try { + const cutoffTime: number = new Date().getTime() - daysOld * 24 * 60 * 60 * 1000; + let marker; + do { + const response: RDS.DBSnapshotMessage = await rds.describeDBSnapshots({ + DBInstanceIdentifier: config.RDS_INSTANCE_NAME, + MaxRecords: 20, // Maximum number of records per page + Marker: marker, // Marker for pagination + }).promise(); + + if (response.DBSnapshots === undefined) { + logger.error({ + at: `${atStart}deleteOldSnapshots`, + message: `No DB snapshots found with identifier: ${config.RDS_INSTANCE_NAME}`, + }); + return; + } + + // Filter for fast sync snapshots older than cutoffTime + const oldFastSyncSnapshots = response.DBSnapshots.filter((snapshot) => { + if (!snapshot.DBSnapshotIdentifier!.startsWith( + config.FAST_SYNC_SNAPSHOT_IDENTIFIER_PREFIX, + )) { + return false; + } + const snapshotDate = snapshot.SnapshotCreateTime!.getTime(); + return snapshotDate < cutoffTime; + }); + + // Delete each old snapshot + for (const snapshot of oldFastSyncSnapshots) { + logger.info({ + at: `${atStart}deleteOldSnapshots`, + message: 'Deleting snapshot', + snapshotIdentifier: snapshot.DBSnapshotIdentifier, + }); + const snapshotResult: RDS.Types.DeleteDBSnapshotResult = await rds.deleteDBSnapshot( + { DBSnapshotIdentifier: snapshot.DBSnapshotIdentifier! }, + ).promise(); + logger.info({ + at: `${atStart}deleteOldSnapshots`, + message: 'Snapshot deleted', + snapshotIdentifier: snapshotResult.DBSnapshot!.DBSnapshotIdentifier!, + }); + } + + marker = response.Marker; + } while (marker); + } catch (error) { + logger.error({ + at: `${atStart}deleteOldSnapshots`, + message: 'Error deleting old snapshots', + error, + }); + throw error; + } +} + /** * @description Get most recent snapshot identifier for an RDS database. * @param rds - RDS client @@ -33,37 +98,47 @@ export async function getMostRecentDBSnapshotIdentifier( snapshotIdentifierPrefixInclude?: string, snapshotIdentifierPrefixExclude?: string, ): Promise { - const awsResponse: RDS.DBSnapshotMessage = await rds.describeDBSnapshots({ - DBInstanceIdentifier: config.RDS_INSTANCE_NAME, - MaxRecords: 20, // this is the minimum - }).promise(); + let snapshots: RDS.DBSnapshotList = []; + let marker: string | undefined; + + do { + const awsResponse: RDS.DBSnapshotMessage = await rds.describeDBSnapshots({ + DBInstanceIdentifier: config.RDS_INSTANCE_NAME, + MaxRecords: 20, // Maximum number of records per page + Marker: marker, // Marker for pagination + }).promise(); - if (awsResponse.DBSnapshots === undefined) { - throw Error(`No DB snapshots found with identifier: ${config.RDS_INSTANCE_NAME}`); - } + if (awsResponse.DBSnapshots === undefined) { + throw Error(`No DB snapshots found with identifier: ${config.RDS_INSTANCE_NAME}`); + } + + snapshots = snapshots.concat(awsResponse.DBSnapshots); + marker = awsResponse.Marker; + } while (marker); - let snapshots: RDS.DBSnapshotList = awsResponse.DBSnapshots; - // Only include snapshots with snapshot identifier that starts with prefixInclude + // Filter snapshots based on include/exclude prefixes if (snapshotIdentifierPrefixInclude !== undefined) { snapshots = snapshots .filter((snapshot) => snapshot.DBSnapshotIdentifier && - snapshot.DBSnapshotIdentifier.startsWith(snapshotIdentifierPrefixInclude), - ); + snapshot.DBSnapshotIdentifier.startsWith(snapshotIdentifierPrefixInclude)); } if (snapshotIdentifierPrefixExclude !== undefined) { snapshots = snapshots .filter((snapshot) => snapshot.DBSnapshotIdentifier && - !snapshot.DBSnapshotIdentifier.startsWith(snapshotIdentifierPrefixExclude), - ); + !snapshot.DBSnapshotIdentifier.startsWith(snapshotIdentifierPrefixExclude)); } + // Sort snapshots by creation time in descending order + snapshots.sort((a, b) => b.SnapshotCreateTime!.getTime() - a.SnapshotCreateTime!.getTime()); + logger.info({ at: `${atStart}getMostRecentDBSnapshotIdentifier`, message: 'Described snapshots for database', - mostRecentSnapshot: snapshots[snapshots.length - 1], + mostRecentSnapshot: snapshots[0], }); - return snapshots[snapshots.length - 1]?.DBSnapshotIdentifier; + // Return the latest snapshot identifier + return snapshots[0]?.DBSnapshotIdentifier; } /** @@ -82,26 +157,27 @@ export async function createDBSnapshot( try { await rds.createDBSnapshot(params).promise(); - // Polling function to check snapshot status. Only return when the snapshot is available. - const waitForSnapshot = async () => { - // eslint-disable-next-line no-constant-condition - while (true) { - const statusResponse = await rds.describeDBSnapshots( - { DBSnapshotIdentifier: snapshotIdentifier }, - ).promise(); - const snapshot = statusResponse.DBSnapshots![0]; - if (snapshot.Status === 'available') { - return snapshot.DBSnapshotIdentifier!; - } else if (snapshot.Status === 'failed') { - throw Error(`Snapshot creation failed for identifier: ${snapshotIdentifier}`); - } - // Wait for 1 minute before checking again - await new Promise((resolve) => setTimeout(resolve, 60000)); - } - }; + // Wait for the DB snapshot to become available with the specified waiter configuration + await rds.waitFor('dBSnapshotAvailable', { + DBSnapshotIdentifier: snapshotIdentifier, + $waiter: { + delay: 60, // 60 seconds delay between each request + maxAttempts: 10, // Maximum of 10 attempts + }, + }).promise(); + + // Once it's available, retrieve its details + const statusResponse = await rds.describeDBSnapshots( + { DBSnapshotIdentifier: snapshotIdentifier }, + ).promise(); - return await waitForSnapshot(); + const snapshot = statusResponse.DBSnapshots![0]; + if (snapshot.Status === 'available') { + return snapshot.DBSnapshotIdentifier!; + } else { + throw Error(`Snapshot is not in the available state: Status is ${snapshot.Status}`); + } } catch (error) { logger.error({ at: `${atStart}createDBSnapshot`, diff --git a/indexer/services/roundtable/src/tasks/take-fast-sync-snapshot.ts b/indexer/services/roundtable/src/tasks/take-fast-sync-snapshot.ts index 66d88dd539..7e17973640 100644 --- a/indexer/services/roundtable/src/tasks/take-fast-sync-snapshot.ts +++ b/indexer/services/roundtable/src/tasks/take-fast-sync-snapshot.ts @@ -5,6 +5,7 @@ import { DateTime } from 'luxon'; import config from '../config'; import { createDBSnapshot, + deleteOldFastSyncSnapshots, getMostRecentDBSnapshotIdentifier, } from '../helpers/aws'; @@ -75,4 +76,8 @@ export default async function runTask(): Promise { createDBSnapshot(rds, snapshotIdentifier, config.RDS_INSTANCE_NAME); logger.info({ at, message: 'Created DB snapshot.', snapshotIdentifier: createdSnapshotIdentifier }); stats.timing(`${statStart}.createDbSnapshot`, Date.now() - startSnapshot); + const startDeleteOldSnapshot: number = Date.now(); + // Delete old snapshots. + await deleteOldFastSyncSnapshots(rds); + stats.timing(`${statStart}.deleteOldSnapshots`, Date.now() - startDeleteOldSnapshot); } From fa75ea878380b737ab09f9b91de4041cb487ed9d Mon Sep 17 00:00:00 2001 From: Will Liu Date: Thu, 4 Jan 2024 13:49:09 -0500 Subject: [PATCH 10/11] lint --- .../__tests__/tasks/take-fast-sync-snapshot.test.ts | 9 ++++++++- indexer/services/roundtable/src/config.ts | 2 +- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/indexer/services/roundtable/__tests__/tasks/take-fast-sync-snapshot.test.ts b/indexer/services/roundtable/__tests__/tasks/take-fast-sync-snapshot.test.ts index c41356c6eb..4fcea6d903 100644 --- a/indexer/services/roundtable/__tests__/tasks/take-fast-sync-snapshot.test.ts +++ b/indexer/services/roundtable/__tests__/tasks/take-fast-sync-snapshot.test.ts @@ -1,6 +1,10 @@ import config from '../../src/config'; import { asMock } from '@dydxprotocol-indexer/dev'; -import { createDBSnapshot, getMostRecentDBSnapshotIdentifier } from '../../src/helpers/aws'; +import { + createDBSnapshot, + deleteOldFastSyncSnapshots, + getMostRecentDBSnapshotIdentifier, +} from '../../src/helpers/aws'; import takeFastSyncSnapshotTask from '../../src/tasks/take-fast-sync-snapshot'; import { DateTime } from 'luxon'; @@ -25,6 +29,7 @@ describe('fast-sync-export-db-snapshot', () => { await takeFastSyncSnapshotTask(); expect(createDBSnapshot).toHaveBeenCalled(); + expect(deleteOldFastSyncSnapshots).toHaveBeenCalled(); }); it('Last snapshot was taken less than interval ago', async () => { @@ -36,6 +41,7 @@ describe('fast-sync-export-db-snapshot', () => { await takeFastSyncSnapshotTask(); expect(createDBSnapshot).not.toHaveBeenCalled(); + expect(deleteOldFastSyncSnapshots).not.toHaveBeenCalled(); }); it('No existing snapshot', async () => { @@ -46,5 +52,6 @@ describe('fast-sync-export-db-snapshot', () => { await takeFastSyncSnapshotTask(); expect(createDBSnapshot).toHaveBeenCalled(); + expect(deleteOldFastSyncSnapshots).toHaveBeenCalled(); }); }); diff --git a/indexer/services/roundtable/src/config.ts b/indexer/services/roundtable/src/config.ts index 9ec482f8d5..552744b010 100644 --- a/indexer/services/roundtable/src/config.ts +++ b/indexer/services/roundtable/src/config.ts @@ -41,7 +41,7 @@ export const configSchema = { LOOPS_ORDERBOOK_INSTRUMENTATION: parseBoolean({ default: true }), LOOPS_CANCEL_STALE_ORDERS: parseBoolean({ default: true }), LOOPS_ENABLED_UPDATE_RESEARCH_ENVIRONMENT: parseBoolean({ default: false }), - LOOPS_ENABLED_TAKE_FAST_SYNC_SNAPSHOTS: parseBoolean({ default: false }), + LOOPS_ENABLED_TAKE_FAST_SYNC_SNAPSHOTS: parseBoolean({ default: true }), LOOPS_ENABLED_TRACK_LAG: parseBoolean({ default: false }), LOOPS_ENABLED_REMOVE_OLD_ORDER_UPDATES: parseBoolean({ default: true }), LOOPS_ENABLED_AGGREGATE_TRADING_REWARDS: parseBoolean({ default: true }), From 90b2f58cdd7ebcde376897800d7fa7d8e1772b2e Mon Sep 17 00:00:00 2001 From: Will Liu Date: Fri, 5 Jan 2024 15:09:26 -0500 Subject: [PATCH 11/11] address cmts --- indexer/packages/base/src/constants.ts | 1 + .../tasks/take-fast-sync-snapshot.test.ts | 4 -- indexer/services/roundtable/src/config.ts | 5 ++ indexer/services/roundtable/src/index.ts | 9 +++ .../tasks/delete-old-fast-sync-snapshots.ts | 19 ++++++ .../src/tasks/take-fast-sync-snapshot.ts | 63 +++++++++---------- 6 files changed, 64 insertions(+), 37 deletions(-) create mode 100644 indexer/services/roundtable/src/tasks/delete-old-fast-sync-snapshots.ts diff --git a/indexer/packages/base/src/constants.ts b/indexer/packages/base/src/constants.ts index 0699deea56..da817304ba 100644 --- a/indexer/packages/base/src/constants.ts +++ b/indexer/packages/base/src/constants.ts @@ -13,3 +13,4 @@ export const FIVE_MINUTES_IN_MILLISECONDS: number = 5 * ONE_MINUTE_IN_MILLISECON export const TEN_MINUTES_IN_MILLISECONDS: number = 10 * ONE_MINUTE_IN_MILLISECONDS; export const ONE_HOUR_IN_MILLISECONDS: number = 60 * ONE_MINUTE_IN_MILLISECONDS; export const FOUR_HOURS_IN_MILLISECONDS: number = 4 * ONE_HOUR_IN_MILLISECONDS; +export const ONE_DAY_IN_MILLISECONDS: number = 24 * ONE_HOUR_IN_MILLISECONDS; diff --git a/indexer/services/roundtable/__tests__/tasks/take-fast-sync-snapshot.test.ts b/indexer/services/roundtable/__tests__/tasks/take-fast-sync-snapshot.test.ts index 4fcea6d903..cf8f22a1b8 100644 --- a/indexer/services/roundtable/__tests__/tasks/take-fast-sync-snapshot.test.ts +++ b/indexer/services/roundtable/__tests__/tasks/take-fast-sync-snapshot.test.ts @@ -2,7 +2,6 @@ import config from '../../src/config'; import { asMock } from '@dydxprotocol-indexer/dev'; import { createDBSnapshot, - deleteOldFastSyncSnapshots, getMostRecentDBSnapshotIdentifier, } from '../../src/helpers/aws'; import takeFastSyncSnapshotTask from '../../src/tasks/take-fast-sync-snapshot'; @@ -29,7 +28,6 @@ describe('fast-sync-export-db-snapshot', () => { await takeFastSyncSnapshotTask(); expect(createDBSnapshot).toHaveBeenCalled(); - expect(deleteOldFastSyncSnapshots).toHaveBeenCalled(); }); it('Last snapshot was taken less than interval ago', async () => { @@ -41,7 +39,6 @@ describe('fast-sync-export-db-snapshot', () => { await takeFastSyncSnapshotTask(); expect(createDBSnapshot).not.toHaveBeenCalled(); - expect(deleteOldFastSyncSnapshots).not.toHaveBeenCalled(); }); it('No existing snapshot', async () => { @@ -52,6 +49,5 @@ describe('fast-sync-export-db-snapshot', () => { await takeFastSyncSnapshotTask(); expect(createDBSnapshot).toHaveBeenCalled(); - expect(deleteOldFastSyncSnapshots).toHaveBeenCalled(); }); }); diff --git a/indexer/services/roundtable/src/config.ts b/indexer/services/roundtable/src/config.ts index 552744b010..ef0978def3 100644 --- a/indexer/services/roundtable/src/config.ts +++ b/indexer/services/roundtable/src/config.ts @@ -16,6 +16,7 @@ import { ONE_SECOND_IN_MILLISECONDS, TEN_SECONDS_IN_MILLISECONDS, FOUR_HOURS_IN_MILLISECONDS, + ONE_DAY_IN_MILLISECONDS, } from '@dydxprotocol-indexer/base'; import { kafkaConfigSchema, @@ -42,6 +43,7 @@ export const configSchema = { LOOPS_CANCEL_STALE_ORDERS: parseBoolean({ default: true }), LOOPS_ENABLED_UPDATE_RESEARCH_ENVIRONMENT: parseBoolean({ default: false }), LOOPS_ENABLED_TAKE_FAST_SYNC_SNAPSHOTS: parseBoolean({ default: true }), + LOOPS_ENABLED_DELETE_OLD_FAST_SYNC_SNAPSHOTS: parseBoolean({ default: true }), LOOPS_ENABLED_TRACK_LAG: parseBoolean({ default: false }), LOOPS_ENABLED_REMOVE_OLD_ORDER_UPDATES: parseBoolean({ default: true }), LOOPS_ENABLED_AGGREGATE_TRADING_REWARDS: parseBoolean({ default: true }), @@ -71,6 +73,9 @@ export const configSchema = { LOOPS_INTERVAL_MS_TAKE_FAST_SYNC_SNAPSHOTS: parseInteger({ default: FOUR_HOURS_IN_MILLISECONDS, }), + LOOPS_INTERVAL_MS_DELETE_OLD_FAST_SYNC_SNAPSHOTS: parseInteger({ + default: ONE_DAY_IN_MILLISECONDS, + }), LOOPS_INTERVAL_MS_UPDATE_COMPLIANCE_DATA: parseInteger({ default: FIVE_MINUTES_IN_MILLISECONDS, }), diff --git a/indexer/services/roundtable/src/index.ts b/indexer/services/roundtable/src/index.ts index 2804270c2b..6a9c5770ce 100644 --- a/indexer/services/roundtable/src/index.ts +++ b/indexer/services/roundtable/src/index.ts @@ -11,6 +11,7 @@ import { import aggregateTradingRewardsTasks from './tasks/aggregate-trading-rewards'; import cancelStaleOrdersTask from './tasks/cancel-stale-orders'; import createPnlTicksTask from './tasks/create-pnl-ticks'; +import deleteOldFastSyncSnapshots from './tasks/delete-old-fast-sync-snapshots'; import deleteZeroPriceLevelsTask from './tasks/delete-zero-price-levels'; import marketUpdaterTask from './tasks/market-updater'; import orderbookInstrumentationTask from './tasks/orderbook-instrumentation'; @@ -109,6 +110,14 @@ async function start(): Promise { ); } + if (config.LOOPS_ENABLED_DELETE_OLD_FAST_SYNC_SNAPSHOTS) { + startLoop( + deleteOldFastSyncSnapshots, + 'delete_old_fast_sync_snapshots', + config.LOOPS_INTERVAL_MS_DELETE_OLD_FAST_SYNC_SNAPSHOTS, + ); + } + startLoop( () => updateComplianceDataTask(complianceProvider), 'update_compliance_data', diff --git a/indexer/services/roundtable/src/tasks/delete-old-fast-sync-snapshots.ts b/indexer/services/roundtable/src/tasks/delete-old-fast-sync-snapshots.ts new file mode 100644 index 0000000000..20acedb692 --- /dev/null +++ b/indexer/services/roundtable/src/tasks/delete-old-fast-sync-snapshots.ts @@ -0,0 +1,19 @@ +import { logger, stats } from '@dydxprotocol-indexer/base'; +import RDS from 'aws-sdk/clients/rds'; + +import config from '../config'; +import { deleteOldFastSyncSnapshots } from '../helpers/aws'; + +const statStart: string = `${config.SERVICE_NAME}.delete_old_fast_sync_snapshots`; + +export default async function runTask(): Promise { + const at: string = 'delete-old-fast-sync-snapshots#runTask'; + logger.info({ at, message: 'Starting task.' }); + + const rds: RDS = new RDS(); + + const startDeleteOldSnapshot: number = Date.now(); + // Delete old snapshots. + await deleteOldFastSyncSnapshots(rds); + stats.timing(`${statStart}.deleteOldSnapshots`, Date.now() - startDeleteOldSnapshot); +} diff --git a/indexer/services/roundtable/src/tasks/take-fast-sync-snapshot.ts b/indexer/services/roundtable/src/tasks/take-fast-sync-snapshot.ts index 7e17973640..e0b0c44369 100644 --- a/indexer/services/roundtable/src/tasks/take-fast-sync-snapshot.ts +++ b/indexer/services/roundtable/src/tasks/take-fast-sync-snapshot.ts @@ -5,40 +5,11 @@ import { DateTime } from 'luxon'; import config from '../config'; import { createDBSnapshot, - deleteOldFastSyncSnapshots, getMostRecentDBSnapshotIdentifier, } from '../helpers/aws'; const statStart: string = `${config.SERVICE_NAME}.fast_sync_export_db_snapshot`; -/** - * Checks if the difference between two dates is less than a given interval. - * - * @param startDate - * @param endDate - * @param intervalMs - */ -function isDifferenceLessThanInterval( - startDate: string, - endDate: string, - intervalMs: number, -): boolean { - const parseDateString = (dateStr: string): Date => { - const [year, month, day, hour, minute] = dateStr.split('-').map(Number); - return new Date(year, month, day, hour, minute); - }; - - // Parse the date strings - const parsedDate1 = parseDateString(startDate); - const parsedDate2 = parseDateString(endDate); - - // Calculate the difference in milliseconds - const differenceInMilliseconds = Math.abs(parsedDate1.getTime() - parsedDate2.getTime()); - - // Compare with the interval - return differenceInMilliseconds < intervalMs; -} - export default async function runTask(): Promise { const at: string = 'fast-sync-export-db-snapshot#runTask'; logger.info({ at, message: 'Starting task.' }); @@ -66,6 +37,8 @@ export default async function runTask(): Promise { at, message: 'Last fast sync db snapshot was taken less than the interval ago', interval: config.LOOPS_INTERVAL_MS_TAKE_FAST_SYNC_SNAPSHOTS, + currentDate: dateString, + lastSnapshotDate: s3Date, }); return; } @@ -76,8 +49,32 @@ export default async function runTask(): Promise { createDBSnapshot(rds, snapshotIdentifier, config.RDS_INSTANCE_NAME); logger.info({ at, message: 'Created DB snapshot.', snapshotIdentifier: createdSnapshotIdentifier }); stats.timing(`${statStart}.createDbSnapshot`, Date.now() - startSnapshot); - const startDeleteOldSnapshot: number = Date.now(); - // Delete old snapshots. - await deleteOldFastSyncSnapshots(rds); - stats.timing(`${statStart}.deleteOldSnapshots`, Date.now() - startDeleteOldSnapshot); +} + +/** + * Checks if the difference between two dates is less than a given interval. + * + * @param startDate + * @param endDate + * @param intervalMs + */ +function isDifferenceLessThanInterval( + startDate: string, + endDate: string, + intervalMs: number, +): boolean { + const parseDateString = (dateStr: string): Date => { + const [year, month, day, hour, minute] = dateStr.split('-').map(Number); + return new Date(year, month, day, hour, minute); + }; + + // Parse the date strings + const parsedDate1 = parseDateString(startDate); + const parsedDate2 = parseDateString(endDate); + + // Calculate the difference in milliseconds + const differenceInMilliseconds = Math.abs(parsedDate1.getTime() - parsedDate2.getTime()); + + // Compare with the interval + return differenceInMilliseconds < intervalMs; }