diff --git a/examples/data-history-museum/index.ts b/examples/data-history-museum/index.ts index 12d7db5..766ced8 100644 --- a/examples/data-history-museum/index.ts +++ b/examples/data-history-museum/index.ts @@ -3,7 +3,7 @@ import { TransactionResult } from '@algorandfoundation/algokit-utils/types/index import algosdk from 'algosdk' import fs from 'fs' import path from 'path' -import { AlgorandSubscriber } from '../../src/subscriber' +import { DynamicAlgorandSubscriber } from '../../src' import TransactionType = algosdk.TransactionType if (!fs.existsSync(path.join(__dirname, '..', '..', '.env')) && !process.env.ALGOD_SERVER) { @@ -20,27 +20,54 @@ interface DHMAsset { metadata: Record created: string lastModified: string + owner: string + ownerModified: string +} + +interface DHMFilterState { + assetIds: number[] } async function getDHMSubscriber() { const algod = await algokit.getAlgoClient() const indexer = await algokit.getAlgoIndexerClient() - const subscriber = new AlgorandSubscriber( + const subscriber = new DynamicAlgorandSubscriber( { - filters: [ - { - name: 'dhm-asset', - filter: { - type: TransactionType.acfg, - // Data History Museum creator accounts - sender: (await algokit.isTestNet(algod)) - ? 'ER7AMZRPD5KDVFWTUUVOADSOWM4RQKEEV2EDYRVSA757UHXOIEKGMBQIVU' - : 'EHYQCYHUC6CIWZLBX5TDTLVJ4SSVE4RRTMKFDCG4Z4Q7QSQ2XWIQPMKBPU', - }, - }, + maxIndexerRoundsToSync: 5_000_000, + dynamicFilters: async (filterState, pollLevel) => [ + ...(pollLevel === 0 + ? [ + { + name: 'dhm-asset', + filter: { + type: TransactionType.acfg, + // Data History Museum creator accounts + sender: (await algokit.isTestNet(algod)) + ? 'ER7AMZRPD5KDVFWTUUVOADSOWM4RQKEEV2EDYRVSA757UHXOIEKGMBQIVU' + : 'EHYQCYHUC6CIWZLBX5TDTLVJ4SSVE4RRTMKFDCG4Z4Q7QSQ2XWIQPMKBPU', + }, + }, + ] + : []), + ...(filterState.assetIds.length > 0 + ? [ + { + name: 'dhm-ownership-change', + filter: { + type: TransactionType.axfer, + assetId: filterState.assetIds, + minAmount: 1, + }, + }, + ] + : []), ], - frequencyInSeconds: 5, - maxRoundsToSync: 100, + filterStatePersistence: { + get: getFilterState, + set: saveFilterState, + }, + frequencyInSeconds: 1, + maxRoundsToSync: 500, syncBehaviour: 'catchup-with-indexer', watermarkPersistence: { get: getLastWatermark, @@ -52,9 +79,18 @@ async function getDHMSubscriber() { ) subscriber.onBatch('dhm-asset', async (events) => { // eslint-disable-next-line no-console - console.log(`Received ${events.length} asset changes`) + console.log(`Received ${events.length} asset changes (${events.filter((t) => t['created-asset-index']).length} new assets)`) + + // Append any new asset ids to the filter state so ownership is picked up of them + subscriber.appendFilterState({ assetIds: events.filter((e) => e['created-asset-index']).map((e) => e['created-asset-index']!) }) + }) + subscriber.onBatch('dhm-ownership-change', async (events) => { + // eslint-disable-next-line no-console + console.log(`Received ${events.length} ownership changes`) + }) + subscriber.onPoll(async (pollMetadata) => { // Save all of the Data History Museum Verifiably Authentic Digital Historical Artifacts - await saveDHMTransactions(events) + await saveDHMTransactions(pollMetadata.subscribedTransactions) }) return subscriber } @@ -81,8 +117,10 @@ async function saveDHMTransactions(transactions: TransactionResult[]) { metadata: getArc69Metadata(t), created: new Date(t['round-time']! * 1000).toISOString(), lastModified: new Date(t['round-time']! * 1000).toISOString(), + owner: t.sender, + ownerModified: new Date(t['round-time']! * 1000).toISOString(), }) - } else { + } else if (t['asset-config-transaction']) { const asset = assets.find((a) => a.id === t['asset-config-transaction']!['asset-id']) if (!asset) { // eslint-disable-next-line no-console @@ -96,6 +134,17 @@ async function saveDHMTransactions(transactions: TransactionResult[]) { asset!.metadata = getArc69Metadata(t) asset!.lastModified = new Date(t['round-time']! * 1000).toISOString() } + } else if (t['asset-transfer-transaction']) { + const asset = assets.find((a) => a.id === t['asset-transfer-transaction']!['asset-id']) + if (!asset) { + // eslint-disable-next-line no-console + console.error(t) + throw new Error(`Unable to find existing asset data for ${t['asset-transfer-transaction']!['asset-id']}`) + } + if (t['asset-transfer-transaction'].amount > 0) { + asset.owner = t['asset-transfer-transaction']!.receiver + asset.ownerModified = new Date(t['round-time']! * 1000).toISOString() + } } } @@ -104,12 +153,25 @@ async function saveDHMTransactions(transactions: TransactionResult[]) { // Basic methods that persist using filesystem - for illustrative purposes only +async function saveFilterState(state: DHMFilterState) { + fs.writeFileSync(path.join(__dirname, 'filters.json'), JSON.stringify(state), { encoding: 'utf-8' }) +} + +async function getFilterState(): Promise { + if (!fs.existsSync(path.join(__dirname, 'filters.json'))) return { assetIds: [] } + const existing = fs.readFileSync(path.join(__dirname, 'filters.json'), 'utf-8') + const existingData = JSON.parse(existing) as DHMFilterState + // eslint-disable-next-line no-console + console.log(`Found existing filter state in filters.json; syncing with ${existingData.assetIds.length} assets`) + return existingData +} + async function saveWatermark(watermark: number) { fs.writeFileSync(path.join(__dirname, 'watermark.txt'), watermark.toString(), { encoding: 'utf-8' }) } async function getLastWatermark(): Promise { - if (!fs.existsSync(path.join(__dirname, 'watermark.txt'))) return 0 + if (!fs.existsSync(path.join(__dirname, 'watermark.txt'))) return 15_000_000 const existing = fs.readFileSync(path.join(__dirname, 'watermark.txt'), 'utf-8') // eslint-disable-next-line no-console console.log(`Found existing sync watermark in watermark.txt; syncing from ${existing}`) diff --git a/src/dynamic-subscriber.ts b/src/dynamic-subscriber.ts new file mode 100644 index 0000000..d4e7138 --- /dev/null +++ b/src/dynamic-subscriber.ts @@ -0,0 +1,127 @@ +import algosdk from 'algosdk' +import { AlgorandSubscriber } from './subscriber' +import { + getAlgodSubscribedTransactions, + getArc28EventsToProcess, + getIndexerCatchupTransactions, + prepareSubscriptionPoll, + processExtraSubscriptionTransactionFields, +} from './subscriptions' +import type { + DynamicAlgorandSubscriberConfig, + NamedTransactionFilter, + SubscribedTransaction, + TransactionSubscriptionResult, +} from './types/subscription' +import Algodv2 = algosdk.Algodv2 +import Indexer = algosdk.Indexer + +export class DynamicAlgorandSubscriber extends AlgorandSubscriber { + private pendingStateChanges: { action: 'append' | 'delete' | 'set'; stateChange: Partial }[] = [] + private dynamicConfig: DynamicAlgorandSubscriberConfig + + constructor(config: DynamicAlgorandSubscriberConfig, algod: Algodv2, indexer?: Indexer) { + super( + { + filters: [], + ...config, + }, + algod, + indexer, + ) + this.dynamicConfig = config + } + + protected override async _pollOnce(watermark: number): Promise { + let subscribedTransactions: SubscribedTransaction[] = [] + let filterState: T = await this.dynamicConfig.filterStatePersistence.get() + + const subscribe = async (filters: NamedTransactionFilter[]) => { + const catchupTransactions = await getIndexerCatchupTransactions(filters, pollMetadata, arc28EventsToProcess, this.indexer) + const algodTransactions = await getAlgodSubscribedTransactions(filters, pollMetadata, arc28EventsToProcess) + const subscribedTransactions = catchupTransactions + .concat(algodTransactions) + .map((t) => processExtraSubscriptionTransactionFields(t, arc28EventsToProcess, this.config.arc28Events ?? [])) + this._processFilters({ subscribedTransactions, ...pollMetadata }) + return subscribedTransactions + } + + const filters = await this.dynamicConfig.dynamicFilters(filterState, 0) + this.filterNames = filters + .map((f) => f.name) + .filter((value, index, self) => { + // Remove duplicates + return self.findIndex((x) => x === value) === index + }) + const pollMetadata = await prepareSubscriptionPoll({ ...this.config, watermark, filters }, this.algod) + const arc28EventsToProcess = getArc28EventsToProcess(this.config.arc28Events ?? []) + + subscribedTransactions = await subscribe(filters) + + let pollLevel = 0 + while (this.pendingStateChanges.length > 0) { + let filterStateToProcess = { ...filterState } + for (const change of this.pendingStateChanges) { + switch (change.action) { + case 'append': + for (const key of Object.keys(change.stateChange)) { + const k = key as keyof T + if (!filterState[k] || !Array.isArray(filterState[k])) { + filterState[k] = change.stateChange[k]! + } else { + filterState[k] = (filterState[k] as unknown[]).concat(change.stateChange[k]) as T[keyof T] + } + } + filterStateToProcess = { ...filterStateToProcess, ...change.stateChange } + break + case 'delete': + for (const key of Object.keys(change.stateChange)) { + const k = key as keyof T + delete filterState[k] + delete filterStateToProcess[k] + } + break + case 'set': + filterState = { ...filterState, ...change.stateChange } + filterStateToProcess = { ...filterState, ...change.stateChange } + break + } + } + this.pendingStateChanges = [] + const newFilters = await this.dynamicConfig.dynamicFilters(filterStateToProcess, ++pollLevel) + this.filterNames = newFilters + .map((f) => f.name) + .filter((value, index, self) => { + // Remove duplicates + return self.findIndex((x) => x === value) === index + }) + subscribedTransactions = subscribedTransactions.concat(await subscribe(newFilters)) + } + + await this.dynamicConfig.filterStatePersistence.set(filterState) + + return { + syncedRoundRange: pollMetadata.syncedRoundRange, + newWatermark: pollMetadata.newWatermark, + currentRound: pollMetadata.currentRound, + subscribedTransactions: subscribedTransactions.sort( + (a, b) => a['confirmed-round']! - b['confirmed-round']! || a['intra-round-offset']! - b['intra-round-offset']!, + ), + } + } + + appendFilterState(stateChange: Partial) { + this.pendingStateChanges.push({ action: 'append', stateChange }) + } + + deleteFilterState(stateChange: (keyof T)[]) { + this.pendingStateChanges.push({ + action: 'delete', + stateChange: stateChange.reduce((acc, key) => ({ ...acc, [key]: true }), {} as Partial), + }) + } + + setFilterState(stateChange: Partial) { + this.pendingStateChanges.push({ action: 'set', stateChange }) + } +} diff --git a/src/index.ts b/src/index.ts index afe6820..46dde84 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,2 +1,3 @@ -export * from './subscriber' -export * from './subscriptions' +export { DynamicAlgorandSubscriber } from './dynamic-subscriber' +export { AlgorandSubscriber } from './subscriber' +export { getSubscribedTransactions } from './subscriptions' diff --git a/src/subscriber.ts b/src/subscriber.ts index 07e5800..498b79d 100644 --- a/src/subscriber.ts +++ b/src/subscriber.ts @@ -4,7 +4,7 @@ import { getSubscribedTransactions } from './subscriptions' import { AsyncEventEmitter, AsyncEventListener } from './types/async-event-emitter' import type { AlgorandSubscriberConfig, - BeforePollMetadata, + BeforeSubscriptionPollMetadata, SubscribedTransaction, TransactionSubscriptionResult, TypedAsyncEventListener, @@ -17,14 +17,14 @@ import Indexer = algosdk.Indexer * Handles the logic for subscribing to the Algorand blockchain and emitting events. */ export class AlgorandSubscriber { - private algod: Algodv2 - private indexer: Indexer | undefined - private config: AlgorandSubscriberConfig - private abortController: AbortController - private eventEmitter: AsyncEventEmitter - private started: boolean = false - private startPromise: Promise | undefined - private filterNames: string[] + protected algod: Algodv2 + protected indexer: Indexer | undefined + protected config: AlgorandSubscriberConfig + protected abortController: AbortController + protected eventEmitter: AsyncEventEmitter + protected started: boolean = false + protected startPromise: Promise | undefined + protected filterNames: string[] /** * Create a new `AlgorandSubscriber`. @@ -51,31 +51,20 @@ export class AlgorandSubscriber { } } - /** - * Execute a single subscription poll. - * - * This is useful when executing in the context of a process - * triggered by a recurring schedule / cron. - * @returns The poll result - */ - async pollOnce(): Promise { - const watermark = await this.config.watermarkPersistence.get() - - const currentRound = (await this.algod.status().do())['last-round'] as number - await this.eventEmitter.emitAsync('before:poll', { - watermark, - currentRound, - } satisfies BeforePollMetadata) - - const pollResult = await getSubscribedTransactions( + /** Perform a single subscribe for a given watermark and subscription config */ + protected async _pollSubscribe(watermark: number, config: AlgorandSubscriberConfig): Promise { + return await getSubscribedTransactions( { watermark, - ...this.config, + ...config, }, this.algod, this.indexer, ) + } + /** Process the filters and event emittance for the result of a single poll */ + protected async _processFilters(pollResult: TransactionSubscriptionResult) { try { for (const filterName of this.filterNames) { const mapper = this.config.filters.find((f) => f.name === filterName)?.mapper @@ -91,6 +80,34 @@ export class AlgorandSubscriber { algokit.Config.logger.error(`Error processing event emittance`, e) throw e } + } + + /** Perform a single poll for the given watermark */ + protected async _pollOnce(watermark: number): Promise { + const pollResult = await this._pollSubscribe(watermark, this.config) + this._processFilters(pollResult) + + return pollResult + } + + /** + * Execute a single subscription poll. + * + * This is useful when executing in the context of a process + * triggered by a recurring schedule / cron. + * @returns The poll result + */ + async pollOnce(): Promise { + const watermark = await this.config.watermarkPersistence.get() + + const currentRound = (await this.algod.status().do())['last-round'] as number + await this.eventEmitter.emitAsync('before:poll', { + watermark, + currentRound, + } satisfies BeforeSubscriptionPollMetadata) + + const pollResult = await this._pollOnce(watermark) + await this.config.watermarkPersistence.set(pollResult.newWatermark) return pollResult } @@ -217,7 +234,7 @@ export class AlgorandSubscriber { * @param listener The listener function to invoke with the pre-poll metadata * @returns The subscriber so `on*` calls can be chained */ - onBeforePoll(listener: TypedAsyncEventListener) { + onBeforePoll(listener: TypedAsyncEventListener) { this.eventEmitter.on('before:poll', listener as AsyncEventListener) return this } diff --git a/src/subscriptions.ts b/src/subscriptions.ts index a29af20..5898558 100644 --- a/src/subscriptions.ts +++ b/src/subscriptions.ts @@ -14,6 +14,7 @@ import type { Arc28EventGroup, Arc28EventToProcess, EmittedArc28Event } from './ import type { Block, BlockInnerTransaction, BlockTransaction } from './types/block' import { BalanceChangeRole, + SubscriptionPollMetadata, type BalanceChange, type NamedTransactionFilter, type SubscribedTransaction, @@ -44,24 +45,12 @@ const deduplicateSubscribedTransactionsReducer = (dedupedTransactions: Subscribe } /** - * Executes a single pull/poll to subscribe to transactions on the configured Algorand - * blockchain for the given subscription context. - * @param subscription The subscription context. - * @param algod An Algod client. - * @param indexer An optional indexer client, only needed when `onMaxRounds` is `catchup-with-indexer`. - * @returns The result of this subscription pull/poll. + * Returns a flat list of arc-28 event definitions ready for processing. + * @param arc28Events The definition of ARC-28 event groups + * @returns The individual event definitions */ -export async function getSubscribedTransactions( - subscription: TransactionSubscriptionParams, - algod: Algodv2, - indexer?: Indexer, -): Promise { - const { watermark, filters, maxRoundsToSync: _maxRoundsToSync, syncBehaviour: onMaxRounds } = subscription - const maxRoundsToSync = _maxRoundsToSync ?? 500 - const currentRound = (await algod.status().do())['last-round'] as number - - // Pre-calculate a flat list of all ARC-28 events to process - const arc28Events = (subscription.arc28Events ?? []).flatMap((g) => +export function getArc28EventsToProcess(arc28Events: Arc28EventGroup[]) { + return (arc28Events ?? []).flatMap((g) => g.events.map((e) => { // https://github.com/algorandfoundation/ARCs/blob/main/ARCs/arc-0028.md#sample-interpretation-of-event-log-data const eventSignature = `${e.name}(${e.args.map((a) => a.type).join(',')})` @@ -77,14 +66,29 @@ export async function getSubscribedTransactions( } satisfies Arc28EventToProcess }), ) +} + +/** + * Creates the metadata needed to perform a single subscription poll. + * @param subscription The subscription configuration + * @param algod An algod instance + * @returns The metadata for the poll + */ +export async function prepareSubscriptionPoll( + subscription: TransactionSubscriptionParams, + algod: Algodv2, +): Promise { + const { watermark, maxRoundsToSync: _maxRoundsToSync, syncBehaviour: onMaxRounds } = subscription + const maxRoundsToSync = _maxRoundsToSync ?? 500 + const currentRound = (await algod.status().do())['last-round'] as number // Nothing to sync we at the tip of the chain already if (currentRound <= watermark) { return { currentRound: currentRound, newWatermark: watermark, - subscribedTransactions: [], syncedRoundRange: [currentRound, currentRound], + arc28EventGroups: subscription.arc28Events ?? [], } } @@ -92,8 +96,6 @@ export async function getSubscribedTransactions( let algodSyncFromRoundNumber = watermark + 1 let startRound = algodSyncFromRoundNumber let endRound = currentRound - let catchupTransactions: SubscribedTransaction[] = [] - let start = +new Date() let skipAlgodSync = false // If we are less than `maxRoundsToSync` from the tip of the chain then we consult the `syncBehaviour` to determine what to do @@ -119,10 +121,6 @@ export async function getSubscribedTransactions( } break case 'catchup-with-indexer': - if (!indexer) { - throw new Error("Can't catch up using indexer since it's not provided") - } - // If we have more than `maxIndexerRoundsToSync` rounds to sync from indexer then we skip algod sync and just sync that many rounds from indexer indexerSyncToRoundNumber = currentRound - maxRoundsToSync if (subscription.maxIndexerRoundsToSync && indexerSyncToRoundNumber - startRound + 1 > subscription.maxIndexerRoundsToSync) { @@ -133,43 +131,6 @@ export async function getSubscribedTransactions( algodSyncFromRoundNumber = indexerSyncToRoundNumber + 1 } - algokit.Config.logger.debug( - `Catching up from round ${startRound} to round ${indexerSyncToRoundNumber} via indexer; this may take a few seconds`, - ) - - // Retrieve and process transactions from indexer in groups of 30 so we don't get rate limited - for (const chunkedFilters of chunkArray(filters, 30)) { - catchupTransactions = catchupTransactions.concat( - ( - await Promise.all( - // For each filter - chunkedFilters.map(async (f) => - // Retrieve all pre-filtered transactions from the indexer - (await algokit.searchTransactions(indexer, indexerPreFilter(f.filter, startRound, indexerSyncToRoundNumber))).transactions - // Re-run the pre-filter in-memory so we properly extract inner transactions - .flatMap((t) => getFilteredIndexerTransactions(t, f)) - // Run the post-filter so we get the final list of matching transactions - .filter(indexerPostFilter(f.filter, arc28Events, subscription.arc28Events ?? [])), - ), - ) - ) - // Collapse the filtered transactions into a single array - .flat(), - ) - } - - catchupTransactions = catchupTransactions - // Sort by transaction order - .sort((a, b) => a['confirmed-round']! - b['confirmed-round']! || a['intra-round-offset']! - b['intra-round-offset']!) - // Collapse duplicate transactions - .reduce(deduplicateSubscribedTransactionsReducer, [] as SubscribedTransaction[]) - - algokit.Config.logger.debug( - `Retrieved ${catchupTransactions.length} transactions from round ${startRound} to round ${ - algodSyncFromRoundNumber - 1 - } via indexer in ${(+new Date() - start) / 1000}s`, - ) - break default: throw new Error('Not implemented') @@ -177,37 +138,163 @@ export async function getSubscribedTransactions( } // Retrieve and process blocks from algod - let algodTransactions: SubscribedTransaction[] = [] + let blockTransactions: TransactionInBlock[] | undefined = undefined if (!skipAlgodSync) { - start = +new Date() + const start = +new Date() const blocks = await getBlocksBulk({ startRound: algodSyncFromRoundNumber, maxRound: endRound }, algod) - const blockTransactions = blocks.flatMap((b) => getBlockTransactions(b.block)) - algodTransactions = filters - .flatMap((f) => - blockTransactions - .filter((t) => transactionFilter(f.filter, arc28Events, subscription.arc28Events ?? [])(t!)) - .map((t) => getIndexerTransactionFromAlgodTransaction(t, f.name)), - ) - .reduce(deduplicateSubscribedTransactionsReducer, []) - + blockTransactions = blocks.flatMap((b) => getBlockTransactions(b.block)) algokit.Config.logger.debug( `Retrieved ${blockTransactions.length} transactions from algod via round(s) ${algodSyncFromRoundNumber}-${endRound} in ${ (+new Date() - start) / 1000 }s`, ) - } else { - algokit.Config.logger.debug( - `Skipping algod sync since we have more than ${subscription.maxIndexerRoundsToSync} rounds to sync from indexer.`, - ) } return { + algodSyncRange: !skipAlgodSync ? [algodSyncFromRoundNumber, endRound] : undefined, + indexerSyncRange: indexerSyncToRoundNumber ? [startRound, indexerSyncToRoundNumber] : undefined, syncedRoundRange: [startRound, endRound], newWatermark: endRound, currentRound, + blockTransactions, + arc28EventGroups: subscription.arc28Events ?? [], + } +} + +/** + * Run indexer catchup for the given filters and poll metadata + * @param filters The filters to apply + * @param pollMetadata The metadata for the poll + * @param indexer The indexer instance + * @returns The set of caught up, filtered transactions + */ +export async function getIndexerCatchupTransactions( + filters: NamedTransactionFilter[], + pollMetadata: SubscriptionPollMetadata, + arc28EventsToProcess: Arc28EventToProcess[], + indexer?: Indexer, +): Promise { + const { indexerSyncRange, arc28EventGroups } = pollMetadata + + if (!indexerSyncRange) { + return [] + } else if (!indexer) { + throw new Error("Can't catch up using indexer since it's not provided") + } + + const [startRound, endRound] = indexerSyncRange + const start = +new Date() + + let catchupTransactions: SubscribedTransaction[] = [] + + algokit.Config.logger.debug(`Catching up from round ${startRound} to round ${endRound} via indexer; this may take a few seconds`) + + const filtersToRetrieve = filters.flatMap((f) => + Array.isArray(f.filter.assetId) + ? f.filter.assetId.map((id) => ({ name: f.name, filter: { ...f.filter, assetId: id } })) + : Array.isArray(f.filter.appId) + ? f.filter.appId.map((id) => ({ name: f.name, filter: { ...f.filter, appId: id } })) + : [f], + ) + + // Retrieve and process transactions from indexer in groups of 10 so we don't get rate limited + for (const chunkedFilters of chunkArray(filtersToRetrieve, 10)) { + catchupTransactions = catchupTransactions.concat( + ( + await Promise.all( + // For each filter + chunkedFilters.map(async (f) => + // Retrieve all pre-filtered transactions from the indexer + (await algokit.searchTransactions(indexer, indexerPreFilter(f.filter, startRound, endRound))).transactions + // Re-run the pre-filter in-memory so we properly extract inner transactions + .flatMap((t) => getFilteredIndexerTransactions(t, f)) + // Run the post-filter so we get the final list of matching transactions + .filter(indexerPostFilter(f.filter, arc28EventsToProcess, arc28EventGroups ?? [])), + ), + ) + ) + // Collapse the filtered transactions into a single array + .flat(), + ) + } + + catchupTransactions = catchupTransactions + // Sort by transaction order + .sort((a, b) => a['confirmed-round']! - b['confirmed-round']! || a['intra-round-offset']! - b['intra-round-offset']!) + // Collapse duplicate transactions + .reduce(deduplicateSubscribedTransactionsReducer, [] as SubscribedTransaction[]) + + algokit.Config.logger.debug( + `Retrieved ${catchupTransactions.length} transactions from round ${startRound} to round ${endRound} via indexer in ${(+new Date() - start) / 1000}s`, + ) + return catchupTransactions +} + +/** + * Run indexer catchup for the given filters and poll metadata + * @param filters The filters to apply + * @param pollMetadata The metadata for the poll + * @param indexer The indexer instance + * @returns The set of caught up, filtered transactions + */ +export async function getAlgodSubscribedTransactions( + filters: NamedTransactionFilter[], + pollMetadata: SubscriptionPollMetadata, + arc28EventsToProcess: Arc28EventToProcess[], +): Promise { + const { algodSyncRange, arc28EventGroups, blockTransactions } = pollMetadata + + if (!algodSyncRange) { + return [] + } else if (blockTransactions === undefined) { + throw new Error("Can't catch up using algod since no block transactions were provided") + } + + const [startRound, endRound] = algodSyncRange + const start = +new Date() + + const algodTransactions = filters + .flatMap((f) => + blockTransactions + .filter((t) => transactionFilter(f.filter, arc28EventsToProcess, arc28EventGroups)(t!)) + .map((t) => getIndexerTransactionFromAlgodTransaction(t, f.name)), + ) + .reduce(deduplicateSubscribedTransactionsReducer, []) + + algokit.Config.logger.debug( + `Retrieved ${blockTransactions.length} transactions from algod via round(s) ${startRound}-${endRound} in ${ + (+new Date() - start) / 1000 + }s`, + ) + + return algodTransactions +} + +/** + * Executes a single pull/poll to subscribe to transactions on the configured Algorand + * blockchain for the given subscription context. + * @param subscription The subscription context. + * @param algod An Algod client. + * @param indexer An optional indexer client, only needed when `onMaxRounds` is `catchup-with-indexer`. + * @returns The result of this subscription pull/poll. + */ +export async function getSubscribedTransactions( + subscription: TransactionSubscriptionParams, + algod: Algodv2, + indexer?: Indexer, +): Promise { + const pollMetadata = await prepareSubscriptionPoll(subscription, algod) + const arc28EventsToProcess = getArc28EventsToProcess(subscription.arc28Events ?? []) + const catchupTransactions = await getIndexerCatchupTransactions(subscription.filters, pollMetadata, arc28EventsToProcess, indexer) + const algodTransactions = await getAlgodSubscribedTransactions(subscription.filters, pollMetadata, arc28EventsToProcess) + + return { + syncedRoundRange: pollMetadata.syncedRoundRange, + newWatermark: pollMetadata.newWatermark, + currentRound: pollMetadata.currentRound, subscribedTransactions: catchupTransactions .concat(algodTransactions) - .map((t) => processExtraFields(t, arc28Events, subscription.arc28Events ?? [])), + .map((t) => processExtraSubscriptionTransactionFields(t, arc28EventsToProcess, subscription.arc28Events ?? [])), } } @@ -219,7 +306,7 @@ function transactionIsInArc28EventGroup(group: Arc28EventGroup, appId: number, t ) } -function processExtraFields( +export function processExtraSubscriptionTransactionFields( transaction: TransactionResult | SubscribedTransaction, arc28Events: Arc28EventToProcess[], arc28Groups: Arc28EventGroup[], @@ -245,7 +332,7 @@ function processExtraFields( ), balanceChanges: extractBalanceChanges(transaction), 'inner-txns': transaction['inner-txns'] - ? transaction['inner-txns'].map((inner) => processExtraFields(inner, arc28Events, arc28Groups)) + ? transaction['inner-txns'].map((inner) => processExtraSubscriptionTransactionFields(inner, arc28Events, arc28Groups)) : undefined, } } diff --git a/src/types/subscription.ts b/src/types/subscription.ts index 4530e47..bbe436d 100644 --- a/src/types/subscription.ts +++ b/src/types/subscription.ts @@ -1,5 +1,6 @@ import type { ApplicationOnComplete, TransactionResult } from '@algorandfoundation/algokit-utils/types/indexer' import algosdk from 'algosdk' +import { TransactionInBlock } from '../transform' import { Arc28EventGroup, EmittedArc28Event } from './arc-28' import TransactionType = algosdk.TransactionType @@ -67,13 +68,31 @@ export enum BalanceChangeRole { } /** Metadata about an impending subscription poll. */ -export interface BeforePollMetadata { +export interface BeforeSubscriptionPollMetadata { /** The current watermark of the subscriber */ watermark: number /** The current round of algod */ currentRound: number } +/** Metadata needed to conduct a single subscription poll. */ +export interface SubscriptionPollMetadata { + /** The range of rounds to sync using algod; if undefined then algod sync not needed. */ + algodSyncRange?: [startRound: number, endRound: number] + /** The range of rounds to sync using indexer; if undefined then indexer sync not needed. */ + indexerSyncRange?: [startRound: number, endRound: number] + /** The range of rounds being synced. */ + syncedRoundRange: [startRound: number, endRound: number] + /** The new watermark to persist after this poll is complete. */ + newWatermark: number + /** The current round according to algod when the poll was started. */ + currentRound: number + /** The full set of transactions from algod for `algodSyncRange` or `undefined` if `algodSyncRange` is `undefined. */ + blockTransactions?: TransactionInBlock[] + /** The set of ARC-28 event groups to process against the subscribed transactions */ + arc28EventGroups: Arc28EventGroup[] +} + /** Common parameters to control a single subscription pull/poll for both `AlgorandSubscriber` and `getSubscribedTransactions`. */ export interface CoreTransactionSubscriptionParams { /** The filter(s) to apply to find transactions of interest. @@ -219,6 +238,29 @@ export interface TransactionSubscriptionParams extends CoreTransactionSubscripti watermark: number } +/** Configuration for a `DynamicAlgorandSubscriber` */ +export interface DynamicAlgorandSubscriberConfig extends Omit { + /** + * A function that returns a set of filters based on a given filter state and hierarchical poll level. + * @param state The filter state to return filters for + * @param pollLevel The hierarchical poll level; starts at 0 and increments by 1 each time a new poll is needed because of filter changes caused by the previous poll + * @returns The set of filters to subscribe to / emit events for + */ + dynamicFilters: (state: T, pollLevel: number) => Promise[]> + + /** Methods to retrieve and persist the current filter state so syncing is resilient */ + filterStatePersistence: { + /** Returns the current filter state that syncing has previously been processed to */ + get: () => Promise + /** Persist the new filter state that has been created */ + set: (newState: T) => Promise + } + /** The frequency to poll for new blocks in seconds; defaults to 1s */ + frequencyInSeconds?: number + /** Whether to wait via algod `/status/wait-for-block-after` endpoint when at the tip of the chain; reduces latency of subscription */ + waitForBlockWhenAtTip?: boolean +} + /** Configuration for an `AlgorandSubscriber`. */ export interface AlgorandSubscriberConfig extends CoreTransactionSubscriptionParams { /** The set of filters to subscribe to / emit events for, along with optional data mappers. */