Skip to content

Commit

Permalink
chore: address filter and chain config (#112)
Browse files Browse the repository at this point in the history
# Description
This PR tweaks some configuration options for easing deployment on
dappnode.

# Changes

- [x] `rpc`, `deploymentBlock`, `watchdogTimeout`, and `orderBookApi`
collapsed down into `chain-config`
- [x] Added `ChainConfig` option parser
- [x] Added user-defined `address` filter (only process conditional
orders from specific addresses)
- [x] Fixes a calculation issue with the watchdog
 
## How to test

1. Follow the readme instructions for quickly running
2. Observe no errors

## Related Issues

Fixes: #72
  • Loading branch information
mfw78 authored Nov 20, 2023
1 parent cc446f3 commit 4f91712
Show file tree
Hide file tree
Showing 7 changed files with 234 additions and 163 deletions.
20 changes: 14 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,22 @@ As an example, to run the latest version of the watch-tower via `docker`:
docker run --rm -it \
ghcr.io/cowprotocol/watch-tower:latest \
run \
--rpc <rpc-url> \
--deployment-block <deployment-block> \
--chain-config <rpc>,<deployment-block> \
--page-size 5000
```

### Dappnode
**NOTE**: There are multiple optional arguments on the `--chain-config` parameter. For a full explanation of the optional arguments, use the `--help` flag:

**TODO**: Add instructions for deploying to Dappnode.
```bash
docker run --rm -it \
ghcr.io/cowprotocol/watch-tower:latest \
run \
--help
```

### DAppNode

For [DAppNode](https://dappnode.com), the watch-tower is available as a package. This package is held in a [separate repository](https://github.com/cowprotocol/dappnodepackage-cow-watch-tower).

### Running locally

Expand All @@ -55,7 +63,7 @@ docker run --rm -it \
# Install dependencies
yarn
# Run watch-tower
yarn cli run --rpc <rpc-url> --deployment-block <deployment-block> --page-size 5000
yarn cli run --chain-config <rpc>,<deployment-block> --page-size 5000
```

## Architecture
Expand Down Expand Up @@ -178,7 +186,7 @@ It is recommended to test against the Goerli testnet. To run the watch-tower:
# Install dependencies
yarn
# Run watch-tower
yarn cli run --rpc <rpc-url> --deployment-block <deployment-block> --page-size 5000
yarn cli run --chain-config <rpc>,<deployment-block> --page-size 5000
```

### Testing
Expand Down
5 changes: 2 additions & 3 deletions src/commands/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ import { ApiService } from "../utils/api";
*/
export async function run(options: RunSingleOptions) {
const log = getLogger("commands:run");
const { oneShot, disableApi, apiPort, databasePath, watchdogTimeout } =
options;
const { oneShot, disableApi, apiPort, databasePath } = options;

// Open the database
const storage = DBService.getInstance(databasePath);
Expand All @@ -35,7 +34,7 @@ export async function run(options: RunSingleOptions) {
let exitCode = 0;
try {
const chainContext = await ChainContext.init(options, storage);
const runPromise = chainContext.warmUp(watchdogTimeout, oneShot);
const runPromise = chainContext.warmUp(oneShot);

// Run the block watcher after warm up for the chain
await runPromise;
Expand Down
7 changes: 5 additions & 2 deletions src/commands/runMulti.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ export async function runMulti(options: RunMultiOptions) {
const {
rpcs,
deploymentBlocks,
watchdogTimeouts,
orderBookApis,
oneShot,
disableApi,
apiPort,
databasePath,
watchdogTimeout,
} = options;

// Open the database
Expand Down Expand Up @@ -48,6 +49,8 @@ export async function runMulti(options: RunMultiOptions) {
...options,
rpc,
deploymentBlock: deploymentBlocks[index],
watchdogTimeout: watchdogTimeouts[index],
orderBookApi: orderBookApis[index],
},
storage
);
Expand All @@ -56,7 +59,7 @@ export async function runMulti(options: RunMultiOptions) {

// Run the block watcher after warm up for each chain
const runPromises = chainContexts.map(async (context) => {
return context.warmUp(watchdogTimeout, oneShot);
return context.warmUp(oneShot);
});

// Run all the chain contexts
Expand Down
84 changes: 51 additions & 33 deletions src/domain/chainContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {
} from "@cowprotocol/cow-sdk";
import { addContract } from "./addContract";
import { checkForAndPlaceOrder } from "./checkForAndPlaceOrder";
import { ethers } from "ethers";
import { EventFilter, providers } from "ethers";
import {
composableCowContract,
DBService,
Expand All @@ -31,6 +31,7 @@ import {
reorgDepth,
reorgsTotal,
} from "../utils/metrics";
import { hexZeroPad } from "ethers/lib/utils";

const WATCHDOG_FREQUENCY = 5 * 1000; // 5 seconds

Expand Down Expand Up @@ -74,40 +75,51 @@ export class ChainContext {
readonly deploymentBlock: number;
readonly pageSize: number;
readonly dryRun: boolean;
readonly watchdogTimeout: number;
readonly addresses?: string[];
readonly orderBookApiBaseUrls?: ApiBaseUrls;
private sync: ChainSync = ChainSync.SYNCING;
static chains: Chains = {};

provider: ethers.providers.Provider;
provider: providers.Provider;
chainId: SupportedChainId;
registry: Registry;
orderBook: OrderBookApi;
contract: ComposableCoW;
multicall: Multicall3;
orderBookApiBaseUrls?: ApiBaseUrls;

protected constructor(
options: RunSingleOptions,
provider: ethers.providers.Provider,
provider: providers.Provider,
chainId: SupportedChainId,
registry: Registry,
orderBookApi?: string
registry: Registry
) {
const { deploymentBlock, pageSize, dryRun } = options;
const {
deploymentBlock,
pageSize,
dryRun,
watchdogTimeout,
owners,
orderBookApi,
} = options;
this.deploymentBlock = deploymentBlock;
this.pageSize = pageSize;
this.dryRun = dryRun;
this.watchdogTimeout = watchdogTimeout;
this.addresses = owners;

this.provider = provider;
this.chainId = chainId;
this.registry = registry;

this.orderBookApiBaseUrls = orderBookApi
? ({
[this.chainId]: orderBookApi,
} as ApiBaseUrls) // FIXME: do not do this casting once this is fixed https://github.com/cowprotocol/cow-sdk/issues/176
: undefined;

this.orderBook = new OrderBookApi({
chainId: this.chainId,
chainId,
baseUrls: this.orderBookApiBaseUrls,
backoffOpts: {
numOfAttempts: SDK_BACKOFF_NUM_OF_ATTEMPTS,
Expand All @@ -128,9 +140,9 @@ export class ChainContext {
options: RunSingleOptions,
storage: DBService
): Promise<ChainContext> {
const { rpc, orderBookApi, deploymentBlock } = options;
const { rpc, deploymentBlock } = options;

const provider = new ethers.providers.JsonRpcProvider(rpc);
const provider = new providers.JsonRpcProvider(rpc);
const chainId = (await provider.getNetwork()).chainId;

const registry = await Registry.load(
Expand All @@ -140,13 +152,7 @@ export class ChainContext {
);

// Save the context to the static map to be used by the API
const context = new ChainContext(
options,
provider,
chainId,
registry,
orderBookApi
);
const context = new ChainContext(options, provider, chainId, registry);
ChainContext.chains[chainId] = context;

return context;
Expand All @@ -155,11 +161,10 @@ export class ChainContext {
/**
* Warm up the chain watcher by fetching the latest block number and
* checking if the chain is in sync.
* @param watchdogTimeout the timeout for the watchdog
* @param oneShot if true, only warm up the chain watcher and return
* @returns the run promises for what needs to be watched
*/
public async warmUp(watchdogTimeout: number, oneShot?: boolean) {
public async warmUp(oneShot?: boolean) {
const { provider, chainId } = this;
const log = getLogger("chainContext:warmUp", chainId.toString());
const { lastProcessedBlock } = this.registry;
Expand All @@ -186,6 +191,14 @@ export class ChainContext {
toBlock =
toBlock > currentBlock.number ? currentBlock.number : toBlock;

// This happens when the watch-tower has restarted and the last processed block is
// the current block. Therefore the `fromBlock` is the current block + 1, which is
// greater than the current block number. In this case, we are in sync.
if (fromBlock > currentBlock.number) {
this.sync = ChainSync.IN_SYNC;
break;
}

log.debug(
`Reaching tip of chain, current block number: ${currentBlock.number}`
);
Expand Down Expand Up @@ -277,27 +290,26 @@ export class ChainContext {
oneShot ? "Chain watcher is in sync" : "Chain watcher is warmed up"
}`
);
log.debug(`Last processed block: ${this.registry.lastProcessedBlock}`);
log.debug(
`Last processed block: ${this.registry.lastProcessedBlock.number}`
);

// If one-shot, return
if (oneShot) {
return;
}

// Otherwise, run the block watcher
return await this.runBlockWatcher(watchdogTimeout, currentBlock);
return await this.runBlockWatcher(currentBlock);
}

/**
* Run the block watcher for the chain. As new blocks come in:
* 1. Check if there are any `ConditionalOrderCreated` events, and index these.
* 2. Check if any orders want to create discrete orders.
*/
private async runBlockWatcher(
watchdogTimeout: number,
lastProcessedBlock: ethers.providers.Block
) {
const { provider, registry, chainId } = this;
private async runBlockWatcher(lastProcessedBlock: providers.Block) {
const { provider, registry, chainId, watchdogTimeout } = this;
const log = getLogger("chainContext:runBlockWatcher", chainId.toString());
// Watch for new blocks
log.info(`👀 Start block watcher`);
Expand Down Expand Up @@ -360,14 +372,13 @@ export class ChainContext {
const now = Math.floor(new Date().getTime() / 1000);
const timeElapsed = now - lastBlockReceived.timestamp;

log.debug(`Time since last block processed: ${timeElapsed}ms`);
log.debug(`Time since last block processed: ${timeElapsed}s`);

// If we haven't received a block within `watchdogTimeout` seconds, either signal
// an error or exit if not running in a kubernetes pod
if (timeElapsed >= watchdogTimeout * 1000) {
const formattedElapsedTime = Math.floor(timeElapsed / 1000);
if (timeElapsed >= watchdogTimeout) {
log.error(
`Chain watcher last processed a block ${formattedElapsedTime}s ago (${watchdogTimeout}s timeout configured). Check the RPC.`
`Chain watcher last processed a block ${timeElapsed}s ago (${watchdogTimeout}s timeout configured). Check the RPC.`
);
if (isRunningInKubernetesPod()) {
this.sync = ChainSync.UNKNOWN;
Expand Down Expand Up @@ -428,7 +439,7 @@ export class ChainContext {
*/
async function processBlock(
context: ChainContext,
block: ethers.providers.Block,
block: providers.Block,
events: ConditionalOrderCreatedEvent[],
blockNumberOverride?: number,
blockTimestampOverride?: number
Expand Down Expand Up @@ -492,9 +503,16 @@ function pollContractForEvents(
toBlock: number | "latest",
context: ChainContext
): Promise<ConditionalOrderCreatedEvent[]> {
const { provider, chainId } = context;
const { provider, chainId, addresses } = context;
const composableCow = composableCowContract(provider, chainId);
const filter = composableCow.filters.ConditionalOrderCreated();
const filter = composableCow.filters.ConditionalOrderCreated() as EventFilter;

if (addresses) {
filter.topics?.push(
addresses.map((address) => hexZeroPad(address.toLowerCase(), 32))
);
}

return composableCow.queryFilter(filter, fromBlock, toBlock);
}

Expand Down
2 changes: 2 additions & 0 deletions src/domain/checkForAndPlaceOrder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,8 @@ async function _processConditionalOrder(
blockNumber,
},
provider,
// TODO: This should be DRY'ed. Upstream should take just an `orderBook` object that
// is already configured.
orderbookApiConfig: {
baseUrls: orderBookApiBaseUrls,
backoffOpts: {
Expand Down
Loading

0 comments on commit 4f91712

Please sign in to comment.