diff --git a/packages/agents/lighthouse/src/tasks/prover/operations/process.ts b/packages/agents/lighthouse/src/tasks/prover/operations/process.ts index 4370e11553..b24f3e9133 100644 --- a/packages/agents/lighthouse/src/tasks/prover/operations/process.ts +++ b/packages/agents/lighthouse/src/tasks/prover/operations/process.ts @@ -7,6 +7,7 @@ import { RequestContext, XMessage, ExecStatus, + DBHelper, } from "@connext/nxtp-utils"; import { @@ -16,7 +17,7 @@ import { MessageRootVerificationFailed, } from "../../../errors"; import { sendWithRelayerWithBackup } from "../../../mockable"; -import { HubDBHelper, SpokeDBHelper } from "../adapters"; +import { HubDBHelper, SpokeDBHelper, OptimisticHubDBHelper } from "../adapters"; import { getContext } from "../prover"; import { BrokerMessage, ProofStruct } from "./types"; @@ -38,6 +39,7 @@ export const processMessages = async (brokerMessage: BrokerMessage, _requestCont messageRootCount, aggregateRoot, aggregateRootCount, + snapshotRoots, } = brokerMessage; // Dedup the batch @@ -65,17 +67,27 @@ export const processMessages = async (brokerMessage: BrokerMessage, _requestCont ); const spokeSMT = new SparseMerkleTree(spokeStore); - const hubStore = new HubDBHelper( - "hub", - aggregateRootCount, - { - reader: database, - writer: databaseWriter, - }, - cache.messages, - ); - const hubSMT = new SparseMerkleTree(hubStore); + let hubStore: DBHelper; + if (snapshotRoots.length == 0) { + hubStore = new HubDBHelper( + "hub", + aggregateRootCount, + { + reader: database, + writer: databaseWriter, + }, + cache.messages, + ); + } else { + const baseAggregateRootCount = aggregateRootCount - snapshotRoots.length; + const baseAggregateRoots: string[] = await database.getAggregateRoots(baseAggregateRootCount); + const opRoots = baseAggregateRoots.concat(snapshotRoots); + // Count of leafs in aggregate tree at targetAggregateRoot. + hubStore = new OptimisticHubDBHelper(opRoots, aggregateRootCount); + } + + const hubSMT = new SparseMerkleTree(hubStore); const destinationSpokeConnector = config.chains[destinationDomain]?.deployments.spokeConnector; if (!destinationSpokeConnector) { throw new NoDestinationDomainForProof(destinationDomain); diff --git a/packages/agents/lighthouse/src/tasks/prover/operations/publisher.ts b/packages/agents/lighthouse/src/tasks/prover/operations/publisher.ts index 225b18362e..d6cd5ee062 100644 --- a/packages/agents/lighthouse/src/tasks/prover/operations/publisher.ts +++ b/packages/agents/lighthouse/src/tasks/prover/operations/publisher.ts @@ -11,6 +11,7 @@ import { ExecStatus, RelayerTaskStatus, ModeType, + Snapshot, } from "@connext/nxtp-utils"; import { @@ -176,10 +177,15 @@ export const enqueue = async () => { .filter((domain) => domain != destinationDomain) .map(async (originDomain) => { let latestMessageRoot: RootMessage | undefined = undefined; + let aggregateRootCount: number | undefined; + let targetMessageRoot: string; + let messageRootCount: number | undefined; + let messageRootIndex: number | undefined; + let snapshot: Snapshot | undefined; const targetAggregateRoot: ReceivedAggregateRoot = curDestAggRoots[0]; try { // Slowmode - if (mode == ModeType.SlowMode) { + if (mode === ModeType.SlowMode) { // Slowmode for (const destAggregateRoot of curDestAggRoots) { latestMessageRoot = await database.getLatestMessageRoot(originDomain, destAggregateRoot.root); @@ -188,9 +194,22 @@ export const enqueue = async () => { if (!latestMessageRoot) { throw new NoTargetMessageRoot(originDomain); } + // Count of leafs in aggregate tree at targetAggregateRoot. + aggregateRootCount = await database.getAggregateRootCount(targetAggregateRoot.root); + if (!aggregateRootCount) { + throw new NoAggregateRootCount(targetAggregateRoot.root); + } + + targetMessageRoot = latestMessageRoot.root; + + // Index of messageRoot leaf node in aggregate tree. + messageRootIndex = await database.getMessageRootIndex(config.hubDomain, targetMessageRoot); + if (messageRootIndex === undefined) { + throw new NoMessageRootIndex(originDomain, targetMessageRoot); + } } else { // Optimistic mode - const snapshot = await database.getPendingAggregateRoot(targetAggregateRoot.root); + snapshot = await database.getPendingAggregateRoot(targetAggregateRoot.root); if (snapshot) { logger.debug("Got pending snapshot", requestContext, methodContext, { snapshot, @@ -207,36 +226,29 @@ export const enqueue = async () => { if (domainIndex === -1) { throw new NoDomainInSnapshot(originDomain, snapshot); } - const messageRoot = snapshot.roots[domainIndex]; - latestMessageRoot = await database.getRootMessage(originDomain, messageRoot); - if (!latestMessageRoot) { + targetMessageRoot = snapshot.roots[domainIndex]; + if (!targetMessageRoot) { throw new NoTargetMessageRoot(originDomain); } + // Count of leafs in aggregate tree at snapshot baseAggregateRoot. + const _baseAggregateRootCount = await database.getAggregateRootCount(snapshot.baseAggregateRoot); + if (!_baseAggregateRootCount) { + throw new NoAggregateRootCount(snapshot.baseAggregateRoot); + } + aggregateRootCount = snapshot.roots.length + _baseAggregateRootCount!; + messageRootIndex = _baseAggregateRootCount + domainIndex; } - const targetMessageRoot = latestMessageRoot.root; // Count of leaf nodes in origin domain`s outbound tree with the targetMessageRoot as root - const messageRootCount = await database.getMessageRootCount(originDomain, targetMessageRoot); + messageRootCount = await database.getMessageRootCount(originDomain, targetMessageRoot); if (messageRootCount === undefined) { throw new NoMessageRootCount(originDomain, targetMessageRoot); } - // Index of messageRoot leaf node in aggregate tree. - const messageRootIndex = await database.getMessageRootIndex(config.hubDomain, targetMessageRoot); - if (messageRootIndex === undefined) { - throw new NoMessageRootIndex(originDomain, targetMessageRoot); - } - - // Count of leafs in aggregate tree at targetAggregateRoot. - const aggregateRootCount = await database.getAggregateRootCount(targetAggregateRoot.root); - if (!aggregateRootCount) { - throw new NoAggregateRootCount(targetAggregateRoot.root); - } - const batchSize = config.proverBatchSize[destinationDomain] ?? DEFAULT_PROVER_BATCH_SIZE; const pendingMessages = await getUnProcessedMessagesByIndex( originDomain, destinationDomain, - latestMessageRoot.count, + messageRootCount, ); // Paginate through all unprocessed messages from the domain @@ -246,14 +258,14 @@ export const enqueue = async () => { const unprocessed = pendingMessages.slice(offset, offset + batchSize); const subContext = createRequestContext( "processUnprocessedMessages", - `${originDomain}-${destinationDomain}-${offset}-${latestMessageRoot.root}`, + `${originDomain}-${destinationDomain}-${offset}-${targetMessageRoot}`, ); if (unprocessed.length > 0) { logger.info("Got unprocessed messages for origin and destination pair", subContext, methodContext, { unprocessed: unprocessed.map((message) => message.leaf), originDomain, destinationDomain, - endIndex: latestMessageRoot.count, + endIndex: messageRootCount, offset, }); @@ -266,6 +278,7 @@ export const enqueue = async () => { messageRootCount, targetAggregateRoot.root, aggregateRootCount, + snapshot ? snapshot.roots : [], subContext, ); if (brokerMessage) { @@ -287,7 +300,7 @@ export const enqueue = async () => { { originDomain, destinationDomain, - endIndex: latestMessageRoot.count, + endIndex: messageRootCount, offset, brokerMessage, }, @@ -357,6 +370,7 @@ export const createBrokerMessage = async ( messageRootCount: number, targetAggregateRoot: string, aggregateRootCount: number, + snapshotRoots: string[], _requestContext: RequestContext, ): Promise => { const { @@ -425,6 +439,7 @@ export const createBrokerMessage = async ( messageRootCount, aggregateRoot: targetAggregateRoot, aggregateRootCount, + snapshotRoots, }; }; diff --git a/packages/agents/lighthouse/src/tasks/prover/operations/types.ts b/packages/agents/lighthouse/src/tasks/prover/operations/types.ts index 977779584f..0fc09f05a1 100644 --- a/packages/agents/lighthouse/src/tasks/prover/operations/types.ts +++ b/packages/agents/lighthouse/src/tasks/prover/operations/types.ts @@ -15,4 +15,5 @@ export type BrokerMessage = { messageRootCount: number; aggregateRoot: string; aggregateRootCount: number; + snapshotRoots: string[]; };