From f0ca94a1e81bf48cb7cd19677d16948bfe228272 Mon Sep 17 00:00:00 2001 From: David A <4429761+daywiss@users.noreply.github.com> Date: Wed, 27 Nov 2024 13:21:24 -0500 Subject: [PATCH] feat(webhooks): allow client registration through env (#118) Signed-off-by: david <david@umaproject.org> --- packages/indexer-api/src/main.ts | 4 +- .../src/entities/WebhookClient.ts | 1 + .../src/migrations/1732730161339-Webhook.ts | 17 ++++++++ packages/indexer/src/main.ts | 12 +++--- packages/indexer/src/parseEnv.ts | 12 ++++++ .../src/database/webhookClientRepository.ts | 43 +++++++++++++++---- .../webhooks/src/eventProcessorManager.ts | 22 ++++------ packages/webhooks/src/factory.ts | 19 +++++++- packages/webhooks/src/types.ts | 10 +++++ packages/webhooks/src/utils.ts | 10 ++++- 10 files changed, 117 insertions(+), 33 deletions(-) create mode 100644 packages/indexer-database/src/migrations/1732730161339-Webhook.ts diff --git a/packages/indexer-api/src/main.ts b/packages/indexer-api/src/main.ts index 475e495b..dea0471d 100644 --- a/packages/indexer-api/src/main.ts +++ b/packages/indexer-api/src/main.ts @@ -87,10 +87,12 @@ export async function Main( const postgres = await connectToDatabase(postgresConfig, logger); const redisConfig = Indexer.parseRedisConfig(env); const redis = await initializeRedis(redisConfig, logger); - const webhooks = Webhooks.WebhookFactory( + const webhooks = await Webhooks.WebhookFactory( { enabledWebhooks: [Webhooks.WebhookTypes.DepositStatus], enabledWebhookRequestWorkers: false, + // indexer will register clients + clients: [], }, { postgres, logger, redis }, ); diff --git a/packages/indexer-database/src/entities/WebhookClient.ts b/packages/indexer-database/src/entities/WebhookClient.ts index ecae8fa5..f71451fb 100644 --- a/packages/indexer-database/src/entities/WebhookClient.ts +++ b/packages/indexer-database/src/entities/WebhookClient.ts @@ -2,6 +2,7 @@ import { Entity, Column, PrimaryGeneratedColumn, Unique } from "typeorm"; @Entity() @Unique("UK_webhook_client_api_key", ["apiKey"]) +@Unique("UK_webhook_client_name", ["name"]) export class WebhookClient { @PrimaryGeneratedColumn() id: number; diff --git a/packages/indexer-database/src/migrations/1732730161339-Webhook.ts b/packages/indexer-database/src/migrations/1732730161339-Webhook.ts new file mode 100644 index 00000000..a7d7eb73 --- /dev/null +++ b/packages/indexer-database/src/migrations/1732730161339-Webhook.ts @@ -0,0 +1,17 @@ +import { MigrationInterface, QueryRunner } from "typeorm"; + +export class Webhook1732730161339 implements MigrationInterface { + name = "Webhook1732730161339"; + + public async up(queryRunner: QueryRunner): Promise<void> { + await queryRunner.query( + `ALTER TABLE "webhook_client" ADD CONSTRAINT "UQ_a08bea1c3eba7711301141ae001" UNIQUE ("name")`, + ); + } + + public async down(queryRunner: QueryRunner): Promise<void> { + await queryRunner.query( + `ALTER TABLE "webhook_client" DROP CONSTRAINT "UQ_a08bea1c3eba7711301141ae001"`, + ); + } +} diff --git a/packages/indexer/src/main.ts b/packages/indexer/src/main.ts index 8c18a9bb..b896f931 100644 --- a/packages/indexer/src/main.ts +++ b/packages/indexer/src/main.ts @@ -56,13 +56,11 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) { const redisCache = new RedisCache(redis); const postgres = await connectToDatabase(postgresConfig, logger); // Call write to kick off webhook calls - const { write } = WebhookFactory( - { - enabledWebhooks: [WebhookTypes.DepositStatus], - enabledWebhookRequestWorkers: true, - }, - { postgres, logger, redis }, - ); + const { write } = await WebhookFactory(config.webhookConfig, { + postgres, + logger, + redis, + }); // Retry providers factory const retryProvidersFactory = new RetryProvidersFactory( redisCache, diff --git a/packages/indexer/src/parseEnv.ts b/packages/indexer/src/parseEnv.ts index 57a8b9bb..2b2a750a 100644 --- a/packages/indexer/src/parseEnv.ts +++ b/packages/indexer/src/parseEnv.ts @@ -2,6 +2,11 @@ import * as s from "superstruct"; import { DatabaseConfig } from "@repo/indexer-database"; import { getNoTtlBlockDistance } from "./web3/constants"; import { assert } from "@repo/error-handling"; +import { + Config as WebhooksConfig, + WebhookTypes, + parseWebhookClientsFromString, +} from "@repo/webhooks"; export type Config = { redisConfig: RedisConfig; @@ -12,6 +17,7 @@ export type Config = { enableBundleEventsProcessor: boolean; enableBundleIncludedEventsService: boolean; enableBundleBuilder: boolean; + webhookConfig: WebhooksConfig; }; export type RedisConfig = { host: string; @@ -182,6 +188,11 @@ export function envToConfig(env: Env): Config { `SPOKEPOOL_CHAINS_ENABLED=${chainId} but did not find any corresponding RPC_PROVIDER_URLS_${chainId}`, ); }); + const webhookConfig = { + enabledWebhooks: [WebhookTypes.DepositStatus], + enabledWebhookRequestWorkers: true, + clients: parseWebhookClientsFromString(env.WEBHOOK_CLIENTS ?? "[]"), + }; return { redisConfig, postgresConfig, @@ -191,5 +202,6 @@ export function envToConfig(env: Env): Config { enableBundleEventsProcessor, enableBundleIncludedEventsService, enableBundleBuilder, + webhookConfig, }; } diff --git a/packages/webhooks/src/database/webhookClientRepository.ts b/packages/webhooks/src/database/webhookClientRepository.ts index 32035aea..33bfdf09 100644 --- a/packages/webhooks/src/database/webhookClientRepository.ts +++ b/packages/webhooks/src/database/webhookClientRepository.ts @@ -1,4 +1,5 @@ import { entities, DataSource } from "@repo/indexer-database"; +import { exists } from "../utils"; import assert from "assert"; // This class is intended to store integration clients allowed to use the webhook service. @@ -9,14 +10,30 @@ export class WebhookClientRepository { this.repository = this.dataSource.getRepository(entities.WebhookClient); } - public async registerClient(client: entities.WebhookClient): Promise<void> { - const existingClient = await this.repository.findOne({ - where: { id: client.id }, - }); - if (existingClient) { - throw new Error(`Client with id ${client.id} already exists.`); + public async registerClient( + client: Omit<entities.WebhookClient, "id">, + ): Promise<entities.WebhookClient> { + assert( + !(await this.hasClientByName(client.name)), + "Client with that name already exists", + ); + const result = await this.repository.insert(client); + return result.raw[0]; + } + public async upsertClient( + client: Omit<entities.WebhookClient, "id">, + ): Promise<entities.WebhookClient> { + if (await this.hasClientByName(client.name)) { + return this.updateClientByName(client); + } else { + return this.registerClient(client); } - await this.repository.insert(client); + } + public async updateClientByName( + client: Omit<entities.WebhookClient, "id">, + ): Promise<entities.WebhookClient> { + const result = await this.repository.update({ name: client.name }, client); + return result.raw[0]; } public async unregisterClient(clientId: number): Promise<void> { @@ -40,12 +57,20 @@ export class WebhookClientRepository { public async listClients(): Promise<entities.WebhookClient[]> { return this.repository.find(); } - + public async hasClientByName(name: string): Promise<boolean> { + const result = await this.repository.findOne({ where: { name } }); + return exists(result); + } + public async getClientByName(name: string): Promise<entities.WebhookClient> { + const result = await this.repository.findOne({ where: { name } }); + assert(result, `Client by name: ${name} does not exist`); + return result; + } public async getClientByApiKey( apiKey: string, ): Promise<entities.WebhookClient> { const result = await this.repository.findOne({ where: { apiKey } }); - assert(result, "Invalid api key"); + assert(result, `Client by apiKey: ${apiKey} does not exist`); return result; } diff --git a/packages/webhooks/src/eventProcessorManager.ts b/packages/webhooks/src/eventProcessorManager.ts index afa1787f..c5d75f1b 100644 --- a/packages/webhooks/src/eventProcessorManager.ts +++ b/packages/webhooks/src/eventProcessorManager.ts @@ -29,6 +29,7 @@ export type Dependencies = { postgres: DataSource; logger: Logger; webhooksQueuesService: WebhooksQueuesService; + clientRepository: WebhookClientRepository; }; export class EventProcessorManager { private logger: Logger; @@ -38,8 +39,8 @@ export class EventProcessorManager { constructor(deps: Dependencies) { this.logger = deps.logger; - this.clientRepository = new WebhookClientRepository(deps.postgres); // Initialize the client manager this.webhooksQueuesService = deps.webhooksQueuesService; + this.clientRepository = deps.clientRepository; } // Register a new type of webhook processor able to be written to @@ -76,12 +77,13 @@ export class EventProcessorManager { `Attempting to register webhook of type: ${params.type} with URL: ${params.url}`, ); const client = await this.clientRepository.getClientByApiKey(apiKey); - const urlDomain = new URL(params.url).hostname; - const isDomainValid = client.domains.includes(urlDomain); - assert( - isDomainValid, - "The base URL of the provided webhook does not match any of the client domains", - ); + // TODO: Reinable this potentially when we need it, but not great for testing + // const urlDomain = new URL(params.url).hostname; + // const isDomainValid = client.domains.includes(urlDomain); + // assert( + // isDomainValid, + // "The base URL of the provided webhook does not match any of the client domains", + // ); assert((params.filter as any).depositTxHash, "depositTxHash is required"); assert((params.filter as any).originChainId, "originChainId is required"); const webhook = this.getEventProcessor(params.type as WebhookTypes); @@ -119,10 +121,4 @@ export class EventProcessorManager { `Successfully unregistered webhook with ID: ${params.id}`, ); } - - async registerClient(client: entities.WebhookClient) { - this.logger.debug(`Attempting to register client with ID: ${client.id}`); - await this.clientRepository.registerClient(client); - this.logger.debug(`Successfully registered client with ID: ${client.id}`); - } } diff --git a/packages/webhooks/src/factory.ts b/packages/webhooks/src/factory.ts index 6f02657f..67081c3f 100644 --- a/packages/webhooks/src/factory.ts +++ b/packages/webhooks/src/factory.ts @@ -1,7 +1,7 @@ import { Logger } from "winston"; import { Redis } from "ioredis"; -import { DataSource } from "@repo/indexer-database"; +import { DataSource, entities } from "@repo/indexer-database"; import { assert } from "@repo/error-handling"; import { EventProcessorManager } from "./eventProcessorManager"; @@ -10,6 +10,8 @@ import { DepositStatusProcessor } from "./eventProcessors"; import { WebhookRouter } from "./router"; import { WebhooksQueuesService } from "./adapter/messaging/WebhooksQueuesService"; import { WebhookRequestWorker } from "./adapter/messaging/WebhookRequestWorker"; +import { WebhookClientRepository } from "./database/webhookClientRepository"; +import { PartialWebhookClients } from "./types"; export enum WebhookTypes { DepositStatus = "DepositStatus", @@ -18,6 +20,7 @@ export enum WebhookTypes { export type Config = { enabledWebhooks: WebhookTypes[]; enabledWebhookRequestWorkers: boolean; + clients: PartialWebhookClients; }; type Dependencies = { postgres: DataSource; @@ -25,7 +28,7 @@ type Dependencies = { logger: Logger; }; -export function WebhookFactory(config: Config, deps: Dependencies) { +export async function WebhookFactory(config: Config, deps: Dependencies) { const { logger, postgres, redis } = deps; const notifier = new WebhookNotifier({ logger }); assert( @@ -33,10 +36,22 @@ export function WebhookFactory(config: Config, deps: Dependencies) { "No webhooks enabled, specify one in config", ); const webhooksQueuesService = new WebhooksQueuesService(redis); + const clientRepository = new WebhookClientRepository(postgres); const eventProcessorManager = new EventProcessorManager({ postgres, logger, webhooksQueuesService, + clientRepository, + }); + const clientRegistrations = await Promise.all( + config.clients.map((client) => { + return clientRepository.upsertClient(client); + }), + ); + logger.info({ + message: "Registered webhook api clients", + at: "Webhooks package factory", + clientRegistrations, }); config.enabledWebhooks.forEach((name) => { switch (name) { diff --git a/packages/webhooks/src/types.ts b/packages/webhooks/src/types.ts index cfb74bb8..89209a4c 100644 --- a/packages/webhooks/src/types.ts +++ b/packages/webhooks/src/types.ts @@ -37,3 +37,13 @@ export const UnregisterParams = ss.object({ id: ss.string(), }); export type UnregisterParams = ss.Infer<typeof UnregisterParams>; + +export const PartialWebhookClient = ss.type({ + name: ss.string(), + apiKey: ss.string(), + domains: ss.array(ss.string()), +}); +export type PartialWebhookClient = ss.Infer<typeof PartialWebhookClient>; + +export const PartialWebhookClients = ss.array(PartialWebhookClient); +export type PartialWebhookClients = ss.Infer<typeof PartialWebhookClients>; diff --git a/packages/webhooks/src/utils.ts b/packages/webhooks/src/utils.ts index 2bde3ed5..2feb84de 100644 --- a/packages/webhooks/src/utils.ts +++ b/packages/webhooks/src/utils.ts @@ -1,4 +1,5 @@ -import { NotificationPayload } from "./types"; +import { NotificationPayload, PartialWebhookClients } from "./types"; +import * as ss from "superstruct"; export async function post(params: NotificationPayload): Promise<void> { const { url, data, apiKey } = params; const response = await fetch(url, { @@ -42,3 +43,10 @@ export function exists<T>(val: T | null | undefined): val is T { export function customId(...args: (string | number)[]): string { return args.join("!"); } + +export function parseWebhookClientsFromString( + envStr: string, +): PartialWebhookClients { + const clients = JSON.parse(envStr); + return ss.create(clients, PartialWebhookClients); +}