From 42d30b27e9ce22947b7eb59075b17f2f857fa17c Mon Sep 17 00:00:00 2001
From: Gustavo Inacio <gustavo@semiotic.ai>
Date: Fri, 4 Oct 2024 20:16:19 +0200
Subject: [PATCH 1/2] refactor: split tap and scalar

Signed-off-by: Gustavo Inacio <gustavo@semiotic.ai>
---
 .../src/allocations/__tests__/tap.test.ts     |  45 +-
 .../indexer-common/src/allocations/index.ts   |   1 +
 .../src/allocations/query-fees.ts             | 536 ---------------
 .../src/allocations/tap-collector.ts          | 608 ++++++++++++++++++
 packages/indexer-common/src/network.ts        |  33 +-
 5 files changed, 659 insertions(+), 564 deletions(-)
 create mode 100644 packages/indexer-common/src/allocations/tap-collector.ts

diff --git a/packages/indexer-common/src/allocations/__tests__/tap.test.ts b/packages/indexer-common/src/allocations/__tests__/tap.test.ts
index b644ba528..9006aa74e 100644
--- a/packages/indexer-common/src/allocations/__tests__/tap.test.ts
+++ b/packages/indexer-common/src/allocations/__tests__/tap.test.ts
@@ -1,5 +1,4 @@
 import {
-  AllocationReceiptCollector,
   defineQueryFeeModels,
   GraphNode,
   Network,
@@ -18,22 +17,20 @@ import {
 import { testNetworkSpecification } from '../../indexer-management/__tests__/util'
 import { Op, Sequelize } from 'sequelize'
 import { utils } from 'ethers'
+import { TapCollector } from '../tap-collector'
 
 // Make global Jest variables available
 // eslint-disable-next-line @typescript-eslint/no-explicit-any
 declare const __DATABASE__: any
 declare const __LOG_LEVEL__: never
 let logger: Logger
-let receiptCollector: AllocationReceiptCollector
+let tapCollector: TapCollector
 let metrics: Metrics
 let queryFeeModels: QueryFeeModels
 let sequelize: Sequelize
 const timeout = 30000
 
-const startRAVProcessing = jest.spyOn(
-  AllocationReceiptCollector.prototype,
-  'startRAVProcessing',
-)
+const startRAVProcessing = jest.spyOn(TapCollector.prototype, 'startRAVProcessing')
 const setup = async () => {
   logger = createLogger({
     name: 'Indexer API Client',
@@ -61,7 +58,7 @@ const setup = async () => {
     graphNode,
     metrics,
   )
-  receiptCollector = network.receiptCollector
+  tapCollector = network.tapCollector!
 }
 
 const ALLOCATION_ID_1 = toAddress('edde47df40c29949a75a6693c77834c00b8ad626')
@@ -97,7 +94,7 @@ const setupEach = async () => {
   await queryFeeModels.receiptAggregateVouchers.create(rav)
 
   jest
-    .spyOn(receiptCollector, 'findTransactionsForRavs')
+    .spyOn(tapCollector, 'findTransactionsForRavs')
     .mockImplementation(async (): Promise<TapSubgraphResponse> => {
       return {
         transactions: [],
@@ -134,7 +131,7 @@ describe('TAP', () => {
   test(
     'test getPendingRAVs',
     async () => {
-      const ravs = await receiptCollector['pendingRAVs']()
+      const ravs = await tapCollector['pendingRAVs']()
 
       expect(ravs).toEqual([
         expect.objectContaining({
@@ -174,7 +171,7 @@ describe('TAP', () => {
     // it's not showing on the subgraph on a specific point in time
     // the timestamp of the subgraph is greater than the receipt id
     // should revert the rav
-    await receiptCollector['revertRavsRedeemed'](ravList, nowSecs - 1)
+    await tapCollector['revertRavsRedeemed'](ravList, nowSecs - 1)
 
     let lastRedeemedRavs = await queryFeeModels.receiptAggregateVouchers.findAll({
       where: {
@@ -191,7 +188,7 @@ describe('TAP', () => {
       expect.objectContaining(ravList[2]),
     ])
 
-    await receiptCollector['revertRavsRedeemed'](ravList, nowSecs)
+    await tapCollector['revertRavsRedeemed'](ravList, nowSecs)
 
     lastRedeemedRavs = await queryFeeModels.receiptAggregateVouchers.findAll({
       where: {
@@ -207,7 +204,7 @@ describe('TAP', () => {
       expect.objectContaining(ravList[2]),
     ])
 
-    await receiptCollector['revertRavsRedeemed'](ravList, nowSecs + 1)
+    await tapCollector['revertRavsRedeemed'](ravList, nowSecs + 1)
 
     lastRedeemedRavs = await queryFeeModels.receiptAggregateVouchers.findAll({
       where: {
@@ -220,7 +217,7 @@ describe('TAP', () => {
     })
     expect(lastRedeemedRavs).toEqual([expect.objectContaining(ravList[2])])
 
-    await receiptCollector['revertRavsRedeemed'](ravList, nowSecs + 2)
+    await tapCollector['revertRavsRedeemed'](ravList, nowSecs + 2)
 
     lastRedeemedRavs = await queryFeeModels.receiptAggregateVouchers.findAll({
       where: {
@@ -253,7 +250,7 @@ describe('TAP', () => {
     await queryFeeModels.receiptAggregateVouchers.bulkCreate(ravList)
 
     // it's showing on the subgraph on a specific point in time
-    await receiptCollector['revertRavsRedeemed'](
+    await tapCollector['revertRavsRedeemed'](
       [
         {
           allocationId: ALLOCATION_ID_1,
@@ -305,7 +302,7 @@ describe('TAP', () => {
     ]
     await queryFeeModels.receiptAggregateVouchers.bulkCreate(ravList)
 
-    await receiptCollector['markRavsAsFinal'](nowSecs - 1)
+    await tapCollector['markRavsAsFinal'](nowSecs - 1)
 
     let finalRavs = await queryFeeModels.receiptAggregateVouchers.findAll({
       where: { last: true, final: true },
@@ -313,13 +310,13 @@ describe('TAP', () => {
 
     expect(finalRavs).toEqual([])
 
-    await receiptCollector['markRavsAsFinal'](nowSecs)
+    await tapCollector['markRavsAsFinal'](nowSecs)
     finalRavs = await queryFeeModels.receiptAggregateVouchers.findAll({
       where: { last: true, final: true },
     })
     expect(finalRavs).toEqual([expect.objectContaining({ ...ravList[0], final: true })])
 
-    await receiptCollector['markRavsAsFinal'](nowSecs + 1)
+    await tapCollector['markRavsAsFinal'](nowSecs + 1)
     finalRavs = await queryFeeModels.receiptAggregateVouchers.findAll({
       where: { last: true, final: true },
     })
@@ -328,7 +325,7 @@ describe('TAP', () => {
       expect.objectContaining({ ...ravList[1], final: true }),
     ])
 
-    await receiptCollector['markRavsAsFinal'](nowSecs + 2)
+    await tapCollector['markRavsAsFinal'](nowSecs + 2)
     finalRavs = await queryFeeModels.receiptAggregateVouchers.findAll({
       where: { last: true, final: true },
     })
@@ -354,7 +351,7 @@ describe('TAP', () => {
         redeemedAt: new Date(redeemDate),
       }
       await queryFeeModels.receiptAggregateVouchers.create(rav2)
-      const ravs = await receiptCollector['pendingRAVs']()
+      const ravs = await tapCollector['pendingRAVs']()
       // The point is it will only return the rav that is not final
       expect(ravs.length).toEqual(1)
       expect(ravs).toEqual([
@@ -390,8 +387,8 @@ describe('TAP', () => {
       }
       await queryFeeModels.receiptAggregateVouchers.create(rav2)
 
-      let ravs = await receiptCollector['pendingRAVs']()
-      ravs = await receiptCollector['filterAndUpdateRavs'](ravs)
+      let ravs = await tapCollector['pendingRAVs']()
+      ravs = await tapCollector['filterAndUpdateRavs'](ravs)
       // The point is it will only return the rav that is not final
 
       expect(ravs).toEqual([
@@ -458,7 +455,7 @@ describe('TAP', () => {
       const redeemDateSecs = Math.floor(redeemDate / 1000)
       const nowSecs = Math.floor(Date.now() / 1000)
       const anotherFuncSpy = jest
-        .spyOn(receiptCollector, 'findTransactionsForRavs')
+        .spyOn(tapCollector, 'findTransactionsForRavs')
         .mockImplementation(async (): Promise<TapSubgraphResponse> => {
           return {
             transactions: [
@@ -489,8 +486,8 @@ describe('TAP', () => {
         redeemedAt: new Date(redeemDate),
       }
       await queryFeeModels.receiptAggregateVouchers.create(rav2)
-      let ravs = await receiptCollector['pendingRAVs']()
-      ravs = await receiptCollector['filterAndUpdateRavs'](ravs)
+      let ravs = await tapCollector['pendingRAVs']()
+      ravs = await tapCollector['filterAndUpdateRavs'](ravs)
       expect(anotherFuncSpy).toBeCalled()
       const finalRavs = await queryFeeModels.receiptAggregateVouchers.findAll({
         where: { last: true, final: true },
diff --git a/packages/indexer-common/src/allocations/index.ts b/packages/indexer-common/src/allocations/index.ts
index 022489129..a52b572fb 100644
--- a/packages/indexer-common/src/allocations/index.ts
+++ b/packages/indexer-common/src/allocations/index.ts
@@ -1,4 +1,5 @@
 export * from './keys'
 export * from './query-fees'
+export * from './tap-collector'
 export * from './monitor'
 export * from './types'
diff --git a/packages/indexer-common/src/allocations/query-fees.ts b/packages/indexer-common/src/allocations/query-fees.ts
index a914b3d93..53fff3e66 100644
--- a/packages/indexer-common/src/allocations/query-fees.ts
+++ b/packages/indexer-common/src/allocations/query-fees.ts
@@ -9,9 +9,7 @@ import {
   Address,
   Metrics,
   Eventual,
-  join as joinEventual,
 } from '@graphprotocol/common-ts'
-import { NetworkContracts as TapContracts } from '@semiotic-labs/tap-contracts-bindings'
 import {
   Allocation,
   AllocationReceipt,
@@ -19,22 +17,15 @@ import {
   IndexerErrorCode,
   QueryFeeModels,
   Voucher,
-  ReceiptAggregateVoucher,
   ensureAllocationSummary,
   TransactionManager,
   specification as spec,
-  SignedRAV,
-  allocationSigner,
-  tapAllocationIdProof,
-  parseGraphQLAllocation,
 } from '..'
 import { DHeap } from '@thi.ng/heaps'
 import { BigNumber, BigNumberish, Contract } from 'ethers'
 import { Op } from 'sequelize'
 import pReduce from 'p-reduce'
-import { TAPSubgraph } from '../tap-subgraph'
 import { NetworkSubgraph } from '../network-subgraph'
-import gql from 'graphql-tag'
 
 // Receipts are collected with a delay of 20 minutes after
 // the corresponding allocation was closed
@@ -65,11 +56,6 @@ interface ReceiptMetrics {
   vouchersRedeemDuration: Histogram<string>
   vouchersBatchRedeemSize: Gauge<never>
   voucherCollectedFees: Gauge<string>
-  ravRedeemsSuccess: Counter<string>
-  ravRedeemsInvalid: Counter<string>
-  ravRedeemsFailed: Counter<string>
-  ravsRedeemDuration: Histogram<string>
-  ravCollectedFees: Gauge<string>
 }
 
 export interface AllocationPartialVouchers {
@@ -82,11 +68,9 @@ export interface AllocationReceiptCollectorOptions {
   metrics: Metrics
   transactionManager: TransactionManager
   allocationExchange: Contract
-  tapContracts?: TapContracts
   allocations: Eventual<Allocation[]>
   models: QueryFeeModels
   networkSpecification: spec.NetworkSpecification
-  tapSubgraph: TAPSubgraph | undefined
   networkSubgraph: NetworkSubgraph
 }
 
@@ -95,39 +79,12 @@ export interface ReceiptCollector {
   collectReceipts(actionID: number, allocation: Allocation): Promise<boolean>
 }
 
-interface ValidRavs {
-  belowThreshold: RavWithAllocation[]
-  eligible: RavWithAllocation[]
-}
-
-interface RavWithAllocation {
-  rav: SignedRAV
-  allocation: Allocation
-  sender: Address
-}
-
-export interface TapSubgraphResponse {
-  transactions: {
-    allocationID: string
-    timestamp: number
-    sender: {
-      id: string
-    }
-  }[]
-  _meta: {
-    block: {
-      timestamp: number
-    }
-  }
-}
-
 export class AllocationReceiptCollector implements ReceiptCollector {
   declare logger: Logger
   declare metrics: ReceiptMetrics
   declare models: QueryFeeModels
   declare transactionManager: TransactionManager
   declare allocationExchange: Contract
-  declare tapContracts: TapContracts | undefined
   declare allocations: Eventual<Allocation[]>
   declare collectEndpoint: URL
   declare partialVoucherEndpoint: URL
@@ -137,9 +94,7 @@ export class AllocationReceiptCollector implements ReceiptCollector {
   declare voucherRedemptionBatchThreshold: BigNumber
   declare voucherRedemptionMaxBatchSize: number
   declare protocolNetwork: string
-  declare tapSubgraph: TAPSubgraph | undefined
   declare networkSubgraph: NetworkSubgraph
-  declare finalityTime: number
 
   // eslint-disable-next-line @typescript-eslint/no-empty-function -- Private constructor to prevent direct instantiation
   private constructor() {}
@@ -150,10 +105,8 @@ export class AllocationReceiptCollector implements ReceiptCollector {
     transactionManager,
     models,
     allocationExchange,
-    tapContracts,
     allocations,
     networkSpecification,
-    tapSubgraph,
     networkSubgraph,
   }: AllocationReceiptCollectorOptions): Promise<AllocationReceiptCollector> {
     const collector = new AllocationReceiptCollector()
@@ -165,10 +118,8 @@ export class AllocationReceiptCollector implements ReceiptCollector {
     collector.transactionManager = transactionManager
     collector.models = models
     collector.allocationExchange = allocationExchange
-    collector.tapContracts = tapContracts
     collector.allocations = allocations
     collector.protocolNetwork = networkSpecification.networkIdentifier
-    collector.tapSubgraph = tapSubgraph
     collector.networkSubgraph = networkSubgraph
 
     // Process Gateway routes
@@ -181,26 +132,16 @@ export class AllocationReceiptCollector implements ReceiptCollector {
       voucherRedemptionThreshold,
       voucherRedemptionBatchThreshold,
       voucherRedemptionMaxBatchSize,
-      finalityTime,
     } = networkSpecification.indexerOptions
     collector.voucherRedemptionThreshold = voucherRedemptionThreshold
     collector.voucherRedemptionBatchThreshold = voucherRedemptionBatchThreshold
     collector.voucherRedemptionMaxBatchSize = voucherRedemptionMaxBatchSize
-    collector.finalityTime = finalityTime
 
     // Start the AllocationReceiptCollector
     // TODO: Consider calling methods conditionally based on a boolean
     // flag during startup.
     collector.startReceiptCollecting()
     collector.startVoucherProcessing()
-    if (collector.tapContracts && collector.tapSubgraph) {
-      collector.logger.info(`RAV processing is initiated`)
-      collector.startRAVProcessing()
-    } else {
-      collector.logger.info(`RAV process not initiated. 
-        Tap Contracts: ${!!collector.tapContracts}. 
-        Tap Subgraph: ${!!collector.tapSubgraph}.`)
-    }
     await collector.queuePendingReceiptsFromDatabase()
     return collector
   }
@@ -444,311 +385,6 @@ export class AllocationReceiptCollector implements ReceiptCollector {
     })
   }
 
-  startRAVProcessing() {
-    const notifyAndMapEligible = (signedRavs: ValidRavs) => {
-      if (signedRavs.belowThreshold.length > 0) {
-        const logger = this.logger.child({ function: 'startRAVProcessing()' })
-        const totalValueGRT = formatGRT(
-          signedRavs.belowThreshold.reduce(
-            (total, signedRav) =>
-              total.add(BigNumber.from(signedRav.rav.rav.valueAggregate)),
-            BigNumber.from(0),
-          ),
-        )
-        logger.info(`Query RAVs below the redemption threshold`, {
-          hint: 'If you would like to redeem vouchers like this, reduce the voucher redemption threshold',
-          voucherRedemptionThreshold: formatGRT(this.voucherRedemptionThreshold),
-          belowThresholdCount: signedRavs.belowThreshold.length,
-          totalValueGRT,
-          allocations: signedRavs.belowThreshold.map(
-            (signedRav) => signedRav.rav.rav.allocationId,
-          ),
-        })
-      }
-      return signedRavs.eligible
-    }
-
-    const pendingRAVs = this.getPendingRAVs()
-    const signedRAVs = this.getSignedRAVsEventual(pendingRAVs)
-    const eligibleRAVs = signedRAVs
-      .map(notifyAndMapEligible)
-      .filter((signedRavs) => signedRavs.length > 0)
-    eligibleRAVs.pipe(async (ravs) => await this.submitRAVs(ravs))
-  }
-
-  private getPendingRAVs(): Eventual<RavWithAllocation[]> {
-    return joinEventual({
-      timer: timer(30_000),
-    }).tryMap(
-      async () => {
-        let ravs = await this.pendingRAVs()
-        if (ravs.length === 0) {
-          this.logger.info(`No pending RAVs to process`)
-          return []
-        }
-        if (ravs.length > 0) {
-          ravs = await this.filterAndUpdateRavs(ravs)
-        }
-        const allocations: Allocation[] = await this.getAllocationsfromAllocationIds(ravs)
-        this.logger.info(
-          `Retrieved allocations for pending RAVs \n: ${JSON.stringify(allocations)}`,
-        )
-        return ravs
-          .map((rav) => {
-            const signedRav = rav.getSignedRAV()
-            return {
-              rav: signedRav,
-              allocation: allocations.find(
-                (a) => a.id === toAddress(signedRav.rav.allocationId),
-              ),
-              sender: rav.senderAddress,
-            }
-          })
-          .filter((rav) => rav.allocation !== undefined) as RavWithAllocation[] // this is safe because we filter out undefined allocations
-      },
-      { onError: (err) => this.logger.error(`Failed to query pending RAVs`, { err }) },
-    )
-  }
-
-  private async getAllocationsfromAllocationIds(
-    ravs: ReceiptAggregateVoucher[],
-  ): Promise<Allocation[]> {
-    const allocationIds: string[] = ravs.map((rav) =>
-      rav.getSignedRAV().rav.allocationId.toLowerCase(),
-    )
-    // eslint-disable-next-line @typescript-eslint/no-explicit-any
-    const returnedAllocations: any[] = (
-      await this.networkSubgraph.query(
-        gql`
-          query allocations($allocationIds: [String!]!) {
-            allocations(where: { id_in: $allocationIds }) {
-              id
-              status
-              subgraphDeployment {
-                id
-                stakedTokens
-                signalledTokens
-                queryFeesAmount
-                deniedAt
-              }
-              indexer {
-                id
-              }
-              allocatedTokens
-              createdAtEpoch
-              createdAtBlockHash
-              closedAtEpoch
-              closedAtEpoch
-              closedAtBlockHash
-              poi
-              queryFeeRebates
-              queryFeesCollected
-            }
-          }
-        `,
-        { allocationIds },
-      )
-    ).data.allocations
-
-    if (returnedAllocations.length == 0) {
-      this.logger.error(
-        `No allocations returned for ${allocationIds} in network subgraph`,
-      )
-    }
-    // eslint-disable-next-line @typescript-eslint/no-explicit-any
-    return returnedAllocations.map((x) => parseGraphQLAllocation(x, this.protocolNetwork))
-  }
-
-  private getSignedRAVsEventual(
-    pendingRAVs: Eventual<RavWithAllocation[]>,
-  ): Eventual<ValidRavs> {
-    return pendingRAVs.tryMap(
-      async (pendingRAVs) => {
-        return await pReduce(
-          pendingRAVs,
-          async (results, rav) => {
-            if (
-              BigNumber.from(rav.rav.rav.valueAggregate).lt(
-                this.voucherRedemptionThreshold,
-              )
-            ) {
-              results.belowThreshold.push(rav)
-            } else {
-              results.eligible.push(rav)
-            }
-            return results
-          },
-          { belowThreshold: <RavWithAllocation[]>[], eligible: <RavWithAllocation[]>[] },
-        )
-      },
-      { onError: (err) => this.logger.error(`Failed to reduce to signed RAVs`, { err }) },
-    )
-  }
-
-  // redeem only if last is true
-  // Later can add order and limit
-  private async pendingRAVs(): Promise<ReceiptAggregateVoucher[]> {
-    return await this.models.receiptAggregateVouchers.findAll({
-      where: { last: true, final: false },
-    })
-  }
-
-  private async filterAndUpdateRavs(
-    ravsLastNotFinal: ReceiptAggregateVoucher[],
-  ): Promise<ReceiptAggregateVoucher[]> {
-    const tapSubgraphResponse = await this.findTransactionsForRavs(ravsLastNotFinal)
-
-    const redeemedRavsNotOnOurDatabase = tapSubgraphResponse.transactions.filter(
-      (tx) =>
-        !ravsLastNotFinal.find(
-          (rav) =>
-            toAddress(rav.senderAddress) === toAddress(tx.sender.id) &&
-            toAddress(rav.allocationId) === toAddress(tx.allocationID),
-        ),
-    )
-
-    // for each transaction that is not redeemed on our database
-    // but was redeemed on the blockchain, update it to redeemed
-    if (redeemedRavsNotOnOurDatabase.length > 0) {
-      for (const rav of redeemedRavsNotOnOurDatabase) {
-        await this.markRavAsRedeemed(
-          toAddress(rav.allocationID),
-          toAddress(rav.sender.id),
-          rav.timestamp,
-        )
-      }
-    }
-
-    // Filter unfinalized RAVS fetched from DB, keeping RAVs that have not yet been redeemed on-chain
-    const nonRedeemedRavs = ravsLastNotFinal
-      .filter((rav) => !!rav.redeemedAt)
-      .filter(
-        (rav) =>
-          !tapSubgraphResponse.transactions.find(
-            (tx) =>
-              toAddress(rav.senderAddress) === toAddress(tx.sender.id) &&
-              toAddress(rav.allocationId) === toAddress(tx.allocationID),
-          ),
-      )
-
-    // we use the subgraph timestamp to make decisions
-    // block timestamp minus 1 minute (because of blockchain timestamp uncertainty)
-    const ONE_MINUTE = 60
-    const blockTimestampSecs = tapSubgraphResponse._meta.block.timestamp - ONE_MINUTE
-
-    // Mark RAVs as unredeemed in DB if the TAP subgraph couldn't find the redeem Tx.
-    // To handle a chain reorg that "unredeemed" the RAVs.
-    if (nonRedeemedRavs.length > 0) {
-      await this.revertRavsRedeemed(nonRedeemedRavs, blockTimestampSecs)
-    }
-
-    // For all RAVs that passed finality time, we mark it as final
-    await this.markRavsAsFinal(blockTimestampSecs)
-
-    return await this.models.receiptAggregateVouchers.findAll({
-      where: { redeemedAt: null, final: false, last: true },
-    })
-  }
-
-  public async findTransactionsForRavs(
-    ravs: ReceiptAggregateVoucher[],
-  ): Promise<TapSubgraphResponse> {
-    const response = await this.tapSubgraph!.query<TapSubgraphResponse>(
-      gql`
-        query transactions(
-          $unfinalizedRavsAllocationIds: [String!]!
-          $senderAddresses: [String!]!
-        ) {
-          transactions(
-            where: {
-              type: "redeem"
-              allocationID_in: $unfinalizedRavsAllocationIds
-              sender_: { id_in: $senderAddresses }
-            }
-          ) {
-            allocationID
-            timestamp
-            sender {
-              id
-            }
-          }
-          _meta {
-            block {
-              timestamp
-            }
-          }
-        }
-      `,
-      {
-        unfinalizedRavsAllocationIds: ravs.map((value) =>
-          toAddress(value.allocationId).toLowerCase(),
-        ),
-        senderAddresses: ravs.map((value) =>
-          toAddress(value.senderAddress).toLowerCase(),
-        ),
-      },
-    )
-    if (!response.data) {
-      throw `There was an error while querying Tap Subgraph. Errors: ${response.error}`
-    }
-
-    return response.data
-  }
-
-  // for every allocation_id of this list that contains the redeemedAt less than the current
-  // subgraph timestamp
-  private async revertRavsRedeemed(
-    ravsNotRedeemed: { allocationId: Address; senderAddress: Address }[],
-    blockTimestampSecs: number,
-  ) {
-    if (ravsNotRedeemed.length == 0) {
-      return
-    }
-
-    // WE use sql directly due to a bug in sequelize update:
-    // https://github.com/sequelize/sequelize/issues/7664 (bug been open for 7 years no fix yet or ever)
-    const query = `
-        UPDATE scalar_tap_ravs
-        SET redeemed_at = NULL
-        WHERE (allocation_id::char(40), sender_address::char(40)) IN (VALUES ${ravsNotRedeemed
-          .map(
-            (rav) =>
-              `('${rav.allocationId
-                .toString()
-                .toLowerCase()
-                .replace('0x', '')}'::char(40), '${rav.senderAddress
-                .toString()
-                .toLowerCase()
-                .replace('0x', '')}'::char(40))`,
-          )
-          .join(', ')})
-        AND redeemed_at < to_timestamp(${blockTimestampSecs})
-      `
-
-    await this.models.receiptAggregateVouchers.sequelize?.query(query)
-
-    this.logger.warn(
-      `Reverted Redeemed RAVs: ${ravsNotRedeemed
-        .map((rav) => `(${rav.senderAddress},${rav.allocationId})`)
-        .join(', ')}`,
-    )
-  }
-
-  // we use blockTimestamp instead of NOW() because we must be older than
-  // the subgraph timestamp
-  private async markRavsAsFinal(blockTimestampSecs: number) {
-    const query = `
-        UPDATE scalar_tap_ravs
-        SET final = TRUE
-        WHERE last = TRUE 
-        AND final = FALSE 
-        AND redeemed_at IS NOT NULL
-        AND redeemed_at < to_timestamp(${blockTimestampSecs - this.finalityTime})
-      `
-
-    await this.models.receiptAggregateVouchers.sequelize?.query(query)
-  }
-
   private encodeReceiptBatch(receipts: AllocationReceipt[]): BytesWriter {
     // Encode the receipt batch to a buffer
     // [allocationId, receipts[]] (in bytes)
@@ -998,143 +634,6 @@ export class AllocationReceiptCollector implements ReceiptCollector {
     }
   }
 
-  private async submitRAVs(signedRavs: RavWithAllocation[]): Promise<void> {
-    const logger = this.logger.child({
-      function: 'submitRAVs()',
-      ravsToSubmit: signedRavs.length,
-    })
-    if (!this.tapContracts) {
-      logger.error(
-        `Undefined escrow contracts, but this shouldn't happen as RAV process is only triggered when escrow is provided. \n
-        If this error is encountered please report and oepn an issue at https://github.com/graphprotocol/indexer/issues`,
-        {
-          signedRavs,
-        },
-      )
-      return
-    }
-    const escrow = this.tapContracts
-
-    logger.info(`Redeem last RAVs on chain individually`, {
-      signedRavs,
-    })
-
-    // Redeem RAV one-by-one as no plual version available
-    for (const { rav: signedRav, allocation, sender } of signedRavs) {
-      const { rav } = signedRav
-      const stopTimer = this.metrics.ravsRedeemDuration.startTimer({
-        allocation: rav.allocationId,
-      })
-      try {
-        const proof = await tapAllocationIdProof(
-          allocationSigner(this.transactionManager.wallet, allocation),
-          parseInt(this.protocolNetwork.split(':')[1]),
-          sender,
-          toAddress(rav.allocationId),
-          toAddress(escrow.escrow.address),
-        )
-        this.logger.debug(`Computed allocationIdProof`, {
-          allocationId: rav.allocationId,
-          proof,
-        })
-        // Submit the signed RAV on chain
-        const txReceipt = await this.transactionManager.executeTransaction(
-          () => escrow.escrow.estimateGas.redeem(signedRav, proof),
-          (gasLimit) =>
-            escrow.escrow.redeem(signedRav, proof, {
-              gasLimit,
-            }),
-          logger.child({ function: 'redeem' }),
-        )
-
-        // get tx receipt and post process
-        if (txReceipt === 'paused' || txReceipt === 'unauthorized') {
-          this.metrics.ravRedeemsInvalid.inc({ allocation: rav.allocationId })
-          return
-        }
-        this.metrics.ravCollectedFees.set(
-          { allocation: rav.allocationId },
-          parseFloat(rav.valueAggregate.toString()),
-        )
-
-        try {
-          await this.markRavAsRedeemed(toAddress(rav.allocationId), sender)
-          logger.info(
-            `Updated receipt aggregate vouchers table with redeemed_at for allocation ${rav.allocationId} and sender ${sender}`,
-          )
-        } catch (err) {
-          logger.warn(
-            `Failed to update receipt aggregate voucher table with redeemed_at for allocation ${rav.allocationId}`,
-            {
-              err,
-            },
-          )
-        }
-      } catch (err) {
-        this.metrics.ravRedeemsFailed.inc({ allocation: rav.allocationId })
-        logger.error(`Failed to redeem RAV`, {
-          err: indexerError(IndexerErrorCode.IE055, err),
-        })
-        return
-      }
-      stopTimer()
-    }
-
-    try {
-      // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-      await this.models.allocationSummaries.sequelize!.transaction(
-        async (transaction) => {
-          for (const { rav: signedRav } of signedRavs) {
-            const { rav } = signedRav
-            const [summary] = await ensureAllocationSummary(
-              this.models,
-              toAddress(rav.allocationId),
-              transaction,
-              this.protocolNetwork,
-            )
-            summary.withdrawnFees = BigNumber.from(summary.withdrawnFees)
-              .add(rav.valueAggregate)
-              .toString()
-            await summary.save({ transaction })
-          }
-        },
-      )
-
-      logger.info(`Updated allocation summaries table with withdrawn fees`)
-    } catch (err) {
-      logger.warn(`Failed to update allocation summaries`, {
-        err,
-      })
-    }
-
-    signedRavs.map((signedRav) =>
-      this.metrics.ravRedeemsSuccess.inc({ allocation: signedRav.allocation.id }),
-    )
-  }
-
-  private async markRavAsRedeemed(
-    allocationId: Address,
-    senderAddress: Address,
-    timestamp?: number,
-  ) {
-    // WE use sql directly due to a bug in sequelize update:
-    // https://github.com/sequelize/sequelize/issues/7664 (bug been open for 7 years no fix yet or ever)
-    const query = `
-            UPDATE scalar_tap_ravs
-            SET redeemed_at = ${timestamp ? timestamp : 'NOW()'}
-            WHERE allocation_id = '${allocationId
-              .toString()
-              .toLowerCase()
-              .replace('0x', '')}'
-            AND sender_address = '${senderAddress
-              .toString()
-              .toLowerCase()
-              .replace('0x', '')}'
-          `
-
-    await this.models.receiptAggregateVouchers.sequelize?.query(query)
-  }
-
   public async queuePendingReceiptsFromDatabase(): Promise<void> {
     // Obtain all closed allocations
     const closedAllocations = await this.models.allocationSummaries.findAll({
@@ -1280,41 +779,6 @@ const registerReceiptMetrics = (metrics: Metrics, networkIdentifier: string) =>
     registers: [metrics.registry],
     labelNames: ['allocation'],
   }),
-
-  ravRedeemsSuccess: new metrics.client.Counter({
-    name: `indexer_agent_rav_exchanges_ok_${networkIdentifier}`,
-    help: 'Successfully redeemed ravs',
-    registers: [metrics.registry],
-    labelNames: ['allocation'],
-  }),
-
-  ravRedeemsInvalid: new metrics.client.Counter({
-    name: `indexer_agent_rav_exchanges_invalid_${networkIdentifier}`,
-    help: 'Invalid ravs redeems - tx paused or unauthorized',
-    registers: [metrics.registry],
-    labelNames: ['allocation'],
-  }),
-
-  ravRedeemsFailed: new metrics.client.Counter({
-    name: `indexer_agent_rav_redeems_failed_${networkIdentifier}`,
-    help: 'Failed redeems for ravs',
-    registers: [metrics.registry],
-    labelNames: ['allocation'],
-  }),
-
-  ravsRedeemDuration: new metrics.client.Histogram({
-    name: `indexer_agent_ravs_redeem_duration_${networkIdentifier}`,
-    help: 'Duration of redeeming ravs',
-    registers: [metrics.registry],
-    labelNames: ['allocation'],
-  }),
-
-  ravCollectedFees: new metrics.client.Gauge({
-    name: `indexer_agent_rav_collected_fees_${networkIdentifier}`,
-    help: 'Amount of query fees collected for a rav',
-    registers: [metrics.registry],
-    labelNames: ['allocation'],
-  }),
 })
 
 interface GatewayRoutes {
diff --git a/packages/indexer-common/src/allocations/tap-collector.ts b/packages/indexer-common/src/allocations/tap-collector.ts
new file mode 100644
index 000000000..95a51564e
--- /dev/null
+++ b/packages/indexer-common/src/allocations/tap-collector.ts
@@ -0,0 +1,608 @@
+import { Counter, Gauge, Histogram } from 'prom-client'
+import {
+  Logger,
+  timer,
+  toAddress,
+  formatGRT,
+  Address,
+  Metrics,
+  Eventual,
+  join as joinEventual,
+} from '@graphprotocol/common-ts'
+import { NetworkContracts as TapContracts } from '@semiotic-labs/tap-contracts-bindings'
+import {
+  Allocation,
+  indexerError,
+  IndexerErrorCode,
+  QueryFeeModels,
+  ReceiptAggregateVoucher,
+  ensureAllocationSummary,
+  TransactionManager,
+  specification as spec,
+  SignedRAV,
+  allocationSigner,
+  tapAllocationIdProof,
+  parseGraphQLAllocation,
+} from '..'
+import { BigNumber } from 'ethers'
+import pReduce from 'p-reduce'
+import { TAPSubgraph } from '../tap-subgraph'
+import { NetworkSubgraph } from '../network-subgraph'
+import gql from 'graphql-tag'
+
+const RAV_CHECK_INTERVAL_MS = 30_000
+
+interface RavMetrics {
+  ravRedeemsSuccess: Counter<string>
+  ravRedeemsInvalid: Counter<string>
+  ravRedeemsFailed: Counter<string>
+  ravsRedeemDuration: Histogram<string>
+  ravCollectedFees: Gauge<string>
+}
+
+interface TapCollectorOptions {
+  logger: Logger
+  metrics: Metrics
+  transactionManager: TransactionManager
+  tapContracts: TapContracts
+  allocations: Eventual<Allocation[]>
+  models: QueryFeeModels
+  networkSpecification: spec.NetworkSpecification
+  tapSubgraph: TAPSubgraph
+  networkSubgraph: NetworkSubgraph
+}
+
+interface ValidRavs {
+  belowThreshold: RavWithAllocation[]
+  eligible: RavWithAllocation[]
+}
+
+interface RavWithAllocation {
+  rav: SignedRAV
+  allocation: Allocation
+  sender: Address
+}
+
+export interface TapSubgraphResponse {
+  transactions: {
+    allocationID: string
+    timestamp: number
+    sender: {
+      id: string
+    }
+  }[]
+  _meta: {
+    block: {
+      timestamp: number
+    }
+  }
+}
+
+export class TapCollector {
+  declare logger: Logger
+  declare metrics: RavMetrics
+  declare models: QueryFeeModels
+  declare transactionManager: TransactionManager
+  declare tapContracts: TapContracts
+  declare allocations: Eventual<Allocation[]>
+  declare ravRedemptionThreshold: BigNumber
+  declare protocolNetwork: string
+  declare tapSubgraph: TAPSubgraph
+  declare networkSubgraph: NetworkSubgraph
+  declare finalityTime: number
+
+  // eslint-disable-next-line @typescript-eslint/no-empty-function -- Private constructor to prevent direct instantiation
+  private constructor() {}
+
+  public static async create({
+    logger,
+    metrics,
+    transactionManager,
+    models,
+    tapContracts,
+    allocations,
+    networkSpecification,
+    tapSubgraph,
+    networkSubgraph,
+  }: TapCollectorOptions): Promise<TapCollector> {
+    const collector = new TapCollector()
+    collector.logger = logger.child({ component: 'AllocationReceiptCollector' })
+    collector.metrics = registerReceiptMetrics(
+      metrics,
+      networkSpecification.networkIdentifier,
+    )
+    collector.transactionManager = transactionManager
+    collector.models = models
+    collector.tapContracts = tapContracts
+    collector.allocations = allocations
+    collector.protocolNetwork = networkSpecification.networkIdentifier
+    collector.tapSubgraph = tapSubgraph
+    collector.networkSubgraph = networkSubgraph
+
+    const { voucherRedemptionThreshold, finalityTime } =
+      networkSpecification.indexerOptions
+    collector.ravRedemptionThreshold = voucherRedemptionThreshold
+    collector.finalityTime = finalityTime
+
+    collector.logger.info(`RAV processing is initiated`)
+    collector.startRAVProcessing()
+    return collector
+  }
+
+  startRAVProcessing() {
+    const notifyAndMapEligible = (signedRavs: ValidRavs) => {
+      if (signedRavs.belowThreshold.length > 0) {
+        const logger = this.logger.child({ function: 'startRAVProcessing()' })
+        const totalValueGRT = formatGRT(
+          signedRavs.belowThreshold.reduce(
+            (total, signedRav) =>
+              total.add(BigNumber.from(signedRav.rav.rav.valueAggregate)),
+            BigNumber.from(0),
+          ),
+        )
+        logger.info(`Query RAVs below the redemption threshold`, {
+          hint: 'If you would like to redeem RAVs like this, reduce the voucher redemption threshold',
+          ravRedemptionThreshold: formatGRT(this.ravRedemptionThreshold),
+          belowThresholdCount: signedRavs.belowThreshold.length,
+          totalValueGRT,
+          allocations: signedRavs.belowThreshold.map(
+            (signedRav) => signedRav.rav.rav.allocationId,
+          ),
+        })
+      }
+      return signedRavs.eligible
+    }
+
+    const pendingRAVs = this.getPendingRAVs()
+    const signedRAVs = this.getSignedRAVsEventual(pendingRAVs)
+    const eligibleRAVs = signedRAVs
+      .map(notifyAndMapEligible)
+      .filter((signedRavs) => signedRavs.length > 0)
+    eligibleRAVs.pipe(async (ravs) => await this.submitRAVs(ravs))
+  }
+
+  private getPendingRAVs(): Eventual<RavWithAllocation[]> {
+    return joinEventual({
+      timer: timer(RAV_CHECK_INTERVAL_MS),
+    }).tryMap(
+      async () => {
+        let ravs = await this.pendingRAVs()
+        if (ravs.length === 0) {
+          this.logger.info(`No pending RAVs to process`)
+          return []
+        }
+        if (ravs.length > 0) {
+          ravs = await this.filterAndUpdateRavs(ravs)
+        }
+        const allocations: Allocation[] = await this.getAllocationsfromAllocationIds(ravs)
+        this.logger.info(
+          `Retrieved allocations for pending RAVs \n: ${JSON.stringify(allocations)}`,
+        )
+        return ravs
+          .map((rav) => {
+            const signedRav = rav.getSignedRAV()
+            return {
+              rav: signedRav,
+              allocation: allocations.find(
+                (a) => a.id === toAddress(signedRav.rav.allocationId),
+              ),
+              sender: rav.senderAddress,
+            }
+          })
+          .filter((rav) => rav.allocation !== undefined) as RavWithAllocation[] // this is safe because we filter out undefined allocations
+      },
+      { onError: (err) => this.logger.error(`Failed to query pending RAVs`, { err }) },
+    )
+  }
+
+  private async getAllocationsfromAllocationIds(
+    ravs: ReceiptAggregateVoucher[],
+  ): Promise<Allocation[]> {
+    const allocationIds: string[] = ravs.map((rav) =>
+      rav.getSignedRAV().rav.allocationId.toLowerCase(),
+    )
+    // eslint-disable-next-line @typescript-eslint/no-explicit-any
+    const returnedAllocations: any[] = (
+      await this.networkSubgraph.query(
+        gql`
+          query allocations($allocationIds: [String!]!) {
+            allocations(where: { id_in: $allocationIds }) {
+              id
+              status
+              subgraphDeployment {
+                id
+                stakedTokens
+                signalledTokens
+                queryFeesAmount
+                deniedAt
+              }
+              indexer {
+                id
+              }
+              allocatedTokens
+              createdAtEpoch
+              createdAtBlockHash
+              closedAtEpoch
+              closedAtEpoch
+              closedAtBlockHash
+              poi
+              queryFeeRebates
+              queryFeesCollected
+            }
+          }
+        `,
+        { allocationIds },
+      )
+    ).data.allocations
+
+    if (returnedAllocations.length == 0) {
+      this.logger.error(
+        `No allocations returned for ${allocationIds} in network subgraph`,
+      )
+    }
+    // eslint-disable-next-line @typescript-eslint/no-explicit-any
+    return returnedAllocations.map((x) => parseGraphQLAllocation(x, this.protocolNetwork))
+  }
+
+  private getSignedRAVsEventual(
+    pendingRAVs: Eventual<RavWithAllocation[]>,
+  ): Eventual<ValidRavs> {
+    return pendingRAVs.tryMap(
+      async (pendingRAVs) => {
+        return await pReduce(
+          pendingRAVs,
+          async (results, rav) => {
+            if (
+              BigNumber.from(rav.rav.rav.valueAggregate).lt(this.ravRedemptionThreshold)
+            ) {
+              results.belowThreshold.push(rav)
+            } else {
+              results.eligible.push(rav)
+            }
+            return results
+          },
+          { belowThreshold: <RavWithAllocation[]>[], eligible: <RavWithAllocation[]>[] },
+        )
+      },
+      { onError: (err) => this.logger.error(`Failed to reduce to signed RAVs`, { err }) },
+    )
+  }
+
+  // redeem only if last is true
+  // Later can add order and limit
+  private async pendingRAVs(): Promise<ReceiptAggregateVoucher[]> {
+    return await this.models.receiptAggregateVouchers.findAll({
+      where: { last: true, final: false },
+    })
+  }
+
+  private async filterAndUpdateRavs(
+    ravsLastNotFinal: ReceiptAggregateVoucher[],
+  ): Promise<ReceiptAggregateVoucher[]> {
+    const tapSubgraphResponse = await this.findTransactionsForRavs(ravsLastNotFinal)
+
+    const redeemedRavsNotOnOurDatabase = tapSubgraphResponse.transactions.filter(
+      (tx) =>
+        !ravsLastNotFinal.find(
+          (rav) =>
+            toAddress(rav.senderAddress) === toAddress(tx.sender.id) &&
+            toAddress(rav.allocationId) === toAddress(tx.allocationID),
+        ),
+    )
+
+    // for each transaction that is not redeemed on our database
+    // but was redeemed on the blockchain, update it to redeemed
+    if (redeemedRavsNotOnOurDatabase.length > 0) {
+      for (const rav of redeemedRavsNotOnOurDatabase) {
+        await this.markRavAsRedeemed(
+          toAddress(rav.allocationID),
+          toAddress(rav.sender.id),
+          rav.timestamp,
+        )
+      }
+    }
+
+    // Filter unfinalized RAVS fetched from DB, keeping RAVs that have not yet been redeemed on-chain
+    const nonRedeemedRavs = ravsLastNotFinal
+      .filter((rav) => !!rav.redeemedAt)
+      .filter(
+        (rav) =>
+          !tapSubgraphResponse.transactions.find(
+            (tx) =>
+              toAddress(rav.senderAddress) === toAddress(tx.sender.id) &&
+              toAddress(rav.allocationId) === toAddress(tx.allocationID),
+          ),
+      )
+
+    // we use the subgraph timestamp to make decisions
+    // block timestamp minus 1 minute (because of blockchain timestamp uncertainty)
+    const ONE_MINUTE = 60
+    const blockTimestampSecs = tapSubgraphResponse._meta.block.timestamp - ONE_MINUTE
+
+    // Mark RAVs as unredeemed in DB if the TAP subgraph couldn't find the redeem Tx.
+    // To handle a chain reorg that "unredeemed" the RAVs.
+    if (nonRedeemedRavs.length > 0) {
+      await this.revertRavsRedeemed(nonRedeemedRavs, blockTimestampSecs)
+    }
+
+    // For all RAVs that passed finality time, we mark it as final
+    await this.markRavsAsFinal(blockTimestampSecs)
+
+    return await this.models.receiptAggregateVouchers.findAll({
+      where: { redeemedAt: null, final: false, last: true },
+    })
+  }
+
+  public async findTransactionsForRavs(
+    ravs: ReceiptAggregateVoucher[],
+  ): Promise<TapSubgraphResponse> {
+    const response = await this.tapSubgraph!.query<TapSubgraphResponse>(
+      gql`
+        query transactions(
+          $unfinalizedRavsAllocationIds: [String!]!
+          $senderAddresses: [String!]!
+        ) {
+          transactions(
+            where: {
+              type: "redeem"
+              allocationID_in: $unfinalizedRavsAllocationIds
+              sender_: { id_in: $senderAddresses }
+            }
+          ) {
+            allocationID
+            timestamp
+            sender {
+              id
+            }
+          }
+          _meta {
+            block {
+              timestamp
+            }
+          }
+        }
+      `,
+      {
+        unfinalizedRavsAllocationIds: ravs.map((value) =>
+          toAddress(value.allocationId).toLowerCase(),
+        ),
+        senderAddresses: ravs.map((value) =>
+          toAddress(value.senderAddress).toLowerCase(),
+        ),
+      },
+    )
+    if (!response.data) {
+      throw `There was an error while querying Tap Subgraph. Errors: ${response.error}`
+    }
+
+    return response.data
+  }
+
+  // for every allocation_id of this list that contains the redeemedAt less than the current
+  // subgraph timestamp
+  private async revertRavsRedeemed(
+    ravsNotRedeemed: { allocationId: Address; senderAddress: Address }[],
+    blockTimestampSecs: number,
+  ) {
+    if (ravsNotRedeemed.length == 0) {
+      return
+    }
+
+    // WE use sql directly due to a bug in sequelize update:
+    // https://github.com/sequelize/sequelize/issues/7664 (bug been open for 7 years no fix yet or ever)
+    const query = `
+        UPDATE scalar_tap_ravs
+        SET redeemed_at = NULL
+        WHERE (allocation_id::char(40), sender_address::char(40)) IN (VALUES ${ravsNotRedeemed
+          .map(
+            (rav) =>
+              `('${rav.allocationId
+                .toString()
+                .toLowerCase()
+                .replace('0x', '')}'::char(40), '${rav.senderAddress
+                .toString()
+                .toLowerCase()
+                .replace('0x', '')}'::char(40))`,
+          )
+          .join(', ')})
+        AND redeemed_at < to_timestamp(${blockTimestampSecs})
+      `
+
+    await this.models.receiptAggregateVouchers.sequelize?.query(query)
+
+    this.logger.warn(
+      `Reverted Redeemed RAVs: ${ravsNotRedeemed
+        .map((rav) => `(${rav.senderAddress},${rav.allocationId})`)
+        .join(', ')}`,
+    )
+  }
+
+  // we use blockTimestamp instead of NOW() because we must be older than
+  // the subgraph timestamp
+  private async markRavsAsFinal(blockTimestampSecs: number) {
+    const query = `
+        UPDATE scalar_tap_ravs
+        SET final = TRUE
+        WHERE last = TRUE 
+        AND final = FALSE 
+        AND redeemed_at IS NOT NULL
+        AND redeemed_at < to_timestamp(${blockTimestampSecs - this.finalityTime})
+      `
+
+    await this.models.receiptAggregateVouchers.sequelize?.query(query)
+  }
+
+  private async submitRAVs(signedRavs: RavWithAllocation[]): Promise<void> {
+    const logger = this.logger.child({
+      function: 'submitRAVs()',
+      ravsToSubmit: signedRavs.length,
+    })
+    if (!this.tapContracts) {
+      logger.error(
+        `Undefined escrow contracts, but this shouldn't happen as RAV process is only triggered when escrow is provided. \n
+        If this error is encountered please report and oepn an issue at https://github.com/graphprotocol/indexer/issues`,
+        {
+          signedRavs,
+        },
+      )
+      return
+    }
+    const escrow = this.tapContracts
+
+    logger.info(`Redeem last RAVs on chain individually`, {
+      signedRavs,
+    })
+
+    // Redeem RAV one-by-one as no plual version available
+    for (const { rav: signedRav, allocation, sender } of signedRavs) {
+      const { rav } = signedRav
+      const stopTimer = this.metrics.ravsRedeemDuration.startTimer({
+        allocation: rav.allocationId,
+      })
+      try {
+        const proof = await tapAllocationIdProof(
+          allocationSigner(this.transactionManager.wallet, allocation),
+          parseInt(this.protocolNetwork.split(':')[1]),
+          sender,
+          toAddress(rav.allocationId),
+          toAddress(escrow.escrow.address),
+        )
+        this.logger.debug(`Computed allocationIdProof`, {
+          allocationId: rav.allocationId,
+          proof,
+        })
+        // Submit the signed RAV on chain
+        const txReceipt = await this.transactionManager.executeTransaction(
+          () => escrow.escrow.estimateGas.redeem(signedRav, proof),
+          (gasLimit) =>
+            escrow.escrow.redeem(signedRav, proof, {
+              gasLimit,
+            }),
+          logger.child({ function: 'redeem' }),
+        )
+
+        // get tx receipt and post process
+        if (txReceipt === 'paused' || txReceipt === 'unauthorized') {
+          this.metrics.ravRedeemsInvalid.inc({ allocation: rav.allocationId })
+          return
+        }
+        this.metrics.ravCollectedFees.set(
+          { allocation: rav.allocationId },
+          parseFloat(rav.valueAggregate.toString()),
+        )
+
+        try {
+          await this.markRavAsRedeemed(toAddress(rav.allocationId), sender)
+          logger.info(
+            `Updated receipt aggregate vouchers table with redeemed_at for allocation ${rav.allocationId} and sender ${sender}`,
+          )
+        } catch (err) {
+          logger.warn(
+            `Failed to update receipt aggregate voucher table with redeemed_at for allocation ${rav.allocationId}`,
+            {
+              err,
+            },
+          )
+        }
+      } catch (err) {
+        this.metrics.ravRedeemsFailed.inc({ allocation: rav.allocationId })
+        logger.error(`Failed to redeem RAV`, {
+          err: indexerError(IndexerErrorCode.IE055, err),
+        })
+        return
+      }
+      stopTimer()
+    }
+
+    try {
+      // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+      await this.models.allocationSummaries.sequelize!.transaction(
+        async (transaction) => {
+          for (const { rav: signedRav } of signedRavs) {
+            const { rav } = signedRav
+            const [summary] = await ensureAllocationSummary(
+              this.models,
+              toAddress(rav.allocationId),
+              transaction,
+              this.protocolNetwork,
+            )
+            summary.withdrawnFees = BigNumber.from(summary.withdrawnFees)
+              .add(rav.valueAggregate)
+              .toString()
+            await summary.save({ transaction })
+          }
+        },
+      )
+
+      logger.info(`Updated allocation summaries table with withdrawn fees`)
+    } catch (err) {
+      logger.warn(`Failed to update allocation summaries`, {
+        err,
+      })
+    }
+
+    signedRavs.map((signedRav) =>
+      this.metrics.ravRedeemsSuccess.inc({ allocation: signedRav.allocation.id }),
+    )
+  }
+
+  private async markRavAsRedeemed(
+    allocationId: Address,
+    senderAddress: Address,
+    timestamp?: number,
+  ) {
+    // WE use sql directly due to a bug in sequelize update:
+    // https://github.com/sequelize/sequelize/issues/7664 (bug been open for 7 years no fix yet or ever)
+    const query = `
+            UPDATE scalar_tap_ravs
+            SET redeemed_at = ${timestamp ? timestamp : 'NOW()'}
+            WHERE allocation_id = '${allocationId
+              .toString()
+              .toLowerCase()
+              .replace('0x', '')}'
+            AND sender_address = '${senderAddress
+              .toString()
+              .toLowerCase()
+              .replace('0x', '')}'
+          `
+
+    await this.models.receiptAggregateVouchers.sequelize?.query(query)
+  }
+}
+
+const registerReceiptMetrics = (metrics: Metrics, networkIdentifier: string) => ({
+  ravRedeemsSuccess: new metrics.client.Counter({
+    name: `indexer_agent_rav_exchanges_ok_${networkIdentifier}`,
+    help: 'Successfully redeemed ravs',
+    registers: [metrics.registry],
+    labelNames: ['allocation'],
+  }),
+
+  ravRedeemsInvalid: new metrics.client.Counter({
+    name: `indexer_agent_rav_exchanges_invalid_${networkIdentifier}`,
+    help: 'Invalid ravs redeems - tx paused or unauthorized',
+    registers: [metrics.registry],
+    labelNames: ['allocation'],
+  }),
+
+  ravRedeemsFailed: new metrics.client.Counter({
+    name: `indexer_agent_rav_redeems_failed_${networkIdentifier}`,
+    help: 'Failed redeems for ravs',
+    registers: [metrics.registry],
+    labelNames: ['allocation'],
+  }),
+
+  ravsRedeemDuration: new metrics.client.Histogram({
+    name: `indexer_agent_ravs_redeem_duration_${networkIdentifier}`,
+    help: 'Duration of redeeming ravs',
+    registers: [metrics.registry],
+    labelNames: ['allocation'],
+  }),
+
+  ravCollectedFees: new metrics.client.Gauge({
+    name: `indexer_agent_rav_collected_fees_${networkIdentifier}`,
+    help: 'Amount of query fees collected for a rav',
+    registers: [metrics.registry],
+    labelNames: ['allocation'],
+  }),
+})
diff --git a/packages/indexer-common/src/network.ts b/packages/indexer-common/src/network.ts
index 2dda0d353..29f8fecb6 100644
--- a/packages/indexer-common/src/network.ts
+++ b/packages/indexer-common/src/network.ts
@@ -37,6 +37,7 @@ import { QueryFeeModels } from './query-fees'
 import { readFileSync } from 'fs'
 
 import { TAPSubgraph } from './tap-subgraph'
+import { TapCollector } from './allocations/tap-collector'
 
 export class Network {
   logger: Logger
@@ -47,6 +48,7 @@ export class Network {
   transactionManager: TransactionManager
   networkMonitor: NetworkMonitor
   receiptCollector: AllocationReceiptCollector
+  tapCollector: TapCollector | undefined
   specification: spec.NetworkSpecification
   paused: Eventual<boolean>
   isOperator: Eventual<boolean>
@@ -60,6 +62,7 @@ export class Network {
     transactionManager: TransactionManager,
     networkMonitor: NetworkMonitor,
     receiptCollector: AllocationReceiptCollector,
+    tapCollector: TapCollector | undefined,
     specification: spec.NetworkSpecification,
     paused: Eventual<boolean>,
     isOperator: Eventual<boolean>,
@@ -72,6 +75,7 @@ export class Network {
     this.transactionManager = transactionManager
     this.networkMonitor = networkMonitor
     this.receiptCollector = receiptCollector
+    this.tapCollector = tapCollector
     this.specification = specification
     this.paused = paused
     this.isOperator = isOperator
@@ -268,19 +272,39 @@ export class Network {
     // --------------------------------------------------------------------------------
     // * Allocation Receipt Collector
     // --------------------------------------------------------------------------------
-    const receiptCollector = await AllocationReceiptCollector.create({
+    const scalarCollector = await AllocationReceiptCollector.create({
       logger,
       metrics,
       transactionManager: transactionManager,
       models: queryFeeModels,
       allocationExchange: contracts.allocationExchange,
-      tapContracts,
       allocations,
       networkSpecification: specification,
-      tapSubgraph,
       networkSubgraph,
     })
 
+    // --------------------------------------------------------------------------------
+    // * TAP Collector
+    // --------------------------------------------------------------------------------
+    let tapCollector: TapCollector | undefined = undefined
+    if (tapContracts && tapSubgraph) {
+      tapCollector = await TapCollector.create({
+        logger,
+        metrics,
+        transactionManager: transactionManager,
+        models: queryFeeModels,
+        tapContracts,
+        allocations,
+        networkSpecification: specification,
+        tapSubgraph,
+        networkSubgraph,
+      })
+    } else {
+      logger.info(`RAV process not initiated. 
+        Tap Contracts: ${!!tapContracts}. 
+        Tap Subgraph: ${!!tapSubgraph}.`)
+    }
+
     // --------------------------------------------------------------------------------
     // * Network
     // --------------------------------------------------------------------------------
@@ -292,7 +316,8 @@ export class Network {
       networkProvider,
       transactionManager,
       networkMonitor,
-      receiptCollector,
+      scalarCollector,
+      tapCollector,
       specification,
       paused,
       isOperator,

From f3d7f933507b87620308ea535b33bf56d0e7598b Mon Sep 17 00:00:00 2001
From: Gustavo Inacio <gustavo@semiotic.ai>
Date: Mon, 7 Oct 2024 11:55:06 +0200
Subject: [PATCH 2/2] test: fix import

Signed-off-by: Gustavo Inacio <gustavo@semiotic.ai>
---
 packages/indexer-common/src/allocations/__tests__/tap.test.ts | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/packages/indexer-common/src/allocations/__tests__/tap.test.ts b/packages/indexer-common/src/allocations/__tests__/tap.test.ts
index 9006aa74e..f0697d4d3 100644
--- a/packages/indexer-common/src/allocations/__tests__/tap.test.ts
+++ b/packages/indexer-common/src/allocations/__tests__/tap.test.ts
@@ -4,6 +4,7 @@ import {
   Network,
   QueryFeeModels,
   TapSubgraphResponse,
+  TapCollector,
 } from '@graphprotocol/indexer-common'
 import {
   Address,
@@ -17,7 +18,6 @@ import {
 import { testNetworkSpecification } from '../../indexer-management/__tests__/util'
 import { Op, Sequelize } from 'sequelize'
 import { utils } from 'ethers'
-import { TapCollector } from '../tap-collector'
 
 // Make global Jest variables available
 // eslint-disable-next-line @typescript-eslint/no-explicit-any