From 4996f97aabbe34e1f474b48192e67f9d5770d7c9 Mon Sep 17 00:00:00 2001 From: bry Date: Tue, 17 Dec 2024 12:24:17 -0600 Subject: [PATCH] Add substream cursor reuse strategy --- .../account-postgres-sink-service/src/env.ts | 2 ++ .../src/services/substream.ts | 31 ++++++++++++++++--- 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/packages/account-postgres-sink-service/src/env.ts b/packages/account-postgres-sink-service/src/env.ts index 22f597a2e..39ecb8724 100644 --- a/packages/account-postgres-sink-service/src/env.ts +++ b/packages/account-postgres-sink-service/src/env.ts @@ -27,6 +27,8 @@ export const USE_SUBSTREAM = getEnvBoolean("USE_SUBSTREAM"); export const SUBSTREAM_API_KEY = process.env.SUBSTREAM_API_KEY; export const SUBSTREAM_URL = process.env.SUBSTREAM_URL; export const SUBSTREAM = process.env.SUBSTREAM; +export const SUBSTREAM_CURSOR_MAX_AGE_DAYS = + Number(process.env.SUBSTREAM_CURSOR_MAX_AGE_DAYS) || 5; export const USE_KAFKA = getEnvBoolean("USE_KAFKA"); export const KAFKA_USER = process.env.KAFKA_USER; diff --git a/packages/account-postgres-sink-service/src/services/substream.ts b/packages/account-postgres-sink-service/src/services/substream.ts index 67972e221..bd6b9d302 100644 --- a/packages/account-postgres-sink-service/src/services/substream.ts +++ b/packages/account-postgres-sink-service/src/services/substream.ts @@ -18,6 +18,7 @@ import { SUBSTREAM, SUBSTREAM_API_KEY, SUBSTREAM_URL, + SUBSTREAM_CURSOR_MAX_AGE_DAYS, } from "../env"; import { getPluginsByAccountTypeByProgram } from "../plugins"; import { IConfig } from "../types"; @@ -77,9 +78,33 @@ export const setupSubstream = async ( await Cursor.sync({ alter: true }); const lastCursor = await Cursor.findOne({ order: [["createdAt", "DESC"]] }); + let cursor: string | undefined; try { - let cursor = lastCursor?.cursor; + console.log("Connected to Substream"); + if (lastCursor) { + const cursorDate = new Date(lastCursor.dataValues.createdAt); + const cursorAge = + (Date.now() - cursorDate.getTime()) / (24 * 60 * 60 * 1000); + + if (cursorAge > SUBSTREAM_CURSOR_MAX_AGE_DAYS) { + console.log( + `Cursor is ${Math.floor( + cursorAge + )} days old, starting from current block` + ); + cursor = undefined; + } else { + cursor = lastCursor.cursor; + console.log( + `Using existing cursor from ${Math.floor(cursorAge)} days ago` + ); + } + } else { + cursor = undefined; + console.log("No existing cursor found, starting from current block"); + } + const currentBlock = await provider.connection.getSlot("finalized"); const request = createRequest({ substreamPackage: substream, @@ -89,10 +114,9 @@ export const setupSubstream = async ( startCursor: cursor, }); - console.log("Connected to Substream"); console.log( `Substream: Streaming from ${ - lastCursor ? `cursor ${lastCursor.cursor}` : `block ${currentBlock}` + cursor ? `cursor ${cursor}` : `block ${currentBlock}` }` ); @@ -111,7 +135,6 @@ export const setupSubstream = async ( try { const output = unpackMapOutput(response, registry); const cursor = message.value.cursor; - if (output !== undefined && !isEmptyMessage(output)) { const accountPromises = (output as any).accounts .map(async (account: IOuputAccount) => {