Skip to content

Commit

Permalink
fix: resolve lh prover for op move v1
Browse files Browse the repository at this point in the history
  • Loading branch information
preethamr committed Oct 16, 2023
1 parent 51294c4 commit 8727df9
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 34 deletions.
34 changes: 23 additions & 11 deletions packages/agents/lighthouse/src/tasks/prover/operations/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
RequestContext,
XMessage,
ExecStatus,
DBHelper,
} from "@connext/nxtp-utils";

import {
Expand All @@ -16,7 +17,7 @@ import {
MessageRootVerificationFailed,
} from "../../../errors";
import { sendWithRelayerWithBackup } from "../../../mockable";
import { HubDBHelper, SpokeDBHelper } from "../adapters";
import { HubDBHelper, SpokeDBHelper, OptimisticHubDBHelper } from "../adapters";
import { getContext } from "../prover";

import { BrokerMessage, ProofStruct } from "./types";
Expand All @@ -38,6 +39,7 @@ export const processMessages = async (brokerMessage: BrokerMessage, _requestCont
messageRootCount,
aggregateRoot,
aggregateRootCount,
snapshotRoots,
} = brokerMessage;

// Dedup the batch
Expand Down Expand Up @@ -65,17 +67,27 @@ export const processMessages = async (brokerMessage: BrokerMessage, _requestCont
);
const spokeSMT = new SparseMerkleTree(spokeStore);

const hubStore = new HubDBHelper(
"hub",
aggregateRootCount,
{
reader: database,
writer: databaseWriter,
},
cache.messages,
);
const hubSMT = new SparseMerkleTree(hubStore);
let hubStore: DBHelper;
if (snapshotRoots.length == 0) {
hubStore = new HubDBHelper(
"hub",
aggregateRootCount,
{
reader: database,
writer: databaseWriter,
},
cache.messages,
);
} else {
const baseAggregateRootCount = aggregateRootCount - snapshotRoots.length;
const baseAggregateRoots: string[] = await database.getAggregateRoots(baseAggregateRootCount);
const opRoots = baseAggregateRoots.concat(snapshotRoots);

// Count of leafs in aggregate tree at targetAggregateRoot.
hubStore = new OptimisticHubDBHelper(opRoots, aggregateRootCount);
}

const hubSMT = new SparseMerkleTree(hubStore);
const destinationSpokeConnector = config.chains[destinationDomain]?.deployments.spokeConnector;
if (!destinationSpokeConnector) {
throw new NoDestinationDomainForProof(destinationDomain);
Expand Down
61 changes: 38 additions & 23 deletions packages/agents/lighthouse/src/tasks/prover/operations/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
ExecStatus,
RelayerTaskStatus,
ModeType,
Snapshot,
} from "@connext/nxtp-utils";

import {
Expand Down Expand Up @@ -176,10 +177,15 @@ export const enqueue = async () => {
.filter((domain) => domain != destinationDomain)
.map(async (originDomain) => {
let latestMessageRoot: RootMessage | undefined = undefined;
let aggregateRootCount: number | undefined;
let targetMessageRoot: string;
let messageRootCount: number | undefined;
let messageRootIndex: number | undefined;
let snapshot: Snapshot | undefined;
const targetAggregateRoot: ReceivedAggregateRoot = curDestAggRoots[0];
try {
// Slowmode
if (mode == ModeType.SlowMode) {
if (mode === ModeType.SlowMode) {
// Slowmode
for (const destAggregateRoot of curDestAggRoots) {
latestMessageRoot = await database.getLatestMessageRoot(originDomain, destAggregateRoot.root);
Expand All @@ -188,9 +194,22 @@ export const enqueue = async () => {
if (!latestMessageRoot) {
throw new NoTargetMessageRoot(originDomain);
}
// Count of leafs in aggregate tree at targetAggregateRoot.
aggregateRootCount = await database.getAggregateRootCount(targetAggregateRoot.root);
if (!aggregateRootCount) {
throw new NoAggregateRootCount(targetAggregateRoot.root);
}

targetMessageRoot = latestMessageRoot.root;

// Index of messageRoot leaf node in aggregate tree.
messageRootIndex = await database.getMessageRootIndex(config.hubDomain, targetMessageRoot);
if (messageRootIndex === undefined) {
throw new NoMessageRootIndex(originDomain, targetMessageRoot);
}
} else {
// Optimistic mode
const snapshot = await database.getPendingAggregateRoot(targetAggregateRoot.root);
snapshot = await database.getPendingAggregateRoot(targetAggregateRoot.root);
if (snapshot) {
logger.debug("Got pending snapshot", requestContext, methodContext, {
snapshot,
Expand All @@ -207,36 +226,29 @@ export const enqueue = async () => {
if (domainIndex === -1) {
throw new NoDomainInSnapshot(originDomain, snapshot);
}
const messageRoot = snapshot.roots[domainIndex];
latestMessageRoot = await database.getRootMessage(originDomain, messageRoot);
if (!latestMessageRoot) {
targetMessageRoot = snapshot.roots[domainIndex];
if (!targetMessageRoot) {
throw new NoTargetMessageRoot(originDomain);
}
// Count of leafs in aggregate tree at snapshot baseAggregateRoot.
const _baseAggregateRootCount = await database.getAggregateRootCount(snapshot.baseAggregateRoot);
if (!_baseAggregateRootCount) {
throw new NoAggregateRootCount(snapshot.baseAggregateRoot);
}
aggregateRootCount = snapshot.roots.length + _baseAggregateRootCount!;
messageRootIndex = _baseAggregateRootCount + domainIndex;
}

const targetMessageRoot = latestMessageRoot.root;
// Count of leaf nodes in origin domain`s outbound tree with the targetMessageRoot as root
const messageRootCount = await database.getMessageRootCount(originDomain, targetMessageRoot);
messageRootCount = await database.getMessageRootCount(originDomain, targetMessageRoot);
if (messageRootCount === undefined) {
throw new NoMessageRootCount(originDomain, targetMessageRoot);
}
// Index of messageRoot leaf node in aggregate tree.
const messageRootIndex = await database.getMessageRootIndex(config.hubDomain, targetMessageRoot);
if (messageRootIndex === undefined) {
throw new NoMessageRootIndex(originDomain, targetMessageRoot);
}

// Count of leafs in aggregate tree at targetAggregateRoot.
const aggregateRootCount = await database.getAggregateRootCount(targetAggregateRoot.root);
if (!aggregateRootCount) {
throw new NoAggregateRootCount(targetAggregateRoot.root);
}

const batchSize = config.proverBatchSize[destinationDomain] ?? DEFAULT_PROVER_BATCH_SIZE;
const pendingMessages = await getUnProcessedMessagesByIndex(
originDomain,
destinationDomain,
latestMessageRoot.count,
messageRootCount,
);

// Paginate through all unprocessed messages from the domain
Expand All @@ -246,14 +258,14 @@ export const enqueue = async () => {
const unprocessed = pendingMessages.slice(offset, offset + batchSize);
const subContext = createRequestContext(
"processUnprocessedMessages",
`${originDomain}-${destinationDomain}-${offset}-${latestMessageRoot.root}`,
`${originDomain}-${destinationDomain}-${offset}-${targetMessageRoot}`,
);
if (unprocessed.length > 0) {
logger.info("Got unprocessed messages for origin and destination pair", subContext, methodContext, {
unprocessed: unprocessed.map((message) => message.leaf),
originDomain,
destinationDomain,
endIndex: latestMessageRoot.count,
endIndex: messageRootCount,
offset,
});

Expand All @@ -266,6 +278,7 @@ export const enqueue = async () => {
messageRootCount,
targetAggregateRoot.root,
aggregateRootCount,
snapshot ? snapshot.roots : [],
subContext,
);
if (brokerMessage) {
Expand All @@ -287,7 +300,7 @@ export const enqueue = async () => {
{
originDomain,
destinationDomain,
endIndex: latestMessageRoot.count,
endIndex: messageRootCount,
offset,
brokerMessage,
},
Expand Down Expand Up @@ -357,6 +370,7 @@ export const createBrokerMessage = async (
messageRootCount: number,
targetAggregateRoot: string,
aggregateRootCount: number,
snapshotRoots: string[],
_requestContext: RequestContext,
): Promise<BrokerMessage | undefined> => {
const {
Expand Down Expand Up @@ -425,6 +439,7 @@ export const createBrokerMessage = async (
messageRootCount,
aggregateRoot: targetAggregateRoot,
aggregateRootCount,
snapshotRoots,
};
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ export type BrokerMessage = {
messageRootCount: number;
aggregateRoot: string;
aggregateRootCount: number;
snapshotRoots: string[];
};

0 comments on commit 8727df9

Please sign in to comment.