Skip to content

Commit

Permalink
Merge pull request #4361 from connext/4121-cartographer-limit-needs-i…
Browse files Browse the repository at this point in the history
…ncrease-when-assigning-messagestatus

fix: cartographer limit needs increase when assigning messagestatus
  • Loading branch information
liu-zhipeng authored May 30, 2023
2 parents 101c8e2 + 2540428 commit 97be8df
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 33 deletions.
43 changes: 42 additions & 1 deletion packages/adapters/database/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
convertFromDbAggregatedRoot,
convertFromDbPropagatedRoot,
convertFromDbReceivedAggregateRoot,
convertFromDbRootStatus,
AggregatedRoot,
PropagatedRoot,
ReceivedAggregateRoot,
Expand All @@ -25,6 +26,7 @@ import {
AssetPrice,
StableSwapTransfer,
StableSwapLpBalance,
RootMessageStatus,
} from "@connext/nxtp-utils";
import { Pool } from "pg";
import * as db from "zapatos/db";
Expand Down Expand Up @@ -893,7 +895,7 @@ export const getMessageRootsFromIndex = async (
s.root_messages.Selectable[]
>`select * from ${"root_messages"} where ${{
spoke_domain,
}} and ${{ leaf_count: dc.gte(index) }} order by ${"leaf_count"} asc nulls last limit 75`.run(poolToUse);
}} and ${{ leaf_count: dc.gte(index) }} order by ${"leaf_count"} asc nulls last limit 100`.run(poolToUse);
return root.length > 0 ? root.map(convertFromDbRootMessage) : [];
};

Expand All @@ -909,6 +911,45 @@ export const getMessageRootCount = async (
return message ? convertFromDbMessage(message).origin?.index : undefined;
};

export const getMessageRootStatusFromIndex = async (
spoke_domain: string,
index: number,
_pool?: Pool | db.TxnClientForRepeatableRead,
): Promise<RootMessageStatus> => {
const poolToUse = _pool ?? pool;
// Find the processed, unprocessed count, aggregated root count that contains the index, for a given domain
const status = await db.sql<s.root_messages.SQL, s.root_messages.Selectable[]>`with cte as (
select *, aggregated.id as aggregated_id
from
((select * from root_messages where ${{
spoke_domain,
}} and ${{ leaf_count: dc.gte(index) }}) as roots
left join aggregated_roots as aggregated
on roots.root=aggregated.received_root)
)
select
COUNT(CASE WHEN processed=true THEN 1 END) AS processed_count,
COUNT(CASE WHEN processed=false THEN 1 END) AS unprocessed_count,
COUNT(CASE WHEN aggregated_id IS not null then 1 END) aggregated_count,
(
SELECT aggregated_id
FROM cte
ORDER BY domain_index desc nulls last
LIMIT 1
) as last_aggregated_id
from
cte
`.run(poolToUse);
return status.length > 0
? convertFromDbRootStatus(status[0])
: {
processedCount: 0,
unprocessedCount: 0,
aggregatedCount: 0,
lastAggregatedRoot: undefined,
};
};

export const getSpokeNode = async (
domain: string,
index: number,
Expand Down
8 changes: 8 additions & 0 deletions packages/adapters/database/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
AssetPrice,
StableSwapTransfer,
StableSwapLpBalance,
RootMessageStatus,
} from "@connext/nxtp-utils";
import { Pool } from "pg";
import { TxnClientForRepeatableRead } from "zapatos/db";
Expand Down Expand Up @@ -52,6 +53,7 @@ import {
getMessageRootAggregatedFromIndex,
getMessageRootsFromIndex,
getMessageRootCount,
getMessageRootStatusFromIndex,
getSpokeNode,
getSpokeNodes,
getHubNode,
Expand Down Expand Up @@ -206,6 +208,11 @@ export type Database = {
messageRoot: string,
_pool?: Pool | TxnClientForRepeatableRead,
) => Promise<number | undefined>;
getMessageRootStatusFromIndex: (
domain: string,
index: number,
_pool?: Pool | TxnClientForRepeatableRead,
) => Promise<RootMessageStatus>;
getSpokeNode: (
domain: string,
index: number,
Expand Down Expand Up @@ -319,6 +326,7 @@ export const getDatabase = async (databaseUrl: string, logger: Logger): Promise<
getMessageRootAggregatedFromIndex,
getMessageRootsFromIndex,
getMessageRootCount,
getMessageRootStatusFromIndex,
getSpokeNode,
getSpokeNodes,
getHubNode,
Expand Down
50 changes: 50 additions & 0 deletions packages/adapters/database/test/client.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ import {
saveStableSwapPoolEvent,
saveStableSwapTransfers,
saveStableSwapLpBalances,
getMessageRootStatusFromIndex,
getAggregateRootByRootAndDomain,
} from "../src/client";

describe("Database client", () => {
Expand Down Expand Up @@ -1136,6 +1138,54 @@ describe("Database client", () => {
expect(queryRes.rows[0].processed).to.eq(false);
});

it("should get getAggregateRootByRootAndDomain", async () => {
const roots: ReceivedAggregateRoot[] = [];
for (let _i = 0; _i < 10; _i++) {
const m = mock.entity.receivedAggregateRoot();
m.domain = mock.domain.A;
roots.push(m);
}

await saveReceivedAggregateRoot(roots, pool);

const receivedAggregatedRoot = await getAggregateRootByRootAndDomain(mock.domain.A, roots[1].root, "ASC", pool);

expect(receivedAggregatedRoot).to.deep.eq(roots[1]);
});

it("should get getMessageRootStatusFromIndex", async () => {
const messages: RootMessage[] = [];
const roots: AggregatedRoot[] = [];
const totalCount = 100;
const processedCount = 10;
const aggregatedCount = 20;
for (let _i = 0; _i < totalCount; _i++) {
const rootMessage = mock.entity.rootMessage();
rootMessage.spokeDomain = mock.domain.A;
rootMessage.count = _i;
rootMessage.processed = _i < processedCount;
messages.push(rootMessage);

const m = mock.entity.aggregatedRoot();
m.index = _i;
m.domain = mock.domain.A;
m.receivedRoot = _i < aggregatedCount ? rootMessage.root : getRandomBytes32();
roots.push(m);
}

await saveSentRootMessages(messages, pool);
await saveAggregatedRoots(roots, pool);

const messageStatus = await getMessageRootStatusFromIndex(mock.domain.A, 0, pool);

expect(messageStatus).to.deep.eq({
aggregatedCount: aggregatedCount,
lastAggregatedRoot: roots[aggregatedCount - 1].id,
processedCount: processedCount,
unprocessedCount: totalCount - processedCount,
});
});

it("should save assets", async () => {
const assets = [mock.entity.asset(), mock.entity.asset(), mock.entity.asset()];
await saveAssets(assets, pool);
Expand Down
1 change: 1 addition & 0 deletions packages/adapters/database/test/mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export const mockDatabase = (): Database => {
getMessageRootCount: stub().resolves(),
getLatestMessageRoot: stub().resolves(),
getLatestAggregateRoots: stub().resolves(),
getMessageRootStatusFromIndex: stub().resolves(),
getSpokeNode: stub().resolves(),
getSpokeNodes: stub().resolves([]),
getHubNode: stub().resolves(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,33 +45,27 @@ export const getMessageStatus = async (transfer: XTransfer): Promise<XTransferMe
// A message has been proven and processed
return XTransferMessageStatus.Processed;
}
const rootMessages = await database.getMessageRootsFromIndex(transfer.xparams.originDomain, message.origin.index);
if (rootMessages.length == 0) {
const rootMessageStatus = await database.getMessageRootStatusFromIndex(
transfer.xparams.originDomain,
message.origin.index,
);
if (rootMessageStatus.processedCount + rootMessageStatus.unprocessedCount == 0) {
// there are 2 possible reasons
// 1. sendOutboundRoot didn't happen on the spoke domain
// 2. sendOutboundRoot happened but the root_messages table sync might still be in progress.
return XTransferMessageStatus.XCalled;
}
// const processed = rootMessage.processed;
const processed = rootMessages.map((i) => i.processed).includes(true);
if (!processed) {
if (rootMessageStatus.processedCount === 0) {
// A root message exist. this means sendOutboundRoot task executed on the spoke domain
// 1. A spoke root got sent to the hub domain, the root might still be getting processed by AMB/LH processFromRoot task
// 2. A spoke root got arrived at the hub domain, the root_me ssages table sync might still be in progress
// 2. A spoke root got arrived at the hub domain, the root_messages table sync might still be in progress

return XTransferMessageStatus.SpokeRootSent;
}

// A message root from the spoke domain got arrived at the hub domain successfully
let aggregateRoot: string | undefined = undefined;
// iterate through roots with most highes leaf count first (most likely to have aggregate when system
// is under load)
for (const rootMessage of rootMessages.sort((a, b) => b.count - a.count)) {
aggregateRoot = await database.getAggregateRoot(rootMessage.root);
if (aggregateRoot) break;
}
//const aggregateRoot = await database.getAggregateRoot(rootMessage.root);
if (!aggregateRoot) {
if (rootMessageStatus.aggregatedCount === 0) {
// An aggregated root doesn't exist.
// 1. A message root arrived at the hub domain but has been neither dequeued/propagated
// 2. The aggregateRoot got propagated on-chain but either of both subgraph and carto sync might still be in progress
Expand All @@ -84,7 +78,7 @@ export const getMessageStatus = async (transfer: XTransfer): Promise<XTransferMe
let messageStatus: XTransferMessageStatus = XTransferMessageStatus.AggregateRootPropagated;
const receivedAggregateRoot = await database.getAggregateRootByRootAndDomain(
transfer.xparams.destinationDomain,
aggregateRoot,
rootMessageStatus.lastAggregatedRoot!,
);

if (receivedAggregateRoot) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@ describe("MessageStatus operations", () => {
(mockContext.adapters.database.getMessageByLeaf as SinonStub).resolves(
mock.entity.xMessage({ destination: { processed: false, returnData: getRandomBytes32() } }),
);
(mockContext.adapters.database.getMessageRootsFromIndex as SinonStub).resolves([]);
(mockContext.adapters.database.getMessageRootStatusFromIndex as SinonStub).resolves({
processedCount: 0,
unprocessedCount: 0,
aggregatedCount: 0,
lastAggregatedRoot: undefined,
});
const xTransfer = mock.entity.xtransfer({ messageHash: undefined });
const messageStatus = await getMessageStatus(xTransfer);
expect(messageStatus).to.be.eq(XTransferMessageStatus.XCalled);
Expand All @@ -45,9 +50,12 @@ describe("MessageStatus operations", () => {
(mockContext.adapters.database.getMessageByLeaf as SinonStub).resolves(
mock.entity.xMessage({ destination: { processed: false, returnData: getRandomBytes32() } }),
);
(mockContext.adapters.database.getMessageRootsFromIndex as SinonStub).resolves([
mock.entity.rootMessage({ processed: false }),
]);
(mockContext.adapters.database.getMessageRootStatusFromIndex as SinonStub).resolves({
processedCount: 0,
unprocessedCount: 1,
aggregatedCount: 0,
lastAggregatedRoot: undefined,
});
const xTransfer = mock.entity.xtransfer({ messageHash: undefined });
const messageStatus = await getMessageStatus(xTransfer);
expect(messageStatus).to.be.eq(XTransferMessageStatus.SpokeRootSent);
Expand All @@ -57,10 +65,12 @@ describe("MessageStatus operations", () => {
(mockContext.adapters.database.getMessageByLeaf as SinonStub).resolves(
mock.entity.xMessage({ destination: { processed: false, returnData: getRandomBytes32() } }),
);
(mockContext.adapters.database.getMessageRootsFromIndex as SinonStub).resolves([
mock.entity.rootMessage({ processed: true }),
]);
(mockContext.adapters.database.getAggregateRoot as SinonStub).resolves(undefined);
(mockContext.adapters.database.getMessageRootStatusFromIndex as SinonStub).resolves({
processedCount: 1,
unprocessedCount: 5,
aggregatedCount: 0,
lastAggregatedRoot: undefined,
});
const xTransfer = mock.entity.xtransfer({ messageHash: undefined });
const messageStatus = await getMessageStatus(xTransfer);
expect(messageStatus).to.be.eq(XTransferMessageStatus.SpokeRootArrivedOnHub);
Expand All @@ -70,10 +80,12 @@ describe("MessageStatus operations", () => {
(mockContext.adapters.database.getMessageByLeaf as SinonStub).resolves(
mock.entity.xMessage({ destination: { processed: false, returnData: getRandomBytes32() } }),
);
(mockContext.adapters.database.getMessageRootsFromIndex as SinonStub).resolves([
mock.entity.rootMessage({ processed: true }),
]);
(mockContext.adapters.database.getAggregateRoot as SinonStub).resolves(mock.entity.aggregatedRoot());
(mockContext.adapters.database.getMessageRootStatusFromIndex as SinonStub).resolves({
processedCount: 1,
unprocessedCount: 5,
aggregatedCount: 1,
lastAggregatedRoot: mock.entity.aggregatedRoot(),
});
const xTransfer = mock.entity.xtransfer({ messageHash: undefined });
const messageStatus = await getMessageStatus(xTransfer);
expect(messageStatus).to.be.eq(XTransferMessageStatus.AggregateRootPropagated);
Expand All @@ -83,10 +95,12 @@ describe("MessageStatus operations", () => {
(mockContext.adapters.database.getMessageByLeaf as SinonStub).resolves(
mock.entity.xMessage({ destination: { processed: false, returnData: getRandomBytes32() } }),
);
(mockContext.adapters.database.getMessageRootsFromIndex as SinonStub).resolves([
mock.entity.rootMessage({ processed: true }),
]);
(mockContext.adapters.database.getAggregateRoot as SinonStub).resolves(mock.entity.aggregatedRoot());
(mockContext.adapters.database.getMessageRootStatusFromIndex as SinonStub).resolves({
processedCount: 1,
unprocessedCount: 5,
aggregatedCount: 1,
lastAggregatedRoot: mock.entity.aggregatedRoot(),
});
(mockContext.adapters.database.getAggregateRootByRootAndDomain as SinonStub).resolves(
mock.entity.aggregatedRoot(),
);
Expand Down
8 changes: 8 additions & 0 deletions packages/utils/src/types/amb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,11 @@ export const ReceivedAggregateRootSchema = Type.Object({
blockNumber: Type.Number(),
});
export type ReceivedAggregateRoot = Static<typeof ReceivedAggregateRootSchema>;

export const RootMessageStatusSchema = Type.Object({
processedCount: Type.Number(),
unprocessedCount: Type.Number(),
aggregatedCount: Type.Number(),
lastAggregatedRoot: Type.Optional(Type.String()),
});
export type RootMessageStatus = Static<typeof RootMessageStatusSchema>;
17 changes: 16 additions & 1 deletion packages/utils/src/types/db.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { BigNumber, constants } from "ethers";

import { XMessage, RootMessage, AggregatedRoot, PropagatedRoot, ReceivedAggregateRoot } from "./amb";
import { XMessage, RootMessage, AggregatedRoot, PropagatedRoot, ReceivedAggregateRoot, RootMessageStatus } from "./amb";
import {
PoolActionType,
StableSwapExchange,
Expand Down Expand Up @@ -432,6 +432,21 @@ export const convertFromDbReceivedAggregateRoot = (message: any): ReceivedAggreg
};
};

/**
* Converts a root message status from the cartographer db through either DB queries or Postgrest into the RootMessageStatus type
* @param message - the message from the cartographer db as a JSON object
* @returns an RootMessageStatus object
*/
export const convertFromDbRootStatus = (status: any): RootMessageStatus => {
const obj = {
processedCount: +status.processed_count,
unprocessedCount: +status.unprocessed_count,
aggregatedCount: +status.aggregated_count,
lastAggregatedRoot: status.last_aggregated_id ? status.last_aggregated_id.split("-")[0] : undefined,
};
return sanitizeNull(obj);
};

/**
* Converts a stable swap pool from the cartographer db through
* @param pool - the stable swap pool from the cartographer db as a JSON object
Expand Down

0 comments on commit 97be8df

Please sign in to comment.