Skip to content

Commit

Permalink
feat: error retry handling (#49)
Browse files Browse the repository at this point in the history
# 🤖 Linear

Closes GIT-223 GIT-224 GIT-110

## Description
We implement a RetryHandler with ExponentialBackoff for handling error
retries (centralized on the Orchestrator) and define two big base class
of errors:
- Retriable
- NonRetriable

In the next PR, will adjust the different errors of the system and
packages to extend from this classes accordingly

## Checklist before requesting a review

-   [x] I have conducted a self-review of my code.
-   [x] I have conducted a QA.
-   [x] If it is a core feature, I have included comprehensive tests.
  • Loading branch information
0xnigir1 authored Jan 7, 2025
1 parent 0a645ba commit 36f8638
Show file tree
Hide file tree
Showing 23 changed files with 625 additions and 83 deletions.
7 changes: 6 additions & 1 deletion apps/processing/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,9 @@ IPFS_GATEWAYS_URL=["https://ipfs.io","https://gateway.pinata.cloud","https://dwe
PRICING_SOURCE= # 'coingecko' or 'dummy'

COINGECKO_API_KEY={{YOUR_KEY}}
COINGECKO_API_TYPE=demo
COINGECKO_API_TYPE=demo

RETRY_MAX_ATTEMPTS=3
RETRY_BASE_DELAY_MS=3000
RETRY_FACTOR=2
RETRY_MAX_DELAY_MS=300000
4 changes: 4 additions & 0 deletions apps/processing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ Available options:
| `DUMMY_PRICE` | Dummy price | 1 | No | Only if PRICING_SOURCE is dummy |
| `COINGECKO_API_KEY` | API key for CoinGecko service | N/A | Yes | |
| `COINGECKO_API_TYPE` | CoinGecko API tier (demo or pro) | pro | No | |
| `RETRY_MAX_ATTEMPTS` | Maximum number of retry attempts | 3 | No | |
| `RETRY_BASE_DELAY_MS` | Base delay for retry attempts | 3000 | No | |
| `RETRY_FACTOR` | Delay factor for retry attempts | 2 | No | |
| `RETRY_MAX_DELAY_MS` | Maximum delay for retry attempts | 300000 | No | |

## Available Scripts

Expand Down
4 changes: 4 additions & 0 deletions apps/processing/src/config/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ const baseSchema = z.object({
IPFS_GATEWAYS_URL: stringToJSONSchema
.pipe(z.array(z.string().url()))
.default('["https://ipfs.io"]'),
RETRY_MAX_ATTEMPTS: z.coerce.number().int().min(1).default(3),
RETRY_BASE_DELAY_MS: z.coerce.number().int().min(1).default(3000), // 3 seconds
RETRY_FACTOR: z.coerce.number().int().min(1).default(2),
RETRY_MAX_DELAY_MS: z.coerce.number().int().min(1).optional(), // 5 minute
});

const dummyPricingSchema = baseSchema.extend({
Expand Down
12 changes: 10 additions & 2 deletions apps/processing/src/services/processing.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,14 @@ export class ProcessingService {
static async initialize(env: Environment): Promise<ProcessingService> {
const sharedDependencies = await SharedDependenciesService.initialize(env);
const { CHAINS: chains } = env;
const { core, registriesRepositories, indexerClient, kyselyDatabase, logger } =
sharedDependencies;
const {
core,
registriesRepositories,
indexerClient,
kyselyDatabase,
logger,
retryStrategy,
} = sharedDependencies;
const {
eventRegistryRepository,
strategyRegistryRepository,
Expand Down Expand Up @@ -83,6 +89,7 @@ export class ProcessingService {
},
chain.fetchLimit,
chain.fetchDelayMs,
retryStrategy,
logger,
);
const retroactiveProcessor = new RetroactiveProcessor(
Expand All @@ -95,6 +102,7 @@ export class ProcessingService {
checkpointRepository: strategyProcessingCheckpointRepository,
},
chain.fetchLimit,
retryStrategy,
logger,
);

Expand Down
11 changes: 10 additions & 1 deletion apps/processing/src/services/sharedDependencies.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import {
KyselyStrategyRegistryRepository,
KyselyTransactionManager,
} from "@grants-stack-indexer/repository";
import { ILogger, Logger } from "@grants-stack-indexer/shared";
import { ExponentialBackoff, ILogger, Logger, RetryStrategy } from "@grants-stack-indexer/shared";

import { Environment } from "../config/index.js";

Expand All @@ -30,6 +30,7 @@ export type SharedDependencies = {
};
indexerClient: EnvioIndexerClient;
kyselyDatabase: ReturnType<typeof createKyselyDatabase>;
retryStrategy: RetryStrategy;
logger: ILogger;
};

Expand Down Expand Up @@ -91,6 +92,13 @@ export class SharedDependenciesService {
env.INDEXER_ADMIN_SECRET,
);

const retryStrategy = new ExponentialBackoff({
maxAttempts: env.RETRY_MAX_ATTEMPTS,
baseDelay: env.RETRY_BASE_DELAY_MS,
maxDelay: env.RETRY_MAX_DELAY_MS,
factor: env.RETRY_FACTOR,
});

return {
core: {
projectRepository,
Expand All @@ -109,6 +117,7 @@ export class SharedDependenciesService {
},
indexerClient,
kyselyDatabase,
retryStrategy,
logger,
};
}
Expand Down
4 changes: 3 additions & 1 deletion apps/processing/test/unit/sharedDependencies.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ const mocks = vi.hoisted(() => {
};
});

vi.mock("@grants-stack-indexer/shared", () => {
vi.mock("@grants-stack-indexer/shared", async (importActual) => {
const actual = await importActual<typeof import("@grants-stack-indexer/shared")>();
return {
...actual,
Logger: {
getInstance: vi.fn().mockReturnValue(mocks.logger),
},
Expand Down
90 changes: 60 additions & 30 deletions packages/data-flow/src/orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ import {
isAlloEvent,
isStrategyEvent,
ProcessorEvent,
RetriableError,
RetryHandler,
RetryStrategy,
StrategyEvent,
stringify,
} from "@grants-stack-indexer/shared";
Expand Down Expand Up @@ -46,10 +49,11 @@ import { CoreDependencies, DataLoader, delay, IQueue, iStrategyAbi, Queue } from
* The Orchestrator provides fault tolerance and performance optimization through:
* - Configurable batch sizes for event fetching
* - Delayed processing to prevent overwhelming the system
* - Error handling and logging for various failure scenarios
* - Retry handling with exponential backoff for transient failures
* - Comprehensive error handling and logging for various failure scenarios
* - Registry tracking of supported/unsupported strategies and events
*
* TODO: Enhance the error handling/retries, logging and observability
* TODO: Enhance logging and observability
*/
export class Orchestrator {
private readonly eventsQueue: IQueue<ProcessorEvent<ContractName, AnyEvent>>;
Expand All @@ -58,13 +62,15 @@ export class Orchestrator {
private readonly eventsRegistry: IEventsRegistry;
private readonly strategyRegistry: IStrategyRegistry;
private readonly dataLoader: DataLoader;
private readonly retryHandler: RetryHandler;

/**
* @param chainId - The chain id
* @param dependencies - The core dependencies
* @param indexerClient - The indexer client
* @param registries - The registries
* @param fetchLimit - The fetch limit
* @param retryStrategy - The retry strategy
* @param fetchDelayInMs - The fetch delay in milliseconds
*/
constructor(
Expand All @@ -77,6 +83,7 @@ export class Orchestrator {
},
private fetchLimit: number = 1000,
private fetchDelayInMs: number = 10000,
private retryStrategy: RetryStrategy,
private logger: ILogger,
) {
this.eventsFetcher = new EventsFetcher(this.indexerClient);
Expand All @@ -98,6 +105,7 @@ export class Orchestrator {
this.logger,
);
this.eventsQueue = new Queue<ProcessorEvent<ContractName, AnyEvent>>(fetchLimit);
this.retryHandler = new RetryHandler(retryStrategy, this.logger);
}

async run(signal: AbortSignal): Promise<void> {
Expand All @@ -119,46 +127,42 @@ export class Orchestrator {
await delay(this.fetchDelayInMs);
continue;
}

await this.eventsRegistry.saveLastProcessedEvent(this.chainId, {
...event,
rawEvent: event,
});

event = await this.enhanceStrategyId(event);
if (this.isPoolCreated(event)) {
const handleable = existsHandler(event.strategyId);
await this.strategyRegistry.saveStrategyId(
this.chainId,
event.params.strategy,
event.strategyId,
handleable,
);
} else if (event.contractName === "Strategy" && "strategyId" in event) {
if (!existsHandler(event.strategyId)) {
this.logger.debug("Skipping event", {
event,
className: Orchestrator.name,
chainId: this.chainId,
});
// we skip the event if the strategy id is not handled yet
continue;
}
}

const changesets = await this.eventsProcessor.processEvent(event);
await this.dataLoader.applyChanges(changesets);
await this.retryHandler.execute(
async () => {
await this.handleEvent(event!);
},
{ abortSignal: signal },
);
} catch (error: unknown) {
// TODO: improve error handling, retries and notify
// TODO: notify
if (
error instanceof UnsupportedStrategy ||
error instanceof InvalidEvent ||
error instanceof UnsupportedEventException
) {
// this.logger.error(
// `Current event cannot be handled. ${error.name}: ${error.message}. Event: ${stringify(event)}`,
// );
this.logger.debug(
`Current event cannot be handled. ${error.name}: ${error.message}.`,
{
className: Orchestrator.name,
chainId: this.chainId,
event,
},
);
} else {
if (error instanceof Error || isNativeError(error)) {
if (error instanceof RetriableError) {
error.message = `Error processing event after retries. ${error.message}`;
this.logger.error(error, {
event,
className: Orchestrator.name,
chainId: this.chainId,
});
} else if (error instanceof Error || isNativeError(error)) {
this.logger.error(error, {
event,
className: Orchestrator.name,
Expand Down Expand Up @@ -201,6 +205,32 @@ export class Orchestrator {
this.eventsQueue.push(...events);
}

private async handleEvent(event: ProcessorEvent<ContractName, AnyEvent>): Promise<void> {
event = await this.enhanceStrategyId(event);
if (this.isPoolCreated(event)) {
const handleable = existsHandler(event.strategyId);
await this.strategyRegistry.saveStrategyId(
this.chainId,
event.params.strategy,
event.strategyId,
handleable,
);
} else if (event.contractName === "Strategy" && "strategyId" in event) {
if (!existsHandler(event.strategyId)) {
this.logger.debug("Skipping event", {
event,
className: Orchestrator.name,
chainId: this.chainId,
});
// we skip the event if the strategy id is not handled yet
return;
}
}

const changesets = await this.eventsProcessor.processEvent(event);
await this.dataLoader.applyChanges(changesets);
}

/**
* Enhance the event with the strategy id when required
* @param event - The event
Expand Down
13 changes: 11 additions & 2 deletions packages/data-flow/src/retroactiveProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import {
Hex,
ILogger,
ProcessorEvent,
RetryHandler,
RetryStrategy,
stringify,
} from "@grants-stack-indexer/shared";

Expand Down Expand Up @@ -62,6 +64,7 @@ export class RetroactiveProcessor {
private readonly strategyRegistry: IStrategyRegistry;
private readonly dataLoader: DataLoader;
private readonly checkpointRepository: IStrategyProcessingCheckpointRepository;
private readonly retryHandler: RetryHandler;

/**
* Creates a new instance of RetroactiveProcessor
Expand All @@ -70,6 +73,7 @@ export class RetroactiveProcessor {
* @param indexerClient - Client for fetching blockchain events
* @param registries - Event and strategy registries for tracking processing state
* @param fetchLimit - Maximum number of events to fetch in a single batch (default: 1000)
* @param retryStrategy - The retry strategy
* @param logger - Logger instance for debugging and monitoring
*/
constructor(
Expand All @@ -82,6 +86,7 @@ export class RetroactiveProcessor {
checkpointRepository: IStrategyProcessingCheckpointRepository;
},
private fetchLimit: number = 1000,
private retryStrategy: RetryStrategy,
private logger: ILogger,
) {
this.eventsFetcher = new EventsFetcher(this.indexerClient);
Expand All @@ -103,6 +108,7 @@ export class RetroactiveProcessor {
this.dependencies.transactionManager,
this.logger,
);
this.retryHandler = new RetryHandler(retryStrategy, this.logger);
}

/**
Expand Down Expand Up @@ -208,8 +214,11 @@ export class RetroactiveProcessor {
if (this.hasReachedLastEvent(currentPointer, lastEventPointer)) break;

event.strategyId = strategyId;
const changesets = await this.eventsProcessor.processEvent(event);
await this.dataLoader.applyChanges(changesets);

await this.retryHandler.execute(async () => {
const changesets = await this.eventsProcessor.processEvent(event!);
await this.dataLoader.applyChanges(changesets);
});
} catch (error) {
if (error instanceof InvalidEvent || error instanceof UnsupportedEventException) {
// Expected errors that we can safely ignore
Expand Down
Loading

0 comments on commit 36f8638

Please sign in to comment.