From cb8f0e1569e2f7c9f022168bd92b3af0a1609a7d Mon Sep 17 00:00:00 2001 From: Cfp Date: Thu, 4 Apr 2024 17:49:20 +0200 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=E2=80=87Sharding=20with=20Redis=20Pub?= =?UTF-8?q?/Sub?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bun.lockb | Bin 6496 -> 6890 bytes devices.ts | 82 +++++++++++++++++++++++++++++++++++++++++++++--- index.ts | 10 ++---- package.json | 1 + redis/pubsub.ts | 37 ++++++++++++++++++++++ redis/redis.ts | 76 ++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 195 insertions(+), 11 deletions(-) create mode 100644 redis/pubsub.ts create mode 100644 redis/redis.ts diff --git a/bun.lockb b/bun.lockb index 38a97da809e61e0bb57f127304233ff00459440a..b96cdf44bf237aeaf9d407f0770c696581717ab1 100755 GIT binary patch delta 1364 zcmbtUT}%{L6ux(6mfe|kV0Kwxy9fqtRK#6immdvrR{~a-EsYZ6R#Ym;T4^ap780bf zY!llC?E~icAT~xLV34$G<;4~f{HX>E4``akXk#?KfW`+jn))F2oZXp(CiKZOx%a!@ zoHOUloS%8*eOlb($z7E-e8bs0@u23nsV_z&`*J#u{#DVoJa%HpH5$CtIJB(R#o}T+ z7GBa-ZA*~Ep21kGbAMx;Y`{God26JxDT?P$IAeC?S%@m)!TRRrrf3_RMZO66V?+(H zT-TQ%T9L2P^)?;z&=Oz?nq1WmW1DL%=}xFE(?!c}hJ4?z|J!fp;#f}amSw-DfOVFGXY1W-gj zk3vuk@Db?7{WVO80l_VTBKd{AB2-C!ej2)^fIK5IR*ddB+!0EJ6%r^WKW~AcsjNH+ zqtPOnk95W?F#Jf$4nq%=jkx>=;ihRx7g;%UEg|tVof}B>y`!NIicbEi(1ch+cwmyL zjQ{@*a4>1|&IjqO7Y$}1HPt61%oc`h2`;5Nl{NYVbfm%E)JkwV z3S14CM*7DbUQ;~DJCf%Hb(0zwG&sIYZ*OTj&>nHO)PEI;x<8M!L|W_H_P4^8^h}s` Z0cmX7kU5y delta 1033 zcmb_bO-NKx6u$SpH{-k+pU%%yXVmg1NyHkRk(mT@#-KtSf~3XMp&ulr3?oQG(}lB; zbYY;JO$rI&!h(`uTIfQQLKwlKT4WP05?uAO>IV&fI%nP!TDa^yzWd#C?!Di=_uTWo z1U~ymy=6CwhW}i=P!S8-76zC4mX0;mzx=S(dT#H^_PSP@xc~mGo2AS?rWFjFx9g7H z<}eoPyVaI*Znw8}bYX7;;ly|xSwX(r+S%FB)y;-DW9jJkAfrfZ)fitvwxPd>OykoA zmm}w*UxaK$ejzd@kL~0^A)n=EAj!kL8(M_0@SDMe5aKnE6v7<9BZh@#4pA}0Wf&L3 zya|%nj(|rB3$Fx-N+DrefC(wY1CW%$@&F14a03oS;t641gs3URB^Wo=oHF6w1TtbX z0?*3u3UpS6D_JIj>>%dw&|JV795?4Q=(BX4|D*|P=4IGPi*OMdEE?2X6nJKda1(5y zw~5{*Il|N6iLAk6Spmfw!IZREHE6afFpr)MWSa&Sn*uj&5w5@-db8*qN{{f&ct?6a zk8dg^qV<9C>QC*UIcf`1SJ~auZU62cjXA0VW*kAk3*-AKOT)eeAtUf9Q>nw!U>S%y z2R3M?EAsyZ$uAo62^UN{k3zzE9U5J^ z`TIpyC;z%SGgU4M+~@UsD{(=bLCob2E@GFean1AN-n+`O?=%5FjIJQ4UWZ0%8|{Ji zbsDMk1~IzI;El_jzkyx)0j7MPdXHSLTF@t8g^~88>gGL=r+WAh1k@@)4;@G)sftIT z{%_Y_4?DjW>q5S2#@s5Zp*5@UQ#}pAEPv@HW`qtrSRGj5y&1d1#*~Ed_gYWhsSXwT X;JMQc(^+R>%##O)vn%3{vKyqoz%0mx diff --git a/devices.ts b/devices.ts index 04da535..dbd6443 100644 --- a/devices.ts +++ b/devices.ts @@ -1,5 +1,7 @@ import type { ServerWebSocket, Socket } from "bun"; -import { redis } from "."; +import { SERVERNAME, redis } from "./redis/redis"; +import { z } from "zod"; +import { randomUUID } from "crypto"; type Device = { socket: Socket, @@ -8,24 +10,44 @@ type Device = { type Client = { socket: ServerWebSocket, - serials: string[] + serials: string[], + uuid: string } +const RedisSocket = z.object({ + server: z.string() +}) + +const RedisClients = z.array(z.object({ + serials: z.array(z.string()), + server: z.string(), + uuid: z.string() +})) + const devices: {[serial: string]: Device} = {}; const clients: Client[] = []; export async function connectDevice(serial: string, socket: Socket) { devices[serial] = { socket, lastPacket: new Date() }; + await redis.json.set("socket:" + serial, "$", { server: SERVERNAME }); } export async function isDeviceConnected(serial: string) { - return serial in devices; + return serial in devices || await redis.exists("socket:" + serial); } export async function sendPacketToDevice(serial: string, packet: any) { if(serial in devices) { devices[serial].socket.write(JSON.stringify(packet)); return; + } else if(await redis.exists("socket:" + serial)) { + const rSock = await redis.json.get("socket:" + serial); + if(!RedisSocket.safeParse(rSock).success) { + throw new Error("Invalid socket"); + } + const sock = RedisSocket.parse(rSock); + await redis.publish(sock.server, JSON.stringify({ type: "toDevice", serial, packet })); + return; } throw new Error("Device not found"); @@ -35,6 +57,18 @@ export async function sendPacketToClients(serial: string, packet: any) { for(const client of clients.filter(client => client.serials.includes(serial))) { client.socket.send(JSON.stringify(packet)); } + + { + const rClients = await redis.json.get("clients"); + if(!RedisClients.safeParse(rClients).success) { + return; + } + const clients = RedisClients.parse(rClients); + for(const socket of clients.filter(socket => socket.serials.includes(serial))) { + if(socket.server === SERVERNAME) continue; + await redis.publish(socket.server, JSON.stringify({ type: "toClient", serial, packet })); + } + } } export async function getRegisteredDevice(serial: string) { @@ -46,10 +80,24 @@ export async function disconnectClients(serial: string) { for(const client of clients.filter(client => client.serials.includes(serial))) { client.socket.close(); } + + if(await redis.exists("clients")) { + const rClients = await redis.json.get("clients"); + if(!RedisClients.safeParse(rClients).success) { + return; + } + const clients = RedisClients.parse(rClients); + for(const socket of clients.filter(socket => socket.serials.includes(serial))) { + if(socket.server === SERVERNAME) continue; + await redis.publish(socket.server, JSON.stringify({ type: "disconnectClients", serial })); + } + await redis.json.set("clients", "$", clients.filter(socket => !socket.serials.includes(serial))); + } } export async function deleteDevice(serial: string) { delete devices[serial]; + await redis.json.del("socket:" + serial); } export async function resetLastPacket(serial: string) { @@ -63,7 +111,9 @@ export async function getSerialFromSocket(socket: Socket) { } export async function registerClient(socket: ServerWebSocket) { - clients.push({ socket, serials: [] }); + const uuid = randomUUID(); + clients.push({ socket, serials: [], uuid }); + await redis.json.arrAppend("clients", "$", { serials: [], server: SERVERNAME, uuid }); } export async function addSerialToClient(socket: ServerWebSocket, serial: string) { @@ -71,6 +121,16 @@ export async function addSerialToClient(socket: ServerWebSocket, serial if(!client) throw new Error("Client not found"); if(!await isDeviceConnected(serial)) throw new Error("Device not connected"); client.serials.push(serial); + const uuid = client.uuid; + const rClients = await redis.json.get("clients"); + if(!RedisClients.safeParse(rClients).success) { + throw new Error("Invalid clients"); + } + const dbClients = RedisClients.parse(rClients); + const dbClient = dbClients.find(client => client.uuid === uuid); + if(!dbClient) throw new Error("Client not found"); + dbClient.serials.push(serial); + await redis.json.set("clients", "$", dbClients); } export async function getSerialsFromClient(socket: ServerWebSocket) { @@ -91,3 +151,17 @@ setInterval(() => { } } }); + +process.on("beforeexit", async () => { + await redis.del("server:" + SERVERNAME); + + for(const socket of Object.keys(devices)) { + await redis.json.del("socket:" + socket); + } + let rClients = await redis.json.get("clients"); + if(!RedisClients.safeParse(rClients).success) { + return; + } + const clients = RedisClients.parse(rClients); + await redis.json.set("clients", "$", clients.filter((client) => client.server !== SERVERNAME)); +}); diff --git a/index.ts b/index.ts index 03c7553..b83bfc7 100644 --- a/index.ts +++ b/index.ts @@ -1,15 +1,11 @@ import { createClient as createRedisClient } from "redis"; import { z } from "zod"; import { addSerialToClient, connectDevice, deleteDevice, disconnectClients, getRegisteredDevice, getSerialFromSocket, getSerialsFromClient, isDeviceConnected, registerClient, resetLastPacket, sendPacketToClients, sendPacketToDevice } from "./devices"; - -export const redis = createRedisClient({ - url: "redis://redis:6379" -}); -await redis.connect(); +import { redis } from "./redis/redis"; Bun.listen({ hostname: "0.0.0.0", - port: 2737, + port: parseInt(process.env.SERVER_PORT!) || 2737, socket: { async data(socket, data) { try { @@ -65,7 +61,7 @@ Bun.listen({ }) Bun.serve({ - port: 8080, + port: parseInt(process.env.WS_PORT!) || 8080, async fetch(req, server) { const url = new URL(req.url); if(url.pathname === "/") { diff --git a/package.json b/package.json index 88947d6..8aae8f0 100644 --- a/package.json +++ b/package.json @@ -11,6 +11,7 @@ "dependencies": { "@total-typescript/ts-reset": "^0.5.1", "redis": "^4.6.12", + "unique-names-generator": "^4.7.1", "zod": "^3.22.4" } } \ No newline at end of file diff --git a/redis/pubsub.ts b/redis/pubsub.ts new file mode 100644 index 0000000..bb46a66 --- /dev/null +++ b/redis/pubsub.ts @@ -0,0 +1,37 @@ +import type { RedisClientType, RedisModules, RedisFunctions, RedisScripts } from "redis"; +import z from "zod"; + +export async function pubSub(redis: RedisClientType) { + const sub = redis.duplicate(); + await sub.connect(); + const pub = redis.duplicate(); + await pub.connect(); + // return { sub, pub }; + return { + subscribe(channel: string, callback: (message: string, channel: string) => void) { + sub.subscribe(channel, callback); + }, + publish(channel: string, message: string) { + pub.publish(channel, message); + } + }; +} + +export const PubSubToClientPacket = z.object({ + type: z.literal("toClient"), + serial: z.string(), + data: z.any() +}); + +export const PubSubToDevicePacket = z.object({ + type: z.literal("toDevice"), + serial: z.string(), + data: z.any() +}); + +export const PubSubDisconnectClientsPacket = z.object({ + type: z.literal("disconnectClients"), + serial: z.string() +}); + +export const PubSubPacket = z.union([PubSubToClientPacket, PubSubToDevicePacket, PubSubDisconnectClientsPacket]); \ No newline at end of file diff --git a/redis/redis.ts b/redis/redis.ts new file mode 100644 index 0000000..d04120c --- /dev/null +++ b/redis/redis.ts @@ -0,0 +1,76 @@ +import { adjectives, animals, uniqueNamesGenerator } from "unique-names-generator"; +import { createClient } from "redis"; +import { pubSub } from "./pubsub"; +import { z } from "zod"; +import { deleteDevice, disconnectClients, sendPacketToClients, sendPacketToDevice } from "../devices"; + +export const SERVERNAME = uniqueNamesGenerator({ + dictionaries: [ + adjectives, animals + ], + separator: "-" +}); +console.log("I am " + SERVERNAME); + +export const redis = createClient({ + url: "redis://redis:6379" +}); +await redis.connect(); +await ensureKeys(); +await ping(); + +export const { subscribe, publish } = await pubSub(redis); + +async function ensureKeys() { + if(!await redis.exists("newsletter")) { + await redis.json.set("newsletter", "$", []); + } + if(!await redis.exists("clients")) { + await redis.json.set("clients", "$", []); + } +} + +async function ping() { + await redis.json.set("server:" + SERVERNAME, "$", { lastPing: Date.now() }); + await redis.expire("server:" + SERVERNAME, 70); +} + +setInterval(() => { + ping(); +}, 60000) + +const PubSubToDevicePacket = z.object({ + type: z.literal("toDevice"), + serial: z.string(), + packet: z.any() +}); + +const PubSubToClientPacket = z.object({ + type: z.literal("toClient"), + serial: z.string(), + packet: z.any() +}); + +const PubSubDisconnectClientsPacket = z.object({ + type: z.literal("disconnectClients"), + serial: z.string() +}); + +const PubSubPacket = z.union([PubSubToDevicePacket, PubSubToClientPacket, PubSubDisconnectClientsPacket]); + +subscribe(SERVERNAME, async (message, channel) => { + console.log("[PUBSUB] " + message); + + const packet = PubSubPacket.parse(JSON.parse(message)); + if(PubSubToDevicePacket.safeParse(packet).success) { + const { serial, packet: data } = PubSubToDevicePacket.parse(packet); + sendPacketToDevice(serial, data); + } else if(PubSubToClientPacket.safeParse(packet).success) { + const { serial, packet: data } = PubSubToClientPacket.parse(packet); + sendPacketToClients(serial, data); + } else if(PubSubDisconnectClientsPacket.safeParse(packet).success) { + const { serial } = PubSubDisconnectClientsPacket.parse(packet); + disconnectClients(serial); + deleteDevice(serial); + } +});