Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(webhooks): add postgres persistence to clients and requests (feature branch) #108

Merged
merged 7 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/indexer-api/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ export async function Main(
const redis = await initializeRedis(redisConfig, logger);
const webhooks = Webhooks.WebhookFactory(
{
requireApiKey: false,
enabledWebhooks: [Webhooks.WebhookTypes.DepositStatus],
enabledWebhookRequestWorkers: false,
},
{ postgres, logger },
{ postgres, logger, redis },
);

const allRouters: Record<string, Router> = {
Expand Down
17 changes: 17 additions & 0 deletions packages/indexer-database/src/entities/WebhookClient.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { Entity, Column, PrimaryGeneratedColumn, Unique } from "typeorm";

@Entity()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should enforce uniqueness of the apiKey

Suggested change
@Entity()
@Entity()
@Unique(`UK_webhook_client_apiKey`, ["apiKey"])

@Unique("UK_webhook_client_api_key", ["apiKey"])
export class WebhookClient {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also have the clientId as FK

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this complicates things, but i can make it optional. right now we allow no client if configured that way

@PrimaryGeneratedColumn()
id: number;

@Column()
name: string;

@Column()
apiKey: string;

@Column("jsonb")
domains: string[];
}
28 changes: 28 additions & 0 deletions packages/indexer-database/src/entities/WebhookRequest.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import {
Entity,
PrimaryColumn,
Column,
Unique,
CreateDateColumn,
Index,
} from "typeorm";

@Entity()
@Unique("UK_webhook_request_clientId_filter", ["clientId", "filter"])
@Index("IX_webhook_request_filter", ["filter"])
export class WebhookRequest {
@PrimaryColumn()
id: string;

@Column({ type: "integer" })
clientId: number;

@Column()
url: string;

@Column()
filter: string;

@CreateDateColumn()
createdAt: Date;
}
3 changes: 3 additions & 0 deletions packages/indexer-database/src/entities/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,6 @@ export * from "./BundleEvent";
export * from "./BundleBlockRange";
export * from "./RootBundleExecutedJoinTable";
export * from "./RelayHashInfo";

export * from "./WebhookRequest";
export * from "./WebhookClient";
3 changes: 3 additions & 0 deletions packages/indexer-database/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ export const createDataSource = (config: DatabaseConfig): DataSource => {
entities.RootBundleExecutedJoinTable,
// Others
entities.RelayHashInfo,
// Webhooks
entities.WebhookRequest,
entities.WebhookClient,
],
migrationsTableName: "_migrations",
migrations: ["migrations/*.ts"],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { MigrationInterface, QueryRunner } from "typeorm";

export class WebhookClient1732297474910 implements MigrationInterface {
name = "WebhookClient1732297474910";

public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`
CREATE TABLE "webhook_client" (
"id" SERIAL NOT NULL,
"name" character varying NOT NULL,
"apiKey" character varying NOT NULL,
"domains" jsonb NOT NULL,
CONSTRAINT "UK_webhook_client_api_key" UNIQUE ("apiKey"),
CONSTRAINT "PK_f7330fb3bdb2e19534eae691d44" PRIMARY KEY ("id")
)`);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`DROP TABLE "webhook_client"`);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { MigrationInterface, QueryRunner } from "typeorm";

export class WebhookRequest1732297948190 implements MigrationInterface {
name = "WebhookRequest1732297948190";

public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`
CREATE TABLE "webhook_request" (
"id" character varying NOT NULL,
"clientId" integer NOT NULL,
"url" character varying NOT NULL,
"filter" character varying NOT NULL,
"createdAt" TIMESTAMP NOT NULL DEFAULT now(),
CONSTRAINT "UK_webhook_request_clientId_filter" UNIQUE ("clientId", "filter"),
CONSTRAINT "PK_67a7784045de2d1b7139b611b93" PRIMARY KEY ("id")
)`);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`DROP TABLE "webhook_request"`);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { MigrationInterface, QueryRunner } from "typeorm";

export class WebhookRequest1732310112989 implements MigrationInterface {
name = "WebhookRequest1732310112989";

public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`CREATE INDEX "IX_webhook_request_filter" ON "webhook_request" ("filter") `,
);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`DROP INDEX "public"."IX_webhook_request_filter"`);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Logger } from "winston";

import { DataSource } from "@repo/indexer-database";
import { eventProcessorManager } from "@repo/webhooks";

import { Config } from "../../parseEnv";
import { HubPoolRepository } from "../../database/HubPoolRepository";
Expand Down Expand Up @@ -39,6 +40,7 @@ export class AcrossIndexerManager {
private spokePoolRepository: SpokePoolRepository,
private redisCache: RedisCache,
private indexerQueuesService: IndexerQueuesService,
private webhookWriteFn?: eventProcessorManager.WebhookWriteFn,
) {}

public async start() {
Expand Down Expand Up @@ -93,7 +95,12 @@ export class AcrossIndexerManager {
this.hubPoolClientFactory,
this.spokePoolClientFactory,
this.spokePoolRepository,
new SpokePoolProcessor(this.postgres, this.logger, chainId),
new SpokePoolProcessor(
this.postgres,
this.logger,
chainId,
this.webhookWriteFn,
),
this.indexerQueuesService,
);
const spokePoolIndexer = new Indexer(
Expand Down
5 changes: 3 additions & 2 deletions packages/indexer/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) {
// Call write to kick off webhook calls
const { write } = WebhookFactory(
{
requireApiKey: false,
enabledWebhooks: [WebhookTypes.DepositStatus],
enabledWebhookRequestWorkers: true,
},
{ postgres, logger },
{ postgres, logger, redis },
);
// Retry providers factory
const retryProvidersFactory = new RetryProvidersFactory(
Expand Down Expand Up @@ -96,6 +96,7 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) {
new SpokePoolRepository(postgres, logger),
redisCache,
indexerQueuesService,
write,
);
const bundleServicesManager = new BundleServicesManager(
config,
Expand Down
79 changes: 68 additions & 11 deletions packages/indexer/src/services/spokePoolProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import { utils } from "@across-protocol/sdk";
import winston from "winston";

import {
DataSource,
entities,
utils as dbUtils,
SaveQueryResultType,
} from "@repo/indexer-database";
import winston from "winston";
import { WebhookTypes, eventProcessorManager } from "@repo/webhooks";

import { RelayStatus } from "../../../indexer-database/dist/src/entities";
import { StoreEventsResult } from "../data-indexing/service/SpokePoolIndexerDataHandler";

Expand All @@ -22,6 +25,7 @@ export class SpokePoolProcessor {
private readonly postgres: DataSource,
private readonly logger: winston.Logger,
private readonly chainId: number,
private readonly webhookWriteFn?: eventProcessorManager.WebhookWriteFn,
) {}

public async process(events: StoreEventsResult) {
Expand All @@ -37,9 +41,19 @@ export class SpokePoolProcessor {
SpokePoolEvents.V3FundsDeposited,
[...newDeposits, ...updatedDeposits],
);
// TODO: for new deposits, notify status change to unfilled
// here...

// Notify webhook of new deposits
newDeposits.forEach((deposit) => {
this.webhookWriteFn?.({
type: WebhookTypes.DepositStatus,
event: {
depositId: deposit.id,
originChainId: deposit.originChainId,
depositTxHash: deposit.transactionHash,
status: RelayStatus.Unfilled,
},
});
});
const newSlowFillRequests = dbUtils.filterSaveQueryResults(
events.slowFillRequests,
SaveQueryResultType.Inserted,
Expand All @@ -52,8 +66,19 @@ export class SpokePoolProcessor {
SpokePoolEvents.RequestedV3SlowFill,
[...newSlowFillRequests, ...updatedSlowFillRequests],
);
// TODO: for new slow fill requests, notify status change to slow fill requested
// here...

// Notify webhook of new slow fill requests
newSlowFillRequests.forEach((deposit) => {
this.webhookWriteFn?.({
type: WebhookTypes.DepositStatus,
event: {
depositId: deposit.id,
originChainId: deposit.originChainId,
depositTxHash: deposit.transactionHash,
status: RelayStatus.SlowFillRequested,
},
});
});

const newFills = dbUtils.filterSaveQueryResults(
events.fills,
Expand All @@ -67,16 +92,48 @@ export class SpokePoolProcessor {
...newFills,
...updatedFills,
]);
// TODO: for new fills, notify status change to filled
// here...

// Notify webhook of new fills
newFills.forEach((fill) => {
this.webhookWriteFn?.({
type: WebhookTypes.DepositStatus,
event: {
depositId: fill.depositId,
originChainId: fill.originChainId,
depositTxHash: fill.transactionHash,
status: RelayStatus.Filled,
},
});
});

const expiredDeposits = await this.updateExpiredRelays();
// TODO: for expired deposits, notify status change to expired
// here...
// Notify webhook of expired deposits
expiredDeposits.forEach((deposit) => {
this.webhookWriteFn?.({
type: WebhookTypes.DepositStatus,
event: {
depositId: deposit.depositId,
originChainId: deposit.originChainId,
depositTxHash: deposit.depositTxHash,
status: RelayStatus.Expired,
},
});
});

const refundedDeposits = await this.updateRefundedDepositsStatus();
// TODO: for refunded deposits, notify status change to refunded
// here...

// Notify webhook of refunded deposits
refundedDeposits.forEach((deposit) => {
this.webhookWriteFn?.({
type: WebhookTypes.DepositStatus,
event: {
depositId: deposit.depositId,
originChainId: deposit.originChainId,
depositTxHash: deposit.depositTxHash,
status: RelayStatus.Refunded,
},
});
});
}

/**
Expand Down
5 changes: 2 additions & 3 deletions packages/webhooks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ The `factory.ts` file provides a `WebhookFactory` function that sets up the webh
To use the `WebhookFactory`, you need to provide a configuration object and dependencies:

- **Config**: This object should include:
- requireApiKey: boolean; - Should registration of new webhooks require an api key
- enabledWebhooks: WebhookTypes[]; - What event processors should be enabled, like 'DepositStatus'

- **Dependencies**: This object should include:
Expand All @@ -27,8 +26,8 @@ import { DataSource } from "@repo/indexer-database";

const webhooks = WebhookFactory(
{
requireApiKey: false,
enableWebhooks: [WebhookTypes.DepositStatus],
enabledWebhooks: [WebhookTypes.DepositStatus],
enabledWebhookRequestWorkers: false,
},
{ postgres, logger },
);
Expand Down
5 changes: 4 additions & 1 deletion packages/webhooks/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
"license": "ISC",
"dependencies": {
"@repo/indexer-database": "workspace:*",
"bullmq": "^5.12.12",
"express": "^4.19.2",
"express-bearer-token": "^3.0.0",
"ioredis": "^5.4.1",
"redis": "^4.7.0",
"superstruct": "2.0.3-1"
"superstruct": "2.0.3-1",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooc is this a beta version?

Copy link
Contributor

@melisaguevara melisaguevara Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, their last official release has some import/export errors that are fixed in this beta version.

"uuid": "^11.0.3"
},
"exports": {
".": "./dist/index.js"
Expand Down
Loading