Skip to content

Commit

Permalink
WLT-651 batched broadcasting to mag
Browse files Browse the repository at this point in the history
  • Loading branch information
nirfireblocks authored and nadav-fireblocks committed Aug 12, 2024
1 parent 30e60b8 commit 21c7b5f
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 6 deletions.
1 change: 1 addition & 0 deletions .env.prod
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ CUSTOMER_SERVER_URL="http://localhost:5000/api"
CUSTOMER_SERVER_AUTHORIZATION=""
CUSTOMER_SERVER_PULL_CADENCE_MS=30000
AGENT_REQUESTS_CACHE_SIZE=1024
BROADCAST_BATCH_SIZE=30
1 change: 1 addition & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ export const MOBILE_GATEWAY_URL = process.env.MOBILE_GATEWAY_URL;
export const CUSTOMER_SERVER_URL = process.env.CUSTOMER_SERVER_URL;
export const CUSTOMER_SERVER_PULL_CADENCE_MS = Number(process.env.CUSTOMER_SERVER_PULL_CADENCE_MS ?? 30000);
export const AGENT_REQUESTS_CACHE_SIZE = Number(process.env.AGENT_REQUESTS_CACHE_SIZE ?? 2048);
export const BROADCAST_BATCH_SIZE = Number(process.env.BROADCAST_BATCH_SIZE ?? 30);
export const CUSTOMER_SERVER_AUTHORIZATION = process.env.CUSTOMER_SERVER_AUTHORIZATION;
export const TOKEN_PATH = `${__dirname}/.fireblocks-refresh-token`;
export const SSL_CERT_PATH = process.env.SSL_CERT_PATH;
16 changes: 11 additions & 5 deletions src/services/messages.service.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import https from 'https';
import { AGENT_REQUESTS_CACHE_SIZE } from '../constants';
import { AGENT_REQUESTS_CACHE_SIZE, BROADCAST_BATCH_SIZE } from '../constants';
import { DecodedMessage, ExtendedMessageStatusCache, FBMessageEnvelope, RequestType } from '../types';
import { decodeAndVerifyMessage } from '../utils/messages-utils';
import customerServerApi from './customer-server.api';
Expand Down Expand Up @@ -121,7 +121,8 @@ class MessageService implements IMessageService {
}

async updateStatus(messagesStatus: ExtendedMessageStatusCache[]) {
let promises = [];
let broadcastPromises = [];
let ackPromises = [];
for (const msgStatus of messagesStatus) {
try {
const { msgId, request, messageStatus } = msgStatus;
Expand Down Expand Up @@ -150,13 +151,13 @@ class MessageService implements IMessageService {
} message with final status: ${status}, msgId ${latestMsgId}, cacheId: ${requestId}`,
);
// broadcast always and ack only if we have a valid msgId
promises.push(
broadcastPromises.push(
fbServerApi
.broadcastResponse(messageStatus, request)
.then(() => (this.msgCache[messageStatus.requestId].messageStatus = messageStatus)),
);
if (latestMsgId) {
promises.push(fbServerApi.ackMessage(latestMsgId));
ackPromises.push(fbServerApi.ackMessage(latestMsgId));
}
}
} catch (e) {
Expand All @@ -166,8 +167,13 @@ class MessageService implements IMessageService {
)}. Error: ${e.message}`,
);
}
if (broadcastPromises.length >= BROADCAST_BATCH_SIZE) {
logger.info(`Awaiting ack for broadcasting ${broadcastPromises.length} messages`);
await Promise.all([...broadcastPromises, ...ackPromises]);
broadcastPromises = ackPromises = [];
}
}
await Promise.all(promises);
await Promise.all([...broadcastPromises, ...ackPromises]);
}

async ackMessages(messagesIds: number[]) {
Expand Down
2 changes: 1 addition & 1 deletion src/version.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export const AGENT_VERSION = '2.1.1';
export const AGENT_VERSION = '2.1.2';

0 comments on commit 21c7b5f

Please sign in to comment.