Skip to content

Commit

Permalink
feat(arweave): reintroduce arweave code (#1281)
Browse files Browse the repository at this point in the history
* feat(arweave): reintroduce arweave code

* improve: include explorer url for arweave

* improve: execute calls in parallel

* nit: EOL

* nit: add timing

* fix: re-run circle-ci

Signed-off-by: james-a-morris <[email protected]>

* fix: use debug

* chore: bump sdk version

* chore: parallelize arweave set/balance

---------

Signed-off-by: james-a-morris <[email protected]>
  • Loading branch information
james-a-morris authored Mar 13, 2024
1 parent 2b54331 commit 1215fe1
Show file tree
Hide file tree
Showing 10 changed files with 215 additions and 11 deletions.
16 changes: 16 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,22 @@ RELAYER_TOKENS='["0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2", "0xA0b86991c6218b
# },
#}'

# The dataworker can be configured to store tertiary bundle data on Arweave
# for debugging and auditing purposes. This is disabled by default and must be
# explicitly enabled for the dataworker to store bundle data on Arweave. When
# PERSIST_DATA_TO_ARWEAVE is set to "true", the variables ARWEAVE_WALLET_JWK
# and ARWEAVE_GATEWAY must be set to valid values.
PERSIST_DATA_TO_ARWEAVE=false

# This wallet JWK is used to sign transactions intended for permenant storage of bundle
# data on Arweave. Ensure that the wallet has enough AR to cover the cost of storage.
ARWEAVE_WALLET_JWK=$({"kty":"", "e":"", "n":"", "d":"", "p":"", "q":"", "dp":"", "dq":"", "qi":""})

# The Arweave gateway to use for storing bundle data. This is used to connect to the
# Arweave network and store bundle data. The default gateway is the official Arweave
# gateway. This can be changed to a custom gateway if desired.
ARWEAVE_GATEWAY=$({"url":"", "port": 443, "protocol":"https"})

################################################################################
########################### Testing Configuration ##############################
################################################################################
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"dependencies": {
"@across-protocol/constants-v2": "1.0.11",
"@across-protocol/contracts-v2": "2.5.0-beta.7",
"@across-protocol/sdk-v2": "0.22.11",
"@across-protocol/sdk-v2": "0.22.12",
"@arbitrum/sdk": "^3.1.3",
"@defi-wonderland/smock": "^2.3.5",
"@eth-optimism/sdk": "^3.2.1",
Expand All @@ -36,6 +36,7 @@
"lodash.get": "^4.4.2",
"minimist": "^1.2.8",
"redis4": "npm:redis@^4.1.0",
"superstruct": "^1.0.3",
"ts-node": "^10.9.1",
"winston": "^3.10.0",
"zksync-web3": "^0.14.3"
Expand Down
26 changes: 25 additions & 1 deletion src/dataworker/Dataworker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,23 @@ type RootBundle = {
tree: MerkleTree<SlowFillLeaf>;
};

export type BundleDataToPersistToDALayerType = {
bundleBlockRanges: number[][];
bundleDepositsV3: BundleDepositsV3;
expiredDepositsToRefundV3: ExpiredDepositsToRefundV3;
bundleFillsV3: BundleFillsV3;
unexecutableSlowFills: BundleExcessSlowFills;
bundleSlowFillsV3: BundleSlowFills;
};

type ProposeRootBundleReturnType = {
poolRebalanceLeaves: PoolRebalanceLeaf[];
poolRebalanceTree: MerkleTree<PoolRebalanceLeaf>;
relayerRefundLeaves: RelayerRefundLeaf[];
relayerRefundTree: MerkleTree<RelayerRefundLeaf>;
slowFillLeaves: SlowFillLeaf[];
slowFillTree: MerkleTree<SlowFillLeaf>;
dataToPersistToDALayer: BundleDataToPersistToDALayerType;
};

export type PoolRebalanceRoot = {
Expand Down Expand Up @@ -377,7 +387,7 @@ export class Dataworker {
usdThresholdToSubmitNewBundle?: BigNumber,
submitProposals = true,
earliestBlocksInSpokePoolClients: { [chainId: number]: number } = {}
): Promise<void> {
): Promise<BundleDataToPersistToDALayerType> {
// TODO: Handle the case where we can't get event data or even blockchain data from any chain. This will require
// some changes to override the bundle block range here, and loadData to skip chains with zero block ranges.
// For now, we assume that if one blockchain fails to return data, then this entire function will fail. This is a
Expand Down Expand Up @@ -489,6 +499,7 @@ export class Dataworker {
rootBundleData.slowFillTree.getHexRoot()
);
}
return rootBundleData.dataToPersistToDALayer;
}

async _proposeRootBundle(
Expand All @@ -510,6 +521,18 @@ export class Dataworker {
unexecutableSlowFills,
expiredDepositsToRefundV3,
} = await this.clients.bundleDataClient.loadData(blockRangesForProposal, spokePoolClients, logData);
// Prepare information about what we need to store to
// Arweave for the bundle. We will be doing this at a
// later point so that we can confirm that this data is
// worth storing.
const dataToPersistToDALayer = {
bundleBlockRanges: blockRangesForProposal,
bundleDepositsV3,
expiredDepositsToRefundV3,
bundleFillsV3,
unexecutableSlowFills,
bundleSlowFillsV3,
};
const allValidFillsInRange = getFillsInRange(
allValidFills,
blockRangesForProposal,
Expand Down Expand Up @@ -583,6 +606,7 @@ export class Dataworker {
relayerRefundTree: relayerRefundRoot.tree,
slowFillLeaves: slowRelayRoot.leaves,
slowFillTree: slowRelayRoot.tree,
dataToPersistToDALayer,
};
}

Expand Down
12 changes: 12 additions & 0 deletions src/dataworker/DataworkerClientHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ import { BundleDataClient, HubPoolClient, TokenClient } from "../clients";
import { getBlockForChain } from "./DataworkerUtils";
import { Dataworker } from "./Dataworker";
import { ProposedRootBundle, SpokePoolClientsByChain } from "../interfaces";
import { caching } from "@across-protocol/sdk-v2";

export interface DataworkerClients extends Clients {
tokenClient: TokenClient;
bundleDataClient: BundleDataClient;
arweaveClient: caching.ArweaveClient;
priceClient?: PriceClient;
}

Expand Down Expand Up @@ -56,11 +58,21 @@ export async function constructDataworkerClients(
new defiLlama.PriceFeed(),
]);

// Define the Arweave client.
const arweaveClient = new caching.ArweaveClient(
config.arweaveWalletJWK,
logger,
config.arweaveGateway?.url,
config.arweaveGateway?.protocol,
config.arweaveGateway?.port
);

return {
...commonClients,
bundleDataClient,
tokenClient,
priceClient,
arweaveClient,
};
}

Expand Down
27 changes: 27 additions & 0 deletions src/dataworker/DataworkerConfig.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
import { CommonConfig, ProcessEnv } from "../common";
import {
ArweaveGatewayInterface,
ArweaveGatewayInterfaceSS,
ArweaveWalletJWKInterface,
ArweaveWalletJWKInterfaceSS,
} from "../interfaces";
import { BigNumber, assert, toBNWei } from "../utils";

export class DataworkerConfig extends CommonConfig {
Expand Down Expand Up @@ -32,13 +38,19 @@ export class DataworkerConfig extends CommonConfig {
readonly sendingExecutionsEnabled: boolean;
readonly sendingFinalizationsEnabled: boolean;

// This variable should be set if the user wants to persist bundle data to Arweave.
readonly persistingBundleData: boolean;

// These variables allow the user to optimize dataworker run-time, which can slow down drastically because of all the
// historical events it needs to fetch and parse.
readonly dataworkerFastLookbackCount: number;
readonly dataworkerFastStartBundle: number | string;

readonly bufferToPropose: number;

readonly arweaveWalletJWK: ArweaveWalletJWKInterface;
readonly arweaveGateway: ArweaveGatewayInterface;

constructor(env: ProcessEnv) {
const {
ROOT_BUNDLE_EXECUTION_THRESHOLD,
Expand All @@ -58,6 +70,9 @@ export class DataworkerConfig extends CommonConfig {
DATAWORKER_FAST_START_BUNDLE,
FORCE_PROPOSAL,
FORCE_PROPOSAL_BUNDLE_RANGE,
PERSIST_BUNDLES_TO_ARWEAVE,
ARWEAVE_WALLET_JWK,
ARWEAVE_GATEWAY,
} = env;
super(env);

Expand Down Expand Up @@ -156,5 +171,17 @@ export class DataworkerConfig extends CommonConfig {
`dataworkerFastStartBundle=${this.dataworkerFastStartBundle} should be >= dataworkerFastLookbackCount=${this.dataworkerFastLookbackCount}`
);
}

this.persistingBundleData = PERSIST_BUNDLES_TO_ARWEAVE === "true";
if (this.persistingBundleData) {
// Load the Arweave wallet JWK from the environment.
const _arweaveWalletJWK = JSON.parse(ARWEAVE_WALLET_JWK ?? "{}");
assert(ArweaveWalletJWKInterfaceSS.is(_arweaveWalletJWK), "Invalid Arweave wallet JWK");
this.arweaveWalletJWK = _arweaveWalletJWK;
// Load the Arweave gateway from the environment.
const _arweaveGateway = JSON.parse(ARWEAVE_GATEWAY ?? "{}");
assert(ArweaveGatewayInterfaceSS.is(_arweaveGateway), "Invalid Arweave gateway");
this.arweaveGateway = _arweaveGateway;
}
}
}
49 changes: 48 additions & 1 deletion src/dataworker/DataworkerUtils.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import assert from "assert";
import { utils, typechain, interfaces } from "@across-protocol/sdk-v2";
import { utils, typechain, interfaces, caching } from "@across-protocol/sdk-v2";
import { SpokePoolClient } from "../clients";
import { spokesThatHoldEthAndWeth } from "../common/Constants";
import { CONTRACT_ADDRESSES } from "../common/ContractAddresses";
Expand Down Expand Up @@ -61,6 +61,7 @@ import {
ExpiredDepositsToRefundV3,
} from "../interfaces/BundleData";
export const { getImpliedBundleBlockRanges, getBlockRangeForChain, getBlockForChain } = utils;
import { any } from "superstruct";

export function getEndBlockBuffers(
chainIdListForBundleEvaluationBlockNumbers: number[],
Expand Down Expand Up @@ -716,3 +717,49 @@ export function l2TokensToCountTowardsSpokePoolLeafExecutionCapital(
const ethAndWeth = getWethAndEth(l2ChainId);
return ethAndWeth.includes(l2TokenAddress) ? ethAndWeth : [l2TokenAddress];
}

/**
* Persists data to Arweave with a given tag, given that the data doesn't
* already exist on Arweave with the tag.
* @param client The Arweave client to use for persistence.
* @param data The data to persist to Arweave.
* @param logger A winston logger
* @param tag The tag to use for the data.
*/
export async function persistDataToArweave(
client: caching.ArweaveClient,
data: Record<string, unknown>,
logger: winston.Logger,
tag?: string
): Promise<void> {
const startTime = Date.now();
// Check if data already exists on Arweave with the given tag.
// If so, we don't need to persist it again.
const matchingTxns = await client.getByTopic(tag, any());
if (matchingTxns.length > 0) {
logger.info({
at: "Dataworker#index",
message: `Data already exists on Arweave with tag: ${tag}`,
hash: matchingTxns.map((txn) => txn.hash),
});
} else {
const [hashTxn, address, balance] = await Promise.all([
client.set(data, tag),
client.getAddress(),
client.getBalance(),
]);
logger.info({
at: "Dataworker#index",
message: `Persisted data to Arweave with transaction hash: ${hashTxn}`,
hash: hashTxn,
explorerUrl: `https://arweave.net/${hashTxn}`,
address,
balance,
});
}
const endTime = Date.now();
logger.debug({
at: "Dataworker#index",
message: `Time to persist data to Arweave: ${endTime - startTime}ms`,
});
}
47 changes: 43 additions & 4 deletions src/dataworker/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
import { processEndPollingLoop, winston, config, startupLogLevel, Signer, disconnectRedisClients } from "../utils";
import {
processEndPollingLoop,
winston,
config,
startupLogLevel,
Signer,
disconnectRedisClients,
isDefined,
} from "../utils";
import { spokePoolClientsToProviders } from "../common";
import { Dataworker } from "./Dataworker";
import { BundleDataToPersistToDALayerType, Dataworker } from "./Dataworker";
import { DataworkerConfig } from "./DataworkerConfig";
import {
constructDataworkerClients,
Expand All @@ -9,6 +17,7 @@ import {
DataworkerClients,
} from "./DataworkerClientHelper";
import { BalanceAllocator } from "../clients/BalanceAllocator";
import { persistDataToArweave } from "./DataworkerUtils";

config();
let logger: winston.Logger;
Expand Down Expand Up @@ -53,6 +62,7 @@ export async function runDataworker(_logger: winston.Logger, baseSigner: Signer)
});
loopStart = Date.now();

let bundleDataToPersist: BundleDataToPersistToDALayerType | undefined = undefined;
try {
logger[startupLogLevel(config)]({ at: "Dataworker#index", message: "Dataworker started 👩‍🔬", config });

Expand Down Expand Up @@ -107,7 +117,7 @@ export async function runDataworker(_logger: winston.Logger, baseSigner: Signer)
}

if (config.proposerEnabled) {
await dataworker.proposeRootBundle(
bundleDataToPersist = await dataworker.proposeRootBundle(
spokePoolClients,
config.rootBundleExecutionThreshold,
config.sendingProposalsEnabled,
Expand Down Expand Up @@ -144,7 +154,36 @@ export async function runDataworker(_logger: winston.Logger, baseSigner: Signer)
logger[startupLogLevel(config)]({ at: "Dataworker#index", message: "Executor disabled" });
}

await clients.multiCallerClient.executeTransactionQueue();
// Define a helper function to persist the bundle data to the DALayer.
const persistBundle = async () => {
// Submit the bundle data to persist to the DALayer if persistingBundleData is enabled.
// Note: The check for `bundleDataToPersist` is necessary for TSC to be happy.
if (config.persistingBundleData && isDefined(bundleDataToPersist)) {
await persistDataToArweave(
clients.arweaveClient,
bundleDataToPersist,
logger,
`bundles-${bundleDataToPersist.bundleBlockRanges}`
);
}
};

// We want to persist the bundle data to the DALayer *AND* execute the multiCall transaction queue
// in parallel. We want to have both of these operations complete, even if one of them fails.
const [persistResult, multiCallResult] = await Promise.allSettled([
persistBundle(),
clients.multiCallerClient.executeTransactionQueue(),
]);

// If either of the operations failed, log the error.
if (persistResult.status === "rejected" || multiCallResult.status === "rejected") {
logger.error({
at: "Dataworker#index",
message: "Failed to persist bundle data to the DALayer or execute the multiCall transaction queue",
persistResult: persistResult.status === "rejected" ? persistResult.reason : undefined,
multiCallResult: multiCallResult.status === "rejected" ? multiCallResult.reason : undefined,
});
}

logger.debug({
at: "Dataworker#index",
Expand Down
37 changes: 37 additions & 0 deletions src/interfaces/Arweave.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { number, object, optional, string } from "superstruct";

export type ArweaveWalletJWKInterface = {
kty: string;
e: string;
n: string;
d?: string;
p?: string;
q?: string;
dp?: string;
dq?: string;
qi?: string;
};

export type ArweaveGatewayInterface = {
url: string;
protocol: string;
port: number;
};

export const ArweaveWalletJWKInterfaceSS = object({
kty: string(),
e: string(),
n: string(),
d: optional(string()),
p: optional(string()),
q: optional(string()),
dp: optional(string()),
dq: optional(string()),
qi: optional(string()),
});

export const ArweaveGatewayInterfaceSS = object({
url: string(),
protocol: string(),
port: number(),
});
1 change: 1 addition & 0 deletions src/interfaces/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export * from "./SpokePool";
export * from "./Token";
export * from "./Error";
export * from "./Report";
export * from "./Arweave";

// Bridge interfaces
export type OutstandingTransfers = interfaces.OutstandingTransfers;
Expand Down
Loading

0 comments on commit 1215fe1

Please sign in to comment.