Skip to content

Commit

Permalink
[http-server-javascript] Support new-style multipart requests (and so…
Browse files Browse the repository at this point in the history
…me other things) (#5514)

This PR implements handling for new-style multipart requests and heavily
refactors the multipart core to use web streams. While this doesn't
enable streaming multipart yet (i.e. the implementation layer still
buffers the parts before providing them to the business logic), it's a
step towards streaming multipart.

- Add special handling for representation of well-known model
`TypeSpec.Http.HttpPart` (link #5275).
- Added encode/decode support for `bytes` through `NodeJS.Buffer` as
`Uint8Array`.
- Fixed an issue where serialization propagation did not flow through
array types, leading to cases where models were being detected as not
requiring serialization if they contained properties with array types,
where the array value type did require serialization.
- Implemented and pervasively used `gensym`, which generates unique
symbols containing a monotonically incrementing counter. This helps me
prevent collisions between names of variables I generate and names of
variables generated from user typespec code without requiring a
full-blown scope solution. It costs readability of the generated code,
but isn't so bad.
- Implemented rfc5987 header parameter parsing as a shared helper
module, used to extract boundary and other information while processing
multipart requests.
- Implemented streaming multipart chunking via web streams in a shared
helper module. When a multipart/form-data content-type is detected, a
parser is generated that first converts the `http.IncomingMessage` into
a ReadableStream, then the bytes read from the multipart body are split
into parts and streamed individually. Finally, each individual part
stream is consumed by a transform that reads enough of the stream to
parse the headers and then passes the rest of the body through to a
final "body stream."
- Exposed error handlers (onInvalidRequest, onRequestNotFound,
onInternalError) to application logic through `HttpContext`. The
application logic can use these handlers to manually respond with
validation/internal errors in whichever way the router would ordinarily
respond to them.

---------

Co-authored-by: Will Temple <[email protected]>
  • Loading branch information
witemple-msft and willmtemple authored Jan 10, 2025
1 parent fda51ff commit 2edfa93
Show file tree
Hide file tree
Showing 23 changed files with 1,525 additions and 242 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
changeKind: feature
packages:
- "@typespec/http-server-javascript"
---

- Implemented new-style multipart request handling.
- Fixed JSON serialization/deserialization in some cases where models that required serialization occurred within arrays.
83 changes: 83 additions & 0 deletions packages/http-server-javascript/generated-defs/helpers/header.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright (c) Microsoft Corporation
// Licensed under the MIT license.

import { Module } from "../../src/ctx.js";

export let module: Module = undefined as any;

// prettier-ignore
const lines = [
"// Copyright (c) Microsoft Corporation",
"// Licensed under the MIT license.",
"",
"export interface HeaderValueParameters {",
" value: string;",
" verbatim: string;",
" params: { [k: string]: string };",
"}",
"",
"/**",
" * Parses a header value that may contain additional parameters (e.g. `text/html; charset=utf-8`).",
" * @param headerValueText - the text of the header value to parse",
" * @returns an object containing the value and a map of parameters",
" */",
"export function parseHeaderValueParameters<Header extends string | undefined>(",
" headerValueText: Header,",
"): undefined extends Header ? HeaderValueParameters | undefined : HeaderValueParameters {",
" if (headerValueText === undefined) {",
" return undefined as any;",
" }",
"",
" const idx = headerValueText.indexOf(\";\");",
" const [value, _paramsText] =",
" idx === -1",
" ? [headerValueText, \"\"]",
" : [headerValueText.slice(0, idx), headerValueText.slice(idx + 1)];",
"",
" let paramsText = _paramsText;",
"",
" // Parameters are a sequence of key=value pairs separated by semicolons, but the value may be quoted in which case it",
" // may contain semicolons. We use a regular expression to iteratively split the parameters into key=value pairs.",
" const params: { [k: string]: string } = {};",
"",
" let match;",
"",
" // TODO: may need to support ext-parameter (e.g. \"filename*=UTF-8''%e2%82%ac%20rates\" => { filename: \"€ rates\" }).",
" // By default we decoded everything as UTF-8, and non-UTF-8 agents are a dying breed, but we may need to support",
" // this for completeness. If we do support it, we'll prefer an ext-parameter over a regular parameter. Currently, we'll",
" // just treat them as separate keys and put the raw value in the parameter.",
" //",
" // https://datatracker.ietf.org/doc/html/rfc5987#section-3.2.1",
" while ((match = paramsText.match(/\\s*([^=]+)=(?:\"([^\"]+)\"|([^;]+));?/))) {",
" const [, key, quotedValue, unquotedValue] = match;",
"",
" params[key.trim()] = quotedValue ?? unquotedValue;",
"",
" paramsText = paramsText.slice(match[0].length);",
" }",
"",
" return {",
" value: value.trim(),",
" verbatim: headerValueText,",
" params,",
" };",
"}",
"",
];

export async function createModule(parent: Module): Promise<Module> {
if (module) return module;

module = {
name: "header",
cursor: parent.cursor.enter("header"),
imports: [],
declarations: [],
};

module.declarations.push(lines);

parent.declarations.push(module);

return module;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ export async function createModule(parent: Module): Promise<Module> {
};

// Child modules
await import("./header.js").then((m) => m.createModule(module));
await import("./multipart.js").then((m) => m.createModule(module));
await import("./router.js").then((m) => m.createModule(module));

parent.declarations.push(module);
Expand Down
256 changes: 256 additions & 0 deletions packages/http-server-javascript/generated-defs/helpers/multipart.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
// Copyright (c) Microsoft Corporation
// Licensed under the MIT license.

import { Module } from "../../src/ctx.js";

export let module: Module = undefined as any;

// prettier-ignore
const lines = [
"// Copyright (c) Microsoft Corporation",
"// Licensed under the MIT license.",
"",
"import type * as http from \"node:http\";",
"",
"export interface HttpPart {",
" headers: { [k: string]: string | undefined };",
" body: ReadableStream<Buffer>;",
"}",
"",
"/**",
" * Consumes a stream of incoming data and splits it into individual streams for each part of a multipart request, using",
" * the provided `boundary` value.",
" */",
"function MultipartBoundaryTransformStream(",
" boundary: string,",
"): ReadableWritablePair<ReadableStream<Buffer>, Buffer> {",
" let buffer: Buffer = Buffer.alloc(0);",
" // Initialize subcontroller to an object that does nothing. Multipart bodies may contain a preamble before the first",
" // boundary, so this dummy controller will discard it.",
" let subController: { enqueue(chunk: Buffer): void; close(): void } | null = {",
" enqueue() {},",
" close() {},",
" };",
"",
" let boundarySplit = Buffer.from(`--${boundary}`);",
" let initialized = false;",
"",
" // We need to keep at least the length of the boundary split plus room for CRLFCRLF in the buffer to detect the boundaries.",
" // We subtract one from this length because if the whole thing were in the buffer, we would detect it and move past it.",
" const bufferKeepLength = boundarySplit.length + BUF_CRLFCRLF.length - 1;",
" let _readableController: ReadableStreamDefaultController<ReadableStream<Buffer>> = null as any;",
"",
" const readable = new ReadableStream<ReadableStream<Buffer>>({",
" start(controller) {",
" _readableController = controller;",
" },",
" });",
"",
" const readableController = _readableController;",
"",
" const writable = new WritableStream<Buffer>({",
" write: async (chunk) => {",
" buffer = Buffer.concat([buffer, chunk]);",
"",
" let index: number;",
"",
" while ((index = buffer.indexOf(boundarySplit)) !== -1) {",
" // We found a boundary, emit everything before it and initialize a new stream for the next part.",
"",
" // We are initialized if we have found the boundary at least once.",
" //",
" // Cases",
" // 1. If the index is zero and we aren't initialized, there was no preamble.",
" // 2. If the index is zero and we are initialized, then we had to have found \\r\\n--boundary, nothing special to do.",
" // 3. If the index is not zero, and we are initialized, then we found \\r\\n--boundary somewhere in the middle,",
" // nothing special to do.",
" // 4. If the index is not zero and we aren't initialized, then we need to check that boundarySplit was preceded",
" // by \\r\\n for validity, because the preamble must end with \\r\\n.",
"",
" if (index > 0) {",
" if (!initialized) {",
" if (!buffer.subarray(index - 2, index).equals(Buffer.from(\"\\r\\n\"))) {",
" readableController.error(new Error(\"Invalid preamble in multipart body.\"));",
" } else {",
" await enqueueSub(buffer.subarray(0, index - 2));",
" }",
" } else {",
" await enqueueSub(buffer.subarray(0, index));",
" }",
" }",
"",
" // We enqueued everything before the boundary, so we clear the buffer past the boundary",
" buffer = buffer.subarray(index + boundarySplit.length);",
"",
" // We're done with the current part, so close the stream. If this is the opening boundary, there won't be a",
" // subcontroller yet.",
" subController?.close();",
" subController = null;",
"",
" if (!initialized) {",
" initialized = true;",
" boundarySplit = Buffer.from(`\\r\\n${boundarySplit}`);",
" }",
" }",
"",
" if (buffer.length > bufferKeepLength) {",
" await enqueueSub(buffer.subarray(0, -bufferKeepLength));",
" buffer = buffer.subarray(-bufferKeepLength);",
" }",
" },",
" close() {",
" if (!/--(\\r\\n)?/.test(buffer.toString(\"utf-8\"))) {",
" readableController.error(new Error(\"Unexpected characters after final boundary.\"));",
" }",
"",
" subController?.close();",
"",
" readableController.close();",
" },",
" });",
"",
" async function enqueueSub(s: Buffer) {",
" subController ??= await new Promise<ReadableStreamDefaultController>((resolve) => {",
" readableController.enqueue(",
" new ReadableStream<Buffer>({",
" start: (controller) => resolve(controller),",
" }),",
" );",
" });",
"",
" subController.enqueue(s);",
" }",
"",
" return { readable, writable };",
"}",
"",
"const BUF_CRLFCRLF = Buffer.from(\"\\r\\n\\r\\n\");",
"",
"/**",
" * Consumes a stream of the contents of a single part of a multipart request and emits an `HttpPart` object for each part.",
" * This consumes just enough of the stream to read the headers, and then forwards the rest of the stream as the body.",
" */",
"class HttpPartTransform extends TransformStream<ReadableStream<Buffer>, HttpPart> {",
" constructor() {",
" super({",
" transform: async (partRaw, controller) => {",
" const reader = partRaw.getReader();",
"",
" let buf = Buffer.alloc(0);",
" let idx;",
"",
" while ((idx = buf.indexOf(BUF_CRLFCRLF)) === -1) {",
" const { done, value } = await reader.read();",
" if (done) {",
" throw new Error(\"Unexpected end of part.\");",
" }",
" buf = Buffer.concat([buf, value]);",
" }",
"",
" const headerText = buf.subarray(0, idx).toString(\"utf-8\").trim();",
"",
" const headers = Object.fromEntries(",
" headerText.split(\"\\r\\n\").map((line) => {",
" const [name, value] = line.split(\": \", 2);",
"",
" return [name.toLowerCase(), value];",
" }),",
" ) as { [k: string]: string };",
"",
" const body = new ReadableStream<Buffer>({",
" start(controller) {",
" controller.enqueue(buf.subarray(idx + BUF_CRLFCRLF.length));",
" },",
" async pull(controller) {",
" const { done, value } = await reader.read();",
"",
" if (done) {",
" controller.close();",
" } else {",
" controller.enqueue(value);",
" }",
" },",
" });",
"",
" controller.enqueue({ headers, body });",
" },",
" });",
" }",
"}",
"",
"/**",
" * Processes a request as a multipart request, returning a stream of `HttpPart` objects, each representing an individual",
" * part in the multipart request.",
" *",
" * Only call this function if you have already validated the content type of the request and confirmed that it is a",
" * multipart request.",
" *",
" * @throws Error if the content-type header is missing or does not contain a boundary field.",
" *",
" * @param request - the incoming request to parse as multipart",
" * @returns a stream of HttpPart objects, each representing an individual part in the multipart request",
" */",
"export function createMultipartReadable(request: http.IncomingMessage): ReadableStream<HttpPart> {",
" const boundary = request.headers[\"content-type\"]",
" ?.split(\";\")",
" .find((s) => s.includes(\"boundary=\"))",
" ?.split(\"=\", 2)[1];",
" if (!boundary) {",
" throw new Error(\"Invalid request: missing boundary in content-type.\");",
" }",
"",
" const bodyStream = new ReadableStream<Uint8Array>({",
" start(controller) {",
" request.on(\"data\", (chunk: Buffer) => {",
" controller.enqueue(chunk);",
" });",
" request.on(\"end\", () => controller.close());",
" },",
" });",
"",
" return bodyStream",
" .pipeThrough(MultipartBoundaryTransformStream(boundary))",
" .pipeThrough(new HttpPartTransform());",
"}",
"",
"// Gross polyfill because Safari doesn't support this yet.",
"//",
"// https://bugs.webkit.org/show_bug.cgi?id=194379",
"// https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream#browser_compatibility",
"(ReadableStream.prototype as any)[Symbol.asyncIterator] ??= async function* () {",
" const reader = this.getReader();",
" try {",
" while (true) {",
" const { done, value } = await reader.read();",
" if (done) return value;",
" yield value;",
" }",
" } finally {",
" reader.releaseLock();",
" }",
"};",
"",
"declare global {",
" interface ReadableStream<R> {",
" [Symbol.asyncIterator](): AsyncIterableIterator<R>;",
" }",
"}",
"",
];

export async function createModule(parent: Module): Promise<Module> {
if (module) return module;

module = {
name: "multipart",
cursor: parent.cursor.enter("multipart"),
imports: [],
declarations: [],
};

module.declarations.push(lines);

parent.declarations.push(module);

return module;
}
Loading

0 comments on commit 2edfa93

Please sign in to comment.