Skip to content

Commit

Permalink
feat(benchmark): add very basic benchmarking
Browse files Browse the repository at this point in the history
Signed-off-by: david <david@umaproject.org>
daywiss committed Dec 3, 2024
1 parent fdfe6d3 commit bfeb627
Showing 20 changed files with 527 additions and 41 deletions.
10 changes: 10 additions & 0 deletions packages/benchmark/.mocharc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"extension": [
"ts"
],
"spec": "**/*.test.ts",
"require": [
"ts-node/register"
],
"recursive": true
}
2 changes: 2 additions & 0 deletions packages/benchmark/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Benchmark Package
General benchmarking library to return the time in milleseconds between start and stop. Includes stats package to get various analytics.
6 changes: 6 additions & 0 deletions packages/benchmark/eslint.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// .eslintrc.js in the new package
module.exports = {
root:true,
extends: ['@repo/eslint-config/library.js'],
};

43 changes: 43 additions & 0 deletions packages/benchmark/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
{
"name": "@repo/benchmark",
"version": "0.0.1",
"description": "",
"main": "index.js",
"scripts": {
"build": "tsc -b",
"build:check": "tsc --noEmit",
"watch": "tsc -b --watch",
"fix": "pnpm format && pnpm lint",
"format": "prettier --write src",
"format:check": "prettier src --check",
"lint": "eslint --fix",
"lint:check": "eslint",
"check": "pnpm format:check && pnpm lint:check && pnpm build:check",
"test": "mocha",
"coverage": "nyc mocha",
"test:watch": "mocha --watch"
},
"keywords": [],
"author": "",
"license": "ISC",
"dependencies": {
},
"exports": {
".": "./dist/index.js"
},
"devDependencies": {
"@istanbuljs/nyc-config-typescript": "^1.0.2",
"@repo/eslint-config": "workspace:*",
"@repo/typescript-config": "workspace:*",
"@types/chai": "^4.3.17",
"@types/mocha": "^10.0.7",
"chai": "^4.5.0",
"eslint": "^8.57.0",
"mocha": "^10.7.0",
"nyc": "^17.0.0",
"prettier": "^3.3.3",
"source-map-support": "^0.5.21",
"ts-node": "^10.9.2",
"typescript": "^5.5.4"
}
}
96 changes: 96 additions & 0 deletions packages/benchmark/src/benchmark.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import { expect } from "chai";
import { Benchmark } from "./benchmark";
import { BenchmarkStats } from "./stats";

describe("Benchmark", () => {
let benchmark: Benchmark;

beforeEach(() => {
benchmark = new Benchmark();
});

it("should start and end a benchmark event correctly", async () => {
benchmark.start("testEvent", 0);
const duration = benchmark.end("testEvent", 1);
expect(duration).to.be.a("number");
expect(duration).to.be.greaterThan(0);
});

it("should throw an error if end is called without start", () => {
expect(() => benchmark.end("nonExistentEvent")).to.throw(
Error,
'Benchmark for event "nonExistentEvent" not started. Call start() before end().',
);
});

it("should handle multiple events independently", () => {
benchmark.start("event1", 0);
benchmark.start("event2", 0);

const duration1 = benchmark.end("event1", 1);
expect(duration1).to.be.a("number");
expect(duration1).to.be.greaterThan(0);

const duration2 = benchmark.end("event2", 1);
expect(duration2).to.be.a("number");
expect(duration2).to.be.greaterThan(0);
});

it("should throw an error if the same event is started twice without ending", () => {
benchmark.start("duplicateEvent");
expect(() => benchmark.start("duplicateEvent")).to.not.throw();
expect(() => benchmark.end("duplicateEvent")).to.not.throw();
});
});

describe("BenchmarkStats", () => {
let benchmarkStats: BenchmarkStats;

beforeEach(() => {
benchmarkStats = new BenchmarkStats();
});

it("should start and end a benchmark event correctly", () => {
benchmarkStats.start("testEvent", 0);
const duration = benchmarkStats.end("testEvent", 1);
expect(duration).to.be.a("number");
expect(duration).to.be.greaterThan(0);
});

it("should return correct stats for events", () => {
benchmarkStats.start("event1");
benchmarkStats.end("event1");
benchmarkStats.start("event2");
benchmarkStats.end("event2");

const stats = benchmarkStats.getStats();
expect(stats.total).to.equal(2);
expect(stats.oldest).to.be.a("number");
expect(stats.newest).to.be.a("number");
expect(stats.average).to.be.a("number");
expect(stats.fastest).to.be.a("number");
expect(stats.slowest).to.be.a("number");
});

it("should handle events with specific integer timestamps correctly", () => {
const startTime1 = 1000;
const endTime1 = 2000;
const startTime2 = 3000;
const endTime2 = 4000;

benchmarkStats.start("event1", startTime1);
benchmarkStats.end("event1", endTime1);
benchmarkStats.start("event2", startTime2);
benchmarkStats.end("event2", endTime2);

const stats = benchmarkStats.getStats();
expect(stats.total).to.equal(2);
expect(stats.oldest).to.equal(endTime1 - startTime1);
expect(stats.newest).to.equal(endTime2 - startTime2);
expect(stats.average).to.equal(
(endTime1 - startTime1 + endTime2 - startTime2) / 2,
);
expect(stats.fastest).to.equal(endTime1 - startTime1);
expect(stats.slowest).to.equal(endTime2 - startTime2);
});
});
46 changes: 46 additions & 0 deletions packages/benchmark/src/benchmark.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* A class to benchmark events by tracking their start and end times.
*/
import { IBenchmark } from "./types";

export class Benchmark implements IBenchmark {
private events: Map<string, number>;

/**
* Initializes a new instance of the Benchmark class.
*/
constructor() {
this.events = new Map();
}

/**
* Starts tracking an event by storing its start time.
*
* @param {string} eventName - The name of the event to start tracking.
* @param {number} [now=Date.now()] - The current time in milliseconds. Defaults to the current time.
*/
start(eventName: string, now: number = Date.now()): void {
this.events.set(eventName, now);
}

/**
* Ends tracking an event and calculates its duration.
*
* @param {string} eventName - The name of the event to end tracking.
* @param {number} [now=Date.now()] - The current time in milliseconds. Defaults to the current time.
* @returns {number | undefined} The duration of the event in milliseconds, or undefined if the event was not started.
* @throws Will throw an error if the event was not started before calling this method.
*/
end(eventName: string, now: number = Date.now()): number | undefined {
const startTime = this.events.get(eventName);
if (startTime === undefined) {
throw new Error(
`Benchmark for event "${eventName}" not started. Call start() before end().`,
);
}
const endTime = now;
const duration = endTime - startTime;
this.events.delete(eventName);
return duration;
}
}
3 changes: 3 additions & 0 deletions packages/benchmark/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export * from "./benchmark";
export * from "./stats";
export * from "./types";
86 changes: 86 additions & 0 deletions packages/benchmark/src/stats.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import { Benchmark } from "./benchmark";

import { IBenchmark } from "./types";

export class BenchmarkStats implements IBenchmark {
private benchmark: Benchmark;
private eventDurations: Map<string, number>;

constructor(benchmark: Benchmark = new Benchmark()) {
this.benchmark = benchmark;
this.eventDurations = new Map();
}

/**
* Starts a new benchmark event.
* @param {string} eventName - The name of the event to start.
*/
start(eventName: string, now: number = Date.now()): void {
this.benchmark.start(eventName, now);
}

/**
* Ends a benchmark event and records its duration.
* @param {string} eventName - The name of the event to stop.
* @returns {number | undefined} The duration of the event in milliseconds, or undefined if the event was not started.
*/
end(eventName: string, now: number = Date.now()): number | undefined {
const duration = this.benchmark.end(eventName, now);
if (duration !== undefined) {
this.eventDurations.set(eventName, duration);
}
return duration;
}

/**
* Provides statistics about the currently tracked events.
*
* @returns {object} An object containing statistics about the events.
*/
getStats(): {
total: number;
oldest: number | null;
newest: number | null;
average: number | null;
fastest: number | null;
slowest: number | null;
} {
const total = this.eventDurations.size;

if (total === 0) {
return {
total,
oldest: null,
newest: null,
average: null,
fastest: null,
slowest: null,
};
}

let oldest = Number.MAX_VALUE;
let newest = Number.MIN_VALUE;
let totalDuration = 0;
let fastest = Number.MAX_VALUE;
let slowest = Number.MIN_VALUE;

for (const duration of this.eventDurations.values()) {
totalDuration += duration;
if (duration < fastest) fastest = duration;
if (duration > slowest) slowest = duration;
if (duration < oldest) oldest = duration;
if (duration > newest) newest = duration;
}

const average = totalDuration / total;

return {
total,
oldest,
newest,
average,
fastest,
slowest,
};
}
}
19 changes: 19 additions & 0 deletions packages/benchmark/src/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
export interface IBenchmark {
/**
* Starts tracking an event by storing its start time.
*
* @param {string} eventName - The name of the event to start tracking.
* @param {number} [now=Date.now()] - The current time in milliseconds. Defaults to the current time.
*/
start(eventName: string, now?: number): void;

/**
* Ends tracking an event and calculates its duration.
*
* @param {string} eventName - The name of the event to end tracking.
* @param {number} [now=Date.now()] - The current time in milliseconds. Defaults to the current time.
* @returns {number | undefined} The duration of the event in milliseconds, or undefined if the event was not started.
* @throws Will throw an error if the event was not started before calling this method.
*/
end(eventName: string, now?: number): number | undefined;
}
6 changes: 6 additions & 0 deletions packages/benchmark/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"extends":"@repo/typescript-config/base.json",
"compilerOptions": {
"outDir": "./dist" /* Specify an output folder for all emitted files. */
}
}
1 change: 1 addition & 0 deletions packages/indexer/package.json
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@
"@across-protocol/constants": "^3.1.20",
"@across-protocol/contracts": "^3.0.16",
"@across-protocol/sdk": "^3.3.23",
"@repo/benchmark": "workspace:*",
"@repo/error-handling": "workspace:*",
"@repo/webhooks": "workspace:*",
"@types/express": "^4.17.21",
52 changes: 50 additions & 2 deletions packages/indexer/src/main.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import winston from "winston";
import Redis from "ioredis";
import * as across from "@across-protocol/sdk";
import { WebhookFactory, WebhookTypes } from "@repo/webhooks";
import {
JSONValue,
WebhookFactory,
WebhookTypes,
eventProcessors,
} from "@repo/webhooks";
import { providers } from "ethers";

import { connectToDatabase } from "./database/database.provider";
import * as parseEnv from "./parseEnv";
@@ -19,6 +25,8 @@ import { IndexerQueuesService } from "./messaging/service";
import { IntegratorIdWorker } from "./messaging/IntegratorIdWorker";
import { AcrossIndexerManager } from "./data-indexing/service/AcrossIndexerManager";
import { BundleServicesManager } from "./services/BundleServicesManager";
import { BenchmarkStats } from "@repo/benchmark";
import { listenForDeposits } from "./utils/benchmarks";

async function initializeRedis(
config: parseEnv.RedisConfig,
@@ -55,6 +63,14 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) {
const redis = await initializeRedis(redisConfig, logger);
const redisCache = new RedisCache(redis);
const postgres = await connectToDatabase(postgresConfig, logger);
const depositBenchmark = new BenchmarkStats();
const providerChainIds = config.allProviderConfigs
.filter(([_, chainId]) => config.spokePoolChainsEnabled.includes(chainId))
.map(([providerUrl, chainId]) => ({
provider: new providers.JsonRpcProvider(providerUrl),
chainId: Number(chainId),
}));

// Call write to kick off webhook calls
const { write } = await WebhookFactory(config.webhookConfig, {
postgres,
@@ -95,7 +111,32 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) {
new SpokePoolRepository(postgres, logger),
redisCache,
indexerQueuesService,
write,
(params: { type: WebhookTypes; event: JSONValue }) => {
// stop any benchmarks based on origin and deposit it
if (params.type === WebhookTypes.DepositStatus) {
const depositStatusEvent =
params.event as eventProcessors.DepositStatusEvent;
const uniqueId = `${depositStatusEvent.originChainId}-${depositStatusEvent.depositId}`;
try {
const duration = depositBenchmark.end(uniqueId);
logger.debug({
message: "Profiled deposit",
duration,
uniqueId,
...depositStatusEvent,
});
} catch (err) {
logger.debug({
message: "Error profiling deposit",
uniqueId,
...depositStatusEvent,
err,
});
// ignore errors, but it can happen if we are ending before starting
}
}
write(params);
},
);
const bundleServicesManager = new BundleServicesManager(
config,
@@ -117,6 +158,11 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) {
retryProvidersFactory,
);

const stopDepositListener = listenForDeposits(
depositBenchmark,
providerChainIds,
logger,
);
let exitRequested = false;
process.on("SIGINT", () => {
if (!exitRequested) {
@@ -127,6 +173,7 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) {
integratorIdWorker.close();
acrossIndexerManager.stopGracefully();
bundleServicesManager.stop();
stopDepositListener();
} else {
integratorIdWorker.close();
logger.info({ at: "Indexer#Main", message: "Forcing exit..." });
@@ -141,6 +188,7 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) {
at: "Indexer#Main",
message: "Running indexers",
});

// start all indexers in parallel, will wait for them to complete, but they all loop independently
const [bundleServicesManagerResults, acrossIndexerManagerResult] =
await Promise.allSettled([
41 changes: 21 additions & 20 deletions packages/indexer/src/parseEnv.ts
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@ export type Config = {
enableBundleIncludedEventsService: boolean;
enableBundleBuilder: boolean;
webhookConfig: WebhooksConfig;
allProviderConfigs: ProviderConfig[];
};
export type RedisConfig = {
host: string;
@@ -73,9 +74,9 @@ function parsePostgresConfig(
};
}

function parseProviderConfigs(env: Env): ProviderConfig[] {
export function parseProviderConfigs(env: Env = process.env): ProviderConfig[] {
const results: ProviderConfig[] = [];
for (const [key, value] of Object.entries(process.env)) {
for (const [key, value] of Object.entries(env)) {
const match = key.match(/^RPC_PROVIDER_URLS_(\d+)$/);
if (match) {
const chainId = match[1] ? parseNumber(match[1]) : undefined;
@@ -90,9 +91,9 @@ function parseProviderConfigs(env: Env): ProviderConfig[] {
return results;
}

export function parseProvidersUrls() {
export function parseProvidersUrls(env: Env = process.env) {
const results: Map<number, string[]> = new Map();
for (const [key, value] of Object.entries(process.env)) {
for (const [key, value] of Object.entries(env)) {
const match = key.match(/^RPC_PROVIDER_URLS_(\d+)$/);
if (match) {
const chainId = match[1] ? parseNumber(match[1]) : undefined;
@@ -105,39 +106,38 @@ export function parseProvidersUrls() {
return results;
}

export function parseRetryProviderEnvs(chainId: number) {
export function parseRetryProviderEnvs(
chainId: number,
env: Env = process.env,
) {
const providerCacheNamespace =
process.env.PROVIDER_CACHE_NAMESPACE || "indexer_provider_cache";
env.PROVIDER_CACHE_NAMESPACE || "indexer_provider_cache";
const maxConcurrency = Number(
process.env[`NODE_MAX_CONCURRENCY_${chainId}`] ||
process.env.NODE_MAX_CONCURRENCY ||
"25",
env[`NODE_MAX_CONCURRENCY_${chainId}`] || env.NODE_MAX_CONCURRENCY || "25",
);
const pctRpcCallsLogged = Number(
process.env[`NODE_PCT_RPC_CALLS_LOGGED_${chainId}`] ||
process.env.NODE_PCT_RPC_CALLS_LOGGED ||
env[`NODE_PCT_RPC_CALLS_LOGGED_${chainId}`] ||
env.NODE_PCT_RPC_CALLS_LOGGED ||
"0",
);
const providerCacheTtl = process.env.PROVIDER_CACHE_TTL
? Number(process.env.PROVIDER_CACHE_TTL)
const providerCacheTtl = env.PROVIDER_CACHE_TTL
? Number(env.PROVIDER_CACHE_TTL)
: undefined;
const nodeQuorumThreshold = Number(
process.env[`NODE_QUORUM_${chainId}`] || process.env.NODE_QUORUM || "1",
env[`NODE_QUORUM_${chainId}`] || env.NODE_QUORUM || "1",
);
const retries = Number(
process.env[`NODE_RETRIES_${chainId}`] || process.env.NODE_RETRIES || "0",
env[`NODE_RETRIES_${chainId}`] || env.NODE_RETRIES || "0",
);
const retryDelay = Number(
process.env[`NODE_RETRY_DELAY_${chainId}`] ||
process.env.NODE_RETRY_DELAY ||
"1",
env[`NODE_RETRY_DELAY_${chainId}`] || env.NODE_RETRY_DELAY || "1",
);
// Note: if there is no env var override _and_ no default, this will remain undefined and
// effectively disable indefinite caching of old blocks/keys.
const noTtlBlockDistance: number | undefined = process.env[
const noTtlBlockDistance: number | undefined = env[
`NO_TTL_BLOCK_DISTANCE_${chainId}`
]
? Number(process.env[`NO_TTL_BLOCK_DISTANCE_${chainId}`])
? Number(env[`NO_TTL_BLOCK_DISTANCE_${chainId}`])
: getNoTtlBlockDistance(chainId);

return {
@@ -203,5 +203,6 @@ export function envToConfig(env: Env): Config {
enableBundleIncludedEventsService,
enableBundleBuilder,
webhookConfig,
allProviderConfigs,
};
}
1 change: 1 addition & 0 deletions packages/indexer/src/services/spokePoolProcessor.ts
Original file line number Diff line number Diff line change
@@ -138,6 +138,7 @@ export class SpokePoolProcessor {

/**
* Updates relayHashInfo table to include recently stored events
* @param eventType The type of event being processed.
* @param events An array of already stored deposits, fills or slow fill requests
* @returns A void promise
*/
71 changes: 71 additions & 0 deletions packages/indexer/src/utils/benchmarks.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import { Contract, ethers, providers } from "ethers";
import { IBenchmark } from "@repo/benchmark";
import winston from "winston";
import {
getDeployedAddress,
getDeployedBlockNumber,
SpokePool,
SpokePool__factory as SpokePoolFactory,
} from "@across-protocol/contracts";
import { getAddress } from "./contractUtils";
import { Logger } from "ethers/lib/utils";

export type GetSpokeClientParams = {
provider: providers.Provider;
address: string;
};

export function getSpokepoolContract(params: GetSpokeClientParams) {
return SpokePoolFactory.connect(params.address, params.provider);
}
export type ProviderChainId = {
provider: providers.Provider;
chainId: number;
};

export function listenForDeposits(
benchmark: IBenchmark,
chains: ProviderChainId[],
logger: winston.Logger,
): () => void {
const spokeClients: [SpokePool, number][] = chains.map(
({ provider, chainId }) => {
const address = getAddress("SpokePool", chainId);
return [getSpokepoolContract({ provider, address }), chainId];
},
);

const unlistenFunctions = spokeClients.map(([spokeClient, chainId]) => {
const onV3FundsDeposited = (depositId: string) => {
const uniqueId = `${chainId}-${depositId}`;
logger.debug({
at: "Indexer.Benchmarks",
uniqueId,
chainId,
message: "Saw V3 Funds deposited",
});
benchmark.start(uniqueId);
};
logger.info({
at: "Indexer.Benchmarks",
chainId,
message: `Registering V3 Funds Deposited benchmarks for chain ${chainId}`,
});
spokeClient.on("V3FundsDeposited", onV3FundsDeposited);

// Return a function to unlisten and clean up events for this client
return () => {
spokeClient.off("V3FundsDeposited", onV3FundsDeposited);
logger.info({
at: "Indexer.Benchmarks",
chainId,
message: `Unlistened from V3FundsDeposited for SpokePool at chainId ${chainId}`,
});
};
});

// Return a single function to unlisten from all events
return () => {
unlistenFunctions.forEach((unlisten) => unlisten());
};
}
2 changes: 1 addition & 1 deletion packages/indexer/src/utils/contractUtils.ts
Original file line number Diff line number Diff line change
@@ -22,7 +22,7 @@ export type GetSpokeClientParams = {
hubPoolClient: across.clients.HubPoolClient;
};

function getAddress(contractName: string, chainId: number): string {
export function getAddress(contractName: string, chainId: number): string {
const address = getDeployedAddress(contractName, chainId);
if (!address) {
throw new Error(
1 change: 0 additions & 1 deletion packages/indexer/src/web3/RetryProvidersFactory.ts
Original file line number Diff line number Diff line change
@@ -15,7 +15,6 @@ export class RetryProvidersFactory {

public initializeProviders(): RetryProvidersFactory {
const providersUrls = parseProvidersUrls();

for (const [chainId, providerUrls] of providersUrls.entries()) {
const retryProviderEnvs = parseRetryProviderEnvs(chainId);
if (!providerUrls || providerUrls.length === 0) {
2 changes: 1 addition & 1 deletion packages/webhooks/src/eventProcessorManager.ts
Original file line number Diff line number Diff line change
@@ -77,7 +77,7 @@ export class EventProcessorManager {
`Attempting to register webhook of type: ${params.type} with URL: ${params.url}`,
);
const client = await this.clientRepository.getClientByApiKey(apiKey);
// TODO: Reinable this potentially when we need it, but not great for testing
// TODO: Re-enable this potentially when we need it, but not great for testing
// const urlDomain = new URL(params.url).hostname;
// const isDomainValid = client.domains.includes(urlDomain);
// assert(
6 changes: 5 additions & 1 deletion packages/webhooks/src/factory.ts
Original file line number Diff line number Diff line change
@@ -76,12 +76,16 @@ export async function WebhookFactory(config: Config, deps: Dependencies) {
}
});
if (config.enabledWebhookRequestWorkers) {
new WebhookRequestWorker(
const worker = new WebhookRequestWorker(
redis,
postgres,
logger,
eventProcessorManager.write,
);
process.on("SIGINT", () => {
// Shutdown worker on exit
worker.close();
});
}
const router = WebhookRouter({ eventProcessorManager });
return {
74 changes: 59 additions & 15 deletions pnpm-lock.yaml

0 comments on commit bfeb627

Please sign in to comment.