diff --git a/README.md b/README.md index a779c75..65c8e3b 100644 --- a/README.md +++ b/README.md @@ -33,14 +33,22 @@ As an example, to run the latest version of the watch-tower via `docker`: docker run --rm -it \ ghcr.io/cowprotocol/watch-tower:latest \ run \ - --rpc \ - --deployment-block \ + --chain-config , \ --page-size 5000 ``` -### Dappnode +**NOTE**: There are multiple optional arguments on the `--chain-config` parameter. For a full explanation of the optional arguments, use the `--help` flag: -**TODO**: Add instructions for deploying to Dappnode. + ```bash + docker run --rm -it \ + ghcr.io/cowprotocol/watch-tower:latest \ + run \ + --help + ``` + +### DAppNode + +For [DAppNode](https://dappnode.com), the watch-tower is available as a package. This package is held in a [separate repository](https://github.com/cowprotocol/dappnodepackage-cow-watch-tower). ### Running locally @@ -55,7 +63,7 @@ docker run --rm -it \ # Install dependencies yarn # Run watch-tower -yarn cli run --rpc --deployment-block --page-size 5000 +yarn cli run --chain-config , --page-size 5000 ``` ## Architecture @@ -178,7 +186,7 @@ It is recommended to test against the Goerli testnet. To run the watch-tower: # Install dependencies yarn # Run watch-tower -yarn cli run --rpc --deployment-block --page-size 5000 +yarn cli run --chain-config , --page-size 5000 ``` ### Testing diff --git a/src/commands/run.ts b/src/commands/run.ts index 6ae9e0c..6e0aea8 100644 --- a/src/commands/run.ts +++ b/src/commands/run.ts @@ -9,8 +9,7 @@ import { ApiService } from "../utils/api"; */ export async function run(options: RunSingleOptions) { const log = getLogger("commands:run"); - const { oneShot, disableApi, apiPort, databasePath, watchdogTimeout } = - options; + const { oneShot, disableApi, apiPort, databasePath } = options; // Open the database const storage = DBService.getInstance(databasePath); @@ -35,7 +34,7 @@ export async function run(options: RunSingleOptions) { let exitCode = 0; try { const chainContext = await ChainContext.init(options, storage); - const runPromise = chainContext.warmUp(watchdogTimeout, oneShot); + const runPromise = chainContext.warmUp(oneShot); // Run the block watcher after warm up for the chain await runPromise; diff --git a/src/commands/runMulti.ts b/src/commands/runMulti.ts index 9cd83ea..9b1c353 100644 --- a/src/commands/runMulti.ts +++ b/src/commands/runMulti.ts @@ -12,11 +12,12 @@ export async function runMulti(options: RunMultiOptions) { const { rpcs, deploymentBlocks, + watchdogTimeouts, + orderBookApis, oneShot, disableApi, apiPort, databasePath, - watchdogTimeout, } = options; // Open the database @@ -48,6 +49,8 @@ export async function runMulti(options: RunMultiOptions) { ...options, rpc, deploymentBlock: deploymentBlocks[index], + watchdogTimeout: watchdogTimeouts[index], + orderBookApi: orderBookApis[index], }, storage ); @@ -56,7 +59,7 @@ export async function runMulti(options: RunMultiOptions) { // Run the block watcher after warm up for each chain const runPromises = chainContexts.map(async (context) => { - return context.warmUp(watchdogTimeout, oneShot); + return context.warmUp(oneShot); }); // Run all the chain contexts diff --git a/src/domain/chainContext.ts b/src/domain/chainContext.ts index dfbe831..71d3010 100644 --- a/src/domain/chainContext.ts +++ b/src/domain/chainContext.ts @@ -16,7 +16,7 @@ import { } from "@cowprotocol/cow-sdk"; import { addContract } from "./addContract"; import { checkForAndPlaceOrder } from "./checkForAndPlaceOrder"; -import { ethers } from "ethers"; +import { EventFilter, providers } from "ethers"; import { composableCowContract, DBService, @@ -31,6 +31,7 @@ import { reorgDepth, reorgsTotal, } from "../utils/metrics"; +import { hexZeroPad } from "ethers/lib/utils"; const WATCHDOG_FREQUENCY = 5 * 1000; // 5 seconds @@ -74,32 +75,43 @@ export class ChainContext { readonly deploymentBlock: number; readonly pageSize: number; readonly dryRun: boolean; + readonly watchdogTimeout: number; + readonly addresses?: string[]; + readonly orderBookApiBaseUrls?: ApiBaseUrls; private sync: ChainSync = ChainSync.SYNCING; static chains: Chains = {}; - provider: ethers.providers.Provider; + provider: providers.Provider; chainId: SupportedChainId; registry: Registry; orderBook: OrderBookApi; contract: ComposableCoW; multicall: Multicall3; - orderBookApiBaseUrls?: ApiBaseUrls; protected constructor( options: RunSingleOptions, - provider: ethers.providers.Provider, + provider: providers.Provider, chainId: SupportedChainId, - registry: Registry, - orderBookApi?: string + registry: Registry ) { - const { deploymentBlock, pageSize, dryRun } = options; + const { + deploymentBlock, + pageSize, + dryRun, + watchdogTimeout, + owners, + orderBookApi, + } = options; this.deploymentBlock = deploymentBlock; this.pageSize = pageSize; this.dryRun = dryRun; + this.watchdogTimeout = watchdogTimeout; + this.addresses = owners; this.provider = provider; this.chainId = chainId; this.registry = registry; + this.orderBookApiBaseUrls = orderBookApi ? ({ [this.chainId]: orderBookApi, @@ -107,7 +119,7 @@ export class ChainContext { : undefined; this.orderBook = new OrderBookApi({ - chainId: this.chainId, + chainId, baseUrls: this.orderBookApiBaseUrls, backoffOpts: { numOfAttempts: SDK_BACKOFF_NUM_OF_ATTEMPTS, @@ -128,9 +140,9 @@ export class ChainContext { options: RunSingleOptions, storage: DBService ): Promise { - const { rpc, orderBookApi, deploymentBlock } = options; + const { rpc, deploymentBlock } = options; - const provider = new ethers.providers.JsonRpcProvider(rpc); + const provider = new providers.JsonRpcProvider(rpc); const chainId = (await provider.getNetwork()).chainId; const registry = await Registry.load( @@ -140,13 +152,7 @@ export class ChainContext { ); // Save the context to the static map to be used by the API - const context = new ChainContext( - options, - provider, - chainId, - registry, - orderBookApi - ); + const context = new ChainContext(options, provider, chainId, registry); ChainContext.chains[chainId] = context; return context; @@ -155,11 +161,10 @@ export class ChainContext { /** * Warm up the chain watcher by fetching the latest block number and * checking if the chain is in sync. - * @param watchdogTimeout the timeout for the watchdog * @param oneShot if true, only warm up the chain watcher and return * @returns the run promises for what needs to be watched */ - public async warmUp(watchdogTimeout: number, oneShot?: boolean) { + public async warmUp(oneShot?: boolean) { const { provider, chainId } = this; const log = getLogger("chainContext:warmUp", chainId.toString()); const { lastProcessedBlock } = this.registry; @@ -186,6 +191,14 @@ export class ChainContext { toBlock = toBlock > currentBlock.number ? currentBlock.number : toBlock; + // This happens when the watch-tower has restarted and the last processed block is + // the current block. Therefore the `fromBlock` is the current block + 1, which is + // greater than the current block number. In this case, we are in sync. + if (fromBlock > currentBlock.number) { + this.sync = ChainSync.IN_SYNC; + break; + } + log.debug( `Reaching tip of chain, current block number: ${currentBlock.number}` ); @@ -277,7 +290,9 @@ export class ChainContext { oneShot ? "Chain watcher is in sync" : "Chain watcher is warmed up" }` ); - log.debug(`Last processed block: ${this.registry.lastProcessedBlock}`); + log.debug( + `Last processed block: ${this.registry.lastProcessedBlock.number}` + ); // If one-shot, return if (oneShot) { @@ -285,7 +300,7 @@ export class ChainContext { } // Otherwise, run the block watcher - return await this.runBlockWatcher(watchdogTimeout, currentBlock); + return await this.runBlockWatcher(currentBlock); } /** @@ -293,11 +308,8 @@ export class ChainContext { * 1. Check if there are any `ConditionalOrderCreated` events, and index these. * 2. Check if any orders want to create discrete orders. */ - private async runBlockWatcher( - watchdogTimeout: number, - lastProcessedBlock: ethers.providers.Block - ) { - const { provider, registry, chainId } = this; + private async runBlockWatcher(lastProcessedBlock: providers.Block) { + const { provider, registry, chainId, watchdogTimeout } = this; const log = getLogger("chainContext:runBlockWatcher", chainId.toString()); // Watch for new blocks log.info(`👀 Start block watcher`); @@ -360,14 +372,13 @@ export class ChainContext { const now = Math.floor(new Date().getTime() / 1000); const timeElapsed = now - lastBlockReceived.timestamp; - log.debug(`Time since last block processed: ${timeElapsed}ms`); + log.debug(`Time since last block processed: ${timeElapsed}s`); // If we haven't received a block within `watchdogTimeout` seconds, either signal // an error or exit if not running in a kubernetes pod - if (timeElapsed >= watchdogTimeout * 1000) { - const formattedElapsedTime = Math.floor(timeElapsed / 1000); + if (timeElapsed >= watchdogTimeout) { log.error( - `Chain watcher last processed a block ${formattedElapsedTime}s ago (${watchdogTimeout}s timeout configured). Check the RPC.` + `Chain watcher last processed a block ${timeElapsed}s ago (${watchdogTimeout}s timeout configured). Check the RPC.` ); if (isRunningInKubernetesPod()) { this.sync = ChainSync.UNKNOWN; @@ -428,7 +439,7 @@ export class ChainContext { */ async function processBlock( context: ChainContext, - block: ethers.providers.Block, + block: providers.Block, events: ConditionalOrderCreatedEvent[], blockNumberOverride?: number, blockTimestampOverride?: number @@ -492,9 +503,16 @@ function pollContractForEvents( toBlock: number | "latest", context: ChainContext ): Promise { - const { provider, chainId } = context; + const { provider, chainId, addresses } = context; const composableCow = composableCowContract(provider, chainId); - const filter = composableCow.filters.ConditionalOrderCreated(); + const filter = composableCow.filters.ConditionalOrderCreated() as EventFilter; + + if (addresses) { + filter.topics?.push( + addresses.map((address) => hexZeroPad(address.toLowerCase(), 32)) + ); + } + return composableCow.queryFilter(filter, fromBlock, toBlock); } diff --git a/src/domain/checkForAndPlaceOrder.ts b/src/domain/checkForAndPlaceOrder.ts index 872bc00..69cbe04 100644 --- a/src/domain/checkForAndPlaceOrder.ts +++ b/src/domain/checkForAndPlaceOrder.ts @@ -271,6 +271,8 @@ async function _processConditionalOrder( blockNumber, }, provider, + // TODO: This should be DRY'ed. Upstream should take just an `orderBook` object that + // is already configured. orderbookApiConfig: { baseUrls: orderBookApiBaseUrls, backoffOpts: { diff --git a/src/index.ts b/src/index.ts index b078d57..4ef39be 100644 --- a/src/index.ts +++ b/src/index.ts @@ -5,7 +5,8 @@ import { Option, InvalidArgumentError, } from "@commander-js/extra-typings"; -import { ReplayTxOptions } from "./types"; +import { MultiChainConfigOptions, ChainConfigOptions } from "./types"; +import { getAddress, isHexString } from "ethers/lib/utils"; import { dumpDb, replayBlock, replayTx, run, runMulti } from "./commands"; import { initLogging } from "./utils"; import { version, description } from "../package.json"; @@ -63,14 +64,6 @@ const slackWebhookOption = new Option( "Slack webhook URL" ).env("SLACK_WEBHOOK"); -const watchdogTimeoutOption = new Option( - "--watchdog-timeout ", - "Watchdog timeout (in seconds)" -) - .default("30") - .env("WATCHDOG_TIMEOUT") - .argParser(parseIntOption); - const databasePathOption = new Option( "--database-path ", "Path to the database" @@ -78,37 +71,26 @@ const databasePathOption = new Option( .default(DEFAULT_DATABASE_PATH) .env("DATABASE_PATH"); -const multiRpcOption = new Option( - "--rpc ", - "Chain RPC endpoints to monitor" -).makeOptionMandatory(true); - -const rpcOption = new Option("--rpc ", "Chain RPC endpoint to monitor") +const chainConfigHelp = `Chain configuration in the format of ,[,,], e.g. http://erigon.dappnode:8545,12345678,30,https://api.cow.fi/mainnet`; +const multiChainConfigOption = new Option( + "--chain-config ", + chainConfigHelp +) .makeOptionMandatory(true) - .env("RPC"); - -const multiDeploymentBlockOption = new Option( - "--deployment-block ", - "Block number at which the ComposableCoW contract was deployed on the respective chains" -).makeOptionMandatory(true); + .argParser(parseChainConfigOptions); -const deploymentBlockOption = new Option( - "--deployment-block ", - "Block number at which the ComposableCoW was deployed" +const chainConfigOption = new Option( + "--chain-config ", + chainConfigHelp ) .makeOptionMandatory(true) - .env("DEPLOYMENT_BLOCK") - .argParser(parseIntOption); - -const multiOrderBookApiOption = new Option( - "--orderBookApi ", - "Orderbook API base URLs (i.e. https://api.cow.fi/mainnet, https://api.cow.fi/xdai, etc.)" -).default([]); + .env("CHAIN_CONFIG") + .argParser(parseChainConfigOption); -const orderBookApiOption = new Option( - "--orderBookApi ", - "Orderbook API base URL (i.e. https://api.cow.fi/mainnet)" -).default(undefined); +const onlyOwnerOption = new Option( + "--only-owner ", + "Addresses of contracts / safes to monitor conditional orders for" +).argParser(parseAddressOption); async function main() { program.name("watch-tower").description(description).version(version); @@ -116,12 +98,10 @@ async function main() { program .command("run") .description("Run the watch-tower, monitoring only a single chain") - .addOption(rpcOption) - .addOption(deploymentBlockOption) - .addOption(orderBookApiOption) + .addOption(chainConfigOption) + .addOption(onlyOwnerOption) .addOption(databasePathOption) .addOption(logLevelOption) - .addOption(watchdogTimeoutOption) .addOption(pageSizeOption) .addOption(dryRunOnlyOption) .addOption(oneShotOption) @@ -130,41 +110,24 @@ async function main() { .addOption(disableNotificationsOption) .addOption(slackWebhookOption) .action((options) => { - const { logLevel, orderBookApi } = options; - - const [pageSize, apiPort, watchdogTimeout, deploymentBlock] = [ - options.pageSize, - options.apiPort, - options.watchdogTimeout, - options.deploymentBlock, - ].map((value) => Number(value)); + const { logLevel, chainConfig, onlyOwner: owners } = options; + const [pageSize, apiPort] = [options.pageSize, options.apiPort].map( + (value) => Number(value) + ); initLogging({ logLevel }); // Run the watch-tower - run({ - ...options, - deploymentBlock, - orderBookApi, - pageSize, - apiPort, - watchdogTimeout, - }); + run({ ...options, ...chainConfig, owners, pageSize, apiPort }); }); program .command("run-multi") .description("Run the watch-tower monitoring multiple blockchains") - .addHelpText( - "before", - "RPC and deployment blocks must be the same length, and in the same order" - ) - .addOption(multiRpcOption) - .addOption(multiDeploymentBlockOption) - .addOption(multiOrderBookApiOption) + .addOption(multiChainConfigOption) + .addOption(onlyOwnerOption) .addOption(databasePathOption) .addOption(logLevelOption) - .addOption(watchdogTimeoutOption) .addOption(pageSizeOption) .addOption(dryRunOnlyOption) .addOption(oneShotOption) @@ -173,47 +136,24 @@ async function main() { .addOption(disableNotificationsOption) .addOption(slackWebhookOption) .action((options) => { - const { logLevel } = options; - const [pageSize, apiPort, watchdogTimeout] = [ - options.pageSize, - options.apiPort, - options.watchdogTimeout, - ].map((value) => Number(value)); - - initLogging({ logLevel }); const { - rpc: rpcs, - orderBookApi: orderBookApis, - deploymentBlock: deploymentBlocksEnv, + logLevel, + chainConfig: chainConfigs, + onlyOwner: owners, } = options; - - // Ensure that the deployment blocks are all numbers - const deploymentBlocks = deploymentBlocksEnv.map((block) => - Number(block) + const [pageSize, apiPort] = [options.pageSize, options.apiPort].map( + (value) => Number(value) ); - if (deploymentBlocks.some((block) => isNaN(block))) { - throw new Error("Deployment blocks must be numbers"); - } - - // Ensure that the RPCs and deployment blocks are the same length - if (rpcs.length !== deploymentBlocks.length) { - throw new Error("RPC and deployment blocks must be the same length"); - } - // Ensure that the orderBookApis and RPCs are the same length - if (orderBookApis.length > 0 && rpcs.length !== orderBookApis.length) { - throw new Error("orderBookApi and RPC urls must be the same length"); - } + initLogging({ logLevel }); // Run the watch-tower runMulti({ ...options, - rpcs, - deploymentBlocks, - orderBookApis, + ...chainConfigs, + owners, pageSize, apiPort, - watchdogTimeout, }); }); @@ -261,16 +201,16 @@ async function main() { program .command("replay-tx") .description("Reply a transaction") - .addOption(rpcOption) + .addOption(chainConfigOption) .addOption(dryRunOnlyOption) .addOption(logLevelOption) .addOption(databasePathOption) .requiredOption("--tx ", "Transaction hash to replay") - .action((options: ReplayTxOptions) => { - const { logLevel } = options; + .action((options) => { + const { logLevel, chainConfig } = options; initLogging({ logLevel }); - replayTx(options); + replayTx({ ...options, ...chainConfig }); }); await program.parseAsync(); @@ -284,6 +224,100 @@ function parseIntOption(option: string) { return parsed.toString(); } +function parseChainConfigOption(option: string): ChainConfigOptions { + // Split the option using ',' as the delimiter + const parts = option.split(","); + + // Ensure there are at least two parts (rpc and deploymentBlock) + if (parts.length < 2) { + throw new InvalidArgumentError( + `Chain configuration must be in the format of ,[,,], e.g. http://erigon.dappnode:8545,12345678,30,https://api.cow.fi/mainnet` + ); + } + + // Extract rpc and deploymentBlock from the parts + const rpc = parts[0]; + // Ensure that the RPC is a valid URL + if (!isValidUrl(rpc)) { + throw new InvalidArgumentError( + `${rpc} must be a valid URL (RPC) (chainConfig)` + ); + } + + const rawDeploymentBlock = parts[1]; + // Ensure that the deployment block is a positive number + const deploymentBlock = Number(rawDeploymentBlock); + if (isNaN(deploymentBlock) || deploymentBlock < 0) { + throw new InvalidArgumentError( + `${rawDeploymentBlock} must be a positive number (deployment block)` + ); + } + + const rawWatchdogTimeout = parts[2]; + // If there is a third part, it is the watchdogTimeout + const watchdogTimeout = parts.length > 2 ? Number(rawWatchdogTimeout) : 30; + // Ensure that the watchdogTimeout is a positive number + if (isNaN(watchdogTimeout) || watchdogTimeout < 0) { + throw new InvalidArgumentError( + `${rawWatchdogTimeout} must be a positive number (watchdogTimeout)` + ); + } + + // If there is a fourth part, it is the orderBookApi + const orderBookApi = parts.length > 3 ? parts[3] : undefined; + // Ensure that the orderBookApi is a valid URL + if (orderBookApi && !isValidUrl(orderBookApi)) { + throw new InvalidArgumentError( + `${orderBookApi} must be a valid URL (orderBookApi)` + ); + } + + return { rpc, deploymentBlock, watchdogTimeout, orderBookApi }; +} + +function parseChainConfigOptions( + option: string, + previous: MultiChainConfigOptions = { + rpcs: [], + deploymentBlocks: [], + watchdogTimeouts: [], + orderBookApis: [], + } +): MultiChainConfigOptions { + const parsedOption = parseChainConfigOption(option); + const { rpc, deploymentBlock, watchdogTimeout, orderBookApi } = parsedOption; + + previous.rpcs.push(rpc); + previous.deploymentBlocks.push(deploymentBlock); + previous.watchdogTimeouts.push(watchdogTimeout); + previous.orderBookApis.push(orderBookApi); + return previous; +} + +function isValidUrl(url: string): boolean { + try { + new URL(url); + return true; + } catch (error) { + return false; + } +} + +function parseAddressOption(option: string, previous: string[] = []): string[] { + // Use ethers to validate the address + try { + if (!isHexString(option)) { + throw new Error(); + } + getAddress(option); + } catch (error) { + throw new InvalidArgumentError( + `${option} must be a valid '0x' prefixed address` + ); + } + return [...previous, option]; +} + main().catch((error) => { console.error(error); process.exit(1); diff --git a/src/types/index.ts b/src/types/index.ts index be918ad..99e1cf6 100644 --- a/src/types/index.ts +++ b/src/types/index.ts @@ -1,49 +1,56 @@ -export interface LogOptions { +export type LogOptions = { logLevel: string; databasePath: string; -} +}; -export interface WatchtowerOptions extends LogOptions { +export type WatchtowerOptions = LogOptions & { dryRun: boolean; -} +}; -export interface WatchtowerReplayOptions extends WatchtowerOptions { +export type WatchtowerReplayOptions = WatchtowerOptions & { rpc: string; -} +}; -export interface RunOptions extends WatchtowerOptions { +export type RunOptions = WatchtowerOptions & { pageSize: number; silent: boolean; slackWebhook?: string; oneShot: boolean; disableApi: boolean; apiPort: number; - watchdogTimeout: number; -} + owners?: string[]; +}; + +export type OrderBookApi = string | undefined; -export interface RunSingleOptions extends RunOptions { +export type ChainConfigOptions = { rpc: string; - orderBookApi?: string; deploymentBlock: number; -} + watchdogTimeout: number; + orderBookApi: OrderBookApi; +}; -export interface RunMultiOptions extends RunOptions { +export type MultiChainConfigOptions = { rpcs: string[]; deploymentBlocks: number[]; - orderBookApis: string[]; -} + watchdogTimeouts: number[]; + orderBookApis: OrderBookApi[]; +}; + +export type RunSingleOptions = RunOptions & ChainConfigOptions; +export type RunMultiOptions = RunOptions & MultiChainConfigOptions; -export interface ReplayBlockOptions extends WatchtowerReplayOptions { +export type ReplayBlockOptions = WatchtowerReplayOptions & { block: number; -} +}; -export interface ReplayTxOptions extends WatchtowerReplayOptions { +export type ReplayTxOptions = WatchtowerReplayOptions & { tx: string; -} +}; -export interface DumpDbOptions extends LogOptions { +export type DumpDbOptions = LogOptions & { chainId: number; -} +}; export type ToBlock = "latest" | number;