diff --git a/bun.lockb b/bun.lockb index 38a97da..b96cdf4 100755 Binary files a/bun.lockb and b/bun.lockb differ diff --git a/devices.ts b/devices.ts index 04da535..dbd6443 100644 --- a/devices.ts +++ b/devices.ts @@ -1,5 +1,7 @@ import type { ServerWebSocket, Socket } from "bun"; -import { redis } from "."; +import { SERVERNAME, redis } from "./redis/redis"; +import { z } from "zod"; +import { randomUUID } from "crypto"; type Device = { socket: Socket, @@ -8,24 +10,44 @@ type Device = { type Client = { socket: ServerWebSocket, - serials: string[] + serials: string[], + uuid: string } +const RedisSocket = z.object({ + server: z.string() +}) + +const RedisClients = z.array(z.object({ + serials: z.array(z.string()), + server: z.string(), + uuid: z.string() +})) + const devices: {[serial: string]: Device} = {}; const clients: Client[] = []; export async function connectDevice(serial: string, socket: Socket) { devices[serial] = { socket, lastPacket: new Date() }; + await redis.json.set("socket:" + serial, "$", { server: SERVERNAME }); } export async function isDeviceConnected(serial: string) { - return serial in devices; + return serial in devices || await redis.exists("socket:" + serial); } export async function sendPacketToDevice(serial: string, packet: any) { if(serial in devices) { devices[serial].socket.write(JSON.stringify(packet)); return; + } else if(await redis.exists("socket:" + serial)) { + const rSock = await redis.json.get("socket:" + serial); + if(!RedisSocket.safeParse(rSock).success) { + throw new Error("Invalid socket"); + } + const sock = RedisSocket.parse(rSock); + await redis.publish(sock.server, JSON.stringify({ type: "toDevice", serial, packet })); + return; } throw new Error("Device not found"); @@ -35,6 +57,18 @@ export async function sendPacketToClients(serial: string, packet: any) { for(const client of clients.filter(client => client.serials.includes(serial))) { client.socket.send(JSON.stringify(packet)); } + + { + const rClients = await redis.json.get("clients"); + if(!RedisClients.safeParse(rClients).success) { + return; + } + const clients = RedisClients.parse(rClients); + for(const socket of clients.filter(socket => socket.serials.includes(serial))) { + if(socket.server === SERVERNAME) continue; + await redis.publish(socket.server, JSON.stringify({ type: "toClient", serial, packet })); + } + } } export async function getRegisteredDevice(serial: string) { @@ -46,10 +80,24 @@ export async function disconnectClients(serial: string) { for(const client of clients.filter(client => client.serials.includes(serial))) { client.socket.close(); } + + if(await redis.exists("clients")) { + const rClients = await redis.json.get("clients"); + if(!RedisClients.safeParse(rClients).success) { + return; + } + const clients = RedisClients.parse(rClients); + for(const socket of clients.filter(socket => socket.serials.includes(serial))) { + if(socket.server === SERVERNAME) continue; + await redis.publish(socket.server, JSON.stringify({ type: "disconnectClients", serial })); + } + await redis.json.set("clients", "$", clients.filter(socket => !socket.serials.includes(serial))); + } } export async function deleteDevice(serial: string) { delete devices[serial]; + await redis.json.del("socket:" + serial); } export async function resetLastPacket(serial: string) { @@ -63,7 +111,9 @@ export async function getSerialFromSocket(socket: Socket) { } export async function registerClient(socket: ServerWebSocket) { - clients.push({ socket, serials: [] }); + const uuid = randomUUID(); + clients.push({ socket, serials: [], uuid }); + await redis.json.arrAppend("clients", "$", { serials: [], server: SERVERNAME, uuid }); } export async function addSerialToClient(socket: ServerWebSocket, serial: string) { @@ -71,6 +121,16 @@ export async function addSerialToClient(socket: ServerWebSocket, serial if(!client) throw new Error("Client not found"); if(!await isDeviceConnected(serial)) throw new Error("Device not connected"); client.serials.push(serial); + const uuid = client.uuid; + const rClients = await redis.json.get("clients"); + if(!RedisClients.safeParse(rClients).success) { + throw new Error("Invalid clients"); + } + const dbClients = RedisClients.parse(rClients); + const dbClient = dbClients.find(client => client.uuid === uuid); + if(!dbClient) throw new Error("Client not found"); + dbClient.serials.push(serial); + await redis.json.set("clients", "$", dbClients); } export async function getSerialsFromClient(socket: ServerWebSocket) { @@ -91,3 +151,17 @@ setInterval(() => { } } }); + +process.on("beforeexit", async () => { + await redis.del("server:" + SERVERNAME); + + for(const socket of Object.keys(devices)) { + await redis.json.del("socket:" + socket); + } + let rClients = await redis.json.get("clients"); + if(!RedisClients.safeParse(rClients).success) { + return; + } + const clients = RedisClients.parse(rClients); + await redis.json.set("clients", "$", clients.filter((client) => client.server !== SERVERNAME)); +}); diff --git a/index.ts b/index.ts index 03c7553..b83bfc7 100644 --- a/index.ts +++ b/index.ts @@ -1,15 +1,11 @@ import { createClient as createRedisClient } from "redis"; import { z } from "zod"; import { addSerialToClient, connectDevice, deleteDevice, disconnectClients, getRegisteredDevice, getSerialFromSocket, getSerialsFromClient, isDeviceConnected, registerClient, resetLastPacket, sendPacketToClients, sendPacketToDevice } from "./devices"; - -export const redis = createRedisClient({ - url: "redis://redis:6379" -}); -await redis.connect(); +import { redis } from "./redis/redis"; Bun.listen({ hostname: "0.0.0.0", - port: 2737, + port: parseInt(process.env.SERVER_PORT!) || 2737, socket: { async data(socket, data) { try { @@ -65,7 +61,7 @@ Bun.listen({ }) Bun.serve({ - port: 8080, + port: parseInt(process.env.WS_PORT!) || 8080, async fetch(req, server) { const url = new URL(req.url); if(url.pathname === "/") { diff --git a/package.json b/package.json index 88947d6..8aae8f0 100644 --- a/package.json +++ b/package.json @@ -11,6 +11,7 @@ "dependencies": { "@total-typescript/ts-reset": "^0.5.1", "redis": "^4.6.12", + "unique-names-generator": "^4.7.1", "zod": "^3.22.4" } } \ No newline at end of file diff --git a/redis/pubsub.ts b/redis/pubsub.ts new file mode 100644 index 0000000..bb46a66 --- /dev/null +++ b/redis/pubsub.ts @@ -0,0 +1,37 @@ +import type { RedisClientType, RedisModules, RedisFunctions, RedisScripts } from "redis"; +import z from "zod"; + +export async function pubSub(redis: RedisClientType) { + const sub = redis.duplicate(); + await sub.connect(); + const pub = redis.duplicate(); + await pub.connect(); + // return { sub, pub }; + return { + subscribe(channel: string, callback: (message: string, channel: string) => void) { + sub.subscribe(channel, callback); + }, + publish(channel: string, message: string) { + pub.publish(channel, message); + } + }; +} + +export const PubSubToClientPacket = z.object({ + type: z.literal("toClient"), + serial: z.string(), + data: z.any() +}); + +export const PubSubToDevicePacket = z.object({ + type: z.literal("toDevice"), + serial: z.string(), + data: z.any() +}); + +export const PubSubDisconnectClientsPacket = z.object({ + type: z.literal("disconnectClients"), + serial: z.string() +}); + +export const PubSubPacket = z.union([PubSubToClientPacket, PubSubToDevicePacket, PubSubDisconnectClientsPacket]); \ No newline at end of file diff --git a/redis/redis.ts b/redis/redis.ts new file mode 100644 index 0000000..d04120c --- /dev/null +++ b/redis/redis.ts @@ -0,0 +1,76 @@ +import { adjectives, animals, uniqueNamesGenerator } from "unique-names-generator"; +import { createClient } from "redis"; +import { pubSub } from "./pubsub"; +import { z } from "zod"; +import { deleteDevice, disconnectClients, sendPacketToClients, sendPacketToDevice } from "../devices"; + +export const SERVERNAME = uniqueNamesGenerator({ + dictionaries: [ + adjectives, animals + ], + separator: "-" +}); +console.log("I am " + SERVERNAME); + +export const redis = createClient({ + url: "redis://redis:6379" +}); +await redis.connect(); +await ensureKeys(); +await ping(); + +export const { subscribe, publish } = await pubSub(redis); + +async function ensureKeys() { + if(!await redis.exists("newsletter")) { + await redis.json.set("newsletter", "$", []); + } + if(!await redis.exists("clients")) { + await redis.json.set("clients", "$", []); + } +} + +async function ping() { + await redis.json.set("server:" + SERVERNAME, "$", { lastPing: Date.now() }); + await redis.expire("server:" + SERVERNAME, 70); +} + +setInterval(() => { + ping(); +}, 60000) + +const PubSubToDevicePacket = z.object({ + type: z.literal("toDevice"), + serial: z.string(), + packet: z.any() +}); + +const PubSubToClientPacket = z.object({ + type: z.literal("toClient"), + serial: z.string(), + packet: z.any() +}); + +const PubSubDisconnectClientsPacket = z.object({ + type: z.literal("disconnectClients"), + serial: z.string() +}); + +const PubSubPacket = z.union([PubSubToDevicePacket, PubSubToClientPacket, PubSubDisconnectClientsPacket]); + +subscribe(SERVERNAME, async (message, channel) => { + console.log("[PUBSUB] " + message); + + const packet = PubSubPacket.parse(JSON.parse(message)); + if(PubSubToDevicePacket.safeParse(packet).success) { + const { serial, packet: data } = PubSubToDevicePacket.parse(packet); + sendPacketToDevice(serial, data); + } else if(PubSubToClientPacket.safeParse(packet).success) { + const { serial, packet: data } = PubSubToClientPacket.parse(packet); + sendPacketToClients(serial, data); + } else if(PubSubDisconnectClientsPacket.safeParse(packet).success) { + const { serial } = PubSubDisconnectClientsPacket.parse(packet); + disconnectClients(serial); + deleteDevice(serial); + } +});