Skip to content

Commit

Permalink
Merge pull request #5014 from connext/prover_op_mode
Browse files Browse the repository at this point in the history
Prover op mode fixes
  • Loading branch information
wanglonghong authored Oct 16, 2023
2 parents 51294c4 + bea7274 commit 951667e
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 36 deletions.
34 changes: 23 additions & 11 deletions packages/agents/lighthouse/src/tasks/prover/operations/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
RequestContext,
XMessage,
ExecStatus,
DBHelper,
} from "@connext/nxtp-utils";

import {
Expand All @@ -16,7 +17,7 @@ import {
MessageRootVerificationFailed,
} from "../../../errors";
import { sendWithRelayerWithBackup } from "../../../mockable";
import { HubDBHelper, SpokeDBHelper } from "../adapters";
import { HubDBHelper, SpokeDBHelper, OptimisticHubDBHelper } from "../adapters";
import { getContext } from "../prover";

import { BrokerMessage, ProofStruct } from "./types";
Expand All @@ -38,6 +39,7 @@ export const processMessages = async (brokerMessage: BrokerMessage, _requestCont
messageRootCount,
aggregateRoot,
aggregateRootCount,
snapshotRoots,
} = brokerMessage;

// Dedup the batch
Expand Down Expand Up @@ -65,17 +67,27 @@ export const processMessages = async (brokerMessage: BrokerMessage, _requestCont
);
const spokeSMT = new SparseMerkleTree(spokeStore);

const hubStore = new HubDBHelper(
"hub",
aggregateRootCount,
{
reader: database,
writer: databaseWriter,
},
cache.messages,
);
const hubSMT = new SparseMerkleTree(hubStore);
let hubStore: DBHelper;
if (snapshotRoots.length == 0) {
hubStore = new HubDBHelper(
"hub",
aggregateRootCount,
{
reader: database,
writer: databaseWriter,
},
cache.messages,
);
} else {
const baseAggregateRootCount = aggregateRootCount - snapshotRoots.length;
const baseAggregateRoots: string[] = await database.getAggregateRoots(baseAggregateRootCount);
const opRoots = baseAggregateRoots.concat(snapshotRoots);

// Count of leafs in aggregate tree at targetAggregateRoot.
hubStore = new OptimisticHubDBHelper(opRoots, aggregateRootCount);
}

const hubSMT = new SparseMerkleTree(hubStore);
const destinationSpokeConnector = config.chains[destinationDomain]?.deployments.spokeConnector;
if (!destinationSpokeConnector) {
throw new NoDestinationDomainForProof(destinationDomain);
Expand Down
61 changes: 38 additions & 23 deletions packages/agents/lighthouse/src/tasks/prover/operations/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
ExecStatus,
RelayerTaskStatus,
ModeType,
Snapshot,
} from "@connext/nxtp-utils";

import {
Expand Down Expand Up @@ -176,10 +177,15 @@ export const enqueue = async () => {
.filter((domain) => domain != destinationDomain)
.map(async (originDomain) => {
let latestMessageRoot: RootMessage | undefined = undefined;
let aggregateRootCount: number | undefined;
let targetMessageRoot: string;
let messageRootCount: number | undefined;
let messageRootIndex: number | undefined;
let snapshot: Snapshot | undefined;
const targetAggregateRoot: ReceivedAggregateRoot = curDestAggRoots[0];
try {
// Slowmode
if (mode == ModeType.SlowMode) {
if (mode === ModeType.SlowMode) {
// Slowmode
for (const destAggregateRoot of curDestAggRoots) {
latestMessageRoot = await database.getLatestMessageRoot(originDomain, destAggregateRoot.root);
Expand All @@ -188,9 +194,22 @@ export const enqueue = async () => {
if (!latestMessageRoot) {
throw new NoTargetMessageRoot(originDomain);
}
// Count of leafs in aggregate tree at targetAggregateRoot.
aggregateRootCount = await database.getAggregateRootCount(targetAggregateRoot.root);
if (!aggregateRootCount) {
throw new NoAggregateRootCount(targetAggregateRoot.root);
}

targetMessageRoot = latestMessageRoot.root;

// Index of messageRoot leaf node in aggregate tree.
messageRootIndex = await database.getMessageRootIndex(config.hubDomain, targetMessageRoot);
if (messageRootIndex === undefined) {
throw new NoMessageRootIndex(originDomain, targetMessageRoot);
}
} else {
// Optimistic mode
const snapshot = await database.getPendingAggregateRoot(targetAggregateRoot.root);
snapshot = await database.getPendingAggregateRoot(targetAggregateRoot.root);
if (snapshot) {
logger.debug("Got pending snapshot", requestContext, methodContext, {
snapshot,
Expand All @@ -207,36 +226,29 @@ export const enqueue = async () => {
if (domainIndex === -1) {
throw new NoDomainInSnapshot(originDomain, snapshot);
}
const messageRoot = snapshot.roots[domainIndex];
latestMessageRoot = await database.getRootMessage(originDomain, messageRoot);
if (!latestMessageRoot) {
targetMessageRoot = snapshot.roots[domainIndex];
if (!targetMessageRoot) {
throw new NoTargetMessageRoot(originDomain);
}
// Count of leafs in aggregate tree at snapshot baseAggregateRoot.
const _baseAggregateRootCount = await database.getAggregateRootCount(snapshot.baseAggregateRoot);
if (!_baseAggregateRootCount) {
throw new NoAggregateRootCount(snapshot.baseAggregateRoot);
}
aggregateRootCount = snapshot.roots.length + _baseAggregateRootCount;
messageRootIndex = _baseAggregateRootCount + domainIndex;
}

const targetMessageRoot = latestMessageRoot.root;
// Count of leaf nodes in origin domain`s outbound tree with the targetMessageRoot as root
const messageRootCount = await database.getMessageRootCount(originDomain, targetMessageRoot);
messageRootCount = await database.getMessageRootCount(originDomain, targetMessageRoot);
if (messageRootCount === undefined) {
throw new NoMessageRootCount(originDomain, targetMessageRoot);
}
// Index of messageRoot leaf node in aggregate tree.
const messageRootIndex = await database.getMessageRootIndex(config.hubDomain, targetMessageRoot);
if (messageRootIndex === undefined) {
throw new NoMessageRootIndex(originDomain, targetMessageRoot);
}

// Count of leafs in aggregate tree at targetAggregateRoot.
const aggregateRootCount = await database.getAggregateRootCount(targetAggregateRoot.root);
if (!aggregateRootCount) {
throw new NoAggregateRootCount(targetAggregateRoot.root);
}

const batchSize = config.proverBatchSize[destinationDomain] ?? DEFAULT_PROVER_BATCH_SIZE;
const pendingMessages = await getUnProcessedMessagesByIndex(
originDomain,
destinationDomain,
latestMessageRoot.count,
messageRootCount,
);

// Paginate through all unprocessed messages from the domain
Expand All @@ -246,14 +258,14 @@ export const enqueue = async () => {
const unprocessed = pendingMessages.slice(offset, offset + batchSize);
const subContext = createRequestContext(
"processUnprocessedMessages",
`${originDomain}-${destinationDomain}-${offset}-${latestMessageRoot.root}`,
`${originDomain}-${destinationDomain}-${offset}-${targetMessageRoot}`,
);
if (unprocessed.length > 0) {
logger.info("Got unprocessed messages for origin and destination pair", subContext, methodContext, {
unprocessed: unprocessed.map((message) => message.leaf),
originDomain,
destinationDomain,
endIndex: latestMessageRoot.count,
endIndex: messageRootCount,
offset,
});

Expand All @@ -266,6 +278,7 @@ export const enqueue = async () => {
messageRootCount,
targetAggregateRoot.root,
aggregateRootCount,
snapshot ? snapshot.roots : [],
subContext,
);
if (brokerMessage) {
Expand All @@ -287,7 +300,7 @@ export const enqueue = async () => {
{
originDomain,
destinationDomain,
endIndex: latestMessageRoot.count,
endIndex: messageRootCount,
offset,
brokerMessage,
},
Expand Down Expand Up @@ -357,6 +370,7 @@ export const createBrokerMessage = async (
messageRootCount: number,
targetAggregateRoot: string,
aggregateRootCount: number,
snapshotRoots: string[],
_requestContext: RequestContext,
): Promise<BrokerMessage | undefined> => {
const {
Expand Down Expand Up @@ -425,6 +439,7 @@ export const createBrokerMessage = async (
messageRootCount,
aggregateRoot: targetAggregateRoot,
aggregateRootCount,
snapshotRoots,
};
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ export type BrokerMessage = {
messageRootCount: number;
aggregateRoot: string;
aggregateRootCount: number;
snapshotRoots: string[];
};
2 changes: 2 additions & 0 deletions packages/agents/lighthouse/test/mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
RootMessage,
RelayerType,
ReceivedAggregateRoot,
ModeType,
} from "@connext/nxtp-utils";
import { Relayer } from "@connext/nxtp-adapters-relayer";
import { mockRelayer } from "@connext/nxtp-adapters-relayer/test/mock";
Expand Down Expand Up @@ -102,6 +103,7 @@ export const mock = {
},
config: mock.config(),
chainData: mock.chainData(),
mode: ModeType.SlowMode,
};
},
processFromRootCtx: (): ProcessFromRootContext => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const mockBrokerMesage: BrokerMessage = {
messageRootCount: 2,
aggregateRoot: mkBytes32("0x222"),
aggregateRootCount: 2,
snapshotRoots: [],
};

const requestContext = createRequestContext("Publisher");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import { expect, createRequestContext, mkBytes32 } from "@connext/nxtp-utils";
import { expect, createRequestContext, mkBytes32, ModeType } from "@connext/nxtp-utils";
import { SinonStub, stub } from "sinon";

import {
enqueue,
prefetch,
createBrokerMessage,
getUnProcessedMessagesByIndex,
acquireLock,
releaseLock,
} from "../../../../src/tasks/prover/operations/publisher";
import * as PublisherFns from "../../../../src/tasks/prover/operations/publisher";
import { mock, mockXMessage1, mockXMessage2, mockRootMessage, mockReceivedRoot } from "../../../mock";
Expand All @@ -22,6 +24,7 @@ const mockBrokerMesage: BrokerMessage = {
messageRootCount: 2,
aggregateRoot: mkBytes32("0x222"),
aggregateRootCount: 2,
snapshotRoots: [],
};

const requestContext = createRequestContext("Publisher");
Expand Down Expand Up @@ -70,6 +73,44 @@ describe("Operations: Publisher", () => {
expect(createBrokerMessageStub.callCount).to.be.eq(4);
});

it("happy case should enqueue a broker messages in op mode", async () => {
acquireLock();
const channel = await proverCtxMock.adapters.mqClient.createChannel();
const publishStub = (channel.publish as SinonStub).resolves();
proverCtxMock.mode = ModeType.OptimisticMode;

(proverCtxMock.adapters.database.getLatestAggregateRoots as SinonStub).resolves([mockReceivedRoot]);
(proverCtxMock.adapters.database.getLatestMessageRoot as SinonStub).resolves(mockRootMessage);
(proverCtxMock.adapters.database.getPendingAggregateRoot as SinonStub).resolves(mock.entity.snapshot());
(proverCtxMock.adapters.database.getRootMessage as SinonStub).resolves(mockRootMessage);
(proverCtxMock.adapters.database.getMessageRootCount as SinonStub).resolves(1);
(proverCtxMock.adapters.database.getMessageRootIndex as SinonStub).resolves(1);
(proverCtxMock.adapters.database.getAggregateRootCount as SinonStub).resolves(1);
(proverCtxMock.adapters.cache.messages.setStatus as SinonStub).resolves();
createBrokerMessageStub.resolves(mockBrokerMesage);
await enqueue();
releaseLock();
expect(createBrokerMessageStub.callCount).to.be.eq(4);
});

it("should not enqueue if no snapshot root in op mode", async () => {
const channel = await proverCtxMock.adapters.mqClient.createChannel();
const publishStub = (channel.publish as SinonStub).resolves();
proverCtxMock.mode = ModeType.OptimisticMode;

(proverCtxMock.adapters.database.getLatestAggregateRoots as SinonStub).resolves([mockReceivedRoot]);
(proverCtxMock.adapters.database.getLatestMessageRoot as SinonStub).resolves(mockRootMessage);
(proverCtxMock.adapters.database.getPendingAggregateRoot as SinonStub).resolves(undefined);
(proverCtxMock.adapters.database.getRootMessage as SinonStub).resolves(mockRootMessage);
(proverCtxMock.adapters.database.getMessageRootCount as SinonStub).resolves(1);
(proverCtxMock.adapters.database.getMessageRootIndex as SinonStub).resolves(1);
(proverCtxMock.adapters.database.getAggregateRootCount as SinonStub).resolves(1);
(proverCtxMock.adapters.cache.messages.setStatus as SinonStub).resolves();
createBrokerMessageStub.resolves(mockBrokerMesage);
await enqueue();
expect(createBrokerMessageStub.callCount).to.be.eq(0);
});

it("should catch error if no pending aggregate root", async () => {
(proverCtxMock.adapters.database.getLatestAggregateRoots as SinonStub).resolves([mockReceivedRoot]);
(proverCtxMock.adapters.database.getLatestMessageRoot as SinonStub).resolves(mockRootMessage);
Expand All @@ -80,7 +121,7 @@ describe("Operations: Publisher", () => {
(proverCtxMock.adapters.cache.messages.setNonce as SinonStub).resolves();
createBrokerMessageStub.resolves(mockBrokerMesage);
expect(await enqueue()).to.throw;
expect(createBrokerMessageStub.callCount).to.be.eq(0);
expect(createBrokerMessageStub.callCount).to.be.eq(4);
});
it("should catch error if no received aggregate root", async () => {
(proverCtxMock.adapters.database.getLatestAggregateRoots as SinonStub).resolves([]);
Expand Down Expand Up @@ -142,6 +183,7 @@ describe("Operations: Publisher", () => {
mockBrokerMesage.messageRootCount,
mockBrokerMesage.aggregateRoot,
mockBrokerMesage.aggregateRootCount,
mockBrokerMesage.snapshotRoots,
requestContext,
);
expect(brokerMessage).to.be.deep.eq(mockBrokerMesage);
Expand All @@ -157,6 +199,7 @@ describe("Operations: Publisher", () => {
mockBrokerMesage.messageRootCount,
mockBrokerMesage.aggregateRoot,
mockBrokerMesage.aggregateRootCount,
mockBrokerMesage.snapshotRoots,
requestContext,
),
).to.eventually.be.rejectedWith(NoDestinationDomainForProof);
Expand All @@ -175,6 +218,7 @@ describe("Operations: Publisher", () => {
mockBrokerMesage.messageRootCount,
mockBrokerMesage.aggregateRoot,
mockBrokerMesage.aggregateRootCount,
mockBrokerMesage.snapshotRoots,
requestContext,
);
expect(brokerMessage).to.be.undefined;
Expand Down

0 comments on commit 951667e

Please sign in to comment.