Skip to content

Commit

Permalink
feat(child-process,workerd,web-worker): use new module transport (con…
Browse files Browse the repository at this point in the history
…nect, send) (#140)
  • Loading branch information
hi-ogawa authored Nov 8, 2024
1 parent feaa3d2 commit 81f30c9
Show file tree
Hide file tree
Showing 20 changed files with 322 additions and 223 deletions.
24 changes: 2 additions & 22 deletions examples/browser-cli/src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import {
createServer,
parseAstAsync,
} from "vite";
import type { FetchFunction, ModuleRunner } from "vite/module-runner";
import type { ModuleRunner } from "vite/module-runner";
import { vitePluginFetchModuleServer } from "../../web-worker/src/lib/fetch-module-server";

const headless = !process.env["CLI_HEADED"];
const extension = process.env["CLI_EXTENSION"] ?? "tsx";
Expand Down Expand Up @@ -185,25 +186,4 @@ function vitePluginBrowserRunner(): Plugin {
};
}

// https://github.com/vitejs/vite/discussions/18191
function vitePluginFetchModuleServer(): Plugin {
return {
name: vitePluginFetchModuleServer.name,
configureServer(server) {
server.middlewares.use(async (req, res, next) => {
const url = new URL(req.url ?? "/", "https://any.local");
if (url.pathname === "/@vite/fetchModule") {
const [name, ...args] = JSON.parse(url.searchParams.get("payload")!);
const result = await server.environments[name]!.fetchModule(
...(args as Parameters<FetchFunction>),
);
res.end(JSON.stringify(result));
return;
}
next();
});
},
};
}

main();
21 changes: 3 additions & 18 deletions examples/browser-cli/src/runner.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
import {
ESModulesEvaluator,
type FetchFunction,
ModuleRunner,
} from "vite/module-runner";
import { ESModulesEvaluator, ModuleRunner } from "vite/module-runner";
import { fetchClientFetchModule } from "../../web-worker/src/lib/fetch-module-client";

export async function start(options: { root: string }) {
const runner = new ModuleRunner(
{
root: options.root,
sourcemapInterceptor: false,
transport: {
fetchModule: fetchModuleFetchClient("custom"),
invoke: fetchClientFetchModule("custom"),
},
hmr: false,
},
Expand All @@ -19,15 +16,3 @@ export async function start(options: { root: string }) {

return runner;
}

// https://github.com/vitejs/vite/discussions/18191
function fetchModuleFetchClient(environmentName: string): FetchFunction {
return async (...args) => {
const payload = JSON.stringify([environmentName, ...args]);
const response = await fetch(
"/@vite/fetchModule?" + new URLSearchParams({ payload }),
);
const result = response.json();
return result as any;
};
}
25 changes: 5 additions & 20 deletions examples/child-process/src/lib/vite/bridge-client.js
Original file line number Diff line number Diff line change
@@ -1,34 +1,19 @@
// @ts-check

import assert from "node:assert";
import { ESModulesEvaluator, ModuleRunner } from "vite/module-runner";
import { createSSEClientTransport } from "./sse-client.ts";

/**
* @param {import("./types").BridgeClientOptions} options
*/
export function createBridgeClient(options) {
/**
* @param {string} method
* @param {...any} args
* @returns {Promise<any>}
*/
async function rpc(method, ...args) {
const response = await fetch(options.bridgeUrl + "/rpc", {
method: "POST",
body: JSON.stringify({ method, args, key: options.key }),
});
assert(response.ok);
const result = response.json();
return result;
}

const runner = new ModuleRunner(
{
root: options.root,
sourcemapInterceptor: "prepareStackTrace",
transport: {
fetchModule: (...args) => rpc("fetchModule", ...args),
},
transport: createSSEClientTransport(
options.bridgeUrl + "/sse?" + new URLSearchParams({ key: options.key }),
),
hmr: false,
},
new ESModulesEvaluator(),
Expand Down Expand Up @@ -56,5 +41,5 @@ export function createBridgeClient(options) {
}
}

return { runner, rpc, handler };
return { runner, handler };
}
31 changes: 13 additions & 18 deletions examples/child-process/src/lib/vite/environment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import readline from "node:readline";
import { Readable } from "node:stream";
import { webToNodeHandler } from "@hiogawa/utils-node";
import { DevEnvironment, type DevEnvironmentOptions } from "vite";
import { createSSEServerTransport } from "./sse-server";
import type { BridgeClientOptions } from "./types";

// TODO
Expand All @@ -23,23 +24,24 @@ export class ChildProcessFetchDevEnvironment extends DevEnvironment {
}): NonNullable<DevEnvironmentOptions["createEnvironment"]> {
return (name, config) => {
const command = [
options.runtime === "node" ? ["node"] : [],
options.runtime === "node" ? ["node", "--import", "tsx/esm"] : [],
options.runtime === "bun" ? ["bun", "run"] : [],
options.conditions ? ["--conditions", ...options.conditions] : [],
join(import.meta.dirname, `./runtime/${options.runtime}.js`),
].flat();
return new ChildProcessFetchDevEnvironment({ command }, name, config, {
// TODO
hot: false,
});
return new ChildProcessFetchDevEnvironment({ command }, name, config);
};
}

constructor(
public extraOptions: { command: string[] },
...args: ConstructorParameters<typeof DevEnvironment>
name: ConstructorParameters<typeof DevEnvironment>[0],
config: ConstructorParameters<typeof DevEnvironment>[1],
) {
super(...args);
super(name, config, {
hot: false,
transport: createSSEServerTransport(),
});
}

override init: DevEnvironment["init"] = async (...args) => {
Expand All @@ -49,18 +51,11 @@ export class ChildProcessFetchDevEnvironment extends DevEnvironment {
const key = Math.random().toString(36).slice(2);

const listener = webToNodeHandler(async (request) => {
const url = new URL(request.url);
// TODO: other than json?
if (url.pathname === "/rpc") {
const { method, args, key: reqKey } = await request.json();
if (reqKey !== key) {
return Response.json({ message: "invalid key" }, { status: 400 });
}
assert(method in this);
const result = await (this as any)[method]!(...args);
return Response.json(result);
const reqKey = new URL(request.url).searchParams.get("key");
if (reqKey !== key) {
return Response.json({ message: "invalid key" }, { status: 400 });
}
return undefined;
return this.hot.api.handler(request);
});

const bridge = http.createServer((req, res) => {
Expand Down
92 changes: 92 additions & 0 deletions examples/child-process/src/lib/vite/sse-client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import assert from "node:assert";
import type { ModuleRunnerTransport } from "vite/module-runner";

export function createSSEClientTransport(url: string): ModuleRunnerTransport {
let sseClient: SSEClient;

return {
async connect(handlers) {
sseClient = await createSSEClient(url, handlers);
},
async send(payload) {
assert(sseClient);
sseClient.send(payload);
},
timeout: 2000,
};
}

type SSEClient = Awaited<ReturnType<typeof createSSEClient>>;

async function createSSEClient(
url: string,
handlers: {
onMessage: (payload: any) => void;
onDisconnection: () => void;
},
) {
const response = await fetch(url);
assert(response.ok);
const clientId = response.headers.get("x-client-id");
assert(clientId);
assert(response.body);
response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(splitTransform("\n\n"))
.pipeTo(
new WritableStream({
write(chunk) {
// console.log("[client.response]", chunk);
if (chunk.startsWith("data: ")) {
const payload = JSON.parse(chunk.slice("data: ".length));
handlers.onMessage(payload);
}
},
abort(e) {
console.log("[client.abort]", e);
},
close() {
console.log("[client.close]");
handlers.onDisconnection();
},
}),
)
.catch((e) => {
console.log("[client.pipeTo.catch]", e);
});

return {
send: async (payload: unknown) => {
const response = await fetch(url, {
method: "POST",
body: JSON.stringify(payload),
headers: {
"x-client-id": clientId,
},
});
assert(response.ok);
const result = await response.json();
return result;
},
};
}

export function splitTransform(sep: string): TransformStream<string, string> {
let pending = "";
return new TransformStream({
transform(chunk, controller) {
while (true) {
const i = chunk.indexOf(sep);
if (i >= 0) {
pending += chunk.slice(0, i);
controller.enqueue(pending);
pending = "";
chunk = chunk.slice(i + sep.length);
continue;
}
pending += chunk;
break;
}
},
});
}
119 changes: 119 additions & 0 deletions examples/child-process/src/lib/vite/sse-server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import assert from "node:assert";
import type { HotChannel, HotChannelListener, HotPayload } from "vite";

export function createSSEServerTransport(): HotChannel {
interface SSEClientProxy {
send(payload: HotPayload): void;
close(): void;
}

const clientMap = new Map<string, SSEClientProxy>();
const listenerManager = createListenerManager();

async function handler(request: Request): Promise<Response | undefined> {
const url = new URL(request.url);
if (url.pathname === "/sse") {
// handle `send`
const senderId = request.headers.get("x-client-id");
if (senderId) {
const client = clientMap.get(senderId);
assert(client);
const payload = await request.json();
listenerManager.dispatch(payload, client);
return Response.json({ ok: true });
}
// otherwise handle `connect`
let controller: ReadableStreamDefaultController<string>;
const stream = new ReadableStream<string>({
start: (controller_) => {
controller = controller_;
controller.enqueue(`:ping\n\n`);
},
cancel() {
clientMap.delete(clientId);
},
});
const pingInterval = setInterval(() => {
controller.enqueue(`:ping\n\n`);
}, 10_000);
const clientId = Math.random().toString(36).slice(2);
const client: SSEClientProxy = {
send(payload) {
controller.enqueue(`data: ${JSON.stringify(payload)}\n\n`);
},
close() {
clearInterval(pingInterval);
controller.close();
},
};
clientMap.set(clientId, client);
return new Response(stream, {
headers: {
"x-client-id": clientId,
"content-type": "text/event-stream",
"cache-control": "no-cache",
connection: "keep-alive",
},
});
}
return undefined;
}

const channel: HotChannel = {
listen() {},
close() {
for (const client of clientMap.values()) {
client.close();
}
},
on: listenerManager.on,
off: listenerManager.off,
send: (payload) => {
for (const client of clientMap.values()) {
client.send(payload);
}
},
// expose SSE handler via hot.api
api: {
type: "sse",
handler,
},
};

return channel;
}

// wrapper to simplify listener management
function createListenerManager(): Pick<HotChannel, "on" | "off"> & {
dispatch: (
payload: HotPayload,
client: { send: (payload: HotPayload) => void },
) => void;
} {
const listerMap: Record<string, Set<HotChannelListener>> = {};
const getListerMap = (e: string) => (listerMap[e] ??= new Set());

return {
on(event: string, listener: HotChannelListener) {
// console.log("[channel.on]", event, listener);
if (event === "connection") {
return;
}
getListerMap(event).add(listener);
},
off(event, listener: any) {
// console.log("[channel.off]", event, listener);
if (event === "connection") {
return;
}
getListerMap(event).delete(listener);
},
dispatch(payload, client) {
if (payload.type === "custom") {
for (const lister of getListerMap(payload.event)) {
lister(payload.data, client);
}
}
},
};
}
Loading

0 comments on commit 81f30c9

Please sign in to comment.