Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: save orders more often #159

Merged
merged 5 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 49 additions & 28 deletions src/domain/polling/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,6 @@ import {
import { ethers } from "ethers";
import { BytesLike } from "ethers/lib/utils";

import { ConditionalOrder, OrderStatus } from "../../types";
import {
formatStatus,
getLogger,
handleOnChainCustomError,
metrics,
} from "../../utils";
import {
ConditionalOrder as ConditionalOrderSDK,
OrderBookApi,
Expand All @@ -30,6 +23,14 @@ import {
formatEpoch,
} from "@cowprotocol/cow-sdk";
import { ChainContext } from "../../services";
import { ConditionalOrder, OrderStatus } from "../../types";
import {
LoggerWithMethods,
formatStatus,
getLogger,
handleOnChainCustomError,
metrics,
} from "../../utils";
import { badOrder, policy } from "./filtering";
import { pollConditionalOrder } from "./poll";

Expand Down Expand Up @@ -91,6 +92,8 @@ const API_ERRORS_DROP: DropApiErrorsArray = [
// ApiErrors.IncompatibleSigningScheme - we control this in the watch-tower
// ApiErrors.AppDataHashMismatch - we never submit full appData

const CHUNK_SIZE = 20; // How many orders to process before saving

/**
* Watch for new blocks and check for orders to place
*
Expand Down Expand Up @@ -128,19 +131,29 @@ export async function checkForAndPlaceOrder(
blockNumber.toString(),
ownerCounter.toString()
);
const ordersPendingDelete = [];
// enumerate all the `ConditionalOrder`s for a given owner

let ordersPendingDelete = [];

log.debug(`Process owner ${owner} (${conditionalOrders.size} orders)`);

for (const conditionalOrder of conditionalOrders) {
orderCounter++;

// Check if we reached the chunk size
if (orderCounter % CHUNK_SIZE === 1 && orderCounter > 1) {
// Delete orders pending delete, if any
_deleteOrders(ordersPendingDelete, conditionalOrders, log, chainId);
// Reset tracker
ordersPendingDelete = [];

log.debug(`Processed ${orderCounter}, saving registry`);

// Save the registry after processing each chunk
await registry.write();
}

const ownerRef = `${ownerCounter}.${orderCounter}`;
const orderRef = `${chainId}:${ownerRef}@${blockNumber}`;
const log = getLogger(
"checkForAndPlaceOrder:checkForAndPlaceOrder",
chainId.toString(),
blockNumber.toString(),
ownerRef
);
const orderRef = `${chainId}:${blockNumber}:${ownerRef}`;
const logOrderDetails = `Processing order ${conditionalOrder.id} from TX ${conditionalOrder.tx} with params:`;

const { result: lastHint } = conditionalOrder.pollResult || {};
Expand All @@ -158,7 +171,6 @@ export async function checkForAndPlaceOrder(
case policy.FilterAction.DROP:
log.info("Dropping conditional order. Reason: AcceptPolicy: DROP");
ordersPendingDelete.push(conditionalOrder);

continue;
case policy.FilterAction.SKIP:
log.debug("Skipping conditional order. Reason: AcceptPolicy: SKIP");
Expand Down Expand Up @@ -217,7 +229,6 @@ export async function checkForAndPlaceOrder(
conditionalOrder.pollResult = {
lastExecutionTimestamp: blockTimestamp,
blockNumber: blockNumber,

result: pollResult,
};

Expand Down Expand Up @@ -247,15 +258,7 @@ export async function checkForAndPlaceOrder(
}

// Delete orders we don't want to keep watching
for (const conditionalOrder of ordersPendingDelete) {
const deleted = conditionalOrders.delete(conditionalOrder);
const action = deleted ? "Stop Watching" : "Failed to stop watching";

log.debug(
`${action} conditional order ${conditionalOrder.id} from TX ${conditionalOrder.tx}`
);
metrics.activeOrdersTotal.labels(chainId.toString()).dec();
}
_deleteOrders(ordersPendingDelete, conditionalOrders, log, chainId);
}

// It may be handy in other versions of the watch tower implemented in other languages
Expand All @@ -280,6 +283,24 @@ export async function checkForAndPlaceOrder(
throw Error(`At least one unexpected error processing conditional orders`);
}
}

function _deleteOrders(
ordersPendingDelete: ConditionalOrder[],
conditionalOrders: Set<ConditionalOrder>,
log: LoggerWithMethods,
chainId: SupportedChainId
) {
for (const conditionalOrder of ordersPendingDelete) {
const deleted = conditionalOrders.delete(conditionalOrder);
const action = deleted ? "Stop Watching" : "Failed to stop watching";

log.debug(
`${action} conditional order ${conditionalOrder.id} from TX ${conditionalOrder.tx}`
);
metrics.activeOrdersTotal.labels(chainId.toString()).dec();
}
}

async function _processConditionalOrder(
owner: string,
conditionalOrder: ConditionalOrder,
Expand Down Expand Up @@ -513,7 +534,7 @@ async function _placeOrder(params: {

// If the operation is a dry run, don't post to the API
log.info(`Post order ${orderUid} to OrderBook on chain ${chainId}`);
log.debug(`Post order details`, postOrder);
log.debug(`Post order ${orderUid} details`, postOrder);
if (!dryRun) {
const orderUid = await orderBookApi.sendOrder(postOrder);
metrics.orderBookDiscreteOrdersTotal.labels(...metricLabels).inc();
Expand Down
32 changes: 18 additions & 14 deletions src/services/chain.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
import {
Registry,
ApiBaseUrls,
OrderBookApi,
SupportedChainId,
} from "@cowprotocol/cow-sdk";
import { ethers, providers } from "ethers";
import { DBService } from ".";
import { addContract } from "../domain/events";
import { checkForAndPlaceOrder } from "../domain/polling";
import { policy } from "../domain/polling/filtering";
import {
ComposableCoW,
ConditionalOrderCreatedEvent,
ContextOptions,
Multicall3,
ComposableCoW,
Multicall3__factory,
Registry,
RegistryBlock,
blockToRegistryBlock,
ContextOptions,
} from "../types";
import {
SupportedChainId,
OrderBookApi,
ApiBaseUrls,
} from "@cowprotocol/cow-sdk";
import { addContract } from "../domain/events";
import { checkForAndPlaceOrder } from "../domain/polling";
import { ethers, providers } from "ethers";
import {
LoggerWithMethods,
composableCowContract,
getLogger,
isRunningInKubernetesPod,
metrics,
} from "../utils";
import { DBService } from ".";
import { policy } from "../domain/polling/filtering";

const WATCHDOG_FREQUENCY_SECS = 5; // 5 seconds
const WATCHDOG_TIMEOUT_DEFAULT_SECS = 30;
Expand Down Expand Up @@ -295,7 +295,7 @@ export class ChainContext {
oneShot ? "Chain watcher is in sync" : "Chain watcher is warmed up"
}`
);
log.debug(`Last processed block: ${lastProcessedBlock}`);
log.debug(`Last processed block: ${JSON.stringify(lastProcessedBlock)}`);

// If one-shot, return
if (oneShot) {
Expand Down Expand Up @@ -591,11 +591,15 @@ function _formatResult(result: boolean) {
}

function getProvider(rpcUrl: string): providers.Provider {
const log = getLogger("getProvider", rpcUrl);
// if the rpcUrl is a websocket url, use the WebSocketProvider
if (rpcUrl.startsWith("ws")) {
log.debug("Instantiating WS");
return new providers.WebSocketProvider(rpcUrl);
}

log.debug("Instantiating HTTP");

// otherwise, use the JsonRpcProvider
return new providers.JsonRpcProvider(rpcUrl);
}
Expand Down
2 changes: 2 additions & 0 deletions src/services/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ export class DBService {
}

public async open() {
const log = getLogger("dbService:open");
log.info("Opening database...");
await this.db.open();
}

Expand Down
17 changes: 14 additions & 3 deletions src/types/model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ import Slack = require("node-slack");

import { BytesLike, ethers } from "ethers";

import { ConditionalOrderParams, PollResult } from "@cowprotocol/cow-sdk";
import {
ConditionalOrderParams,
ConditionalOrder as ConditionalOrderSdk,
PollResult,
} from "@cowprotocol/cow-sdk";
import { DBService } from "../services";
import { metrics } from "../utils";
import { ConditionalOrder as ConditionalOrderSdk } from "@cowprotocol/cow-sdk";
import { getLogger, metrics } from "../utils";

// Standardise the storage key
const LAST_NOTIFIED_ERROR_STORAGE_KEY = "LAST_NOTIFIED_ERROR";
Expand Down Expand Up @@ -237,6 +240,14 @@ export class Registry {

// Write all atomically
await batch.write();

const log = getLogger(
`Registry:write:${this.version}:${this.network}:${
this.lastProcessedBlock?.number
}:${this.lastNotifiedError || ""}`
);

log.debug("batch written 📝");
}

public stringifyOrders(): string {
Expand Down
2 changes: 1 addition & 1 deletion src/types/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export interface Config {
deploymentBlock: number;
watchdogTimeout?: number;
/**
* Throttle block processing to only process blocks every N blocks. Set to 1 to process every block, 2 to process every other block, etc.
* Throttle block processing to only process blocks every N blocks. Set to 1 to process every block (default), 2 to process every other block, etc.
*/
processEveryNumBlocks?: number;
orderBookApi?: string;
Expand Down
Loading