diff --git a/.env.prod b/.env.prod index 1588f8c..f9c1840 100644 --- a/.env.prod +++ b/.env.prod @@ -3,3 +3,4 @@ CUSTOMER_SERVER_URL="http://localhost:5000/api" CUSTOMER_SERVER_AUTHORIZATION="" CUSTOMER_SERVER_PULL_CADENCE_MS=30000 AGENT_REQUESTS_CACHE_SIZE=1024 +BROADCAST_BATCH_SIZE=30 diff --git a/src/constants.ts b/src/constants.ts index a39529f..bbf620c 100644 --- a/src/constants.ts +++ b/src/constants.ts @@ -6,6 +6,7 @@ export const MOBILE_GATEWAY_URL = process.env.MOBILE_GATEWAY_URL; export const CUSTOMER_SERVER_URL = process.env.CUSTOMER_SERVER_URL; export const CUSTOMER_SERVER_PULL_CADENCE_MS = Number(process.env.CUSTOMER_SERVER_PULL_CADENCE_MS ?? 30000); export const AGENT_REQUESTS_CACHE_SIZE = Number(process.env.AGENT_REQUESTS_CACHE_SIZE ?? 2048); +export const BROADCAST_BATCH_SIZE = Number(process.env.BROADCAST_BATCH_SIZE ?? 30); export const CUSTOMER_SERVER_AUTHORIZATION = process.env.CUSTOMER_SERVER_AUTHORIZATION; export const TOKEN_PATH = `${__dirname}/.fireblocks-refresh-token`; export const SSL_CERT_PATH = process.env.SSL_CERT_PATH; diff --git a/src/services/messages.service.ts b/src/services/messages.service.ts index 5c390e1..bde7027 100644 --- a/src/services/messages.service.ts +++ b/src/services/messages.service.ts @@ -1,5 +1,5 @@ import https from 'https'; -import { AGENT_REQUESTS_CACHE_SIZE } from '../constants'; +import { AGENT_REQUESTS_CACHE_SIZE, BROADCAST_BATCH_SIZE } from '../constants'; import { DecodedMessage, ExtendedMessageStatusCache, FBMessageEnvelope, RequestType } from '../types'; import { decodeAndVerifyMessage } from '../utils/messages-utils'; import customerServerApi from './customer-server.api'; @@ -121,7 +121,8 @@ class MessageService implements IMessageService { } async updateStatus(messagesStatus: ExtendedMessageStatusCache[]) { - let promises = []; + let broadcastPromises = []; + let ackPromises = []; for (const msgStatus of messagesStatus) { try { const { msgId, request, messageStatus } = msgStatus; @@ -150,13 +151,13 @@ class MessageService implements IMessageService { } message with final status: ${status}, msgId ${latestMsgId}, cacheId: ${requestId}`, ); // broadcast always and ack only if we have a valid msgId - promises.push( + broadcastPromises.push( fbServerApi .broadcastResponse(messageStatus, request) .then(() => (this.msgCache[messageStatus.requestId].messageStatus = messageStatus)), ); if (latestMsgId) { - promises.push(fbServerApi.ackMessage(latestMsgId)); + ackPromises.push(fbServerApi.ackMessage(latestMsgId)); } } } catch (e) { @@ -166,8 +167,13 @@ class MessageService implements IMessageService { )}. Error: ${e.message}`, ); } + if (broadcastPromises.length >= BROADCAST_BATCH_SIZE) { + logger.info(`Awaiting ack for broadcasting ${broadcastPromises.length} messages`); + await Promise.all([...broadcastPromises, ...ackPromises]); + broadcastPromises = ackPromises = []; + } } - await Promise.all(promises); + await Promise.all([...broadcastPromises, ...ackPromises]); } async ackMessages(messagesIds: number[]) { diff --git a/src/version.ts b/src/version.ts index 5b4ee79..44ecc96 100644 --- a/src/version.ts +++ b/src/version.ts @@ -1 +1 @@ -export const AGENT_VERSION = '2.1.1'; +export const AGENT_VERSION = '2.1.2';