diff --git a/packages/adapters/database/src/client.ts b/packages/adapters/database/src/client.ts index 3e3035d9b0..744398ba9c 100644 --- a/packages/adapters/database/src/client.ts +++ b/packages/adapters/database/src/client.ts @@ -11,6 +11,7 @@ import { convertFromDbAggregatedRoot, convertFromDbPropagatedRoot, convertFromDbReceivedAggregateRoot, + convertFromDbRootStatus, AggregatedRoot, PropagatedRoot, ReceivedAggregateRoot, @@ -25,6 +26,7 @@ import { AssetPrice, StableSwapTransfer, StableSwapLpBalance, + RootMessageStatus, } from "@connext/nxtp-utils"; import { Pool } from "pg"; import * as db from "zapatos/db"; @@ -893,7 +895,7 @@ export const getMessageRootsFromIndex = async ( s.root_messages.Selectable[] >`select * from ${"root_messages"} where ${{ spoke_domain, - }} and ${{ leaf_count: dc.gte(index) }} order by ${"leaf_count"} asc nulls last limit 75`.run(poolToUse); + }} and ${{ leaf_count: dc.gte(index) }} order by ${"leaf_count"} asc nulls last limit 100`.run(poolToUse); return root.length > 0 ? root.map(convertFromDbRootMessage) : []; }; @@ -909,6 +911,45 @@ export const getMessageRootCount = async ( return message ? convertFromDbMessage(message).origin?.index : undefined; }; +export const getMessageRootStatusFromIndex = async ( + spoke_domain: string, + index: number, + _pool?: Pool | db.TxnClientForRepeatableRead, +): Promise => { + const poolToUse = _pool ?? pool; + // Find the processed, unprocessed count, aggregated root count that contains the index, for a given domain + const status = await db.sql`with cte as ( + select *, aggregated.id as aggregated_id + from + ((select * from root_messages where ${{ + spoke_domain, + }} and ${{ leaf_count: dc.gte(index) }}) as roots + left join aggregated_roots as aggregated + on roots.root=aggregated.received_root) + ) + select + COUNT(CASE WHEN processed=true THEN 1 END) AS processed_count, + COUNT(CASE WHEN processed=false THEN 1 END) AS unprocessed_count, + COUNT(CASE WHEN aggregated_id IS not null then 1 END) aggregated_count, + ( + SELECT aggregated_id + FROM cte + ORDER BY domain_index desc nulls last + LIMIT 1 + ) as last_aggregated_id + from + cte + `.run(poolToUse); + return status.length > 0 + ? convertFromDbRootStatus(status[0]) + : { + processedCount: 0, + unprocessedCount: 0, + aggregatedCount: 0, + lastAggregatedRoot: undefined, + }; +}; + export const getSpokeNode = async ( domain: string, index: number, diff --git a/packages/adapters/database/src/index.ts b/packages/adapters/database/src/index.ts index f22b091017..c76bcca478 100644 --- a/packages/adapters/database/src/index.ts +++ b/packages/adapters/database/src/index.ts @@ -19,6 +19,7 @@ import { AssetPrice, StableSwapTransfer, StableSwapLpBalance, + RootMessageStatus, } from "@connext/nxtp-utils"; import { Pool } from "pg"; import { TxnClientForRepeatableRead } from "zapatos/db"; @@ -52,6 +53,7 @@ import { getMessageRootAggregatedFromIndex, getMessageRootsFromIndex, getMessageRootCount, + getMessageRootStatusFromIndex, getSpokeNode, getSpokeNodes, getHubNode, @@ -206,6 +208,11 @@ export type Database = { messageRoot: string, _pool?: Pool | TxnClientForRepeatableRead, ) => Promise; + getMessageRootStatusFromIndex: ( + domain: string, + index: number, + _pool?: Pool | TxnClientForRepeatableRead, + ) => Promise; getSpokeNode: ( domain: string, index: number, @@ -319,6 +326,7 @@ export const getDatabase = async (databaseUrl: string, logger: Logger): Promise< getMessageRootAggregatedFromIndex, getMessageRootsFromIndex, getMessageRootCount, + getMessageRootStatusFromIndex, getSpokeNode, getSpokeNodes, getHubNode, diff --git a/packages/adapters/database/test/client.spec.ts b/packages/adapters/database/test/client.spec.ts index 0eafa5a39f..6c7e7082be 100644 --- a/packages/adapters/database/test/client.spec.ts +++ b/packages/adapters/database/test/client.spec.ts @@ -74,6 +74,8 @@ import { saveStableSwapPoolEvent, saveStableSwapTransfers, saveStableSwapLpBalances, + getMessageRootStatusFromIndex, + getAggregateRootByRootAndDomain, } from "../src/client"; describe("Database client", () => { @@ -1136,6 +1138,54 @@ describe("Database client", () => { expect(queryRes.rows[0].processed).to.eq(false); }); + it("should get getAggregateRootByRootAndDomain", async () => { + const roots: ReceivedAggregateRoot[] = []; + for (let _i = 0; _i < 10; _i++) { + const m = mock.entity.receivedAggregateRoot(); + m.domain = mock.domain.A; + roots.push(m); + } + + await saveReceivedAggregateRoot(roots, pool); + + const receivedAggregatedRoot = await getAggregateRootByRootAndDomain(mock.domain.A, roots[1].root, "ASC", pool); + + expect(receivedAggregatedRoot).to.deep.eq(roots[1]); + }); + + it("should get getMessageRootStatusFromIndex", async () => { + const messages: RootMessage[] = []; + const roots: AggregatedRoot[] = []; + const totalCount = 100; + const processedCount = 10; + const aggregatedCount = 20; + for (let _i = 0; _i < totalCount; _i++) { + const rootMessage = mock.entity.rootMessage(); + rootMessage.spokeDomain = mock.domain.A; + rootMessage.count = _i; + rootMessage.processed = _i < processedCount; + messages.push(rootMessage); + + const m = mock.entity.aggregatedRoot(); + m.index = _i; + m.domain = mock.domain.A; + m.receivedRoot = _i < aggregatedCount ? rootMessage.root : getRandomBytes32(); + roots.push(m); + } + + await saveSentRootMessages(messages, pool); + await saveAggregatedRoots(roots, pool); + + const messageStatus = await getMessageRootStatusFromIndex(mock.domain.A, 0, pool); + + expect(messageStatus).to.deep.eq({ + aggregatedCount: aggregatedCount, + lastAggregatedRoot: roots[aggregatedCount - 1].id, + processedCount: processedCount, + unprocessedCount: totalCount - processedCount, + }); + }); + it("should save assets", async () => { const assets = [mock.entity.asset(), mock.entity.asset(), mock.entity.asset()]; await saveAssets(assets, pool); diff --git a/packages/adapters/database/test/mock.ts b/packages/adapters/database/test/mock.ts index 5e6364639e..8157e9e8f6 100644 --- a/packages/adapters/database/test/mock.ts +++ b/packages/adapters/database/test/mock.ts @@ -29,6 +29,7 @@ export const mockDatabase = (): Database => { getMessageRootCount: stub().resolves(), getLatestMessageRoot: stub().resolves(), getLatestAggregateRoots: stub().resolves(), + getMessageRootStatusFromIndex: stub().resolves(), getSpokeNode: stub().resolves(), getSpokeNodes: stub().resolves([]), getHubNode: stub().resolves(), diff --git a/packages/agents/cartographer/poller/src/lib/operations/messagestatus.ts b/packages/agents/cartographer/poller/src/lib/operations/messagestatus.ts index 5fa2838058..af42f63608 100644 --- a/packages/agents/cartographer/poller/src/lib/operations/messagestatus.ts +++ b/packages/agents/cartographer/poller/src/lib/operations/messagestatus.ts @@ -45,33 +45,27 @@ export const getMessageStatus = async (transfer: XTransfer): Promise i.processed).includes(true); - if (!processed) { + if (rootMessageStatus.processedCount === 0) { // A root message exist. this means sendOutboundRoot task executed on the spoke domain // 1. A spoke root got sent to the hub domain, the root might still be getting processed by AMB/LH processFromRoot task - // 2. A spoke root got arrived at the hub domain, the root_me ssages table sync might still be in progress + // 2. A spoke root got arrived at the hub domain, the root_messages table sync might still be in progress return XTransferMessageStatus.SpokeRootSent; } // A message root from the spoke domain got arrived at the hub domain successfully - let aggregateRoot: string | undefined = undefined; - // iterate through roots with most highes leaf count first (most likely to have aggregate when system - // is under load) - for (const rootMessage of rootMessages.sort((a, b) => b.count - a.count)) { - aggregateRoot = await database.getAggregateRoot(rootMessage.root); - if (aggregateRoot) break; - } - //const aggregateRoot = await database.getAggregateRoot(rootMessage.root); - if (!aggregateRoot) { + if (rootMessageStatus.aggregatedCount === 0) { // An aggregated root doesn't exist. // 1. A message root arrived at the hub domain but has been neither dequeued/propagated // 2. The aggregateRoot got propagated on-chain but either of both subgraph and carto sync might still be in progress @@ -84,7 +78,7 @@ export const getMessageStatus = async (transfer: XTransfer): Promise { (mockContext.adapters.database.getMessageByLeaf as SinonStub).resolves( mock.entity.xMessage({ destination: { processed: false, returnData: getRandomBytes32() } }), ); - (mockContext.adapters.database.getMessageRootsFromIndex as SinonStub).resolves([]); + (mockContext.adapters.database.getMessageRootStatusFromIndex as SinonStub).resolves({ + processedCount: 0, + unprocessedCount: 0, + aggregatedCount: 0, + lastAggregatedRoot: undefined, + }); const xTransfer = mock.entity.xtransfer({ messageHash: undefined }); const messageStatus = await getMessageStatus(xTransfer); expect(messageStatus).to.be.eq(XTransferMessageStatus.XCalled); @@ -45,9 +50,12 @@ describe("MessageStatus operations", () => { (mockContext.adapters.database.getMessageByLeaf as SinonStub).resolves( mock.entity.xMessage({ destination: { processed: false, returnData: getRandomBytes32() } }), ); - (mockContext.adapters.database.getMessageRootsFromIndex as SinonStub).resolves([ - mock.entity.rootMessage({ processed: false }), - ]); + (mockContext.adapters.database.getMessageRootStatusFromIndex as SinonStub).resolves({ + processedCount: 0, + unprocessedCount: 1, + aggregatedCount: 0, + lastAggregatedRoot: undefined, + }); const xTransfer = mock.entity.xtransfer({ messageHash: undefined }); const messageStatus = await getMessageStatus(xTransfer); expect(messageStatus).to.be.eq(XTransferMessageStatus.SpokeRootSent); @@ -57,10 +65,12 @@ describe("MessageStatus operations", () => { (mockContext.adapters.database.getMessageByLeaf as SinonStub).resolves( mock.entity.xMessage({ destination: { processed: false, returnData: getRandomBytes32() } }), ); - (mockContext.adapters.database.getMessageRootsFromIndex as SinonStub).resolves([ - mock.entity.rootMessage({ processed: true }), - ]); - (mockContext.adapters.database.getAggregateRoot as SinonStub).resolves(undefined); + (mockContext.adapters.database.getMessageRootStatusFromIndex as SinonStub).resolves({ + processedCount: 1, + unprocessedCount: 5, + aggregatedCount: 0, + lastAggregatedRoot: undefined, + }); const xTransfer = mock.entity.xtransfer({ messageHash: undefined }); const messageStatus = await getMessageStatus(xTransfer); expect(messageStatus).to.be.eq(XTransferMessageStatus.SpokeRootArrivedOnHub); @@ -70,10 +80,12 @@ describe("MessageStatus operations", () => { (mockContext.adapters.database.getMessageByLeaf as SinonStub).resolves( mock.entity.xMessage({ destination: { processed: false, returnData: getRandomBytes32() } }), ); - (mockContext.adapters.database.getMessageRootsFromIndex as SinonStub).resolves([ - mock.entity.rootMessage({ processed: true }), - ]); - (mockContext.adapters.database.getAggregateRoot as SinonStub).resolves(mock.entity.aggregatedRoot()); + (mockContext.adapters.database.getMessageRootStatusFromIndex as SinonStub).resolves({ + processedCount: 1, + unprocessedCount: 5, + aggregatedCount: 1, + lastAggregatedRoot: mock.entity.aggregatedRoot(), + }); const xTransfer = mock.entity.xtransfer({ messageHash: undefined }); const messageStatus = await getMessageStatus(xTransfer); expect(messageStatus).to.be.eq(XTransferMessageStatus.AggregateRootPropagated); @@ -83,10 +95,12 @@ describe("MessageStatus operations", () => { (mockContext.adapters.database.getMessageByLeaf as SinonStub).resolves( mock.entity.xMessage({ destination: { processed: false, returnData: getRandomBytes32() } }), ); - (mockContext.adapters.database.getMessageRootsFromIndex as SinonStub).resolves([ - mock.entity.rootMessage({ processed: true }), - ]); - (mockContext.adapters.database.getAggregateRoot as SinonStub).resolves(mock.entity.aggregatedRoot()); + (mockContext.adapters.database.getMessageRootStatusFromIndex as SinonStub).resolves({ + processedCount: 1, + unprocessedCount: 5, + aggregatedCount: 1, + lastAggregatedRoot: mock.entity.aggregatedRoot(), + }); (mockContext.adapters.database.getAggregateRootByRootAndDomain as SinonStub).resolves( mock.entity.aggregatedRoot(), ); diff --git a/packages/utils/src/types/amb.ts b/packages/utils/src/types/amb.ts index e4f9df59ee..4ec1fc94e7 100644 --- a/packages/utils/src/types/amb.ts +++ b/packages/utils/src/types/amb.ts @@ -91,3 +91,11 @@ export const ReceivedAggregateRootSchema = Type.Object({ blockNumber: Type.Number(), }); export type ReceivedAggregateRoot = Static; + +export const RootMessageStatusSchema = Type.Object({ + processedCount: Type.Number(), + unprocessedCount: Type.Number(), + aggregatedCount: Type.Number(), + lastAggregatedRoot: Type.Optional(Type.String()), +}); +export type RootMessageStatus = Static; diff --git a/packages/utils/src/types/db.ts b/packages/utils/src/types/db.ts index 17d6e627e5..5537f69e28 100644 --- a/packages/utils/src/types/db.ts +++ b/packages/utils/src/types/db.ts @@ -1,6 +1,6 @@ import { BigNumber, constants } from "ethers"; -import { XMessage, RootMessage, AggregatedRoot, PropagatedRoot, ReceivedAggregateRoot } from "./amb"; +import { XMessage, RootMessage, AggregatedRoot, PropagatedRoot, ReceivedAggregateRoot, RootMessageStatus } from "./amb"; import { PoolActionType, StableSwapExchange, @@ -432,6 +432,21 @@ export const convertFromDbReceivedAggregateRoot = (message: any): ReceivedAggreg }; }; +/** + * Converts a root message status from the cartographer db through either DB queries or Postgrest into the RootMessageStatus type + * @param message - the message from the cartographer db as a JSON object + * @returns an RootMessageStatus object + */ +export const convertFromDbRootStatus = (status: any): RootMessageStatus => { + const obj = { + processedCount: +status.processed_count, + unprocessedCount: +status.unprocessed_count, + aggregatedCount: +status.aggregated_count, + lastAggregatedRoot: status.last_aggregated_id ? status.last_aggregated_id.split("-")[0] : undefined, + }; + return sanitizeNull(obj); +}; + /** * Converts a stable swap pool from the cartographer db through * @param pool - the stable swap pool from the cartographer db as a JSON object