Skip to content

Commit

Permalink
chore: adjust runner options for config files
Browse files Browse the repository at this point in the history
  • Loading branch information
mfw78 committed Jan 17, 2024
1 parent 6637318 commit c0b8f79
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 74 deletions.
30 changes: 23 additions & 7 deletions src/commands/run.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { RunSingleOptions } from "../types";
import { RunOptions } from "../types";
import { getLogger, DBService } from "../utils";
import { ChainContext } from "../domain";
import { ApiService } from "../utils/api";
Expand All @@ -7,9 +7,9 @@ import { ApiService } from "../utils/api";
* Run the watch-tower 👀🐮
* @param options Specified by the CLI / environment for running the watch-tower
*/
export async function run(options: RunSingleOptions) {
export async function run(options: RunOptions) {
const log = getLogger("commands:run");
const { oneShot, disableApi, apiPort, databasePath } = options;
const { oneShot, disableApi, apiPort, databasePath, networks } = options;

// Open the database
const storage = DBService.getInstance(databasePath);
Expand All @@ -33,11 +33,27 @@ export async function run(options: RunSingleOptions) {

let exitCode = 0;
try {
const chainContext = await ChainContext.init(options, storage);
const runPromise = chainContext.warmUp(oneShot);
const chainContexts = await Promise.all(
networks.map((network) => {
const { name } = network;
log.info(`Starting chain ${name}...`);
return ChainContext.init(
{
...options,
...network,
},
storage
);
})
);

// Run the block watcher after warm up for the chain
await runPromise;
// Run the block watcher after warm up for each chain
const runPromises = chainContexts.map(async (context) => {
return context.warmUp(oneShot);
});

// Run all the chain contexts
await Promise.all(runPromises);
} catch (error) {
log.error("Unexpected error thrown when running watchtower", error);
exitCode = 1;
Expand Down
43 changes: 12 additions & 31 deletions src/domain/chainContext.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import {
RunSingleOptions,
Registry,
ReplayPlan,
ConditionalOrderCreatedEvent,
Expand All @@ -8,6 +7,7 @@ import {
Multicall3__factory,
RegistryBlock,
blockToRegistryBlock,
ContextOptions,
} from "../types";
import {
SupportedChainId,
Expand All @@ -34,10 +34,11 @@ import {
import { hexZeroPad } from "ethers/lib/utils";
import { FilterPolicy } from "../utils/filterPolicy";

const WATCHDOG_FREQUENCY = 5 * 1000; // 5 seconds
const WATCHDOG_FREQUENCY_SECS = 5; // 5 seconds
const WATCHDOG_TIMEOUT_DEFAULT_SECS = 30;

const MULTICALL3 = "0xcA11bde05977b3631167028862bE2a173976CA11";
const FILTER_FREQUENCY_SECS = 60 * 60; // 1 hour
const PAGE_SIZE_DEFAULT = 5000;

export const SDK_BACKOFF_NUM_OF_ATTEMPTS = 5;

Expand Down Expand Up @@ -97,7 +98,7 @@ export class ChainContext {
multicall: Multicall3;

protected constructor(
options: RunSingleOptions,
options: ContextOptions,
provider: providers.Provider,
chainId: SupportedChainId,
registry: Registry
Expand All @@ -109,12 +110,12 @@ export class ChainContext {
watchdogTimeout,
owners,
orderBookApi,
filterPolicyConfig,
filterPolicy,
} = options;
this.deploymentBlock = deploymentBlock;
this.pageSize = pageSize;
this.pageSize = pageSize ?? PAGE_SIZE_DEFAULT;
this.dryRun = dryRun;
this.watchdogTimeout = watchdogTimeout;
this.watchdogTimeout = watchdogTimeout ?? WATCHDOG_TIMEOUT_DEFAULT_SECS;
this.addresses = owners;

this.provider = provider;
Expand All @@ -135,12 +136,7 @@ export class ChainContext {
},
});

this.filterPolicy = filterPolicyConfig
? new FilterPolicy({
configBaseUrl: filterPolicyConfig,
// configAuthToken: filterPolicyConfigAuthToken, // TODO: Implement authToken
})
: undefined;
this.filterPolicy = new FilterPolicy(filterPolicy);
this.contract = composableCowContract(this.provider, this.chainId);
this.multicall = Multicall3__factory.connect(MULTICALL3, this.provider);
}
Expand All @@ -153,7 +149,7 @@ export class ChainContext {
* @returns A chain context that is monitoring for orders on the chain.
*/
public static async init(
options: RunSingleOptions,
options: ContextOptions,
storage: DBService
): Promise<ChainContext> {
const { rpc, deploymentBlock } = options;
Expand Down Expand Up @@ -384,7 +380,7 @@ export class ChainContext {
// pod, we don't exit, but we do log an error and set the sync status to unknown.
while (true) {
// sleep for 5 seconds
await asyncSleep(WATCHDOG_FREQUENCY);
await asyncSleep(WATCHDOG_FREQUENCY_SECS * 1000);
const now = Math.floor(new Date().getTime() / 1000);
const timeElapsed = now - lastBlockReceived.timestamp;

Expand Down Expand Up @@ -460,7 +456,7 @@ async function processBlock(
blockNumberOverride?: number,
blockTimestampOverride?: number
) {
const { provider, chainId, filterPolicy } = context;
const { provider, chainId } = context;
const timer = processBlockDurationSeconds
.labels(context.chainId.toString())
.startTimer();
Expand All @@ -470,21 +466,6 @@ async function processBlock(
block.number.toString()
);

// Refresh the policy every hour
// NOTE: This is a temporary solution until we have a better way to update the filter policy
const blocksPerFilterFrequency =
FILTER_FREQUENCY_SECS /
(context.chainId === SupportedChainId.GNOSIS_CHAIN ? 5 : 12); // 5 seconds for gnosis, 12 seconds for mainnet
if (
filterPolicy &&
block.number % (FILTER_FREQUENCY_SECS / blocksPerFilterFrequency) == 0
) {
filterPolicy.reloadPolicies().catch((error) => {
console.log(`Error fetching the filter policy config for chain `, error);
return null;
});
}

// Transaction watcher for adding new contracts
let hasErrors = false;
for (const event of events) {
Expand Down
2 changes: 2 additions & 0 deletions src/types/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import type { Config } from "./types";

export type LogOptions = {
logLevel: string;
databasePath: string;
Expand Down
6 changes: 3 additions & 3 deletions src/utils/context.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import Slack = require("node-slack");
import { DBService } from "./db";

import { ExecutionContext, Registry, RunSingleOptions } from "../types";
import { ContextOptions, ExecutionContext, Registry } from "../types";
import { SupportedChainId } from "@cowprotocol/cow-sdk";
import { getLogger } from "./logging";

Expand All @@ -12,7 +12,7 @@ let executionContext: ExecutionContext | undefined;
export async function initContext(
transactionName: string,
chainId: SupportedChainId,
options: RunSingleOptions
options: ContextOptions
): Promise<ExecutionContext> {
// Init storage
const storage = DBService.getInstance();
Expand All @@ -37,7 +37,7 @@ export async function initContext(
return executionContext;
}

function _getSlack(options: RunSingleOptions): Slack | undefined {
function _getSlack(options: ContextOptions): Slack | undefined {
if (executionContext) {
return executionContext?.slack;
}
Expand Down
51 changes: 18 additions & 33 deletions src/utils/filterPolicy.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { ConditionalOrderParams } from "@cowprotocol/cow-sdk";
import { Config } from "../types";

export enum FilterAction {
DROP = "DROP",
Expand All @@ -22,11 +23,14 @@ export interface FilterPolicyParams {
// configAuthToken: string; // TODO: Implement authToken
}
export class FilterPolicy {
protected configUrl: string;
protected config: PolicyConfig | undefined;

constructor({ configBaseUrl }: FilterPolicyParams) {
this.configUrl = configBaseUrl;
constructor(config: Config["networks"][number]["filterPolicy"]) {
this.config = {
defaultAction: FilterAction[config.defaultAction],
owners: this.convertToMap(config.owners),
handlers: this.convertToMap(config.handlers),
};
}

/**
Expand All @@ -52,35 +56,16 @@ export class FilterPolicy {
return this.config.defaultAction;
}

/**
* Reloads the policies with their latest version
*/
async reloadPolicies() {
const policyConfig = await this.getConfig();

if (policyConfig) {
this.config = policyConfig;
}
}

protected async getConfig(): Promise<PolicyConfig> {
if (!this.configUrl) {
throw new Error("configUrl must be defined");
}
const configResponse = await fetch(this.configUrl); // TODO: Implement authToken

if (!configResponse.ok) {
throw new Error(
`Failed to fetch policy. Error ${
configResponse.status
}: ${await configResponse.text().catch(() => "")}`
);
}
const config = await configResponse.json();
return {
defaultAction: config.defaultAction,
owners: new Map(Object.entries(config.owners)),
handlers: new Map(Object.entries(config.handlers)),
};
private convertToMap(object?: {
[k: string]: "ACCEPT" | "DROP" | "SKIP";
}): Map<string, FilterAction> {
return object
? new Map(
Object.entries(object).map(([key, value]) => [
key,
FilterAction[value],
])
)
: new Map<string, FilterAction>();
}
}

0 comments on commit c0b8f79

Please sign in to comment.