Skip to content

Commit

Permalink
Merge pull request #118 from lidofinance/feat/provider-rpc-events
Browse files Browse the repository at this point in the history
Add RPC events and middleware data for better monitoring
  • Loading branch information
infloop authored Sep 12, 2024
2 parents 3df4406 + 8a4dd7e commit 14b9d8b
Show file tree
Hide file tree
Showing 11 changed files with 352 additions and 327 deletions.
28 changes: 28 additions & 0 deletions packages/execution/src/common/networks.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
import { Network } from '@ethersproject/networks';
import { Networkish } from '../interfaces/networkish';
import { ConnectionInfo } from '@ethersproject/web';

const IP_V4_REGEX = new RegExp(
/^(?<domain>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})(?::(?<port>\d+))?/i,
);

const DOMAIN_REGEX = new RegExp(
/^(?<protocol>https?:\/\/)(?=(?<fqdn>[^:/]+))(?:(?<service>www|ww\d|cdn|ftp|mail|pop\d?|ns\d?|git)\.)?(?:(?<subdomain>[^:/]+)\.)*(?<domain>[^:/]+\.[a-z0-9]+)(?::(?<port>\d+))?(?<path>\/[^?]*)?(?:\?(?<query>[^#]*))?(?:#(?<hash>.*))?/i,
);

export const networksEqual = (
networkA: Network,
Expand All @@ -22,3 +31,22 @@ export const networksChainsEqual = (
networkA: Network,
networkB: Networkish,
): boolean => networkA.chainId === getNetworkChain(networkB);

export const getConnectionFQDN = (
connectionInfo: ConnectionInfo | string,
): string => {
const urlLike =
typeof connectionInfo === 'string' ? connectionInfo : connectionInfo.url;

const ipGroups = urlLike.match(IP_V4_REGEX)?.groups;

if (ipGroups) {
/* istanbul ignore next */
return ipGroups.domain ?? '';
}

const groups = urlLike.match(DOMAIN_REGEX)?.groups;

/* istanbul ignore next */
return groups?.fqdn ?? '';
};
62 changes: 62 additions & 0 deletions packages/execution/src/events/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import { SimpleFallbackJsonRpcBatchProvider } from '../provider/simple-fallback-json-rpc-batch-provider';
import { AllProvidersFailedError } from '../error';
import {
ExtendedJsonRpcBatchProvider,
JsonRpcRequest,
JsonRpcResponse,
} from '../provider/extended-json-rpc-batch-provider';

export type FallbackProviderRequestFailedAllEvent = {
action: 'fallback-provider:request:failed:all';
provider: SimpleFallbackJsonRpcBatchProvider;
error: AllProvidersFailedError;
};

export type FallbackProviderRequestNonRetryableErrorEvent = {
action: 'fallback-provider:request:non-retryable-error';
provider: SimpleFallbackJsonRpcBatchProvider;
error: Error | unknown;
};

export type FallbackProviderRequestEvent = {
action: 'fallback-provider:request';
provider: SimpleFallbackJsonRpcBatchProvider;
activeFallbackProviderIndex: number;
fallbackProvidersCount: number;
domain: string;
retryAttempt: number;
};

export type ProviderResponseBatchedErrorEvent = {
action: 'provider:response-batched:error';
error: Error;
request: JsonRpcRequest[];
provider: ExtendedJsonRpcBatchProvider;
domain: string;
};

export type ProviderResponseBatchedEvent = {
action: 'provider:response-batched';
request: JsonRpcRequest[];
response: JsonRpcResponse[] | JsonRpcResponse;
provider: ExtendedJsonRpcBatchProvider;
domain: string;
};

export type ProviderRequestBatchedEvent = {
action: 'provider:request-batched';
request: JsonRpcRequest[];
provider: ExtendedJsonRpcBatchProvider;
domain: string;
};

export type ProviderEvents =
| ProviderRequestBatchedEvent
| ProviderResponseBatchedEvent
| ProviderResponseBatchedErrorEvent;

export type FallbackProviderEvents =
| ProviderEvents
| FallbackProviderRequestEvent
| FallbackProviderRequestNonRetryableErrorEvent
| FallbackProviderRequestFailedAllEvent;
1 change: 1 addition & 0 deletions packages/execution/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ export * from './interfaces/module.options';
export * from './interfaces/non-empty-array';
export * from './ethers/fee-history';
export * from './error';
export * from './events';
64 changes: 52 additions & 12 deletions packages/execution/src/provider/extended-json-rpc-batch-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,25 @@ import { FeeHistory, getFeeHistory } from '../ethers/fee-history';
import { ErrorCode } from '../error/codes/error-codes';
import { TraceConfig, TraceResult } from '../interfaces/debug-traces';
import { getDebugTraceBlockByHash } from '../ethers/debug-trace-block-by-hash';
import { getConnectionFQDN } from '../common/networks';
import { EventEmitter } from 'events';
import {
ProviderEvents,
ProviderRequestBatchedEvent,
ProviderResponseBatchedErrorEvent,
ProviderResponseBatchedEvent,
} from '../events';

// this will help with autocomplete
export interface ExtendedJsonRpcBatchProviderEventEmitter
extends NodeJS.EventEmitter {
on(eventName: 'rpc', listener: (event: ProviderEvents) => void): this;
once(eventName: 'rpc', listener: (event: ProviderEvents) => void): this;
addListener(
eventName: 'rpc',
listener: (event: ProviderEvents) => void,
): this;
}

export interface RequestPolicy {
jsonRpcMaxBatchSize: number;
Expand Down Expand Up @@ -104,6 +123,8 @@ export class ExtendedJsonRpcBatchProvider extends JsonRpcProvider {
protected _concurrencyLimiter: LimitFunction;
protected _tickCounter = 0;
protected _fetchMiddlewareService: MiddlewareService<Promise<any>>;
protected _domain: string;
protected _eventEmitter: ExtendedJsonRpcBatchProviderEventEmitter;

public constructor(
url: ConnectionInfo | string,
Expand All @@ -112,6 +133,8 @@ export class ExtendedJsonRpcBatchProvider extends JsonRpcProvider {
fetchMiddlewares: MiddlewareCallback<Promise<any>>[] = [],
) {
super(url, network);
this._eventEmitter = new EventEmitter();
this._domain = getConnectionFQDN(url);
this._requestPolicy = requestPolicy ?? {
jsonRpcMaxBatchSize: 200,
maxConcurrentRequests: 5,
Expand Down Expand Up @@ -152,28 +175,33 @@ export class ExtendedJsonRpcBatchProvider extends JsonRpcProvider {

const batchRequest = batch.map((intent) => intent.request);

this.emit('debug', {
action: 'requestBatch',
const event: ProviderRequestBatchedEvent = {
action: 'provider:request-batched',
request: deepCopy(batchRequest),
provider: this,
});
domain: this._domain,
};
this._eventEmitter.emit('rpc', event);

this._concurrencyLimiter(() => {
return this._fetchMiddlewareService.go(
() => this.fetchJson(this.connection, JSON.stringify(batchRequest)),
{
provider: this,
domain: this._domain,
},
);
})
.then(
(batchResult: JsonRpcResponse[] | JsonRpcResponse) => {
this.emit('debug', {
action: 'response',
const event: ProviderResponseBatchedEvent = {
action: 'provider:response-batched',
request: deepCopy(batchRequest),
response: deepCopy(batchResult),
provider: this,
});
domain: this._domain,
};
this._eventEmitter.emit('rpc', event);

if (!Array.isArray(batchResult)) {
const errMessage = 'Unexpected batch result.';
Expand Down Expand Up @@ -216,12 +244,14 @@ export class ExtendedJsonRpcBatchProvider extends JsonRpcProvider {
});
},
(error: Error) => {
this.emit('debug', {
action: 'response',
const event: ProviderResponseBatchedErrorEvent = {
action: 'provider:response-batched:error',
error: error,
request: deepCopy(batchRequest),
provider: this,
});
domain: this._domain,
};
this._eventEmitter.emit('rpc', event);

batch.forEach((inflightRequest) => {
inflightRequest.reject(error);
Expand All @@ -230,12 +260,14 @@ export class ExtendedJsonRpcBatchProvider extends JsonRpcProvider {
)
.catch((error: Error) => {
// catch errors happening in the 'then' callback
this.emit('debug', {
action: 'response',
const event: ProviderResponseBatchedErrorEvent = {
action: 'provider:response-batched:error',
error: error,
request: deepCopy(batchRequest),
provider: this,
});
domain: this._domain,
};
this._eventEmitter.emit('rpc', event);

batch.forEach((inflightRequest) => {
inflightRequest.reject(error);
Expand Down Expand Up @@ -299,6 +331,14 @@ export class ExtendedJsonRpcBatchProvider extends JsonRpcProvider {
this._fetchMiddlewareService.use(callback);
}

public get domain(): string {
return this._domain;
}

public get eventEmitter() {
return this._eventEmitter;
}

public send(method: string, params: Array<unknown>): Promise<unknown> {
const request: JsonRpcRequest = {
method: method,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ import { AllProvidersFailedError } from '../error/all-providers-failed.error';
import { FeeHistory, getFeeHistory } from '../ethers/fee-history';
import { TraceConfig, TraceResult } from '../interfaces/debug-traces';
import { getDebugTraceBlockByHash } from '../ethers/debug-trace-block-by-hash';
import { EventEmitter } from 'events';
import {
FallbackProviderEvents,
FallbackProviderRequestEvent,
FallbackProviderRequestFailedAllEvent,
FallbackProviderRequestNonRetryableErrorEvent,
} from '../events';

/**
* EIP-1898 support
Expand Down Expand Up @@ -62,6 +69,20 @@ declare module '@ethersproject/providers' {
}
}

// this will help with autocomplete
export interface SimpleFallbackJsonRpcBatchProviderEventEmitter
extends NodeJS.EventEmitter {
on(eventName: 'rpc', listener: (event: FallbackProviderEvents) => void): this;
once(
eventName: 'rpc',
listener: (event: FallbackProviderEvents) => void,
): this;
addListener(
eventName: 'rpc',
listener: (event: FallbackProviderEvents) => void,
): this;
}

@Injectable()
export class SimpleFallbackJsonRpcBatchProvider extends BaseProvider {
protected config: SimpleFallbackProviderConfig;
Expand All @@ -73,12 +94,14 @@ export class SimpleFallbackJsonRpcBatchProvider extends BaseProvider {
// it is crucial not to mix these two errors
protected lastPerformError: Error | null | unknown = null; // last error for 'perform' operations, is batch-oriented
protected lastError: Error | null | unknown = null; // last error for whole provider
protected _eventEmitter: SimpleFallbackJsonRpcBatchProviderEventEmitter;

public constructor(
config: SimpleFallbackProviderConfig,
logger: LoggerService,
) {
super(config.network);
this._eventEmitter = new EventEmitter();
this.config = {
maxRetries: 3,
minBackoffMs: 500,
Expand Down Expand Up @@ -113,6 +136,12 @@ export class SimpleFallbackJsonRpcBatchProvider extends BaseProvider {
config.requestPolicy,
config.fetchMiddlewares ?? [],
);

// re-emitting events from fallback-providers
provider.eventEmitter.on('rpc', (event) => {
this._eventEmitter.emit('rpc', event);
});

return {
network: null,
provider,
Expand Down Expand Up @@ -245,16 +274,37 @@ export class SimpleFallbackJsonRpcBatchProvider extends BaseProvider {
// maximum number of switching is limited to total fallback provider count
while (attempt < this.fallbackProviders.length) {
try {
let performRetryAttempt = 0;
attempt++;
// awaiting is extremely important here
// without it, the error will not be caught in current try-catch scope
return await retry(() =>
this.provider.provider.perform(method, params),
);
return await retry(() => {
const provider = this.provider;

const event: FallbackProviderRequestEvent = {
action: 'fallback-provider:request',
provider: this,
activeFallbackProviderIndex: this.activeFallbackProviderIndex,
fallbackProvidersCount: this.fallbackProviders.length,
domain: provider.provider.domain,
retryAttempt: performRetryAttempt,
};
this._eventEmitter.emit('rpc', event);

performRetryAttempt++;
return provider.provider.perform(method, params);
});
} catch (e) {
this.lastError = e;

// checking that error should not be retried on another provider
if (this.isNonRetryableError(e)) {
const event: FallbackProviderRequestNonRetryableErrorEvent = {
action: 'fallback-provider:request:non-retryable-error',
provider: this,
error: e,
};
this._eventEmitter.emit('rpc', event);
throw e;
}

Expand All @@ -279,6 +329,14 @@ export class SimpleFallbackJsonRpcBatchProvider extends BaseProvider {
'All attempts to do ETH1 RPC request failed',
);
allProvidersFailedError.cause = this.lastError;

const event: FallbackProviderRequestFailedAllEvent = {
action: 'fallback-provider:request:failed:all',
provider: this,
error: allProvidersFailedError,
};
this._eventEmitter.emit('rpc', event);

throw allProvidersFailedError;
}

Expand Down Expand Up @@ -382,4 +440,8 @@ export class SimpleFallbackJsonRpcBatchProvider extends BaseProvider {
public get activeProviderIndex() {
return this.activeFallbackProviderIndex;
}

public get eventEmitter() {
return this._eventEmitter;
}
}
18 changes: 17 additions & 1 deletion packages/execution/test/fallback-provider.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import { MiddlewareCallback } from '@lido-nestjs/middleware';
import { Network } from '@ethersproject/networks';
import { nonRetryableErrors } from '../src/common/errors';
import { ErrorCode, Logger } from '@ethersproject/logger';
import { AllProvidersFailedError } from '../src';
import { AllProvidersFailedError, FallbackProviderEvents } from '../src';

export type MockedExtendedJsonRpcBatchProvider =
ExtendedJsonRpcBatchProvider & {
Expand Down Expand Up @@ -797,5 +797,21 @@ describe('Execution module. ', () => {
// second 'getBlock' fetch call to second provider that does not fail
expect(mockedFallbackProviderFetch[1]).toBeCalledTimes(2);
});

test('should emit `fallback-provider:request` events', async () => {
await createMocks(2);

let retryAttempt = NaN;
mockedProvider.eventEmitter.on('rpc', (event: FallbackProviderEvents) => {
if (event.action === 'fallback-provider:request') {
retryAttempt = event.retryAttempt;
}
});

const block = await mockedProvider.getBlock(42);

expect(retryAttempt).toBe(0);
expect(block.hash).toBe(fixtures.eth_getBlockByNumber.default.hash);
});
});
});
Loading

0 comments on commit 14b9d8b

Please sign in to comment.