diff --git a/src/lib/queue.ts b/src/lib/queue.ts new file mode 100644 index 0000000..674fbaa --- /dev/null +++ b/src/lib/queue.ts @@ -0,0 +1,24 @@ +const queue: (() => Promise<unknown>)[] = []; + +export async function consumeQueue(batchSize = 2) { + console.log('consume queue'); + + const ts = queue.splice(0, batchSize); + if (ts.length > 0) { + console.log('got from queue', ts.length); + + await Promise.all(ts.map((t) => t())); + } + + setTimeout(() => consumeQueue(batchSize), 1000); +} + +export function queueTask(task: () => Promise<unknown>) { + console.log('queue task'); + return new Promise((resolve) => { + queue.push(async () => { + await task(); + resolve(true); + }); + }); +} diff --git a/src/profile.ts b/src/profile.ts index 4b130cd..ef09148 100644 --- a/src/profile.ts +++ b/src/profile.ts @@ -8,6 +8,11 @@ import { getRandomInt, rand } from './lib/rand'; import { getClients } from './macros/getClients'; // import { joinRooms } from './macros/joinRooms'; import { populateDatabase, isFullPopulation } from './populate'; +import { consumeQueue, queueTask } from './lib/queue'; + +const queueLogin = async (client: Client) => { + await queueTask(async () => client.login()); +}; export default (): void => { let clients: Client[]; @@ -15,7 +20,8 @@ export default (): void => { async function getLoggedInClient() { const client = rand(clients); if (!client.loggedIn) { - await client.login(); + // await client.login(); + await queueLogin(client); } return client; } @@ -86,6 +92,8 @@ export default (): void => { b.on('ready', async () => { console.log('Starting sending messages'); + + await consumeQueue(config.LOGIN_BATCH); }); b.on('message', async () => {