Skip to content

Commit

Permalink
refactor(workerd): replace websocket with worker rpc (#141)
Browse files Browse the repository at this point in the history
  • Loading branch information
hi-ogawa authored Oct 27, 2024
1 parent 4b928fd commit 99cff0a
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 130 deletions.
Binary file added examples/react-ssr-workerd/public/favicon.ico
Binary file not shown.
122 changes: 55 additions & 67 deletions packages/workerd/src/plugin.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { readFileSync } from "node:fs";
import { fileURLToPath } from "node:url";
import { DefaultMap, tinyassert } from "@hiogawa/utils";
import type { Fetcher } from "@cloudflare/workers-types/experimental";
import { webToNodeHandler } from "@hiogawa/utils-node";
import {
Miniflare,
Expand All @@ -10,14 +10,14 @@ import {
mergeWorkerOptions,
} from "miniflare";
import {
type CustomPayload,
DevEnvironment,
type HotChannel,
type HotPayload,
type Plugin,
type ResolvedConfig,
} from "vite";
import type { SourcelessWorkerOptions } from "wrangler";
import { ANY_URL, type FetchMetadata, RUNNER_INIT_PATH } from "./shared";
import { type FetchMetadata, type RunnerRpc } from "./shared";

interface WorkerdPluginOptions extends WorkerdEnvironmentOptions {
entry?: string;
Expand Down Expand Up @@ -119,12 +119,17 @@ export async function createWorkerdDevEnvironment(
const args = await request.json();
try {
const result = await devEnv.fetchModule(...(args as [any, any]));
return new MiniflareResponse(JSON.stringify(result));
return MiniflareResponse.json(result);
} catch (error) {
console.error("[fetchModule]", args, error);
throw error;
}
},
__viteRunnerSend: async (request) => {
const payload = (await request.json()) as HotPayload;
hotListener.dispatch(payload);
return MiniflareResponse.json(null);
},
},
bindings: {
__viteRoot: config.root,
Expand Down Expand Up @@ -159,30 +164,23 @@ export async function createWorkerdDevEnvironment(

// get durable object singleton
const ns = await miniflare.getDurableObjectNamespace("__viteRunner");
const runnerObject = ns.get(ns.idFromName(""));
const runnerObject = ns.get(ns.idFromName("")) as any as Fetcher & RunnerRpc;

// initial request to setup websocket
const initResponse = await runnerObject.fetch(ANY_URL + RUNNER_INIT_PATH, {
headers: {
Upgrade: "websocket",
},
});
tinyassert(initResponse.webSocket);
const { webSocket } = initResponse;
webSocket.accept();
// init via rpc
await runnerObject.__viteInit();

// websocket hmr channgel
const hot = createSimpleHMRChannel({
post: (data) => webSocket.send(data),
on: (listener) => {
webSocket.addEventListener("message", listener);
return () => {
webSocket.removeEventListener("message", listener);
};
// hmr channel
const hotListener = createHotListenerManager();
const hot: HotChannel = {
listen: () => {},
close: () => {},
on: hotListener.on,
off: hotListener.off,
send(...args: any[]) {
const payload = normalizeServerSendPayload(...args);
runnerObject.__viteServerSend(payload);
},
serialize: (v) => JSON.stringify(v),
deserialize: (v) => JSON.parse(v.data),
});
};

// TODO: move initialization code to `init`?
// inheritance to extend dispose
Expand All @@ -204,69 +202,59 @@ export async function createWorkerdDevEnvironment(
"x-vite-fetch",
JSON.stringify({ entry } satisfies FetchMetadata),
);
const fetch_ = runnerObject.fetch as any as typeof fetch; // fix web/undici types
const res = await fetch_(request.url, {
const res = await runnerObject.fetch(request.url, {
method: request.method,
headers,
body: request.body as any,
redirect: "manual",
// @ts-ignore undici
duplex: "half",
});
return new Response(res.body, {
return new Response(res.body as any, {
status: res.status,
statusText: res.statusText,
headers: res.headers,
headers: res.headers as any,
});
},
};

return Object.assign(devEnv, { api }) as WorkerdDevEnvironment;
}

// cf.
// https://github.com/vitejs/vite/blob/feat/environment-api/packages/vite/src/node/server/hmr.ts/#L909-L910
// https://github.com/vitejs/vite/blob/feat/environment-api/packages/vite/src/node/ssr/runtime/serverHmrConnector.ts/#L33-L34
function createSimpleHMRChannel(options: {
post: (data: any) => any;
on: (listener: (data: any) => void) => () => void;
serialize: (v: any) => any;
deserialize: (v: any) => any;
}): HotChannel {
const listerMap = new DefaultMap<string, Set<Function>>(() => new Set());
let dispose: (() => void) | undefined;
// wrapper to simplify listener management
function createHotListenerManager(): Pick<HotChannel, "on" | "off"> & {
dispatch: (payload: HotPayload) => void;
} {
const listerMap: Record<string, Set<Function>> = {};
const getListerMap = (e: string) => (listerMap[e] ??= new Set());

return {
listen() {
dispose = options.on((data) => {
const payload = options.deserialize(data) as CustomPayload;
for (const f of listerMap.get(payload.event)) {
f(payload.data);
}
});
},
close() {
dispose?.();
dispose = undefined;
on(event: string, listener: Function) {
getListerMap(event).add(listener);
},
on(event: string, listener: (...args: any[]) => any) {
listerMap.get(event).add(listener);
off(event, listener: any) {
getListerMap(event).delete(listener);
},
off(event: string, listener: (...args: any[]) => any) {
listerMap.get(event).delete(listener);
},
send(...args: any[]) {
let payload: any;
if (typeof args[0] === "string") {
payload = {
type: "custom",
event: args[0],
data: args[1],
};
} else {
payload = args[0];
dispatch(payload) {
if (payload.type === "custom") {
for (const lister of getListerMap(payload.event)) {
lister(payload.data);
}
}
options.post(options.serialize(payload));
},
};
}

function normalizeServerSendPayload(...args: any[]) {
let payload: HotPayload;
if (typeof args[0] === "string") {
payload = {
type: "custom",
event: args[0],
data: args[1],
};
} else {
payload = args[0];
}
return payload;
}
18 changes: 17 additions & 1 deletion packages/workerd/src/shared.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export const RUNNER_INIT_PATH = "/__viteInit";
import type { HotPayload } from "vite";

export const ANY_URL = "https://any.local";

export type RunnerEnv = {
Expand All @@ -9,9 +10,24 @@ export type RunnerEnv = {
__viteFetchModule: {
fetch: (request: Request) => Promise<Response>;
};
__viteRunnerSend: {
fetch: (request: Request) => Promise<Response>;
};
__viteRunner: DurableObject;
};

export type RunnerRpc = {
__viteInit: () => Promise<void>;
__viteServerSend: (payload: HotPayload) => Promise<void>;
};

export type FetchMetadata = {
entry: string;
};

export function requestJson(data: unknown) {
return new Request(ANY_URL, {
method: "POST",
body: JSON.stringify(data),
});
}
119 changes: 57 additions & 62 deletions packages/workerd/src/worker.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
import { DurableObject } from "cloudflare:workers";
import { objectPickBy, tinyassert } from "@hiogawa/utils";
import {
ModuleRunner,
ssrImportMetaKey,
ssrModuleExportsKey,
} from "vite/module-runner";
import {
ANY_URL,
type FetchMetadata,
RUNNER_INIT_PATH,
type RunnerEnv,
type RunnerRpc,
requestJson,
} from "./shared";

export class RunnerObject implements DurableObject {
export class RunnerObject extends DurableObject implements RunnerRpc {
#env: RunnerEnv;
#runner?: ModuleRunner;

constructor(_state: DurableObjectState, env: RunnerEnv) {
this.#env = env;
constructor(...args: ConstructorParameters<typeof DurableObject>) {
super(...args);
this.#env = args[1] as RunnerEnv;
}

async fetch(request: Request) {
override async fetch(request: Request) {
try {
return await this.#fetch(request);
} catch (e) {
Expand All @@ -33,16 +35,6 @@ export class RunnerObject implements DurableObject {
}

async #fetch(request: Request) {
const url = new URL(request.url);

if (url.pathname === RUNNER_INIT_PATH) {
const pair = new WebSocketPair();
(pair[0] as any).accept();
tinyassert(!this.#runner);
this.#runner = createRunner(this.#env, pair[0]);
return new Response(null, { status: 101, webSocket: pair[1] });
}

tinyassert(this.#runner);
const options = JSON.parse(
request.headers.get("x-vite-fetch")!,
Expand All @@ -58,56 +50,59 @@ export class RunnerObject implements DurableObject {
abort(_reason?: any) {},
});
}
}

function createRunner(env: RunnerEnv, webSocket: WebSocket) {
return new ModuleRunner(
{
root: env.__viteRoot,
sourcemapInterceptor: "prepareStackTrace",
transport: {
fetchModule: async (...args) => {
const response = await env.__viteFetchModule.fetch(
new Request(ANY_URL, {
method: "POST",
body: JSON.stringify(args),
}),
);
tinyassert(response.ok);
const result = response.json();
return result as any;
},
},
hmr: {
connection: {
isReady: () => true,
onUpdate(callback) {
webSocket.addEventListener("message", (event) => {
callback(JSON.parse(event.data));
});
async __viteInit() {
const env = this.#env;
this.#runner = new ModuleRunner(
{
root: env.__viteRoot,
sourcemapInterceptor: "prepareStackTrace",
transport: {
fetchModule: async (...args) => {
const response = await env.__viteFetchModule.fetch(
requestJson(args),
);
tinyassert(response.ok);
return response.json();
},
send(payload) {
webSocket.send(JSON.stringify(payload));
},
hmr: {
connection: {
isReady: () => true,
onUpdate: (callback) => {
this.#viteServerSendHandler = callback;
},
send: async (payload) => {
const response = await env.__viteRunnerSend.fetch(
requestJson(payload),
);
tinyassert(response.ok);
},
},
},
},
},
{
runInlinedModule: async (context, transformed) => {
const codeDefinition = `'use strict';async (${Object.keys(context).join(
",",
)})=>{{`;
const code = `${codeDefinition}${transformed}\n}}`;
const fn = env.__viteUnsafeEval.eval(
code,
context[ssrImportMetaKey].filename,
);
await fn(...Object.values(context));
Object.freeze(context[ssrModuleExportsKey]);
},
async runExternalModule(filepath) {
return import(filepath);
{
runInlinedModule: async (context, transformed) => {
const codeDefinition = `'use strict';async (${Object.keys(
context,
).join(",")})=>{{`;
const code = `${codeDefinition}${transformed}\n}}`;
const fn = env.__viteUnsafeEval.eval(
code,
context[ssrImportMetaKey].filename,
);
await fn(...Object.values(context));
Object.freeze(context[ssrModuleExportsKey]);
},
async runExternalModule(filepath) {
return import(filepath);
},
},
},
);
);
}

#viteServerSendHandler!: (payload: any) => void;
async __viteServerSend(payload: any) {
this.#viteServerSendHandler(payload);
}
}

0 comments on commit 99cff0a

Please sign in to comment.