Skip to content

Commit

Permalink
fix: save orders more often (#159)
Browse files Browse the repository at this point in the history
# Description

This PR introduces more often saving of orders after processing.
For now, after every 20 orders processed, it'll trigger a db save.

There are also some logging improvements to make debugging easier.

## How to test

Run the service.
It should start and process orders as usual.

## Related Issues

There have been performance issues which I'm not able to tell what's
causing it so far.
This change is an attempt to alleviate the in memory load by saving it
more often.
  • Loading branch information
alfetopito authored Oct 4, 2024
1 parent 90ecbf5 commit 4978b50
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 46 deletions.
77 changes: 49 additions & 28 deletions src/domain/polling/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,6 @@ import {
import { ethers } from "ethers";
import { BytesLike } from "ethers/lib/utils";

import { ConditionalOrder, OrderStatus } from "../../types";
import {
formatStatus,
getLogger,
handleOnChainCustomError,
metrics,
} from "../../utils";
import {
ConditionalOrder as ConditionalOrderSDK,
OrderBookApi,
Expand All @@ -30,6 +23,14 @@ import {
formatEpoch,
} from "@cowprotocol/cow-sdk";
import { ChainContext } from "../../services";
import { ConditionalOrder, OrderStatus } from "../../types";
import {
LoggerWithMethods,
formatStatus,
getLogger,
handleOnChainCustomError,
metrics,
} from "../../utils";
import { badOrder, policy } from "./filtering";
import { pollConditionalOrder } from "./poll";

Expand Down Expand Up @@ -91,6 +92,8 @@ const API_ERRORS_DROP: DropApiErrorsArray = [
// ApiErrors.IncompatibleSigningScheme - we control this in the watch-tower
// ApiErrors.AppDataHashMismatch - we never submit full appData

const CHUNK_SIZE = 20; // How many orders to process before saving

/**
* Watch for new blocks and check for orders to place
*
Expand Down Expand Up @@ -128,19 +131,29 @@ export async function checkForAndPlaceOrder(
blockNumber.toString(),
ownerCounter.toString()
);
const ordersPendingDelete = [];
// enumerate all the `ConditionalOrder`s for a given owner

let ordersPendingDelete = [];

log.debug(`Process owner ${owner} (${conditionalOrders.size} orders)`);

for (const conditionalOrder of conditionalOrders) {
orderCounter++;

// Check if we reached the chunk size
if (orderCounter % CHUNK_SIZE === 1 && orderCounter > 1) {
// Delete orders pending delete, if any
_deleteOrders(ordersPendingDelete, conditionalOrders, log, chainId);
// Reset tracker
ordersPendingDelete = [];

log.debug(`Processed ${orderCounter}, saving registry`);

// Save the registry after processing each chunk
await registry.write();
}

const ownerRef = `${ownerCounter}.${orderCounter}`;
const orderRef = `${chainId}:${ownerRef}@${blockNumber}`;
const log = getLogger(
"checkForAndPlaceOrder:checkForAndPlaceOrder",
chainId.toString(),
blockNumber.toString(),
ownerRef
);
const orderRef = `${chainId}:${blockNumber}:${ownerRef}`;
const logOrderDetails = `Processing order ${conditionalOrder.id} from TX ${conditionalOrder.tx} with params:`;

const { result: lastHint } = conditionalOrder.pollResult || {};
Expand All @@ -158,7 +171,6 @@ export async function checkForAndPlaceOrder(
case policy.FilterAction.DROP:
log.info("Dropping conditional order. Reason: AcceptPolicy: DROP");
ordersPendingDelete.push(conditionalOrder);

continue;
case policy.FilterAction.SKIP:
log.debug("Skipping conditional order. Reason: AcceptPolicy: SKIP");
Expand Down Expand Up @@ -217,7 +229,6 @@ export async function checkForAndPlaceOrder(
conditionalOrder.pollResult = {
lastExecutionTimestamp: blockTimestamp,
blockNumber: blockNumber,

result: pollResult,
};

Expand Down Expand Up @@ -247,15 +258,7 @@ export async function checkForAndPlaceOrder(
}

// Delete orders we don't want to keep watching
for (const conditionalOrder of ordersPendingDelete) {
const deleted = conditionalOrders.delete(conditionalOrder);
const action = deleted ? "Stop Watching" : "Failed to stop watching";

log.debug(
`${action} conditional order ${conditionalOrder.id} from TX ${conditionalOrder.tx}`
);
metrics.activeOrdersTotal.labels(chainId.toString()).dec();
}
_deleteOrders(ordersPendingDelete, conditionalOrders, log, chainId);
}

// It may be handy in other versions of the watch tower implemented in other languages
Expand All @@ -280,6 +283,24 @@ export async function checkForAndPlaceOrder(
throw Error(`At least one unexpected error processing conditional orders`);
}
}

function _deleteOrders(
ordersPendingDelete: ConditionalOrder[],
conditionalOrders: Set<ConditionalOrder>,
log: LoggerWithMethods,
chainId: SupportedChainId
) {
for (const conditionalOrder of ordersPendingDelete) {
const deleted = conditionalOrders.delete(conditionalOrder);
const action = deleted ? "Stop Watching" : "Failed to stop watching";

log.debug(
`${action} conditional order ${conditionalOrder.id} from TX ${conditionalOrder.tx}`
);
metrics.activeOrdersTotal.labels(chainId.toString()).dec();
}
}

async function _processConditionalOrder(
owner: string,
conditionalOrder: ConditionalOrder,
Expand Down Expand Up @@ -513,7 +534,7 @@ async function _placeOrder(params: {

// If the operation is a dry run, don't post to the API
log.info(`Post order ${orderUid} to OrderBook on chain ${chainId}`);
log.debug(`Post order details`, postOrder);
log.debug(`Post order ${orderUid} details`, postOrder);
if (!dryRun) {
const orderUid = await orderBookApi.sendOrder(postOrder);
metrics.orderBookDiscreteOrdersTotal.labels(...metricLabels).inc();
Expand Down
32 changes: 18 additions & 14 deletions src/services/chain.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
import {
Registry,
ApiBaseUrls,
OrderBookApi,
SupportedChainId,
} from "@cowprotocol/cow-sdk";
import { ethers, providers } from "ethers";
import { DBService } from ".";
import { addContract } from "../domain/events";
import { checkForAndPlaceOrder } from "../domain/polling";
import { policy } from "../domain/polling/filtering";
import {
ComposableCoW,
ConditionalOrderCreatedEvent,
ContextOptions,
Multicall3,
ComposableCoW,
Multicall3__factory,
Registry,
RegistryBlock,
blockToRegistryBlock,
ContextOptions,
} from "../types";
import {
SupportedChainId,
OrderBookApi,
ApiBaseUrls,
} from "@cowprotocol/cow-sdk";
import { addContract } from "../domain/events";
import { checkForAndPlaceOrder } from "../domain/polling";
import { ethers, providers } from "ethers";
import {
LoggerWithMethods,
composableCowContract,
getLogger,
isRunningInKubernetesPod,
metrics,
} from "../utils";
import { DBService } from ".";
import { policy } from "../domain/polling/filtering";

const WATCHDOG_FREQUENCY_SECS = 5; // 5 seconds
const WATCHDOG_TIMEOUT_DEFAULT_SECS = 30;
Expand Down Expand Up @@ -295,7 +295,7 @@ export class ChainContext {
oneShot ? "Chain watcher is in sync" : "Chain watcher is warmed up"
}`
);
log.debug(`Last processed block: ${lastProcessedBlock}`);
log.debug(`Last processed block: ${JSON.stringify(lastProcessedBlock)}`);

// If one-shot, return
if (oneShot) {
Expand Down Expand Up @@ -591,11 +591,15 @@ function _formatResult(result: boolean) {
}

function getProvider(rpcUrl: string): providers.Provider {
const log = getLogger("getProvider", rpcUrl);
// if the rpcUrl is a websocket url, use the WebSocketProvider
if (rpcUrl.startsWith("ws")) {
log.debug("Instantiating WS");
return new providers.WebSocketProvider(rpcUrl);
}

log.debug("Instantiating HTTP");

// otherwise, use the JsonRpcProvider
return new providers.JsonRpcProvider(rpcUrl);
}
Expand Down
2 changes: 2 additions & 0 deletions src/services/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ export class DBService {
}

public async open() {
const log = getLogger("dbService:open");
log.info("Opening database...");
await this.db.open();
}

Expand Down
17 changes: 14 additions & 3 deletions src/types/model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ import Slack = require("node-slack");

import { BytesLike, ethers } from "ethers";

import { ConditionalOrderParams, PollResult } from "@cowprotocol/cow-sdk";
import {
ConditionalOrderParams,
ConditionalOrder as ConditionalOrderSdk,
PollResult,
} from "@cowprotocol/cow-sdk";
import { DBService } from "../services";
import { metrics } from "../utils";
import { ConditionalOrder as ConditionalOrderSdk } from "@cowprotocol/cow-sdk";
import { getLogger, metrics } from "../utils";

// Standardise the storage key
const LAST_NOTIFIED_ERROR_STORAGE_KEY = "LAST_NOTIFIED_ERROR";
Expand Down Expand Up @@ -237,6 +240,14 @@ export class Registry {

// Write all atomically
await batch.write();

const log = getLogger(
`Registry:write:${this.version}:${this.network}:${
this.lastProcessedBlock?.number
}:${this.lastNotifiedError || ""}`
);

log.debug("batch written 📝");
}

public stringifyOrders(): string {
Expand Down
2 changes: 1 addition & 1 deletion src/types/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export interface Config {
deploymentBlock: number;
watchdogTimeout?: number;
/**
* Throttle block processing to only process blocks every N blocks. Set to 1 to process every block, 2 to process every other block, etc.
* Throttle block processing to only process blocks every N blocks. Set to 1 to process every block (default), 2 to process every other block, etc.
*/
processEveryNumBlocks?: number;
orderBookApi?: string;
Expand Down

0 comments on commit 4978b50

Please sign in to comment.