Skip to content

Commit

Permalink
✨ Sharding with Redis Pub/Sub
Browse files Browse the repository at this point in the history
  • Loading branch information
cfpwastaken committed Apr 4, 2024
1 parent c292ebd commit cb8f0e1
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 11 deletions.
Binary file modified bun.lockb
Binary file not shown.
82 changes: 78 additions & 4 deletions devices.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import type { ServerWebSocket, Socket } from "bun";
import { redis } from ".";
import { SERVERNAME, redis } from "./redis/redis";
import { z } from "zod";
import { randomUUID } from "crypto";

type Device = {
socket: Socket,
Expand All @@ -8,24 +10,44 @@ type Device = {

type Client = {
socket: ServerWebSocket<unknown>,
serials: string[]
serials: string[],
uuid: string
}

const RedisSocket = z.object({
server: z.string()
})

const RedisClients = z.array(z.object({
serials: z.array(z.string()),
server: z.string(),
uuid: z.string()
}))

const devices: {[serial: string]: Device} = {};
const clients: Client[] = [];

export async function connectDevice(serial: string, socket: Socket) {
devices[serial] = { socket, lastPacket: new Date() };
await redis.json.set("socket:" + serial, "$", { server: SERVERNAME });
}

export async function isDeviceConnected(serial: string) {
return serial in devices;
return serial in devices || await redis.exists("socket:" + serial);
}

export async function sendPacketToDevice(serial: string, packet: any) {
if(serial in devices) {
devices[serial].socket.write(JSON.stringify(packet));
return;
} else if(await redis.exists("socket:" + serial)) {
const rSock = await redis.json.get("socket:" + serial);
if(!RedisSocket.safeParse(rSock).success) {
throw new Error("Invalid socket");
}
const sock = RedisSocket.parse(rSock);
await redis.publish(sock.server, JSON.stringify({ type: "toDevice", serial, packet }));
return;
}

throw new Error("Device not found");
Expand All @@ -35,6 +57,18 @@ export async function sendPacketToClients(serial: string, packet: any) {
for(const client of clients.filter(client => client.serials.includes(serial))) {
client.socket.send(JSON.stringify(packet));
}

{
const rClients = await redis.json.get("clients");
if(!RedisClients.safeParse(rClients).success) {
return;
}
const clients = RedisClients.parse(rClients);
for(const socket of clients.filter(socket => socket.serials.includes(serial))) {
if(socket.server === SERVERNAME) continue;
await redis.publish(socket.server, JSON.stringify({ type: "toClient", serial, packet }));
}
}
}

export async function getRegisteredDevice(serial: string) {
Expand All @@ -46,10 +80,24 @@ export async function disconnectClients(serial: string) {
for(const client of clients.filter(client => client.serials.includes(serial))) {
client.socket.close();
}

if(await redis.exists("clients")) {
const rClients = await redis.json.get("clients");
if(!RedisClients.safeParse(rClients).success) {
return;
}
const clients = RedisClients.parse(rClients);
for(const socket of clients.filter(socket => socket.serials.includes(serial))) {
if(socket.server === SERVERNAME) continue;
await redis.publish(socket.server, JSON.stringify({ type: "disconnectClients", serial }));
}
await redis.json.set("clients", "$", clients.filter(socket => !socket.serials.includes(serial)));
}
}

export async function deleteDevice(serial: string) {
delete devices[serial];
await redis.json.del("socket:" + serial);
}

export async function resetLastPacket(serial: string) {
Expand All @@ -63,14 +111,26 @@ export async function getSerialFromSocket(socket: Socket) {
}

export async function registerClient(socket: ServerWebSocket<unknown>) {
clients.push({ socket, serials: [] });
const uuid = randomUUID();
clients.push({ socket, serials: [], uuid });
await redis.json.arrAppend("clients", "$", { serials: [], server: SERVERNAME, uuid });
}

export async function addSerialToClient(socket: ServerWebSocket<unknown>, serial: string) {
const client = clients.find(client => client.socket === socket);
if(!client) throw new Error("Client not found");
if(!await isDeviceConnected(serial)) throw new Error("Device not connected");
client.serials.push(serial);
const uuid = client.uuid;
const rClients = await redis.json.get("clients");
if(!RedisClients.safeParse(rClients).success) {
throw new Error("Invalid clients");
}
const dbClients = RedisClients.parse(rClients);
const dbClient = dbClients.find(client => client.uuid === uuid);
if(!dbClient) throw new Error("Client not found");
dbClient.serials.push(serial);
await redis.json.set("clients", "$", dbClients);
}

export async function getSerialsFromClient(socket: ServerWebSocket<unknown>) {
Expand All @@ -91,3 +151,17 @@ setInterval(() => {
}
}
});

process.on("beforeexit", async () => {
await redis.del("server:" + SERVERNAME);

for(const socket of Object.keys(devices)) {
await redis.json.del("socket:" + socket);
}
let rClients = await redis.json.get("clients");
if(!RedisClients.safeParse(rClients).success) {
return;
}
const clients = RedisClients.parse(rClients);
await redis.json.set("clients", "$", clients.filter((client) => client.server !== SERVERNAME));
});
10 changes: 3 additions & 7 deletions index.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
import { createClient as createRedisClient } from "redis";
import { z } from "zod";
import { addSerialToClient, connectDevice, deleteDevice, disconnectClients, getRegisteredDevice, getSerialFromSocket, getSerialsFromClient, isDeviceConnected, registerClient, resetLastPacket, sendPacketToClients, sendPacketToDevice } from "./devices";

export const redis = createRedisClient({
url: "redis://redis:6379"
});
await redis.connect();
import { redis } from "./redis/redis";

Bun.listen({
hostname: "0.0.0.0",
port: 2737,
port: parseInt(process.env.SERVER_PORT!) || 2737,
socket: {
async data(socket, data) {
try {
Expand Down Expand Up @@ -65,7 +61,7 @@ Bun.listen({
})

Bun.serve({
port: 8080,
port: parseInt(process.env.WS_PORT!) || 8080,
async fetch(req, server) {
const url = new URL(req.url);
if(url.pathname === "/") {
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"dependencies": {
"@total-typescript/ts-reset": "^0.5.1",
"redis": "^4.6.12",
"unique-names-generator": "^4.7.1",
"zod": "^3.22.4"
}
}
37 changes: 37 additions & 0 deletions redis/pubsub.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import type { RedisClientType, RedisModules, RedisFunctions, RedisScripts } from "redis";
import z from "zod";

export async function pubSub(redis: RedisClientType<RedisModules, RedisFunctions, RedisScripts>) {
const sub = redis.duplicate();
await sub.connect();
const pub = redis.duplicate();
await pub.connect();
// return { sub, pub };
return {
subscribe(channel: string, callback: (message: string, channel: string) => void) {
sub.subscribe(channel, callback);
},
publish(channel: string, message: string) {
pub.publish(channel, message);
}
};
}

export const PubSubToClientPacket = z.object({
type: z.literal("toClient"),
serial: z.string(),
data: z.any()
});

export const PubSubToDevicePacket = z.object({
type: z.literal("toDevice"),
serial: z.string(),
data: z.any()
});

export const PubSubDisconnectClientsPacket = z.object({
type: z.literal("disconnectClients"),
serial: z.string()
});

export const PubSubPacket = z.union([PubSubToClientPacket, PubSubToDevicePacket, PubSubDisconnectClientsPacket]);
76 changes: 76 additions & 0 deletions redis/redis.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import { adjectives, animals, uniqueNamesGenerator } from "unique-names-generator";
import { createClient } from "redis";
import { pubSub } from "./pubsub";
import { z } from "zod";
import { deleteDevice, disconnectClients, sendPacketToClients, sendPacketToDevice } from "../devices";

export const SERVERNAME = uniqueNamesGenerator({
dictionaries: [
adjectives, animals
],
separator: "-"
});
console.log("I am " + SERVERNAME);

export const redis = createClient({
url: "redis://redis:6379"
});
await redis.connect();
await ensureKeys();
await ping();

export const { subscribe, publish } = await pubSub(redis);

async function ensureKeys() {
if(!await redis.exists("newsletter")) {
await redis.json.set("newsletter", "$", []);
}
if(!await redis.exists("clients")) {
await redis.json.set("clients", "$", []);
}
}

async function ping() {
await redis.json.set("server:" + SERVERNAME, "$", { lastPing: Date.now() });
await redis.expire("server:" + SERVERNAME, 70);
}

setInterval(() => {
ping();
}, 60000)

const PubSubToDevicePacket = z.object({
type: z.literal("toDevice"),
serial: z.string(),
packet: z.any()
});

const PubSubToClientPacket = z.object({
type: z.literal("toClient"),
serial: z.string(),
packet: z.any()
});

const PubSubDisconnectClientsPacket = z.object({
type: z.literal("disconnectClients"),
serial: z.string()
});

const PubSubPacket = z.union([PubSubToDevicePacket, PubSubToClientPacket, PubSubDisconnectClientsPacket]);

subscribe(SERVERNAME, async (message, channel) => {
console.log("[PUBSUB] " + message);

const packet = PubSubPacket.parse(JSON.parse(message));
if(PubSubToDevicePacket.safeParse(packet).success) {
const { serial, packet: data } = PubSubToDevicePacket.parse(packet);
sendPacketToDevice(serial, data);
} else if(PubSubToClientPacket.safeParse(packet).success) {
const { serial, packet: data } = PubSubToClientPacket.parse(packet);
sendPacketToClients(serial, data);
} else if(PubSubDisconnectClientsPacket.safeParse(packet).success) {
const { serial } = PubSubDisconnectClientsPacket.parse(packet);
disconnectClients(serial);
deleteDevice(serial);
}
});

0 comments on commit cb8f0e1

Please sign in to comment.