Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: error retry handling #49

Merged
merged 3 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion apps/processing/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,9 @@ IPFS_GATEWAYS_URL=["https://ipfs.io","https://gateway.pinata.cloud","https://dwe
PRICING_SOURCE= # 'coingecko' or 'dummy'

COINGECKO_API_KEY={{YOUR_KEY}}
COINGECKO_API_TYPE=demo
COINGECKO_API_TYPE=demo

RETRY_MAX_ATTEMPTS=3
RETRY_BASE_DELAY_MS=3000
RETRY_FACTOR=2
RETRY_MAX_DELAY_MS=300000
4 changes: 4 additions & 0 deletions apps/processing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ Available options:
| `DUMMY_PRICE` | Dummy price | 1 | No | Only if PRICING_SOURCE is dummy |
| `COINGECKO_API_KEY` | API key for CoinGecko service | N/A | Yes | |
| `COINGECKO_API_TYPE` | CoinGecko API tier (demo or pro) | pro | No | |
| `RETRY_MAX_ATTEMPTS` | Maximum number of retry attempts | 3 | No | |
| `RETRY_BASE_DELAY_MS` | Base delay for retry attempts | 3000 | No | |
| `RETRY_FACTOR` | Delay factor for retry attempts | 2 | No | |
| `RETRY_MAX_DELAY_MS` | Maximum delay for retry attempts | 300000 | No | |

## Available Scripts

Expand Down
4 changes: 4 additions & 0 deletions apps/processing/src/config/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ const baseSchema = z.object({
IPFS_GATEWAYS_URL: stringToJSONSchema
.pipe(z.array(z.string().url()))
.default('["https://ipfs.io"]'),
RETRY_MAX_ATTEMPTS: z.coerce.number().int().min(1).default(3),
RETRY_BASE_DELAY_MS: z.coerce.number().int().min(1).default(3000), // 3 seconds
RETRY_FACTOR: z.coerce.number().int().min(1).default(2),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to mark these three as optional within schema chaining since they're technically not required?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with the default() they are optional, you are not forced to pass them

RETRY_MAX_DELAY_MS: z.coerce.number().int().min(1).optional(), // 5 minute
});

const dummyPricingSchema = baseSchema.extend({
Expand Down
12 changes: 10 additions & 2 deletions apps/processing/src/services/processing.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,14 @@ export class ProcessingService {
static async initialize(env: Environment): Promise<ProcessingService> {
const sharedDependencies = await SharedDependenciesService.initialize(env);
const { CHAINS: chains } = env;
const { core, registriesRepositories, indexerClient, kyselyDatabase, logger } =
sharedDependencies;
const {
core,
registriesRepositories,
indexerClient,
kyselyDatabase,
logger,
retryStrategy,
} = sharedDependencies;
const {
eventRegistryRepository,
strategyRegistryRepository,
Expand Down Expand Up @@ -83,6 +89,7 @@ export class ProcessingService {
},
chain.fetchLimit,
chain.fetchDelayMs,
retryStrategy,
logger,
);
const retroactiveProcessor = new RetroactiveProcessor(
Expand All @@ -95,6 +102,7 @@ export class ProcessingService {
checkpointRepository: strategyProcessingCheckpointRepository,
},
chain.fetchLimit,
retryStrategy,
logger,
);

Expand Down
11 changes: 10 additions & 1 deletion apps/processing/src/services/sharedDependencies.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import {
KyselyStrategyRegistryRepository,
KyselyTransactionManager,
} from "@grants-stack-indexer/repository";
import { ILogger, Logger } from "@grants-stack-indexer/shared";
import { ExponentialBackoff, ILogger, Logger, RetryStrategy } from "@grants-stack-indexer/shared";

import { Environment } from "../config/index.js";

Expand All @@ -30,6 +30,7 @@ export type SharedDependencies = {
};
indexerClient: EnvioIndexerClient;
kyselyDatabase: ReturnType<typeof createKyselyDatabase>;
retryStrategy: RetryStrategy;
logger: ILogger;
};

Expand Down Expand Up @@ -91,6 +92,13 @@ export class SharedDependenciesService {
env.INDEXER_ADMIN_SECRET,
);

const retryStrategy = new ExponentialBackoff({
maxAttempts: env.RETRY_MAX_ATTEMPTS,
baseDelay: env.RETRY_BASE_DELAY_MS,
maxDelay: env.RETRY_MAX_DELAY_MS,
factor: env.RETRY_FACTOR,
});

return {
core: {
projectRepository,
Expand All @@ -109,6 +117,7 @@ export class SharedDependenciesService {
},
indexerClient,
kyselyDatabase,
retryStrategy,
logger,
};
}
Expand Down
4 changes: 3 additions & 1 deletion apps/processing/test/unit/sharedDependencies.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ const mocks = vi.hoisted(() => {
};
});

vi.mock("@grants-stack-indexer/shared", () => {
vi.mock("@grants-stack-indexer/shared", async (importActual) => {
const actual = await importActual<typeof import("@grants-stack-indexer/shared")>();
return {
...actual,
Logger: {
getInstance: vi.fn().mockReturnValue(mocks.logger),
},
Expand Down
90 changes: 60 additions & 30 deletions packages/data-flow/src/orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ import {
isAlloEvent,
isStrategyEvent,
ProcessorEvent,
RetriableError,
RetryHandler,
RetryStrategy,
StrategyEvent,
stringify,
} from "@grants-stack-indexer/shared";
Expand Down Expand Up @@ -46,10 +49,11 @@ import { CoreDependencies, DataLoader, delay, IQueue, iStrategyAbi, Queue } from
* The Orchestrator provides fault tolerance and performance optimization through:
* - Configurable batch sizes for event fetching
* - Delayed processing to prevent overwhelming the system
* - Error handling and logging for various failure scenarios
* - Retry handling with exponential backoff for transient failures
* - Comprehensive error handling and logging for various failure scenarios
* - Registry tracking of supported/unsupported strategies and events
*
* TODO: Enhance the error handling/retries, logging and observability
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

* TODO: Enhance logging and observability
*/
export class Orchestrator {
private readonly eventsQueue: IQueue<ProcessorEvent<ContractName, AnyEvent>>;
Expand All @@ -58,13 +62,15 @@ export class Orchestrator {
private readonly eventsRegistry: IEventsRegistry;
private readonly strategyRegistry: IStrategyRegistry;
private readonly dataLoader: DataLoader;
private readonly retryHandler: RetryHandler;

/**
* @param chainId - The chain id
* @param dependencies - The core dependencies
* @param indexerClient - The indexer client
* @param registries - The registries
* @param fetchLimit - The fetch limit
* @param retryStrategy - The retry strategy
* @param fetchDelayInMs - The fetch delay in milliseconds
*/
constructor(
Expand All @@ -77,6 +83,7 @@ export class Orchestrator {
},
private fetchLimit: number = 1000,
private fetchDelayInMs: number = 10000,
private retryStrategy: RetryStrategy,
private logger: ILogger,
) {
this.eventsFetcher = new EventsFetcher(this.indexerClient);
Expand All @@ -98,6 +105,7 @@ export class Orchestrator {
this.logger,
);
this.eventsQueue = new Queue<ProcessorEvent<ContractName, AnyEvent>>(fetchLimit);
this.retryHandler = new RetryHandler(retryStrategy, this.logger);
}

async run(signal: AbortSignal): Promise<void> {
Expand All @@ -119,46 +127,42 @@ export class Orchestrator {
await delay(this.fetchDelayInMs);
continue;
}

await this.eventsRegistry.saveLastProcessedEvent(this.chainId, {
...event,
rawEvent: event,
});

event = await this.enhanceStrategyId(event);
if (this.isPoolCreated(event)) {
const handleable = existsHandler(event.strategyId);
await this.strategyRegistry.saveStrategyId(
this.chainId,
event.params.strategy,
event.strategyId,
handleable,
);
} else if (event.contractName === "Strategy" && "strategyId" in event) {
if (!existsHandler(event.strategyId)) {
this.logger.debug("Skipping event", {
event,
className: Orchestrator.name,
chainId: this.chainId,
});
// we skip the event if the strategy id is not handled yet
continue;
}
}

const changesets = await this.eventsProcessor.processEvent(event);
await this.dataLoader.applyChanges(changesets);
await this.retryHandler.execute(
async () => {
await this.handleEvent(event!);
},
{ abortSignal: signal },
);
} catch (error: unknown) {
// TODO: improve error handling, retries and notify
// TODO: notify
if (
error instanceof UnsupportedStrategy ||
error instanceof InvalidEvent ||
error instanceof UnsupportedEventException
) {
// this.logger.error(
// `Current event cannot be handled. ${error.name}: ${error.message}. Event: ${stringify(event)}`,
// );
this.logger.debug(
`Current event cannot be handled. ${error.name}: ${error.message}.`,
{
className: Orchestrator.name,
chainId: this.chainId,
event,
},
);
} else {
if (error instanceof Error || isNativeError(error)) {
if (error instanceof RetriableError) {
error.message = `Error processing event after retries. ${error.message}`;
this.logger.error(error, {
event,
className: Orchestrator.name,
chainId: this.chainId,
});
} else if (error instanceof Error || isNativeError(error)) {
this.logger.error(error, {
event,
className: Orchestrator.name,
Expand Down Expand Up @@ -201,6 +205,32 @@ export class Orchestrator {
this.eventsQueue.push(...events);
}

private async handleEvent(event: ProcessorEvent<ContractName, AnyEvent>): Promise<void> {
event = await this.enhanceStrategyId(event);
if (this.isPoolCreated(event)) {
const handleable = existsHandler(event.strategyId);
await this.strategyRegistry.saveStrategyId(
this.chainId,
event.params.strategy,
event.strategyId,
handleable,
);
} else if (event.contractName === "Strategy" && "strategyId" in event) {
if (!existsHandler(event.strategyId)) {
this.logger.debug("Skipping event", {
event,
className: Orchestrator.name,
chainId: this.chainId,
});
// we skip the event if the strategy id is not handled yet
return;
}
}

const changesets = await this.eventsProcessor.processEvent(event);
await this.dataLoader.applyChanges(changesets);
}

/**
* Enhance the event with the strategy id when required
* @param event - The event
Expand Down
13 changes: 11 additions & 2 deletions packages/data-flow/src/retroactiveProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import {
Hex,
ILogger,
ProcessorEvent,
RetryHandler,
RetryStrategy,
stringify,
} from "@grants-stack-indexer/shared";

Expand Down Expand Up @@ -62,6 +64,7 @@ export class RetroactiveProcessor {
private readonly strategyRegistry: IStrategyRegistry;
private readonly dataLoader: DataLoader;
private readonly checkpointRepository: IStrategyProcessingCheckpointRepository;
private readonly retryHandler: RetryHandler;

/**
* Creates a new instance of RetroactiveProcessor
Expand All @@ -70,6 +73,7 @@ export class RetroactiveProcessor {
* @param indexerClient - Client for fetching blockchain events
* @param registries - Event and strategy registries for tracking processing state
* @param fetchLimit - Maximum number of events to fetch in a single batch (default: 1000)
* @param retryStrategy - The retry strategy
* @param logger - Logger instance for debugging and monitoring
*/
constructor(
Expand All @@ -82,6 +86,7 @@ export class RetroactiveProcessor {
checkpointRepository: IStrategyProcessingCheckpointRepository;
},
private fetchLimit: number = 1000,
private retryStrategy: RetryStrategy,
private logger: ILogger,
) {
this.eventsFetcher = new EventsFetcher(this.indexerClient);
Expand All @@ -103,6 +108,7 @@ export class RetroactiveProcessor {
this.dependencies.transactionManager,
this.logger,
);
this.retryHandler = new RetryHandler(retryStrategy, this.logger);
}

/**
Expand Down Expand Up @@ -208,8 +214,11 @@ export class RetroactiveProcessor {
if (this.hasReachedLastEvent(currentPointer, lastEventPointer)) break;

event.strategyId = strategyId;
const changesets = await this.eventsProcessor.processEvent(event);
await this.dataLoader.applyChanges(changesets);

await this.retryHandler.execute(async () => {
const changesets = await this.eventsProcessor.processEvent(event!);
await this.dataLoader.applyChanges(changesets);
});
} catch (error) {
if (error instanceof InvalidEvent || error instanceof UnsupportedEventException) {
// Expected errors that we can safely ignore
Expand Down
Loading
Loading