Skip to content

Commit

Permalink
feat: Dynamic subscription filters
Browse files Browse the repository at this point in the history
  • Loading branch information
robdmoore committed Mar 29, 2024
1 parent bdb54b7 commit 67adb88
Show file tree
Hide file tree
Showing 6 changed files with 465 additions and 129 deletions.
100 changes: 81 additions & 19 deletions examples/data-history-museum/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { TransactionResult } from '@algorandfoundation/algokit-utils/types/index
import algosdk from 'algosdk'
import fs from 'fs'
import path from 'path'
import { AlgorandSubscriber } from '../../src/subscriber'
import { DynamicAlgorandSubscriber } from '../../src'
import TransactionType = algosdk.TransactionType

if (!fs.existsSync(path.join(__dirname, '..', '..', '.env')) && !process.env.ALGOD_SERVER) {
Expand All @@ -20,27 +20,54 @@ interface DHMAsset {
metadata: Record<string, unknown>
created: string
lastModified: string
owner: string
ownerModified: string
}

interface DHMFilterState {
assetIds: number[]
}

async function getDHMSubscriber() {
const algod = await algokit.getAlgoClient()
const indexer = await algokit.getAlgoIndexerClient()
const subscriber = new AlgorandSubscriber(
const subscriber = new DynamicAlgorandSubscriber<DHMFilterState>(
{
filters: [
{
name: 'dhm-asset',
filter: {
type: TransactionType.acfg,
// Data History Museum creator accounts
sender: (await algokit.isTestNet(algod))
? 'ER7AMZRPD5KDVFWTUUVOADSOWM4RQKEEV2EDYRVSA757UHXOIEKGMBQIVU'
: 'EHYQCYHUC6CIWZLBX5TDTLVJ4SSVE4RRTMKFDCG4Z4Q7QSQ2XWIQPMKBPU',
},
},
maxIndexerRoundsToSync: 5_000_000,
dynamicFilters: async (filterState, pollLevel) => [
...(pollLevel === 0
? [
{
name: 'dhm-asset',
filter: {
type: TransactionType.acfg,
// Data History Museum creator accounts
sender: (await algokit.isTestNet(algod))
? 'ER7AMZRPD5KDVFWTUUVOADSOWM4RQKEEV2EDYRVSA757UHXOIEKGMBQIVU'
: 'EHYQCYHUC6CIWZLBX5TDTLVJ4SSVE4RRTMKFDCG4Z4Q7QSQ2XWIQPMKBPU',
},
},
]
: []),
...(filterState.assetIds.length > 0
? [
{
name: 'dhm-ownership-change',
filter: {
type: TransactionType.axfer,
assetId: filterState.assetIds,
minAmount: 1,
},
},
]
: []),
],
frequencyInSeconds: 5,
maxRoundsToSync: 100,
filterStatePersistence: {
get: getFilterState,
set: saveFilterState,
},
frequencyInSeconds: 1,
maxRoundsToSync: 500,
syncBehaviour: 'catchup-with-indexer',
watermarkPersistence: {
get: getLastWatermark,
Expand All @@ -52,9 +79,18 @@ async function getDHMSubscriber() {
)
subscriber.onBatch('dhm-asset', async (events) => {
// eslint-disable-next-line no-console
console.log(`Received ${events.length} asset changes`)
console.log(`Received ${events.length} asset changes (${events.filter((t) => t['created-asset-index']).length} new assets)`)

// Append any new asset ids to the filter state so ownership is picked up of them
subscriber.appendFilterState({ assetIds: events.filter((e) => e['created-asset-index']).map((e) => e['created-asset-index']!) })
})
subscriber.onBatch('dhm-ownership-change', async (events) => {
// eslint-disable-next-line no-console
console.log(`Received ${events.length} ownership changes`)
})
subscriber.onPoll(async (pollMetadata) => {
// Save all of the Data History Museum Verifiably Authentic Digital Historical Artifacts
await saveDHMTransactions(events)
await saveDHMTransactions(pollMetadata.subscribedTransactions)
})
return subscriber
}
Expand All @@ -81,8 +117,10 @@ async function saveDHMTransactions(transactions: TransactionResult[]) {
metadata: getArc69Metadata(t),
created: new Date(t['round-time']! * 1000).toISOString(),
lastModified: new Date(t['round-time']! * 1000).toISOString(),
owner: t.sender,
ownerModified: new Date(t['round-time']! * 1000).toISOString(),
})
} else {
} else if (t['asset-config-transaction']) {
const asset = assets.find((a) => a.id === t['asset-config-transaction']!['asset-id'])
if (!asset) {
// eslint-disable-next-line no-console
Expand All @@ -96,6 +134,17 @@ async function saveDHMTransactions(transactions: TransactionResult[]) {
asset!.metadata = getArc69Metadata(t)
asset!.lastModified = new Date(t['round-time']! * 1000).toISOString()
}
} else if (t['asset-transfer-transaction']) {
const asset = assets.find((a) => a.id === t['asset-transfer-transaction']!['asset-id'])
if (!asset) {
// eslint-disable-next-line no-console
console.error(t)
throw new Error(`Unable to find existing asset data for ${t['asset-transfer-transaction']!['asset-id']}`)
}
if (t['asset-transfer-transaction'].amount > 0) {
asset.owner = t['asset-transfer-transaction']!.receiver
asset.ownerModified = new Date(t['round-time']! * 1000).toISOString()
}
}
}

Expand All @@ -104,12 +153,25 @@ async function saveDHMTransactions(transactions: TransactionResult[]) {

// Basic methods that persist using filesystem - for illustrative purposes only

async function saveFilterState(state: DHMFilterState) {
fs.writeFileSync(path.join(__dirname, 'filters.json'), JSON.stringify(state), { encoding: 'utf-8' })
}

async function getFilterState(): Promise<DHMFilterState> {
if (!fs.existsSync(path.join(__dirname, 'filters.json'))) return { assetIds: [] }
const existing = fs.readFileSync(path.join(__dirname, 'filters.json'), 'utf-8')
const existingData = JSON.parse(existing) as DHMFilterState
// eslint-disable-next-line no-console
console.log(`Found existing filter state in filters.json; syncing with ${existingData.assetIds.length} assets`)
return existingData
}

async function saveWatermark(watermark: number) {
fs.writeFileSync(path.join(__dirname, 'watermark.txt'), watermark.toString(), { encoding: 'utf-8' })
}

async function getLastWatermark(): Promise<number> {
if (!fs.existsSync(path.join(__dirname, 'watermark.txt'))) return 0
if (!fs.existsSync(path.join(__dirname, 'watermark.txt'))) return 15_000_000
const existing = fs.readFileSync(path.join(__dirname, 'watermark.txt'), 'utf-8')
// eslint-disable-next-line no-console
console.log(`Found existing sync watermark in watermark.txt; syncing from ${existing}`)
Expand Down
127 changes: 127 additions & 0 deletions src/dynamic-subscriber.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import algosdk from 'algosdk'
import { AlgorandSubscriber } from './subscriber'
import {
getAlgodSubscribedTransactions,
getArc28EventsToProcess,
getIndexerCatchupTransactions,
prepareSubscriptionPoll,
processExtraSubscriptionTransactionFields,
} from './subscriptions'
import type {
DynamicAlgorandSubscriberConfig,
NamedTransactionFilter,
SubscribedTransaction,
TransactionSubscriptionResult,
} from './types/subscription'
import Algodv2 = algosdk.Algodv2
import Indexer = algosdk.Indexer

export class DynamicAlgorandSubscriber<T> extends AlgorandSubscriber {
private pendingStateChanges: { action: 'append' | 'delete' | 'set'; stateChange: Partial<T> }[] = []
private dynamicConfig: DynamicAlgorandSubscriberConfig<T>

constructor(config: DynamicAlgorandSubscriberConfig<T>, algod: Algodv2, indexer?: Indexer) {
super(
{
filters: [],
...config,
},
algod,
indexer,
)
this.dynamicConfig = config
}

protected override async _pollOnce(watermark: number): Promise<TransactionSubscriptionResult> {
let subscribedTransactions: SubscribedTransaction[] = []
let filterState: T = await this.dynamicConfig.filterStatePersistence.get()

const subscribe = async (filters: NamedTransactionFilter[]) => {
const catchupTransactions = await getIndexerCatchupTransactions(filters, pollMetadata, arc28EventsToProcess, this.indexer)
const algodTransactions = await getAlgodSubscribedTransactions(filters, pollMetadata, arc28EventsToProcess)
const subscribedTransactions = catchupTransactions
.concat(algodTransactions)
.map((t) => processExtraSubscriptionTransactionFields(t, arc28EventsToProcess, this.config.arc28Events ?? []))
this._processFilters({ subscribedTransactions, ...pollMetadata })
return subscribedTransactions
}

const filters = await this.dynamicConfig.dynamicFilters(filterState, 0)
this.filterNames = filters
.map((f) => f.name)
.filter((value, index, self) => {
// Remove duplicates
return self.findIndex((x) => x === value) === index
})
const pollMetadata = await prepareSubscriptionPoll({ ...this.config, watermark, filters }, this.algod)
const arc28EventsToProcess = getArc28EventsToProcess(this.config.arc28Events ?? [])

subscribedTransactions = await subscribe(filters)

let pollLevel = 0
while (this.pendingStateChanges.length > 0) {
let filterStateToProcess = { ...filterState }
for (const change of this.pendingStateChanges) {
switch (change.action) {
case 'append':
for (const key of Object.keys(change.stateChange)) {
const k = key as keyof T
if (!filterState[k] || !Array.isArray(filterState[k])) {
filterState[k] = change.stateChange[k]!
} else {
filterState[k] = (filterState[k] as unknown[]).concat(change.stateChange[k]) as T[keyof T]
}
}
filterStateToProcess = { ...filterStateToProcess, ...change.stateChange }
break
case 'delete':
for (const key of Object.keys(change.stateChange)) {
const k = key as keyof T
delete filterState[k]
delete filterStateToProcess[k]
}
break
case 'set':
filterState = { ...filterState, ...change.stateChange }
filterStateToProcess = { ...filterState, ...change.stateChange }
break
}
}
this.pendingStateChanges = []
const newFilters = await this.dynamicConfig.dynamicFilters(filterStateToProcess, ++pollLevel)
this.filterNames = newFilters
.map((f) => f.name)
.filter((value, index, self) => {
// Remove duplicates
return self.findIndex((x) => x === value) === index
})
subscribedTransactions = subscribedTransactions.concat(await subscribe(newFilters))
}

await this.dynamicConfig.filterStatePersistence.set(filterState)

return {
syncedRoundRange: pollMetadata.syncedRoundRange,
newWatermark: pollMetadata.newWatermark,
currentRound: pollMetadata.currentRound,
subscribedTransactions: subscribedTransactions.sort(
(a, b) => a['confirmed-round']! - b['confirmed-round']! || a['intra-round-offset']! - b['intra-round-offset']!,
),
}
}

appendFilterState(stateChange: Partial<T>) {
this.pendingStateChanges.push({ action: 'append', stateChange })
}

deleteFilterState(stateChange: (keyof T)[]) {
this.pendingStateChanges.push({
action: 'delete',
stateChange: stateChange.reduce((acc, key) => ({ ...acc, [key]: true }), {} as Partial<T>),
})
}

setFilterState(stateChange: Partial<T>) {
this.pendingStateChanges.push({ action: 'set', stateChange })
}
}
5 changes: 3 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './subscriber'
export * from './subscriptions'
export { DynamicAlgorandSubscriber } from './dynamic-subscriber'
export { AlgorandSubscriber } from './subscriber'
export { getSubscribedTransactions } from './subscriptions'
Loading

0 comments on commit 67adb88

Please sign in to comment.