From be038cc053e19c44dc4ff8ce9b2eaa2e4c11850c Mon Sep 17 00:00:00 2001 From: nigiri <168690269+0xnigir1@users.noreply.github.com> Date: Mon, 6 Jan 2025 16:31:24 -0300 Subject: [PATCH 1/3] feat: error retry handling --- .../src/services/processing.service.ts | 11 +- .../services/sharedDependencies.service.ts | 11 +- .../unit/sharedDependencies.service.spec.ts | 4 +- packages/data-flow/src/orchestrator.ts | 90 ++++++++++----- .../data-flow/src/retroactiveProcessor.ts | 13 ++- .../data-flow/test/unit/orchestrator.spec.ts | 88 +++++++------- .../test/unit/retroactiveProcessor.spec.ts | 38 ++++++ packages/shared/src/exceptions/base.ts | 40 +++++++ packages/shared/src/exceptions/common.ts | 17 +++ packages/shared/src/exceptions/index.ts | 4 + .../shared/src/exceptions/nonRetriable.ts | 7 ++ packages/shared/src/exceptions/retriable.ts | 18 +++ packages/shared/src/external.ts | 6 + packages/shared/src/internal.ts | 2 + .../src/retry/exponentialBackoff.strategy.ts | 57 +++++++++ packages/shared/src/retry/index.ts | 3 + packages/shared/src/retry/retry.ts | 65 +++++++++++ .../src/retry/retryStrategy.interface.ts | 23 ++++ .../retry/exponentialBackoff.strategy.spec.ts | 108 ++++++++++++++++++ packages/shared/test/retry/retry.spec.ts | 87 ++++++++++++++ 20 files changed, 610 insertions(+), 82 deletions(-) create mode 100644 packages/shared/src/exceptions/base.ts create mode 100644 packages/shared/src/exceptions/common.ts create mode 100644 packages/shared/src/exceptions/index.ts create mode 100644 packages/shared/src/exceptions/nonRetriable.ts create mode 100644 packages/shared/src/exceptions/retriable.ts create mode 100644 packages/shared/src/retry/exponentialBackoff.strategy.ts create mode 100644 packages/shared/src/retry/index.ts create mode 100644 packages/shared/src/retry/retry.ts create mode 100644 packages/shared/src/retry/retryStrategy.interface.ts create mode 100644 packages/shared/test/retry/exponentialBackoff.strategy.spec.ts create mode 100644 packages/shared/test/retry/retry.spec.ts diff --git a/apps/processing/src/services/processing.service.ts b/apps/processing/src/services/processing.service.ts index 0d8a199..dd41be0 100644 --- a/apps/processing/src/services/processing.service.ts +++ b/apps/processing/src/services/processing.service.ts @@ -45,8 +45,14 @@ export class ProcessingService { static async initialize(env: Environment): Promise { const sharedDependencies = await SharedDependenciesService.initialize(env); const { CHAINS: chains } = env; - const { core, registriesRepositories, indexerClient, kyselyDatabase, logger } = - sharedDependencies; + const { + core, + registriesRepositories, + indexerClient, + kyselyDatabase, + logger, + retryStrategy, + } = sharedDependencies; const { eventRegistryRepository, strategyRegistryRepository, @@ -83,6 +89,7 @@ export class ProcessingService { }, chain.fetchLimit, chain.fetchDelayMs, + retryStrategy, logger, ); const retroactiveProcessor = new RetroactiveProcessor( diff --git a/apps/processing/src/services/sharedDependencies.service.ts b/apps/processing/src/services/sharedDependencies.service.ts index fc29306..bc91d40 100644 --- a/apps/processing/src/services/sharedDependencies.service.ts +++ b/apps/processing/src/services/sharedDependencies.service.ts @@ -17,7 +17,7 @@ import { KyselyStrategyRegistryRepository, KyselyTransactionManager, } from "@grants-stack-indexer/repository"; -import { ILogger, Logger } from "@grants-stack-indexer/shared"; +import { ExponentialBackoff, ILogger, Logger, RetryStrategy } from "@grants-stack-indexer/shared"; import { Environment } from "../config/index.js"; @@ -30,6 +30,7 @@ export type SharedDependencies = { }; indexerClient: EnvioIndexerClient; kyselyDatabase: ReturnType; + retryStrategy: RetryStrategy; logger: ILogger; }; @@ -91,6 +92,13 @@ export class SharedDependenciesService { env.INDEXER_ADMIN_SECRET, ); + const retryStrategy = new ExponentialBackoff({ + maxAttempts: 3, + baseDelay: 1000, + maxDelay: 3 * 60 * 1000, + factor: 2, + }); + return { core: { projectRepository, @@ -109,6 +117,7 @@ export class SharedDependenciesService { }, indexerClient, kyselyDatabase, + retryStrategy, logger, }; } diff --git a/apps/processing/test/unit/sharedDependencies.service.spec.ts b/apps/processing/test/unit/sharedDependencies.service.spec.ts index cdd3c75..ed84920 100644 --- a/apps/processing/test/unit/sharedDependencies.service.spec.ts +++ b/apps/processing/test/unit/sharedDependencies.service.spec.ts @@ -19,8 +19,10 @@ const mocks = vi.hoisted(() => { }; }); -vi.mock("@grants-stack-indexer/shared", () => { +vi.mock("@grants-stack-indexer/shared", async (importActual) => { + const actual = await importActual(); return { + ...actual, Logger: { getInstance: vi.fn().mockReturnValue(mocks.logger), }, diff --git a/packages/data-flow/src/orchestrator.ts b/packages/data-flow/src/orchestrator.ts index 0265750..1e732a5 100644 --- a/packages/data-flow/src/orchestrator.ts +++ b/packages/data-flow/src/orchestrator.ts @@ -16,6 +16,9 @@ import { isAlloEvent, isStrategyEvent, ProcessorEvent, + RetriableError, + RetryHandler, + RetryStrategy, StrategyEvent, stringify, } from "@grants-stack-indexer/shared"; @@ -46,10 +49,11 @@ import { CoreDependencies, DataLoader, delay, IQueue, iStrategyAbi, Queue } from * The Orchestrator provides fault tolerance and performance optimization through: * - Configurable batch sizes for event fetching * - Delayed processing to prevent overwhelming the system - * - Error handling and logging for various failure scenarios + * - Retry handling with exponential backoff for transient failures + * - Comprehensive error handling and logging for various failure scenarios * - Registry tracking of supported/unsupported strategies and events * - * TODO: Enhance the error handling/retries, logging and observability + * TODO: Enhance logging and observability */ export class Orchestrator { private readonly eventsQueue: IQueue>; @@ -58,6 +62,7 @@ export class Orchestrator { private readonly eventsRegistry: IEventsRegistry; private readonly strategyRegistry: IStrategyRegistry; private readonly dataLoader: DataLoader; + private readonly retryHandler: RetryHandler; /** * @param chainId - The chain id @@ -65,6 +70,7 @@ export class Orchestrator { * @param indexerClient - The indexer client * @param registries - The registries * @param fetchLimit - The fetch limit + * @param retryStrategy - The retry strategy * @param fetchDelayInMs - The fetch delay in milliseconds */ constructor( @@ -77,6 +83,7 @@ export class Orchestrator { }, private fetchLimit: number = 1000, private fetchDelayInMs: number = 10000, + private retryStrategy: RetryStrategy, private logger: ILogger, ) { this.eventsFetcher = new EventsFetcher(this.indexerClient); @@ -98,6 +105,7 @@ export class Orchestrator { this.logger, ); this.eventsQueue = new Queue>(fetchLimit); + this.retryHandler = new RetryHandler(retryStrategy, this.logger); } async run(signal: AbortSignal): Promise { @@ -119,46 +127,42 @@ export class Orchestrator { await delay(this.fetchDelayInMs); continue; } + await this.eventsRegistry.saveLastProcessedEvent(this.chainId, { ...event, rawEvent: event, }); - event = await this.enhanceStrategyId(event); - if (this.isPoolCreated(event)) { - const handleable = existsHandler(event.strategyId); - await this.strategyRegistry.saveStrategyId( - this.chainId, - event.params.strategy, - event.strategyId, - handleable, - ); - } else if (event.contractName === "Strategy" && "strategyId" in event) { - if (!existsHandler(event.strategyId)) { - this.logger.debug("Skipping event", { - event, - className: Orchestrator.name, - chainId: this.chainId, - }); - // we skip the event if the strategy id is not handled yet - continue; - } - } - - const changesets = await this.eventsProcessor.processEvent(event); - await this.dataLoader.applyChanges(changesets); + await this.retryHandler.execute( + async () => { + await this.handleEvent(event!); + }, + { abortSignal: signal }, + ); } catch (error: unknown) { - // TODO: improve error handling, retries and notify + // TODO: notify if ( error instanceof UnsupportedStrategy || error instanceof InvalidEvent || error instanceof UnsupportedEventException ) { - // this.logger.error( - // `Current event cannot be handled. ${error.name}: ${error.message}. Event: ${stringify(event)}`, - // ); + this.logger.debug( + `Current event cannot be handled. ${error.name}: ${error.message}.`, + { + className: Orchestrator.name, + chainId: this.chainId, + event, + }, + ); } else { - if (error instanceof Error || isNativeError(error)) { + if (error instanceof RetriableError) { + error.message = `Error processing event after retries. ${error.message}`; + this.logger.error(error, { + event, + className: Orchestrator.name, + chainId: this.chainId, + }); + } else if (error instanceof Error || isNativeError(error)) { this.logger.error(error, { event, className: Orchestrator.name, @@ -201,6 +205,32 @@ export class Orchestrator { this.eventsQueue.push(...events); } + private async handleEvent(event: ProcessorEvent): Promise { + event = await this.enhanceStrategyId(event); + if (this.isPoolCreated(event)) { + const handleable = existsHandler(event.strategyId); + await this.strategyRegistry.saveStrategyId( + this.chainId, + event.params.strategy, + event.strategyId, + handleable, + ); + } else if (event.contractName === "Strategy" && "strategyId" in event) { + if (!existsHandler(event.strategyId)) { + this.logger.debug("Skipping event", { + event, + className: Orchestrator.name, + chainId: this.chainId, + }); + // we skip the event if the strategy id is not handled yet + return; + } + } + + const changesets = await this.eventsProcessor.processEvent(event); + await this.dataLoader.applyChanges(changesets); + } + /** * Enhance the event with the strategy id when required * @param event - The event diff --git a/packages/data-flow/src/retroactiveProcessor.ts b/packages/data-flow/src/retroactiveProcessor.ts index ddb25c6..c47c341 100644 --- a/packages/data-flow/src/retroactiveProcessor.ts +++ b/packages/data-flow/src/retroactiveProcessor.ts @@ -9,6 +9,8 @@ import { Hex, ILogger, ProcessorEvent, + RetryHandler, + RetryStrategy, stringify, } from "@grants-stack-indexer/shared"; @@ -62,6 +64,7 @@ export class RetroactiveProcessor { private readonly strategyRegistry: IStrategyRegistry; private readonly dataLoader: DataLoader; private readonly checkpointRepository: IStrategyProcessingCheckpointRepository; + private readonly retryHandler: RetryHandler; /** * Creates a new instance of RetroactiveProcessor @@ -70,6 +73,7 @@ export class RetroactiveProcessor { * @param indexerClient - Client for fetching blockchain events * @param registries - Event and strategy registries for tracking processing state * @param fetchLimit - Maximum number of events to fetch in a single batch (default: 1000) + * @param retryStrategy - The retry strategy * @param logger - Logger instance for debugging and monitoring */ constructor( @@ -82,6 +86,7 @@ export class RetroactiveProcessor { checkpointRepository: IStrategyProcessingCheckpointRepository; }, private fetchLimit: number = 1000, + private retryStrategy: RetryStrategy, private logger: ILogger, ) { this.eventsFetcher = new EventsFetcher(this.indexerClient); @@ -103,6 +108,7 @@ export class RetroactiveProcessor { this.dependencies.transactionManager, this.logger, ); + this.retryHandler = new RetryHandler(retryStrategy, this.logger); } /** @@ -208,8 +214,11 @@ export class RetroactiveProcessor { if (this.hasReachedLastEvent(currentPointer, lastEventPointer)) break; event.strategyId = strategyId; - const changesets = await this.eventsProcessor.processEvent(event); - await this.dataLoader.applyChanges(changesets); + + await this.retryHandler.execute(async () => { + const changesets = await this.eventsProcessor.processEvent(event!); + await this.dataLoader.applyChanges(changesets); + }); } catch (error) { if (error instanceof InvalidEvent || error instanceof UnsupportedEventException) { // Expected errors that we can safely ignore diff --git a/packages/data-flow/test/unit/orchestrator.spec.ts b/packages/data-flow/test/unit/orchestrator.spec.ts index 057784b..31a2596 100644 --- a/packages/data-flow/test/unit/orchestrator.spec.ts +++ b/packages/data-flow/test/unit/orchestrator.spec.ts @@ -3,7 +3,6 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { EvmProvider } from "@grants-stack-indexer/chain-providers"; import { IIndexerClient } from "@grants-stack-indexer/indexer-client"; -import { UnsupportedStrategy } from "@grants-stack-indexer/processors"; import { Changeset, IApplicationPayoutRepository, @@ -19,11 +18,12 @@ import { ContractName, ContractToEventName, EventParams, + ExponentialBackoff, Hex, ILogger, ProcessorEvent, + RateLimitError, StrategyEvent, - stringify, } from "@grants-stack-indexer/shared"; import { @@ -119,6 +119,7 @@ describe("Orchestrator", { sequential: true }, () => { }, mockFetchLimit, mockFetchDelay, + new ExponentialBackoff({ baseDelay: 10, maxAttempts: 3, factor: 2 }), logger, ); }); @@ -548,7 +549,33 @@ describe("Orchestrator", { sequential: true }, () => { }); describe("Error Handling", () => { - it.skip("retries error"); + it("retries retriable errors", async () => { + const retriableError = new RateLimitError({ className: "ExternalProvider" }, 10); + const mockEvent = createMockEvent("Allo", "Unknown" as unknown as AlloEvent, 1); + + const eventsProcessorSpy = vi.spyOn(orchestrator["eventsProcessor"], "processEvent"); + + vi.spyOn(mockIndexerClient, "getEventsAfterBlockNumberAndLogIndex") + .mockResolvedValueOnce([mockEvent]) + .mockResolvedValue([]); + vi.spyOn(orchestrator["dataLoader"], "applyChanges").mockResolvedValue( + await Promise.resolve(), + ); + vi.spyOn(mockEventsRegistry, "saveLastProcessedEvent").mockImplementation(() => { + return Promise.resolve(); + }); + eventsProcessorSpy.mockRejectedValueOnce(retriableError).mockResolvedValueOnce([]); + + runPromise = orchestrator.run(abortController.signal); + + await vi.waitFor(() => { + if (eventsProcessorSpy.mock.calls.length < 2) throw new Error("Not yet called"); + }); + + expect(eventsProcessorSpy).toHaveBeenCalledTimes(2); + expect(orchestrator["dataLoader"].applyChanges).toHaveBeenCalledTimes(1); + expect(mockEventsRegistry.saveLastProcessedEvent).toHaveBeenCalledTimes(1); + }); it("keeps running when there is an error", async () => { const eventsProcessorSpy = vi.spyOn(orchestrator["eventsProcessor"], "processEvent"); @@ -575,14 +602,13 @@ describe("Orchestrator", { sequential: true }, () => { if (eventsProcessorSpy.mock.calls.length < 2) throw new Error("Not yet called"); }, { - timeout: 1000, + timeout: 2000, }, ); expect(eventsProcessorSpy).toHaveBeenCalledTimes(2); expect(orchestrator["dataLoader"].applyChanges).toHaveBeenCalledTimes(1); expect(mockEventsRegistry.saveLastProcessedEvent).toHaveBeenCalledTimes(2); - expect(logger.error).toHaveBeenCalledTimes(1); expect(logger.error).toHaveBeenCalledWith(error, { className: Orchestrator.name, chainId, @@ -590,11 +616,10 @@ describe("Orchestrator", { sequential: true }, () => { }); }); - it.skip("logs error for InvalidEvent", async () => { + it("logs debug for InvalidEvent", async () => { const mockEvent = createMockEvent("Allo", "Unknown" as unknown as AlloEvent, 1); const error = new InvalidEvent(mockEvent); - const consoleSpy = vi.spyOn(console, "error"); const eventsProcessorSpy = vi.spyOn(orchestrator["eventsProcessor"], "processEvent"); vi.spyOn(mockIndexerClient, "getEventsAfterBlockNumberAndLogIndex") @@ -608,48 +633,19 @@ describe("Orchestrator", { sequential: true }, () => { if (eventsProcessorSpy.mock.calls.length < 1) throw new Error("Not yet called"); }); - expect(consoleSpy).toHaveBeenCalledWith(expect.stringContaining("InvalidEvent")); - expect(consoleSpy).toHaveBeenCalledWith(expect.stringContaining(stringify(mockEvent))); - expect(orchestrator["dataLoader"].applyChanges).not.toHaveBeenCalled(); - expect(mockEventsRegistry.saveLastProcessedEvent).not.toHaveBeenCalled(); - }); - - it.skip("logs error for UnsupportedEvent", async () => { - const strategyId = - "0x6f9291df02b2664139cec5703c124e4ebce32879c74b6297faa1468aa5ff9ebf" as Hex; - const mockEvent = createMockEvent( - "Strategy", - "NotHandled" as unknown as StrategyEvent, + expect(logger.debug).toHaveBeenNthCalledWith( 1, + expect.stringContaining( + `Current event cannot be handled. ${error.name}: ${error.message}.`, + ), + { + className: Orchestrator.name, + chainId, + event: mockEvent, + }, ); - const error = new UnsupportedStrategy(strategyId); - - vi.spyOn(mockStrategyRegistry, "getStrategyId").mockResolvedValue({ - id: strategyId, - address: mockEvent.srcAddress, - chainId, - handled: true, - }); - vi.spyOn(mockIndexerClient, "getEventsAfterBlockNumberAndLogIndex") - .mockResolvedValueOnce([mockEvent]) - .mockResolvedValue([]); - vi.spyOn(orchestrator["eventsProcessor"], "processEvent").mockRejectedValue(error); - - const consoleSpy = vi.spyOn(console, "error"); - - runPromise = orchestrator.run(abortController.signal); - - await vi.waitFor(() => { - if (consoleSpy.mock.calls.length < 1) throw new Error("Not yet called"); - }); - - expect(consoleSpy).toHaveBeenCalledTimes(1); - expect(consoleSpy).toHaveBeenCalledWith( - expect.stringContaining(`Strategy ${strategyId} unsupported`), - ); - expect(consoleSpy).toHaveBeenCalledWith(expect.stringContaining(stringify(mockEvent))); expect(orchestrator["dataLoader"].applyChanges).not.toHaveBeenCalled(); - expect(mockEventsRegistry.saveLastProcessedEvent).not.toHaveBeenCalled(); + expect(mockEventsRegistry.saveLastProcessedEvent).toHaveBeenCalled(); }); it("logs DataLoader errors", async () => { diff --git a/packages/data-flow/test/unit/retroactiveProcessor.spec.ts b/packages/data-flow/test/unit/retroactiveProcessor.spec.ts index 0dd931c..e9353ef 100644 --- a/packages/data-flow/test/unit/retroactiveProcessor.spec.ts +++ b/packages/data-flow/test/unit/retroactiveProcessor.spec.ts @@ -17,10 +17,12 @@ import { ContractToEventName, DeepPartial, EventParams, + ExponentialBackoff, Hex, ILogger, mergeDeep, ProcessorEvent, + RateLimitError, } from "@grants-stack-indexer/shared"; import { @@ -158,6 +160,7 @@ describe("RetroactiveProcessor", () => { checkpointRepository: mockCheckpointRepository, }, mockFetchLimit, + new ExponentialBackoff({ baseDelay: 100, maxAttempts: 3, factor: 2 }), mockLogger, ); @@ -489,6 +492,41 @@ describe("RetroactiveProcessor", () => { }, ); }); + + it("retries on retriable errors", async () => { + const retriableError = new RateLimitError({ className: "ExternalProvider" }, 100); + vi.spyOn(mockEventsProcessor, "processEvent").mockRejectedValueOnce(retriableError); + + const mockEvent = createMockEvent(eventName, defaultParams, existentStrategyId, { + blockNumber: 50, + logIndex: 0, + }); + + vi.spyOn(mockStrategyRegistry, "getStrategies").mockResolvedValue( + mockValidStrategies, + ); + vi.spyOn(mockEventsRegistry, "getLastProcessedEvent").mockResolvedValue({ + blockNumber: 100, + logIndex: 1, + chainId, + blockTimestamp: 1234567890, + }); + + vi.spyOn(mockEventsFetcher, "fetchEvents") + .mockResolvedValueOnce([mockEvent]) + .mockResolvedValue([]); + + const processEventSpy = vi.spyOn(mockEventsProcessor, "processEvent"); + processEventSpy.mockRejectedValueOnce(retriableError).mockResolvedValue([]); + + vi.spyOn(mockDataLoader, "applyChanges").mockResolvedValue(await Promise.resolve()); + + await processor.processRetroactiveStrategies(); + + expect(processEventSpy).toHaveBeenCalledTimes(2); // 1st attempt failed, 2nd attempt succeeded + expect(mockLogger.error).not.toHaveBeenCalled(); + expect(mockDataLoader.applyChanges).toHaveBeenCalledTimes(1); + }); }); }); }); diff --git a/packages/shared/src/exceptions/base.ts b/packages/shared/src/exceptions/base.ts new file mode 100644 index 0000000..ee2b749 --- /dev/null +++ b/packages/shared/src/exceptions/base.ts @@ -0,0 +1,40 @@ +export interface ErrorContext { + chainId?: string; + className?: string; + methodName?: string; + timestamp?: Date; + additionalData?: Record; +} + +export abstract class BaseError extends Error { + public readonly context: ErrorContext; + + constructor( + message: string, + context: ErrorContext, + public override readonly cause?: Error, + ) { + super(message); + this.context = { + ...context, + timestamp: new Date(), + }; + + Error.captureStackTrace(this, this.constructor); + } + + /** + * Get the full error chain as a string + */ + getFullStack(): string { + let stack = this.stack || ""; + let currentCause = this.cause; + + while (currentCause) { + stack += "\nCaused by: " + (currentCause.stack || currentCause); + currentCause = (currentCause as Error & { cause?: Error }).cause; + } + + return stack; + } +} diff --git a/packages/shared/src/exceptions/common.ts b/packages/shared/src/exceptions/common.ts new file mode 100644 index 0000000..3546336 --- /dev/null +++ b/packages/shared/src/exceptions/common.ts @@ -0,0 +1,17 @@ +import { ErrorContext, RetriableError, RetryMetadata } from "./index.js"; + +export class NetworkError extends RetriableError { + constructor(context: ErrorContext, metadata?: RetryMetadata, cause?: Error) { + super("Network request failed", context, metadata, cause); + } +} + +export class RateLimitError extends RetriableError { + constructor(context: ErrorContext, retryAfterInMs?: number) { + super("Rate limit exceeded", context, { + retryAfterInMs, + statusCode: 429, + failureReason: "Rate limit exceeded", + }); + } +} diff --git a/packages/shared/src/exceptions/index.ts b/packages/shared/src/exceptions/index.ts new file mode 100644 index 0000000..2bd2dd8 --- /dev/null +++ b/packages/shared/src/exceptions/index.ts @@ -0,0 +1,4 @@ +export * from "./base.js"; +export * from "./nonRetriable.js"; +export * from "./retriable.js"; +export * from "./common.js"; diff --git a/packages/shared/src/exceptions/nonRetriable.ts b/packages/shared/src/exceptions/nonRetriable.ts new file mode 100644 index 0000000..30c8cfd --- /dev/null +++ b/packages/shared/src/exceptions/nonRetriable.ts @@ -0,0 +1,7 @@ +import { BaseError, ErrorContext } from "./index.js"; + +export class NonRetriableError extends BaseError { + constructor(message: string, context: ErrorContext, cause?: Error) { + super(message, context, cause); + } +} diff --git a/packages/shared/src/exceptions/retriable.ts b/packages/shared/src/exceptions/retriable.ts new file mode 100644 index 0000000..0c8a2ef --- /dev/null +++ b/packages/shared/src/exceptions/retriable.ts @@ -0,0 +1,18 @@ +import { BaseError, ErrorContext } from "./index.js"; + +export interface RetryMetadata { + retryAfterInMs?: number; // Optional time in ms specified by service + statusCode?: number; // HTTP status code if applicable + failureReason?: string; // Specific reason for the failure +} + +export class RetriableError extends BaseError { + constructor( + message: string, + context: ErrorContext, + public readonly metadata?: RetryMetadata, + cause?: Error, + ) { + super(message, context, cause); + } +} diff --git a/packages/shared/src/external.ts b/packages/shared/src/external.ts index 01ba50e..4c87e41 100644 --- a/packages/shared/src/external.ts +++ b/packages/shared/src/external.ts @@ -21,3 +21,9 @@ export { TOKENS, getToken, getTokenOrThrow, UnknownToken } from "./internal.js"; export { isAlloEvent, isRegistryEvent, isStrategyEvent } from "./internal.js"; export { stringify } from "./internal.js"; + +export { RetriableError, NonRetriableError, RateLimitError, NetworkError } from "./internal.js"; +export type { RetryMetadata, ErrorContext } from "./internal.js"; + +export { ExponentialBackoff, RetryHandler } from "./internal.js"; +export type { RetryStrategy, RetryStrategyOptions } from "./internal.js"; diff --git a/packages/shared/src/internal.ts b/packages/shared/src/internal.ts index 1130d35..5229d88 100644 --- a/packages/shared/src/internal.ts +++ b/packages/shared/src/internal.ts @@ -6,3 +6,5 @@ export * from "./constants/index.js"; export * from "./utils/testing.js"; export * from "./logger/index.js"; export * from "./tokens/tokens.js"; +export * from "./exceptions/index.js"; +export * from "./retry/index.js"; diff --git a/packages/shared/src/retry/exponentialBackoff.strategy.ts b/packages/shared/src/retry/exponentialBackoff.strategy.ts new file mode 100644 index 0000000..115726e --- /dev/null +++ b/packages/shared/src/retry/exponentialBackoff.strategy.ts @@ -0,0 +1,57 @@ +import { RetryStrategy, RetryStrategyOptions } from "./index.js"; + +type ExponentialBackoffOptions = RetryStrategyOptions & { + factor: number; +}; + +/** + * Implements exponential backoff retry strategy with jitter + * + * Exponentially increases delay between retry attempts by multiplying base delay + * by a factor raised to the attempt count. Also adds random jitter to prevent + * thundering herd problems. + */ +export class ExponentialBackoff implements RetryStrategy { + /** + * @param options - Configuration options + * @param options.baseDelay - Initial delay in milliseconds (default: 5000) + * @param options.factor - Multiplier for exponential increase (default: 2) + * @param options.maxAttempts - Maximum number of retry attempts (default: 3) + * @param options.maxDelay - Optional maximum delay cap in milliseconds + */ + constructor( + private readonly options: ExponentialBackoffOptions = { + baseDelay: 5000, + factor: 2, + maxAttempts: 3, + }, + ) {} + + /** @inheritdoc */ + getDelay(attemptCount: number, retryAfter?: number): number { + const calculatedDelay = + this.options.baseDelay * Math.pow(this.options.factor, attemptCount); + const targetDelay = this.options.maxDelay + ? Math.min(calculatedDelay, this.options.maxDelay) + : calculatedDelay; + + const delay = retryAfter ? Math.max(retryAfter, targetDelay) : targetDelay; + return this.addJitter(delay); + } + + /** @inheritdoc */ + shouldRetry(attemptCount: number): boolean { + return attemptCount < this.options.maxAttempts; + } + + /** + * Adds random jitter to delay value to prevent thundering herd + * @param delay - Base delay value in milliseconds + * @returns Delay with jitter applied (±20% of base delay) + */ + private addJitter(delay: number): number { + // Random value between 0.8 and 1.2 + const jitterFactor = 0.8 + Math.random() * 0.4; + return Math.floor(delay * jitterFactor); + } +} diff --git a/packages/shared/src/retry/index.ts b/packages/shared/src/retry/index.ts new file mode 100644 index 0000000..56602c4 --- /dev/null +++ b/packages/shared/src/retry/index.ts @@ -0,0 +1,3 @@ +export * from "./retryStrategy.interface.js"; +export * from "./exponentialBackoff.strategy.js"; +export * from "./retry.js"; diff --git a/packages/shared/src/retry/retry.ts b/packages/shared/src/retry/retry.ts new file mode 100644 index 0000000..7134e2b --- /dev/null +++ b/packages/shared/src/retry/retry.ts @@ -0,0 +1,65 @@ +import { ILogger, RetriableError } from "../internal.js"; +import { ExponentialBackoff, RetryStrategy } from "./index.js"; + +/** + * Handles retrying operations with configurable retry strategies. + * Supports exponential backoff and other retry patterns through the RetryStrategy interface. + */ +export class RetryHandler { + /** + * Creates a new RetryHandler instance + * @param strategy - The retry strategy to use, defaults to ExponentialBackoff + * @param logger - Logger instance for debug messages + */ + constructor( + private readonly strategy: RetryStrategy = new ExponentialBackoff(), + private readonly logger: ILogger, + ) {} + + /** + * Executes an operation with retry logic + * @param operation - Async operation to execute and potentially retry + * @param params - Optional parameters + * @param params.abortSignal - Optional AbortSignal to cancel retries + * @returns Promise that resolves when operation succeeds or max retries exceeded + * @throws RetriableError if max retries exceeded + * @throws Error if operation aborted or non-retriable error occurs + */ + async execute( + operation: () => Promise, + params: { abortSignal?: AbortSignal } = {}, + ): Promise { + let attemptCount = 0; + while (true && !params.abortSignal?.aborted) { + try { + await operation(); + break; + } catch (error) { + if (!(error instanceof RetriableError)) { + throw error; + } + attemptCount++; + + if (!this.strategy.shouldRetry(attemptCount)) { + throw error; + } else { + const delay = this.strategy.getDelay( + attemptCount, + error.metadata?.retryAfterInMs, + ); + + this.logger.debug(`Retrying in ${delay}ms`, { + className: RetryHandler.name, + delay, + }); + + await new Promise((resolve) => setTimeout(resolve, delay, params.abortSignal)); + } + } + } + + if (params.abortSignal?.aborted) { + throw new Error("Operation aborted"); + } + } +} diff --git a/packages/shared/src/retry/retryStrategy.interface.ts b/packages/shared/src/retry/retryStrategy.interface.ts new file mode 100644 index 0000000..b290913 --- /dev/null +++ b/packages/shared/src/retry/retryStrategy.interface.ts @@ -0,0 +1,23 @@ +export interface RetryStrategy { + /** + * Calculate delay for next retry attempt + * @param attemptCount - Current retry attempt number + * @param retryAfter - Optional minimum delay specified by service + * @returns Delay duration in milliseconds + */ + getDelay(attemptCount: number, retryAfter?: number): number; + + /** + * Determine if another retry should be attempted + * @param attemptCount - Current retry attempt number + * @returns True if retry should be attempted, false otherwise + */ + shouldRetry(attemptCount: number): boolean; +} + +export type RetryStrategyOptions = { + baseDelay: number; + maxDelay?: number; + factor?: number; + maxAttempts: number; +}; diff --git a/packages/shared/test/retry/exponentialBackoff.strategy.spec.ts b/packages/shared/test/retry/exponentialBackoff.strategy.spec.ts new file mode 100644 index 0000000..6f0502c --- /dev/null +++ b/packages/shared/test/retry/exponentialBackoff.strategy.spec.ts @@ -0,0 +1,108 @@ +import { describe, expect, it, vi } from "vitest"; + +import { ExponentialBackoff } from "../../src/retry/exponentialBackoff.strategy.js"; + +describe("ExponentialBackoff", () => { + describe("constructor", () => { + it("uses default options when none provided", () => { + const strategy = new ExponentialBackoff(); + expect(strategy["options"].baseDelay).toBe(5000); + expect(strategy["options"].factor).toBe(2); + expect(strategy["options"].maxAttempts).toBe(3); + }); + + it("applies provided options", () => { + const strategy = new ExponentialBackoff({ + baseDelay: 1000, + factor: 3, + maxAttempts: 5, + maxDelay: 10000, + }); + expect(strategy["options"].baseDelay).toBe(1000); + expect(strategy["options"].factor).toBe(3); + expect(strategy["options"].maxAttempts).toBe(5); + expect(strategy["options"].maxDelay).toBe(10000); + }); + }); + + describe("getDelay", () => { + it("calculates exponential delay correctly", () => { + const strategy = new ExponentialBackoff({ + baseDelay: 1000, + factor: 2, + maxAttempts: 3, + }); + + // Mock Math.random to return 0.5 for consistent jitter testing + vi.spyOn(Math, "random").mockReturnValue(0.5); + + // With jitter factor of 1.0 (0.8 + 0.5 * 0.4): + // Attempt 1: 1000 * (2^1) * 1.0 = 2000 + // Attempt 2: 1000 * (2^2) * 1.0 = 4000 + expect(strategy.getDelay(1)).toBe(2000); + expect(strategy.getDelay(2)).toBe(4000); + }); + + it("respects maxDelay cap", () => { + const strategy = new ExponentialBackoff({ + baseDelay: 1000, + factor: 2, + maxAttempts: 3, + maxDelay: 3000, + }); + + vi.spyOn(Math, "random").mockReturnValue(0.5); + + // Should be capped at 3000 even though calculation would be 4000 + expect(strategy.getDelay(2)).toBe(3000); + }); + + it("uses retryAfter when larger than calculated delay", () => { + const strategy = new ExponentialBackoff({ + baseDelay: 1000, + factor: 2, + maxAttempts: 3, + }); + + vi.spyOn(Math, "random").mockReturnValue(0.5); + + // Calculated delay would be 2000, but retryAfter is 3000 + expect(strategy.getDelay(1, 3000)).toBe(3000); + }); + + it("adds jitter within expected range", () => { + const strategy = new ExponentialBackoff({ + baseDelay: 1000, + factor: 2, + maxAttempts: 3, + }); + + const delay = strategy.getDelay(1); // Base delay would be 2000 + expect(delay).toBeGreaterThanOrEqual(1600); // 2000 * 0.8 + expect(delay).toBeLessThanOrEqual(2400); // 2000 * 1.2 + }); + }); + + describe("shouldRetry", () => { + it("returns true when attempt count is below max attempts", () => { + const strategy = new ExponentialBackoff({ + baseDelay: 1000, + factor: 2, + maxAttempts: 3, + }); + expect(strategy.shouldRetry(0)).toBe(true); + expect(strategy.shouldRetry(1)).toBe(true); + expect(strategy.shouldRetry(2)).toBe(true); + }); + + it("returns false when attempt count reaches or exceeds max attempts", () => { + const strategy = new ExponentialBackoff({ + baseDelay: 1000, + factor: 2, + maxAttempts: 3, + }); + expect(strategy.shouldRetry(3)).toBe(false); + expect(strategy.shouldRetry(4)).toBe(false); + }); + }); +}); diff --git a/packages/shared/test/retry/retry.spec.ts b/packages/shared/test/retry/retry.spec.ts new file mode 100644 index 0000000..7e4ae6f --- /dev/null +++ b/packages/shared/test/retry/retry.spec.ts @@ -0,0 +1,87 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +import { NonRetriableError, RetriableError } from "../../src/internal.js"; +import { ExponentialBackoff } from "../../src/retry/exponentialBackoff.strategy.js"; +import { RetryHandler } from "../../src/retry/retry.js"; + +describe("RetryHandler", () => { + let retriableError: RetriableError; + const mockLogger = { + debug: vi.fn(), + error: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + }; + + beforeEach(() => { + retriableError = new RetriableError( + "Temporary error", + { className: "MyClass" }, + { retryAfterInMs: 5000 }, + ); + }); + + afterEach(() => { + vi.clearAllMocks(); + }); + + it("executes operation successfully on first try", async () => { + const handler = new RetryHandler(new ExponentialBackoff(), mockLogger); + const operation = vi.fn().mockResolvedValue("success"); + + await handler.execute(operation); + + expect(operation).toHaveBeenCalledTimes(1); + expect(mockLogger.debug).not.toHaveBeenCalled(); + }); + + it("retries on RetriableError and succeeds", async () => { + vi.useFakeTimers(); + const handler = new RetryHandler(new ExponentialBackoff(), mockLogger); + const operation = vi + .fn() + .mockRejectedValueOnce(retriableError) + .mockResolvedValueOnce("success"); + + const promise = handler.execute(operation); + + // Fast-forward through the delay + await vi.runAllTimersAsync(); + await promise; + + expect(operation).toHaveBeenCalledTimes(2); + expect(mockLogger.debug).toHaveBeenCalled(); + vi.useRealTimers(); + }); + + it("throws non-RetriableError immediately", async () => { + const handler = new RetryHandler(new ExponentialBackoff(), mockLogger); + const error = new NonRetriableError("Non-retriable error", { className: "MyClass" }); + const operation = vi.fn().mockRejectedValue(error); + + await expect(handler.execute(operation)).rejects.toThrow(error); + expect(operation).toHaveBeenCalledTimes(1); + expect(mockLogger.debug).not.toHaveBeenCalled(); + }); + + it("throws error after max attempts", async () => { + const strategy = new ExponentialBackoff({ maxAttempts: 2, baseDelay: 1, factor: 1 }); + const handler = new RetryHandler(strategy, mockLogger); + const error = new RetriableError("Temporary error", { className: "MyClass" }); + const error2 = new RetriableError("Temporary error 2", { className: "MyClass" }); + const operation = vi + .fn() + .mockImplementation(() => { + throw error; + }) + .mockImplementation(() => { + throw error2; + }); + + const promise = handler.execute(operation); + + await expect(promise).rejects.toThrow(error2); + + expect(operation).toHaveBeenCalledTimes(2); + }); +}); From 20dead2b0b0b6fd29311c87a3b5172eaea76780b Mon Sep 17 00:00:00 2001 From: nigiri <168690269+0xnigir1@users.noreply.github.com> Date: Mon, 6 Jan 2025 16:41:15 -0300 Subject: [PATCH 2/3] feat: set retry parameters from env variables --- apps/processing/.env.example | 7 ++++++- apps/processing/README.md | 4 ++++ apps/processing/src/config/env.ts | 4 ++++ .../processing/src/services/sharedDependencies.service.ts | 8 ++++---- 4 files changed, 18 insertions(+), 5 deletions(-) diff --git a/apps/processing/.env.example b/apps/processing/.env.example index 591b483..41b13df 100644 --- a/apps/processing/.env.example +++ b/apps/processing/.env.example @@ -13,4 +13,9 @@ IPFS_GATEWAYS_URL=["https://ipfs.io","https://gateway.pinata.cloud","https://dwe PRICING_SOURCE= # 'coingecko' or 'dummy' COINGECKO_API_KEY={{YOUR_KEY}} -COINGECKO_API_TYPE=demo \ No newline at end of file +COINGECKO_API_TYPE=demo + +RETRY_MAX_ATTEMPTS=3 +RETRY_BASE_DELAY_MS=3000 +RETRY_FACTOR=2 +RETRY_MAX_DELAY_MS=300000 \ No newline at end of file diff --git a/apps/processing/README.md b/apps/processing/README.md index 03e5392..8c2ba1a 100644 --- a/apps/processing/README.md +++ b/apps/processing/README.md @@ -36,6 +36,10 @@ Available options: | `DUMMY_PRICE` | Dummy price | 1 | No | Only if PRICING_SOURCE is dummy | | `COINGECKO_API_KEY` | API key for CoinGecko service | N/A | Yes | | | `COINGECKO_API_TYPE` | CoinGecko API tier (demo or pro) | pro | No | | +| `RETRY_MAX_ATTEMPTS` | Maximum number of retry attempts | 3 | No | | +| `RETRY_BASE_DELAY_MS` | Base delay for retry attempts | 3000 | No | | +| `RETRY_FACTOR` | Delay factor for retry attempts | 2 | No | | +| `RETRY_MAX_DELAY_MS` | Maximum delay for retry attempts | 300000 | No | | ## Available Scripts diff --git a/apps/processing/src/config/env.ts b/apps/processing/src/config/env.ts index 4a201e2..dfde17a 100644 --- a/apps/processing/src/config/env.ts +++ b/apps/processing/src/config/env.ts @@ -36,6 +36,10 @@ const baseSchema = z.object({ IPFS_GATEWAYS_URL: stringToJSONSchema .pipe(z.array(z.string().url())) .default('["https://ipfs.io"]'), + RETRY_MAX_ATTEMPTS: z.coerce.number().int().min(1).default(3), + RETRY_BASE_DELAY_MS: z.coerce.number().int().min(1).default(3000), // 3 seconds + RETRY_FACTOR: z.coerce.number().int().min(1).default(2), + RETRY_MAX_DELAY_MS: z.coerce.number().int().min(1).optional(), // 5 minute }); const dummyPricingSchema = baseSchema.extend({ diff --git a/apps/processing/src/services/sharedDependencies.service.ts b/apps/processing/src/services/sharedDependencies.service.ts index bc91d40..dde0e77 100644 --- a/apps/processing/src/services/sharedDependencies.service.ts +++ b/apps/processing/src/services/sharedDependencies.service.ts @@ -93,10 +93,10 @@ export class SharedDependenciesService { ); const retryStrategy = new ExponentialBackoff({ - maxAttempts: 3, - baseDelay: 1000, - maxDelay: 3 * 60 * 1000, - factor: 2, + maxAttempts: env.RETRY_MAX_ATTEMPTS, + baseDelay: env.RETRY_BASE_DELAY_MS, + maxDelay: env.RETRY_MAX_DELAY_MS, + factor: env.RETRY_FACTOR, }); return { From 391aa0454cc59fe8fe89897c3a9155d689c40e25 Mon Sep 17 00:00:00 2001 From: nigiri <168690269+0xnigir1@users.noreply.github.com> Date: Mon, 6 Jan 2025 17:06:17 -0300 Subject: [PATCH 3/3] fix: pass retry strategy to retroactive processor --- apps/processing/src/services/processing.service.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/processing/src/services/processing.service.ts b/apps/processing/src/services/processing.service.ts index dd41be0..91c0a85 100644 --- a/apps/processing/src/services/processing.service.ts +++ b/apps/processing/src/services/processing.service.ts @@ -102,6 +102,7 @@ export class ProcessingService { checkpointRepository: strategyProcessingCheckpointRepository, }, chain.fetchLimit, + retryStrategy, logger, );