diff --git a/packages/indexer-api/src/main.ts b/packages/indexer-api/src/main.ts index d8e32dde..f1b5646d 100644 --- a/packages/indexer-api/src/main.ts +++ b/packages/indexer-api/src/main.ts @@ -87,10 +87,10 @@ export async function Main( const redis = await initializeRedis(redisConfig, logger); const webhooks = Webhooks.WebhookFactory( { - requireApiKey: false, enabledWebhooks: [Webhooks.WebhookTypes.DepositStatus], + enabledWebhookRequestWorkers: false, }, - { postgres, logger }, + { postgres, logger, redis }, ); const allRouters: Record = { diff --git a/packages/indexer-database/src/entities/WebhookClient.ts b/packages/indexer-database/src/entities/WebhookClient.ts new file mode 100644 index 00000000..ecae8fa5 --- /dev/null +++ b/packages/indexer-database/src/entities/WebhookClient.ts @@ -0,0 +1,17 @@ +import { Entity, Column, PrimaryGeneratedColumn, Unique } from "typeorm"; + +@Entity() +@Unique("UK_webhook_client_api_key", ["apiKey"]) +export class WebhookClient { + @PrimaryGeneratedColumn() + id: number; + + @Column() + name: string; + + @Column() + apiKey: string; + + @Column("jsonb") + domains: string[]; +} diff --git a/packages/indexer-database/src/entities/WebhookRequest.ts b/packages/indexer-database/src/entities/WebhookRequest.ts new file mode 100644 index 00000000..4f35ef37 --- /dev/null +++ b/packages/indexer-database/src/entities/WebhookRequest.ts @@ -0,0 +1,28 @@ +import { + Entity, + PrimaryColumn, + Column, + Unique, + CreateDateColumn, + Index, +} from "typeorm"; + +@Entity() +@Unique("UK_webhook_request_clientId_filter", ["clientId", "filter"]) +@Index("IX_webhook_request_filter", ["filter"]) +export class WebhookRequest { + @PrimaryColumn() + id: string; + + @Column({ type: "integer" }) + clientId: number; + + @Column() + url: string; + + @Column() + filter: string; + + @CreateDateColumn() + createdAt: Date; +} diff --git a/packages/indexer-database/src/entities/index.ts b/packages/indexer-database/src/entities/index.ts index 58f7c660..19148c7e 100644 --- a/packages/indexer-database/src/entities/index.ts +++ b/packages/indexer-database/src/entities/index.ts @@ -19,3 +19,6 @@ export * from "./BundleEvent"; export * from "./BundleBlockRange"; export * from "./RootBundleExecutedJoinTable"; export * from "./RelayHashInfo"; + +export * from "./WebhookRequest"; +export * from "./WebhookClient"; diff --git a/packages/indexer-database/src/main.ts b/packages/indexer-database/src/main.ts index 7ba56df8..96c9bf46 100644 --- a/packages/indexer-database/src/main.ts +++ b/packages/indexer-database/src/main.ts @@ -36,6 +36,9 @@ export const createDataSource = (config: DatabaseConfig): DataSource => { entities.RootBundleExecutedJoinTable, // Others entities.RelayHashInfo, + // Webhooks + entities.WebhookRequest, + entities.WebhookClient, ], migrationsTableName: "_migrations", migrations: ["migrations/*.ts"], diff --git a/packages/indexer-database/src/migrations/1732297474910-WebhookClient.ts b/packages/indexer-database/src/migrations/1732297474910-WebhookClient.ts new file mode 100644 index 00000000..3cfaf4c6 --- /dev/null +++ b/packages/indexer-database/src/migrations/1732297474910-WebhookClient.ts @@ -0,0 +1,21 @@ +import { MigrationInterface, QueryRunner } from "typeorm"; + +export class WebhookClient1732297474910 implements MigrationInterface { + name = "WebhookClient1732297474910"; + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(` + CREATE TABLE "webhook_client" ( + "id" SERIAL NOT NULL, + "name" character varying NOT NULL, + "apiKey" character varying NOT NULL, + "domains" jsonb NOT NULL, + CONSTRAINT "UK_webhook_client_api_key" UNIQUE ("apiKey"), + CONSTRAINT "PK_f7330fb3bdb2e19534eae691d44" PRIMARY KEY ("id") + )`); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP TABLE "webhook_client"`); + } +} diff --git a/packages/indexer-database/src/migrations/1732297948190-WebhookRequest.ts b/packages/indexer-database/src/migrations/1732297948190-WebhookRequest.ts new file mode 100644 index 00000000..8c8190d1 --- /dev/null +++ b/packages/indexer-database/src/migrations/1732297948190-WebhookRequest.ts @@ -0,0 +1,22 @@ +import { MigrationInterface, QueryRunner } from "typeorm"; + +export class WebhookRequest1732297948190 implements MigrationInterface { + name = "WebhookRequest1732297948190"; + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(` + CREATE TABLE "webhook_request" ( + "id" character varying NOT NULL, + "clientId" integer NOT NULL, + "url" character varying NOT NULL, + "filter" character varying NOT NULL, + "createdAt" TIMESTAMP NOT NULL DEFAULT now(), + CONSTRAINT "UK_webhook_request_clientId_filter" UNIQUE ("clientId", "filter"), + CONSTRAINT "PK_67a7784045de2d1b7139b611b93" PRIMARY KEY ("id") + )`); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP TABLE "webhook_request"`); + } +} diff --git a/packages/indexer-database/src/migrations/1732310112989-WebhookRequest.ts b/packages/indexer-database/src/migrations/1732310112989-WebhookRequest.ts new file mode 100644 index 00000000..bbf1d238 --- /dev/null +++ b/packages/indexer-database/src/migrations/1732310112989-WebhookRequest.ts @@ -0,0 +1,15 @@ +import { MigrationInterface, QueryRunner } from "typeorm"; + +export class WebhookRequest1732310112989 implements MigrationInterface { + name = "WebhookRequest1732310112989"; + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `CREATE INDEX "IX_webhook_request_filter" ON "webhook_request" ("filter") `, + ); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP INDEX "public"."IX_webhook_request_filter"`); + } +} diff --git a/packages/indexer/src/data-indexing/service/AcrossIndexerManager.ts b/packages/indexer/src/data-indexing/service/AcrossIndexerManager.ts index 4a8c740a..c5721c50 100644 --- a/packages/indexer/src/data-indexing/service/AcrossIndexerManager.ts +++ b/packages/indexer/src/data-indexing/service/AcrossIndexerManager.ts @@ -1,6 +1,7 @@ import { Logger } from "winston"; import { DataSource } from "@repo/indexer-database"; +import { eventProcessorManager } from "@repo/webhooks"; import { Config } from "../../parseEnv"; import { HubPoolRepository } from "../../database/HubPoolRepository"; @@ -39,6 +40,7 @@ export class AcrossIndexerManager { private spokePoolRepository: SpokePoolRepository, private redisCache: RedisCache, private indexerQueuesService: IndexerQueuesService, + private webhookWriteFn?: eventProcessorManager.WebhookWriteFn, ) {} public async start() { @@ -93,7 +95,12 @@ export class AcrossIndexerManager { this.hubPoolClientFactory, this.spokePoolClientFactory, this.spokePoolRepository, - new SpokePoolProcessor(this.postgres, this.logger, chainId), + new SpokePoolProcessor( + this.postgres, + this.logger, + chainId, + this.webhookWriteFn, + ), this.indexerQueuesService, ); const spokePoolIndexer = new Indexer( diff --git a/packages/indexer/src/main.ts b/packages/indexer/src/main.ts index 56847a58..37672711 100644 --- a/packages/indexer/src/main.ts +++ b/packages/indexer/src/main.ts @@ -57,10 +57,10 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) { // Call write to kick off webhook calls const { write } = WebhookFactory( { - requireApiKey: false, enabledWebhooks: [WebhookTypes.DepositStatus], + enabledWebhookRequestWorkers: true, }, - { postgres, logger }, + { postgres, logger, redis }, ); // Retry providers factory const retryProvidersFactory = new RetryProvidersFactory( @@ -96,6 +96,7 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) { new SpokePoolRepository(postgres, logger), redisCache, indexerQueuesService, + write, ); const bundleServicesManager = new BundleServicesManager( config, diff --git a/packages/indexer/src/services/spokePoolProcessor.ts b/packages/indexer/src/services/spokePoolProcessor.ts index b913d3be..e91494a4 100644 --- a/packages/indexer/src/services/spokePoolProcessor.ts +++ b/packages/indexer/src/services/spokePoolProcessor.ts @@ -1,11 +1,14 @@ import { utils } from "@across-protocol/sdk"; +import winston from "winston"; + import { DataSource, entities, utils as dbUtils, SaveQueryResultType, } from "@repo/indexer-database"; -import winston from "winston"; +import { WebhookTypes, eventProcessorManager } from "@repo/webhooks"; + import { RelayStatus } from "../../../indexer-database/dist/src/entities"; import { StoreEventsResult } from "../data-indexing/service/SpokePoolIndexerDataHandler"; @@ -22,6 +25,7 @@ export class SpokePoolProcessor { private readonly postgres: DataSource, private readonly logger: winston.Logger, private readonly chainId: number, + private readonly webhookWriteFn?: eventProcessorManager.WebhookWriteFn, ) {} public async process(events: StoreEventsResult) { @@ -37,9 +41,19 @@ export class SpokePoolProcessor { SpokePoolEvents.V3FundsDeposited, [...newDeposits, ...updatedDeposits], ); - // TODO: for new deposits, notify status change to unfilled - // here... + // Notify webhook of new deposits + newDeposits.forEach((deposit) => { + this.webhookWriteFn?.({ + type: WebhookTypes.DepositStatus, + event: { + depositId: deposit.id, + originChainId: deposit.originChainId, + depositTxHash: deposit.transactionHash, + status: RelayStatus.Unfilled, + }, + }); + }); const newSlowFillRequests = dbUtils.filterSaveQueryResults( events.slowFillRequests, SaveQueryResultType.Inserted, @@ -52,8 +66,19 @@ export class SpokePoolProcessor { SpokePoolEvents.RequestedV3SlowFill, [...newSlowFillRequests, ...updatedSlowFillRequests], ); - // TODO: for new slow fill requests, notify status change to slow fill requested - // here... + + // Notify webhook of new slow fill requests + newSlowFillRequests.forEach((deposit) => { + this.webhookWriteFn?.({ + type: WebhookTypes.DepositStatus, + event: { + depositId: deposit.id, + originChainId: deposit.originChainId, + depositTxHash: deposit.transactionHash, + status: RelayStatus.SlowFillRequested, + }, + }); + }); const newFills = dbUtils.filterSaveQueryResults( events.fills, @@ -67,16 +92,48 @@ export class SpokePoolProcessor { ...newFills, ...updatedFills, ]); - // TODO: for new fills, notify status change to filled - // here... + + // Notify webhook of new fills + newFills.forEach((fill) => { + this.webhookWriteFn?.({ + type: WebhookTypes.DepositStatus, + event: { + depositId: fill.depositId, + originChainId: fill.originChainId, + depositTxHash: fill.transactionHash, + status: RelayStatus.Filled, + }, + }); + }); const expiredDeposits = await this.updateExpiredRelays(); - // TODO: for expired deposits, notify status change to expired - // here... + // Notify webhook of expired deposits + expiredDeposits.forEach((deposit) => { + this.webhookWriteFn?.({ + type: WebhookTypes.DepositStatus, + event: { + depositId: deposit.depositId, + originChainId: deposit.originChainId, + depositTxHash: deposit.depositTxHash, + status: RelayStatus.Expired, + }, + }); + }); const refundedDeposits = await this.updateRefundedDepositsStatus(); - // TODO: for refunded deposits, notify status change to refunded - // here... + + // Notify webhook of refunded deposits + refundedDeposits.forEach((deposit) => { + this.webhookWriteFn?.({ + type: WebhookTypes.DepositStatus, + event: { + depositId: deposit.depositId, + originChainId: deposit.originChainId, + depositTxHash: deposit.depositTxHash, + status: RelayStatus.Refunded, + }, + }); + }); } /** diff --git a/packages/webhooks/README.md b/packages/webhooks/README.md index 189a516e..0ec7ad7d 100644 --- a/packages/webhooks/README.md +++ b/packages/webhooks/README.md @@ -11,7 +11,6 @@ The `factory.ts` file provides a `WebhookFactory` function that sets up the webh To use the `WebhookFactory`, you need to provide a configuration object and dependencies: - **Config**: This object should include: - - requireApiKey: boolean; - Should registration of new webhooks require an api key - enabledWebhooks: WebhookTypes[]; - What event processors should be enabled, like 'DepositStatus' - **Dependencies**: This object should include: @@ -27,8 +26,8 @@ import { DataSource } from "@repo/indexer-database"; const webhooks = WebhookFactory( { - requireApiKey: false, - enableWebhooks: [WebhookTypes.DepositStatus], + enabledWebhooks: [WebhookTypes.DepositStatus], + enabledWebhookRequestWorkers: false, }, { postgres, logger }, ); diff --git a/packages/webhooks/package.json b/packages/webhooks/package.json index 8a0e0d03..7ecb36eb 100644 --- a/packages/webhooks/package.json +++ b/packages/webhooks/package.json @@ -22,10 +22,13 @@ "license": "ISC", "dependencies": { "@repo/indexer-database": "workspace:*", + "bullmq": "^5.12.12", "express": "^4.19.2", "express-bearer-token": "^3.0.0", + "ioredis": "^5.4.1", "redis": "^4.7.0", - "superstruct": "2.0.3-1" + "superstruct": "2.0.3-1", + "uuid": "^11.0.3" }, "exports": { ".": "./dist/index.js" diff --git a/packages/webhooks/src/adapter/messaging/WebhookRequestWorker.ts b/packages/webhooks/src/adapter/messaging/WebhookRequestWorker.ts new file mode 100644 index 00000000..2956077f --- /dev/null +++ b/packages/webhooks/src/adapter/messaging/WebhookRequestWorker.ts @@ -0,0 +1,84 @@ +import Redis from "ioredis"; +import winston from "winston"; +import { Job, Worker } from "bullmq"; + +import { DataSource, entities } from "@repo/indexer-database"; + +import { WebhooksQueues } from "./WebhooksQueuesService"; +import { WebhookTypes } from "../../factory"; +import { WebhookWriteFn } from "../../eventProcessorManager"; + +export type WebhookRequestQueueJob = { + webhookRequestId: string; + depositTxHash: string; + originChainId: number; +}; + +export class WebhookRequestWorker { + public worker: Worker; + + constructor( + private redis: Redis, + private postgres: DataSource, + private logger: winston.Logger, + private webhookWriteFn: WebhookWriteFn, + ) { + this.setWorker(); + } + + public setWorker() { + this.worker = new Worker( + WebhooksQueues.WebhookRequest, + async (job: Job) => { + try { + this.logger.debug({ + at: "WebhookRequestWorker", + message: `Processing job for webhook request ${job.data.webhookRequestId}`, + }); + await this.run(job.data); + } catch (error) { + this.logger.error({ + at: "WebhookRequestWorker", + message: `Error processing job for webhook request ${job.data.webhookRequestId}`, + error, + }); + throw error; + } + }, + { connection: this.redis, concurrency: 10 }, + ); + } + + private async run(webhookRequestJob: WebhookRequestQueueJob) { + const { depositTxHash, originChainId } = webhookRequestJob; + const relayHashInfo = await this.postgres + .getRepository(entities.RelayHashInfo) + .findOne({ + where: { + depositTxHash, + originChainId, + }, + }); + if (!relayHashInfo) { + this.logger.warn({ + at: "WebhookRequestWorker", + message: `Relay hash info not found for webhook request ${webhookRequestJob.webhookRequestId}`, + webhookRequestJob, + }); + return; + } + this.webhookWriteFn({ + type: WebhookTypes.DepositStatus, + event: { + originChainId: relayHashInfo.originChainId, + depositTxHash: relayHashInfo.depositTxHash, + depositId: relayHashInfo.depositId, + status: relayHashInfo.status, + }, + }); + } + + public async close() { + return this.worker.close(); + } +} diff --git a/packages/webhooks/src/adapter/messaging/WebhooksQueuesService.ts b/packages/webhooks/src/adapter/messaging/WebhooksQueuesService.ts new file mode 100644 index 00000000..f25cd1d0 --- /dev/null +++ b/packages/webhooks/src/adapter/messaging/WebhooksQueuesService.ts @@ -0,0 +1,53 @@ +import Redis from "ioredis"; +import { Queue, JobsOptions, BulkJobOptions } from "bullmq"; + +export enum WebhooksQueues { + WebhookRequest = "WebhookRequest", +} + +export class WebhooksQueuesService { + private queues = {} as Record; + + constructor(private connection: Redis) { + this.initializeQueues(); + } + + private initializeQueues() { + const queueNames = Object.values(WebhooksQueues); + queueNames.forEach( + (queueName) => + (this.queues[queueName] = new Queue(queueName, { + connection: this.connection, + defaultJobOptions: { + attempts: Number.MAX_SAFE_INTEGER, + removeOnComplete: true, + }, + })), + ); + } + + public async publishMessage( + queue: WebhooksQueues, + message: T, + options: JobsOptions = {}, + ) { + const q = this.queues[queue]; + if (q) { + await q.add(queue, message, options); + } + } + + public async publishMessagesBulk( + queue: WebhooksQueues, + jobName: string, + messages: T[], + options: BulkJobOptions = {}, + ) { + const q = this.queues[queue]; + if (q) { + await q.addBulk( + messages.map((m) => ({ name: jobName, data: m, opts: options })), + ); + } + } +} diff --git a/packages/webhooks/src/database/index.ts b/packages/webhooks/src/database/index.ts new file mode 100644 index 00000000..d2a452f6 --- /dev/null +++ b/packages/webhooks/src/database/index.ts @@ -0,0 +1,2 @@ +export * from "./webhookRequestRepository"; +export * from "./webhookClientRepository"; diff --git a/packages/webhooks/src/database/webhookClientRepository.ts b/packages/webhooks/src/database/webhookClientRepository.ts index dd40764b..32035aea 100644 --- a/packages/webhooks/src/database/webhookClientRepository.ts +++ b/packages/webhooks/src/database/webhookClientRepository.ts @@ -1,49 +1,58 @@ -import { AsyncStore } from "../store"; - -export interface WebhookClient { - id: string; - apiKey: string; - url: string; - domains: string[]; -} +import { entities, DataSource } from "@repo/indexer-database"; +import assert from "assert"; // This class is intended to store integration clients allowed to use the webhook service. export class WebhookClientRepository { - constructor(private store: AsyncStore) {} + private repository; + + constructor(private dataSource: DataSource) { + this.repository = this.dataSource.getRepository(entities.WebhookClient); + } - public async registerClient(client: WebhookClient): Promise { - if (await this.store.has(client.id)) { + public async registerClient(client: entities.WebhookClient): Promise { + const existingClient = await this.repository.findOne({ + where: { id: client.id }, + }); + if (existingClient) { throw new Error(`Client with id ${client.id} already exists.`); } - await this.store.set(client.id, client); + await this.repository.insert(client); } - public async unregisterClient(clientId: string): Promise { - if (!(await this.store.has(clientId))) { + public async unregisterClient(clientId: number): Promise { + const existingClient = await this.repository.findOne({ + where: { id: clientId }, + }); + if (!existingClient) { throw new Error(`Client with id ${clientId} does not exist.`); } - await this.store.delete(clientId); + await this.repository.delete({ id: clientId }); } - public async getClient(clientId: string): Promise { - return this.store.get(clientId); + public async getClient( + clientId: number, + ): Promise { + return ( + (await this.repository.findOne({ where: { id: clientId } })) ?? undefined + ); } - public async listClients(): Promise { - const clients: WebhookClient[] = []; - for await (const client of this.store.values()) { - clients.push(client); - } - return clients; + public async listClients(): Promise { + return this.repository.find(); } - public async findClientsByApiKey(apiKey: string): Promise { - const clients: WebhookClient[] = []; - for await (const client of this.store.values()) { - if (client.apiKey === apiKey) { - clients.push(client); - } - } - return clients; + public async getClientByApiKey( + apiKey: string, + ): Promise { + const result = await this.repository.findOne({ where: { apiKey } }); + assert(result, "Invalid api key"); + return result; + } + + public async getWebhookClientById( + id: number, + ): Promise { + const client = await this.repository.findOne({ where: { id } }); + return client ?? undefined; } } diff --git a/packages/webhooks/src/database/webhookRequestRepository.ts b/packages/webhooks/src/database/webhookRequestRepository.ts index fa97de6e..c632b332 100644 --- a/packages/webhooks/src/database/webhookRequestRepository.ts +++ b/packages/webhooks/src/database/webhookRequestRepository.ts @@ -1,48 +1,65 @@ -import { AsyncStore } from "../store"; -import { WebhookRequest } from "../types"; +import { entities, DataSource } from "@repo/indexer-database"; +import assert from "assert"; +import { exists } from "../utils"; export class WebhookRequestRepository { - constructor(private store: AsyncStore) {} + private repository; - public async register(webhook: WebhookRequest): Promise { - if (await this.store.has(webhook.id)) { + constructor(private dataSource: DataSource) { + this.repository = this.dataSource.getRepository(entities.WebhookRequest); + } + + public async register( + webhook: Omit, + ): Promise { + const existingWebhook = await this.repository.findOne({ + where: { id: webhook.id }, + }); + if (existingWebhook) { throw new Error(`Webhook with id ${webhook.id} already exists.`); } - await this.store.set(webhook.id, webhook); + await this.repository.insert(webhook); } public async unregister(webhookId: string): Promise { - if (!(await this.store.has(webhookId))) { + const existingWebhook = await this.repository.findOne({ + where: { id: webhookId }, + }); + if (!existingWebhook) { throw new Error(`Webhook with id ${webhookId} does not exist.`); } - await this.store.delete(webhookId); + await this.repository.delete({ id: webhookId }); } - public async getWebhook( + public async getWebhookRequest( webhookId: string, - ): Promise { - return this.store.get(webhookId); + ): Promise { + const result = await this.repository.findOne({ where: { id: webhookId } }); + assert(result, "Webhook request not found"); + return result; } - public async listWebhooks(): Promise { - const webhooks: WebhookRequest[] = []; - for await (const webhook of this.store.values()) { - webhooks.push(webhook); - } - return webhooks; + public async listWebhookRequests(): Promise { + return this.repository.find(); } - public async filterWebhooks(filter: string): Promise { - const webhooks: WebhookRequest[] = []; - for await (const webhook of this.store.values()) { - if (webhook.filter === filter) { - webhooks.push(webhook); - } - } - return webhooks; + public async findWebhookRequestsByFilter( + filter: string, + ): Promise { + return this.repository.find({ where: { filter } }); + } + + public async findWebhookRequestsByFilterAndClient( + filter: string, + clientId: number, + ): Promise { + return this.repository.find({ where: { filter, clientId } }); } - public async hasWebhook(webhookId: string): Promise { - return this.store.has(webhookId); + public async hasWebhookRequest(webhookId: string): Promise { + const result = await this.repository.findOne({ + where: { id: webhookId }, + }); + return exists(result); } } diff --git a/packages/webhooks/src/eventProcessorManager.ts b/packages/webhooks/src/eventProcessorManager.ts index 99a96414..12aa53ab 100644 --- a/packages/webhooks/src/eventProcessorManager.ts +++ b/packages/webhooks/src/eventProcessorManager.ts @@ -1,17 +1,23 @@ -import { MemoryStore } from "./store"; -import { - WebhookClientRepository, - WebhookClient, -} from "./database/webhookClientRepository"; -import { DataSource, entities } from "@repo/indexer-database"; import { Logger } from "winston"; import assert from "assert"; + +import { DataSource, entities } from "@repo/indexer-database"; + +import { WebhookClientRepository } from "./database/webhookClientRepository"; import { JSONValue, IEventProcessor } from "./types"; +import { + WebhooksQueues, + WebhooksQueuesService, +} from "./adapter/messaging/WebhooksQueuesService"; +import { WebhookTypes } from "./factory"; +import { WebhookRequestQueueJob } from "./adapter/messaging/WebhookRequestWorker"; export type EventProcessorRecord = Record; -type EventType = { - type: string; +export type WebhookWriteFn = (event: EventType) => void; + +export type EventType = { + type: WebhookTypes; event: JSONValue; }; @@ -22,64 +28,81 @@ export type Config = { export type Dependencies = { postgres: DataSource; logger: Logger; + webhooksQueuesService: WebhooksQueuesService; }; export class EventProcessorManager { private logger: Logger; private clientRepository: WebhookClientRepository; - private processors = new Map(); + private processors = new Map(); + private webhooksQueuesService: WebhooksQueuesService; - constructor( - private config: Config, - deps: Dependencies, - ) { + constructor(deps: Dependencies) { this.logger = deps.logger; - this.clientRepository = new WebhookClientRepository(new MemoryStore()); // Initialize the client manager + this.clientRepository = new WebhookClientRepository(deps.postgres); // Initialize the client manager + this.webhooksQueuesService = deps.webhooksQueuesService; } + // Register a new type of webhook processor able to be written to - public registerEventProcessor(name: string, webhook: IEventProcessor) { + public registerEventProcessor(name: WebhookTypes, webhook: IEventProcessor) { + this.logger.debug( + `Attempting to register event processor with name: ${name}`, + ); assert( !this.processors.has(name), `Webhook with that name already exists: ${name}`, ); this.processors.set(name, webhook); + this.logger.debug( + `Successfully registered event processor with name: ${name}`, + ); } - private getEventProcessor(name: string) { + private getEventProcessor(name: WebhookTypes) { const eventProcessor = this.processors.get(name); - assert( - eventProcessor, - "EventProcessor does not exist by type: ${event.type}", - ); + assert(eventProcessor, `EventProcessor does not exist by type: ${name}`); return eventProcessor; } - write = (event: EventType): void => { + write: WebhookWriteFn = (event: EventType): void => { const webhook = this.getEventProcessor(event.type); webhook.write(event.event); }; async registerWebhook( + id: string, params: { type: string; url: string; filter: JSONValue }, - apiKey?: string, + apiKey: string, ) { - if (this.config.requireApiKey) { - if (apiKey === undefined) throw new Error("Api Key required"); - const clients = await this.clientRepository.findClientsByApiKey(apiKey); - assert(clients.length > 0, "Invalid api key"); - const urlDomain = new URL(params.url).hostname; - const isDevDomain = - urlDomain === "localhost" || urlDomain.startsWith("127."); - if (!isDevDomain) { - const isDomainValid = clients.some((client) => - client.domains.includes(urlDomain), - ); - assert( - isDomainValid, - "The base URL of the provided webhook does not match any of the client domains", - ); - } - } - const webhook = this.getEventProcessor(params.type); - return webhook.register(params.url, params.filter); + this.logger.debug( + `Attempting to register webhook of type: ${params.type} with URL: ${params.url}`, + ); + const client = await this.clientRepository.getClientByApiKey(apiKey); + const urlDomain = new URL(params.url).hostname; + const isDomainValid = client.domains.includes(urlDomain); + assert( + isDomainValid, + "The base URL of the provided webhook does not match any of the client domains", + ); + assert((params.filter as any).depositTxHash, "depositTxHash is required"); + assert((params.filter as any).originChainId, "originChainId is required"); + const webhook = this.getEventProcessor(params.type as WebhookTypes); + const webhookRequestId = await webhook.register( + id, + params.url, + params.filter, + client.id, + ); + this.logger.debug( + `Successfully registered webhook with ID: ${webhookRequestId}`, + ); + this.webhooksQueuesService.publishMessage( + WebhooksQueues.WebhookRequest, + { + webhookRequestId, + depositTxHash: (params.filter as any).depositTxHash, + originChainId: (params.filter as any).originChainId, + }, + ); + return webhookRequestId; } // TODO: gaurd this with api key @@ -87,12 +110,19 @@ export class EventProcessorManager { params: { type: string; id: string }, apiKey?: string, ) { - // Assuming the IWebhook interface has an unregister method - const webhook = this.getEventProcessor(params.type); - return webhook.unregister(params.id); + this.logger.debug( + `Attempting to unregister webhook of type: ${params.type} with ID: ${params.id}`, + ); + const webhook = this.getEventProcessor(params.type as WebhookTypes); + await webhook.unregister(params.id); + this.logger.debug( + `Successfully unregistered webhook with ID: ${params.id}`, + ); } - async registerClient(client: WebhookClient) { - return this.clientRepository.registerClient(client); + async registerClient(client: entities.WebhookClient) { + this.logger.debug(`Attempting to register client with ID: ${client.id}`); + await this.clientRepository.registerClient(client); + this.logger.debug(`Successfully registered client with ID: ${client.id}`); } } diff --git a/packages/webhooks/src/eventProcessors/depositStatus.ts b/packages/webhooks/src/eventProcessors/depositStatus.ts index 88862c27..889a1191 100644 --- a/packages/webhooks/src/eventProcessors/depositStatus.ts +++ b/packages/webhooks/src/eventProcessors/depositStatus.ts @@ -1,14 +1,13 @@ import assert from "assert"; +import * as ss from "superstruct"; +import { Logger } from "winston"; + import { DataSource, entities } from "@repo/indexer-database"; import { WebhookRequestRepository } from "../database/webhookRequestRepository"; -import { - JSONValue, - IEventProcessor, - NotificationPayload, - WebhookRequest, -} from "../types"; +import { customId } from "../utils"; -import * as ss from "superstruct"; +import { IEventProcessor, NotificationPayload } from "../types"; +import { WebhookClientRepository } from "../database/webhookClientRepository"; export const DepositStatusEvent = ss.object({ originChainId: ss.number(), @@ -25,73 +24,115 @@ export const DepositStatusFilter = ss.object({ export type DepositStatusFilter = ss.Infer; export type Dependencies = { - webhookRequests: WebhookRequestRepository; notify: (params: NotificationPayload) => void; postgres: DataSource; + logger: Logger; }; export class DepositStatusProcessor implements IEventProcessor { private webhookRequests: WebhookRequestRepository; + private webhookClientsRepository: WebhookClientRepository; private notify: (params: NotificationPayload) => void; - private postgres: DataSource; + private logger: Logger; - constructor(deps: Dependencies) { - this.webhookRequests = deps.webhookRequests; + constructor( + deps: Dependencies, + private type: string = "DepositStatus", + ) { + this.webhookRequests = new WebhookRequestRepository(deps.postgres); + this.webhookClientsRepository = new WebhookClientRepository(deps.postgres); this.notify = deps.notify; - this.postgres = deps.postgres; + this.logger = deps.logger; } private async _write(event: DepositStatusEvent): Promise { - const filter = [event.originChainId, event.depositTxHash].join("!"); - const hooks = await this.webhookRequests.filterWebhooks(filter); + const filter = customId( + this.type, + event.originChainId, + event.depositTxHash, + ); + const webhookRequests = + await this.webhookRequests.findWebhookRequestsByFilter(filter); + const uniqueClientIds = [ + ...new Set(webhookRequests.map((hook) => hook.clientId)), + ]; + const clients = await Promise.all( + uniqueClientIds.map((id) => + this.webhookClientsRepository.getWebhookClientById(id), + ), + ); + const clientsMap = clients + .filter((client) => client !== undefined) + .reduce( + (acc, client) => { + acc[client.id] = client; + return acc; + }, + {} as Record, + ); + //TODO: unregister any hooks where event has reached terminal state - await Promise.all( - hooks.map((hook) => { + webhookRequests.forEach((hook) => { + const client = clientsMap[hook.clientId]; + if (client) { this.notify({ url: hook.url, - data: event, + data: { ...event, webhookRequestId: hook.id }, + apiKey: client.apiKey, }); - }), - ); + } else { + this.logger.error({ + at: "DepositStatusProcessor::_write", + message: `Client not found for webhook request ${hook.id}`, + webhookRequest: hook, + }); + } + }); } + write(e: unknown) { this._write(ss.create(e, DepositStatusEvent)).catch((err) => console.error(err), ); } + private async _register( + id: string, url: string, params: DepositStatusFilter, + clientId: number, ): Promise { - const id = [url, params.originChainId, params.depositTxHash].join("!"); - const filter = [params.originChainId, params.depositTxHash].join("!"); + const filter = customId( + this.type, + params.originChainId, + params.depositTxHash, + ); + const existingFilters = + await this.webhookRequests.findWebhookRequestsByFilterAndClient( + filter, + clientId, + ); assert( - !(await this.webhookRequests.hasWebhook(id)), - "This webhook already exists", + existingFilters.length === 0, + "Webhook already exists for this filter", ); await this.webhookRequests.register({ id, filter, url, + clientId, }); - const relayHashInfoRepository = this.postgres.getRepository( - entities.RelayHashInfo, - ); - const relayHashInfo = await relayHashInfoRepository.findOne({ - where: params, - }); - if (relayHashInfo) - this._write({ - depositId: relayHashInfo.depositId, - status: relayHashInfo.status, - ...params, - }); return id; } - async register(url: string, params: unknown) { - return this._register(url, ss.create(params, DepositStatusFilter)); + async register(id: string, url: string, params: unknown, clientId: number) { + return this._register( + id, + url, + ss.create(params, DepositStatusFilter), + clientId, + ); } async unregister(id: string): Promise { assert( - await this.webhookRequests.hasWebhook(id), + await this.webhookRequests.hasWebhookRequest(id), "This webhook does not exist", ); await this.webhookRequests.unregister(id); diff --git a/packages/webhooks/src/factory.ts b/packages/webhooks/src/factory.ts index d8824d1c..1d5023e9 100644 --- a/packages/webhooks/src/factory.ts +++ b/packages/webhooks/src/factory.ts @@ -1,52 +1,56 @@ import assert from "assert"; -import { EventProcessorManager } from "./eventProcessorManager"; -import { MemoryStore } from "./store"; -import { DataSource } from "@repo/indexer-database"; import { Logger } from "winston"; +import { Redis } from "ioredis"; + +import { DataSource } from "@repo/indexer-database"; +import { EventProcessorManager } from "./eventProcessorManager"; import { WebhookNotifier } from "./notifier"; import { DepositStatusProcessor } from "./eventProcessors"; -import { WebhookRequestRepository } from "./database/webhookRequestRepository"; import { WebhookRouter } from "./router"; +import { WebhooksQueuesService } from "./adapter/messaging/WebhooksQueuesService"; +import { WebhookRequestWorker } from "./adapter/messaging/WebhookRequestWorker"; export enum WebhookTypes { DepositStatus = "DepositStatus", } export type Config = { - requireApiKey: boolean; enabledWebhooks: WebhookTypes[]; + enabledWebhookRequestWorkers: boolean; }; type Dependencies = { postgres: DataSource; + redis: Redis; logger: Logger; }; export function WebhookFactory(config: Config, deps: Dependencies) { - const { logger, postgres } = deps; + const { logger, postgres, redis } = deps; const notifier = new WebhookNotifier({ logger }); assert( config.enabledWebhooks.length, "No webhooks enabled, specify one in config", ); - const eventProcessorManager = new EventProcessorManager( - config ?? { requireApiKey: false }, - { - postgres, - logger, - }, - ); + const webhooksQueuesService = new WebhooksQueuesService(redis); + const eventProcessorManager = new EventProcessorManager({ + postgres, + logger, + webhooksQueuesService, + }); config.enabledWebhooks.forEach((name) => { - const hooks = new WebhookRequestRepository(new MemoryStore()); switch (name) { // add more webhook types here - case "DepositStatus": { + case WebhookTypes.DepositStatus: { eventProcessorManager.registerEventProcessor( name, - new DepositStatusProcessor({ - postgres, - webhookRequests: hooks, - notify: notifier.notify, - }), + new DepositStatusProcessor( + { + postgres, + logger, + notify: notifier.notify, + }, + WebhookTypes.DepositStatus, + ), ); break; } @@ -55,6 +59,14 @@ export function WebhookFactory(config: Config, deps: Dependencies) { } } }); + if (config.enabledWebhookRequestWorkers) { + new WebhookRequestWorker( + redis, + postgres, + logger, + eventProcessorManager.write, + ); + } const router = WebhookRouter({ eventProcessorManager }); return { write: eventProcessorManager.write, diff --git a/packages/webhooks/src/index.ts b/packages/webhooks/src/index.ts index a051aa6d..65b84a31 100644 --- a/packages/webhooks/src/index.ts +++ b/packages/webhooks/src/index.ts @@ -2,6 +2,5 @@ export * from "./factory"; export * as eventProcessors from "./eventProcessors"; export * as eventProcessorManager from "./eventProcessorManager"; export * as router from "./router"; -export * as store from "./store"; export * from "./types"; export * from "./utils"; diff --git a/packages/webhooks/src/notifier.ts b/packages/webhooks/src/notifier.ts index 80b1a232..7c896662 100644 --- a/packages/webhooks/src/notifier.ts +++ b/packages/webhooks/src/notifier.ts @@ -1,6 +1,5 @@ import { post } from "./utils"; import { NotificationPayload } from "./types"; -import { AsyncStore } from "./store"; import { Logger } from "winston"; export type Dependencies = { @@ -11,7 +10,9 @@ export type Dependencies = { export class BaseNotifier { private logger: Logger; - constructor(private deps: Dependencies) {} + constructor(private deps: Dependencies) { + this.logger = deps.logger; + } public notify = (payload: NotificationPayload): void => { this.deps.notify(payload).catch((error) => { diff --git a/packages/webhooks/src/router.ts b/packages/webhooks/src/router.ts index 232a5144..7e4aa4b4 100644 --- a/packages/webhooks/src/router.ts +++ b/packages/webhooks/src/router.ts @@ -2,6 +2,7 @@ import express from "express"; import { EventProcessorManager } from "./eventProcessorManager"; import * as ss from "superstruct"; import bearerToken from "express-bearer-token"; +import { v4 as uuidv4 } from "uuid"; type Dependencies = { eventProcessorManager: EventProcessorManager; @@ -26,16 +27,18 @@ export function WebhookRouter(deps: Dependencies): express.Router { router.post( "/webhook", async ( - req: express.Request & { token?: string }, + req: express.Request, res: express.Response, next: express.NextFunction, ) => { try { const parsedBody = RegistrationParams.create(req.body); - const id = await deps.eventProcessorManager.registerWebhook( - parsedBody, - req.token, - ); + const token = req.token; + if (!token) { + throw new Error("API Key required"); + } + const id = uuidv4(); + await deps.eventProcessorManager.registerWebhook(id, parsedBody, token); res.status(201).send(id); } catch (error) { next(error); diff --git a/packages/webhooks/src/store.ts b/packages/webhooks/src/store.ts deleted file mode 100644 index 6bd5f03c..00000000 --- a/packages/webhooks/src/store.ts +++ /dev/null @@ -1,107 +0,0 @@ -import { createClient, RedisClientType } from "redis"; - -export interface AsyncStore { - get(key: string): Promise; - set(key: string, value: V): Promise; - has(key: string): Promise; - delete(key: string): Promise; - values(): AsyncIterableIterator; - entries(): AsyncIterableIterator<[string, V]>; - keys(): AsyncIterableIterator; -} - -export class MemoryStore implements AsyncStore { - private map: Map; - - constructor(map?: Map) { - this.map = map ?? new Map(); - } - - async get(key: string): Promise { - return this.map.get(key); - } - - async set(key: string, value: V): Promise { - this.map.set(key, value); - } - - async has(key: string): Promise { - return this.map.has(key); - } - - async delete(key: string): Promise { - return this.map.delete(key); - } - - async *values(): AsyncIterableIterator { - for (const value of this.map.values()) { - yield value; - } - } - - async *entries(): AsyncIterableIterator<[string, V]> { - for (const entry of this.map.entries()) { - yield entry; - } - } - - async *keys(): AsyncIterableIterator { - for (const key of this.map.keys()) { - yield key; - } - } -} - -export class RedisStore implements AsyncStore { - private client: RedisClientType; - - constructor(client: RedisClientType) { - this.client = client; - } - - async get(key: string): Promise { - const value = await this.client.get(key); - return value ? JSON.parse(value) : undefined; - } - - async set(key: string, value: V): Promise { - await this.client.set(key, JSON.stringify(value)); - } - - async has(key: string): Promise { - const exists = await this.client.exists(key); - return exists === 1; - } - - async delete(key: string): Promise { - const result = await this.client.del(key); - return result === 1; - } - - async *values(): AsyncIterableIterator { - const keys = await this.client.keys("*"); - for (const key of keys) { - const value = await this.get(key); - if (value !== undefined) { - yield value; - } - } - } - - async *entries(): AsyncIterableIterator<[string, V]> { - const keys = await this.client.keys("*"); - for (const key of keys) { - const value = await this.get(key); - if (value !== undefined) { - yield [key, value]; - } - } - } - - async *keys(): AsyncIterableIterator { - const keys = await this.client.keys("*"); - for (const key of keys) { - yield key; - } - } -} diff --git a/packages/webhooks/src/types.ts b/packages/webhooks/src/types.ts index a6406d7d..cfb74bb8 100644 --- a/packages/webhooks/src/types.ts +++ b/packages/webhooks/src/types.ts @@ -1,14 +1,13 @@ import * as ss from "superstruct"; -export interface WebhookRequest { - id: string; - url: string; - filter: string; -} - export interface IEventProcessor { write(event: JSONValue): void; - register(url: string, params: JSONValue): Promise; + register( + id: string, + url: string, + params: JSONValue, + clientId: number, + ): Promise; unregister(id: string): Promise; } @@ -23,6 +22,7 @@ export type JSONValue = export type NotificationPayload = { url: string; data: JSONValue; + apiKey: string; }; export const RegistrationParams = ss.object({ diff --git a/packages/webhooks/src/utils.ts b/packages/webhooks/src/utils.ts index 8ae0ade3..2bde3ed5 100644 --- a/packages/webhooks/src/utils.ts +++ b/packages/webhooks/src/utils.ts @@ -1,10 +1,11 @@ import { NotificationPayload } from "./types"; export async function post(params: NotificationPayload): Promise { - const { url, data } = params; + const { url, data, apiKey } = params; const response = await fetch(url, { method: "POST", headers: { "Content-Type": "application/json", + Authorization: `Bearer ${apiKey}`, }, body: JSON.stringify(data), }); @@ -34,3 +35,10 @@ export function asyncInterval(fn: () => Promise, delay: number) { isStopped = true; }; } + +export function exists(val: T | null | undefined): val is T { + return val !== null && val !== undefined; +} +export function customId(...args: (string | number)[]): string { + return args.join("!"); +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index e875de94..3172ff30 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -509,18 +509,27 @@ importers: '@repo/indexer-database': specifier: workspace:* version: link:../indexer-database + bullmq: + specifier: ^5.12.12 + version: 5.12.14 express: specifier: ^4.19.2 version: 4.21.1 express-bearer-token: specifier: ^3.0.0 version: 3.0.0 + ioredis: + specifier: ^5.4.1 + version: 5.4.1 redis: specifier: ^4.7.0 version: 4.7.0 superstruct: specifier: 2.0.3-1 version: 2.0.3-1 + uuid: + specifier: ^11.0.3 + version: 11.0.3 devDependencies: '@istanbuljs/nyc-config-typescript': specifier: ^1.0.2 @@ -7723,6 +7732,10 @@ packages: resolution: {integrity: sha512-pMZTvIkT1d+TFGvDOqodOclx0QWkkgi6Tdoa8gC8ffGAAqz9pzPTZWAybbsHHoED/ztMtkv/VoYTYyShUn81hA==} engines: {node: '>= 0.4.0'} + uuid@11.0.3: + resolution: {integrity: sha512-d0z310fCWv5dJwnX1Y/MncBAqGMKEzlBb1AOf7z9K8ALnd0utBX/msg/fA0+sbyN1ihbMsLhrBlnl1ak7Wa0rg==} + hasBin: true + uuid@2.0.1: resolution: {integrity: sha512-nWg9+Oa3qD2CQzHIP4qKUqwNfzKn8P0LtFhotaCTFchsV7ZfDhAybeip/HZVeMIpZi9JgY1E3nUlwaCmZT1sEg==} deprecated: Please upgrade to version 7 or higher. Older versions may use Math.random() in certain circumstances, which is known to be problematic. See https://v8.dev/blog/math-random for details. @@ -18362,6 +18375,8 @@ snapshots: utils-merge@1.0.1: {} + uuid@11.0.3: {} + uuid@2.0.1: {} uuid@3.3.2: {}