From 5d53ed5b3f5268a4b8f390dfdce1e4adfc12f2b0 Mon Sep 17 00:00:00 2001 From: Muki Kiboigo Date: Tue, 12 Nov 2024 11:55:47 -0800 Subject: [PATCH 1/8] fix(router): ensure real path is correct on serve --- src/http/router.zig | 47 +++++++++++++++++++++++++++++++++++++++------ 1 file changed, 41 insertions(+), 6 deletions(-) diff --git a/src/http/router.zig b/src/http/router.zig index c1f59da..c5a0ceb 100644 --- a/src/http/router.zig +++ b/src/http/router.zig @@ -25,6 +25,7 @@ pub fn Router(comptime Server: type) type { const Route = _Route(Server); const Context = _Context(Server); allocator: std.mem.Allocator, + arena: std.heap.ArenaAllocator, routes: RoutingTrie, /// This makes the router immutable, also making it /// thread-safe when shared. @@ -32,11 +33,17 @@ pub fn Router(comptime Server: type) type { pub fn init(allocator: std.mem.Allocator) Self { const routes = RoutingTrie.init(allocator) catch unreachable; - return Self{ .allocator = allocator, .routes = routes, .locked = false }; + return Self{ + .allocator = allocator, + .arena = std.heap.ArenaAllocator.init(allocator), + .routes = routes, + .locked = false, + }; } pub fn deinit(self: *Self) void { self.routes.deinit(); + self.arena.deinit(); } const FileProvision = struct { @@ -127,15 +134,43 @@ pub fn Router(comptime Server: type) type { pub fn serve_fs_dir(self: *Self, comptime url_path: []const u8, comptime dir_path: []const u8) !void { assert(!self.locked); + const arena = self.arena.allocator(); + + const slice = try arena.create([]const u8); + // Gets the real path of the directory being served. + slice.* = try std.fs.realpathAlloc(arena, dir_path); + + const route = Route.init().get(slice, struct { + pub fn handler_fn(ctx: *Context, real_dir: *const []const u8) !void { + if (ctx.captures.len == 0) { + try ctx.respond(.{ + .status = .@"Not Found", + .mime = Mime.HTML, + .body = "", + }); + return; + } - const route = Route.init().get({}, struct { - pub fn handler_fn(ctx: *Context, _: void) !void { const search_path = ctx.captures[0].remaining; - const file_path = try std.fmt.allocPrintZ(ctx.allocator, "{s}/{s}", .{ dir_path, search_path }); + const file_path: [:0]u8 = try std.fmt.allocPrintZ(ctx.allocator, "{s}/{s}", .{ dir_path, search_path }); + const real_path = std.fs.realpathAlloc(ctx.allocator, file_path) catch { + try ctx.respond(.{ + .status = .@"Not Found", + .mime = Mime.HTML, + .body = "", + }); + return; + }; - // TODO: Ensure that paths cannot go out of scope and reference data that they shouldn't be allowed to. - // Very important. + if (!std.mem.startsWith(u8, real_path, real_dir.*)) { + try ctx.respond(.{ + .status = .Forbidden, + .mime = Mime.HTML, + .body = "", + }); + return; + } const extension_start = std.mem.lastIndexOfScalar(u8, search_path, '.'); const mime: Mime = blk: { From 5fdd98a4d2f0f8afa6c267cb767507de3f30536b Mon Sep 17 00:00:00 2001 From: Muki Kiboigo Date: Tue, 12 Nov 2024 11:56:02 -0800 Subject: [PATCH 2/8] chore: add more comments to the router --- examples/fs/main.zig | 4 +++- src/http/router.zig | 18 ++++++++++++++---- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/examples/fs/main.zig b/examples/fs/main.zig index 1e34706..c13b72f 100644 --- a/examples/fs/main.zig +++ b/examples/fs/main.zig @@ -63,7 +63,9 @@ pub fn main() !void { &router, struct { fn entry(rt: *Runtime, r: *const Router) !void { - var server = Server.init(.{ .allocator = rt.allocator }); + var server = Server.init(.{ + .allocator = rt.allocator, + }); try server.bind(host, port); try server.serve(r, rt); } diff --git a/src/http/router.zig b/src/http/router.zig index c5a0ceb..b521e5d 100644 --- a/src/http/router.zig +++ b/src/http/router.zig @@ -72,6 +72,12 @@ pub fn Router(comptime Server: type) type { } provision.fd = fd; + // TODO: If we have a If-None-Match by this point, we should fire off a stat request + // that way we can check the last modified time and compare that with our ETag. + // We generally avoid using the HTTP Date stuff since it can be so slow. + // + // If we have a matching etag, we can just respond with Not Modified. + // If we don't then we continue doing what we normally do. try rt.fs.read( provision, read_file_task, @@ -141,7 +147,11 @@ pub fn Router(comptime Server: type) type { slice.* = try std.fs.realpathAlloc(arena, dir_path); const route = Route.init().get(slice, struct { - pub fn handler_fn(ctx: *Context, real_dir: *const []const u8) !void { + fn handler_fn(ctx: *Context, real_dir: *const []const u8) !void { + // TODO: Add caching support. We shouldn't need to resend files + // all the time, especially if the user has gotten them before + // and has an ETag. + if (ctx.captures.len == 0) { try ctx.respond(.{ .status = .@"Not Found", @@ -153,7 +163,7 @@ pub fn Router(comptime Server: type) type { const search_path = ctx.captures[0].remaining; - const file_path: [:0]u8 = try std.fmt.allocPrintZ(ctx.allocator, "{s}/{s}", .{ dir_path, search_path }); + const file_path = try std.fmt.allocPrintZ(ctx.allocator, "{s}/{s}", .{ dir_path, search_path }); const real_path = std.fs.realpathAlloc(ctx.allocator, file_path) catch { try ctx.respond(.{ .status = .@"Not Found", @@ -204,7 +214,7 @@ pub fn Router(comptime Server: type) type { const url_with_match_all = comptime std.fmt.comptimePrint( "{s}/%r", - .{std.mem.trimRight(u8, url_path, &.{'/'})}, + .{std.mem.trimRight(u8, url_path, "/")}, ); try self.serve_route(url_with_match_all, route); @@ -218,7 +228,7 @@ pub fn Router(comptime Server: type) type { ) !void { assert(!self.locked); const route = Route.init().get({}, struct { - pub fn handler_fn(ctx: *Context, _: void) !void { + fn handler_fn(ctx: *Context, _: void) !void { if (comptime builtin.mode == .Debug) { // Don't Cache in Debug. try ctx.response.headers.add( From 09c4da9e42864e39b5778607700c27ce44f3c863 Mon Sep 17 00:00:00 2001 From: Muki Kiboigo Date: Tue, 12 Nov 2024 13:03:27 -0800 Subject: [PATCH 3/8] feat(router): add serve_not_found fn to router --- examples/basic/main.zig | 12 ++++- src/http/route.zig | 2 +- src/http/router.zig | 26 ++++++++-- src/http/server.zig | 112 ++++++++++++++++++---------------------- 4 files changed, 86 insertions(+), 66 deletions(-) diff --git a/examples/basic/main.zig b/examples/basic/main.zig index 2089e66..c862813 100644 --- a/examples/basic/main.zig +++ b/examples/basic/main.zig @@ -35,7 +35,7 @@ pub fn main() !void { const num: i8 = 12; try router.serve_route("/", Route.init().get(&num, struct { - pub fn handler_fn(ctx: *Context, id: *const i8) !void { + fn handler_fn(ctx: *Context, id: *const i8) !void { const body_fmt = \\ \\ @@ -59,6 +59,16 @@ pub fn main() !void { } }.handler_fn)); + router.serve_not_found(Route.init().get({}, struct { + fn handler_fn(ctx: *Context, _: void) !void { + try ctx.respond(.{ + .status = .@"Not Found", + .mime = http.Mime.HTML, + .body = "Not Found Handler!", + }); + } + }.handler_fn)); + // This provides the entry function into the Tardy runtime. This will run // exactly once inside of each runtime (each thread gets a single runtime). try t.entry( diff --git a/src/http/route.zig b/src/http/route.zig index 26f41bc..e55bc46 100644 --- a/src/http/route.zig +++ b/src/http/route.zig @@ -80,7 +80,7 @@ pub fn Route(comptime Server: type) type { // You can either give a void (if you don't want to pass data through) or a pointer. comptime assert(@typeInfo(@TypeOf(data)) == .Pointer or @typeInfo(@TypeOf(data)) == .Void); const inner_data = switch (comptime @typeInfo(@TypeOf(data))) { - .Void => @intFromPtr(&data), + .Void => undefined, .Pointer => @intFromPtr(data), else => unreachable, }; diff --git a/src/http/router.zig b/src/http/router.zig index b521e5d..9b15d79 100644 --- a/src/http/router.zig +++ b/src/http/router.zig @@ -27,6 +27,7 @@ pub fn Router(comptime Server: type) type { allocator: std.mem.Allocator, arena: std.heap.ArenaAllocator, routes: RoutingTrie, + not_found_route: ?Route = null, /// This makes the router immutable, also making it /// thread-safe when shared. locked: bool = false, @@ -78,6 +79,7 @@ pub fn Router(comptime Server: type) type { // // If we have a matching etag, we can just respond with Not Modified. // If we don't then we continue doing what we normally do. + try rt.fs.read( provision, read_file_task, @@ -105,7 +107,6 @@ pub fn Router(comptime Server: type) type { } const length: usize = @intCast(result); - try provision.list.appendSlice(provision.buffer[0..length]); // TODO: This needs to be a setting you pass in to the router. @@ -274,13 +275,32 @@ pub fn Router(comptime Server: type) type { try self.serve_route(path, route); } + pub fn serve_not_found(self: *Self, route: Route) void { + self.not_found_route = route; + } + pub fn serve_route(self: *Self, path: []const u8, route: Route) !void { assert(!self.locked); try self.routes.add_route(path, route); } - pub fn get_route_from_host(self: Self, host: []const u8, captures: []Capture, queries: *QueryMap) ?FoundRoute { - return self.routes.get_route(host, captures, queries); + fn not_found_handler(ctx: *Context, _: void) !void { + try ctx.respond(.{ + .status = .@"Not Found", + .mime = Mime.HTML, + .body = "", + }); + } + + pub fn get_route_from_host(self: Self, path: []const u8, captures: []Capture, queries: *QueryMap) FoundRoute { + const base_404_route = comptime Route.init().get({}, not_found_handler); + + return self.routes.get_route(path, captures, queries) orelse { + if (self.not_found_route) |not_found| { + queries.clearRetainingCapacity(); + return FoundRoute{ .route = not_found, .captures = captures[0..0], .queries = queries }; + } else return FoundRoute{ .route = base_404_route, .captures = captures[0..], .queries = queries }; + }; } }; } diff --git a/src/http/server.zig b/src/http/server.zig index 97bffcf..595b6d8 100644 --- a/src/http/server.zig +++ b/src/http/server.zig @@ -745,78 +745,68 @@ pub fn Server(comptime security: Security) type { fn route_and_respond(runtime: *Runtime, p: *Provision, router: *const Router) !RecvStatus { route: { const found = router.get_route_from_host(p.request.uri, p.captures, &p.queries); - if (found) |f| { - const handler = f.route.get_handler(p.request.method); - - if (handler) |h_with_data| { - const context: *Context = try p.arena.allocator().create(Context); - context.* = .{ - .allocator = p.arena.allocator(), - .runtime = runtime, - .request = &p.request, - .response = &p.response, - .path = p.request.uri, - .captures = f.captures, - .queries = f.queries, - .provision = p, - }; - - @call(.auto, h_with_data.handler, .{ - context, - @as(*anyopaque, @ptrFromInt(h_with_data.data)), - }) catch |e| { - log.err("\"{s}\" handler failed with error: {}", .{ p.request.uri, e }); - p.response.set(.{ - .status = .@"Internal Server Error", - .mime = Mime.HTML, - .body = "", - }); - - return try raw_respond(p); - }; + const handler = found.route.get_handler(p.request.method); + + if (handler) |h_with_data| { + const context: *Context = try p.arena.allocator().create(Context); + context.* = .{ + .allocator = p.arena.allocator(), + .runtime = runtime, + .request = &p.request, + .response = &p.response, + .path = p.request.uri, + .captures = found.captures, + .queries = found.queries, + .provision = p, + }; - return .spawned; - } else { - // If we match the route but not the method. + @call(.auto, h_with_data.handler, .{ + context, + @as(*anyopaque, @ptrFromInt(h_with_data.data)), + }) catch |e| { + log.err("\"{s}\" handler failed with error: {}", .{ p.request.uri, e }); p.response.set(.{ - .status = .@"Method Not Allowed", + .status = .@"Internal Server Error", .mime = Mime.HTML, - .body = "405 Method Not Allowed", + .body = "", }); - // We also need to add to Allow header. - // This uses the connection's arena to allocate 64 bytes. - const allowed = f.route.get_allowed(p.arena.allocator()) catch { - p.response.set(.{ - .status = .@"Internal Server Error", - .mime = Mime.HTML, - .body = "", - }); + return try raw_respond(p); + }; - break :route; - }; + return .spawned; + } else { + // If we match the route but not the method. + p.response.set(.{ + .status = .@"Method Not Allowed", + .mime = Mime.HTML, + .body = "405 Method Not Allowed", + }); - p.response.headers.add("Allow", allowed) catch { - p.response.set(.{ - .status = .@"Internal Server Error", - .mime = Mime.HTML, - .body = "", - }); + // We also need to add to Allow header. + // This uses the connection's arena to allocate 64 bytes. + const allowed = found.route.get_allowed(p.arena.allocator()) catch { + p.response.set(.{ + .status = .@"Internal Server Error", + .mime = Mime.HTML, + .body = "", + }); - break :route; - }; + break :route; + }; + + p.response.headers.add("Allow", allowed) catch { + p.response.set(.{ + .status = .@"Internal Server Error", + .mime = Mime.HTML, + .body = "", + }); break :route; - } - } + }; - // Didn't match any route. - p.response.set(.{ - .status = .@"Not Found", - .mime = Mime.HTML, - .body = "404 Not Found", - }); - break :route; + break :route; + } } if (p.response.status == .Kill) { From 07977c17f421e791177ac69ff21cadaa09206408 Mon Sep 17 00:00:00 2001 From: Muki Kiboigo Date: Tue, 12 Nov 2024 22:24:54 -0800 Subject: [PATCH 4/8] fix(router): various bugs with not found handler --- src/http/route.zig | 2 +- src/http/router.zig | 20 ++++++++++---------- src/http/server.zig | 6 ++---- 3 files changed, 13 insertions(+), 15 deletions(-) diff --git a/src/http/route.zig b/src/http/route.zig index e55bc46..86889f6 100644 --- a/src/http/route.zig +++ b/src/http/route.zig @@ -80,7 +80,7 @@ pub fn Route(comptime Server: type) type { // You can either give a void (if you don't want to pass data through) or a pointer. comptime assert(@typeInfo(@TypeOf(data)) == .Pointer or @typeInfo(@TypeOf(data)) == .Void); const inner_data = switch (comptime @typeInfo(@TypeOf(data))) { - .Void => undefined, + .Void => 1, // Needs to not be 0. .Pointer => @intFromPtr(data), else => unreachable, }; diff --git a/src/http/router.zig b/src/http/router.zig index 9b15d79..f10ff62 100644 --- a/src/http/router.zig +++ b/src/http/router.zig @@ -284,20 +284,20 @@ pub fn Router(comptime Server: type) type { try self.routes.add_route(path, route); } - fn not_found_handler(ctx: *Context, _: void) !void { - try ctx.respond(.{ - .status = .@"Not Found", - .mime = Mime.HTML, - .body = "", - }); - } - pub fn get_route_from_host(self: Self, path: []const u8, captures: []Capture, queries: *QueryMap) FoundRoute { - const base_404_route = comptime Route.init().get({}, not_found_handler); + const base_404_route = comptime Route.init().get({}, struct { + fn not_found_handler(ctx: *Context, _: void) !void { + try ctx.respond(.{ + .status = .@"Not Found", + .mime = Mime.HTML, + .body = "", + }); + } + }.not_found_handler); return self.routes.get_route(path, captures, queries) orelse { + queries.clearRetainingCapacity(); if (self.not_found_route) |not_found| { - queries.clearRetainingCapacity(); return FoundRoute{ .route = not_found, .captures = captures[0..0], .queries = queries }; } else return FoundRoute{ .route = base_404_route, .captures = captures[0..], .queries = queries }; }; diff --git a/src/http/server.zig b/src/http/server.zig index 595b6d8..b02ecfd 100644 --- a/src/http/server.zig +++ b/src/http/server.zig @@ -670,10 +670,8 @@ pub fn Server(comptime security: Security) type { TLSType, self.config.size_connections_max, ); - if (comptime security == .tls) { - for (tls_slice) |*tls| { - tls.* = null; - } + for (tls_slice) |*tls| { + tls.* = null; } // since slices are fat pointers... From fd16c0e2e555080abf722ad2207484b59906e7cf Mon Sep 17 00:00:00 2001 From: Muki Kiboigo Date: Tue, 12 Nov 2024 22:53:39 -0800 Subject: [PATCH 5/8] fix(router): fix some minor consistency issues --- src/http/router.zig | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/http/router.zig b/src/http/router.zig index f10ff62..ab12aab 100644 --- a/src/http/router.zig +++ b/src/http/router.zig @@ -276,6 +276,7 @@ pub fn Router(comptime Server: type) type { } pub fn serve_not_found(self: *Self, route: Route) void { + assert(!self.locked); self.not_found_route = route; } @@ -299,7 +300,7 @@ pub fn Router(comptime Server: type) type { queries.clearRetainingCapacity(); if (self.not_found_route) |not_found| { return FoundRoute{ .route = not_found, .captures = captures[0..0], .queries = queries }; - } else return FoundRoute{ .route = base_404_route, .captures = captures[0..], .queries = queries }; + } else return FoundRoute{ .route = base_404_route, .captures = captures[0..0], .queries = queries }; }; } }; From 51296f258932fe7248b9e4286803132983de8120 Mon Sep 17 00:00:00 2001 From: Muki Kiboigo Date: Fri, 15 Nov 2024 16:48:25 -0800 Subject: [PATCH 6/8] feat(router): stream files to socket --- src/core/job.zig | 2 +- src/http/context.zig | 105 +++++++++++++++++++-------- src/http/response.zig | 2 +- src/http/router.zig | 163 ++++++++++++++++++++++++++++-------------- src/http/server.zig | 20 +++--- src/http/sse.zig | 19 +---- 6 files changed, 203 insertions(+), 108 deletions(-) diff --git a/src/core/job.zig b/src/core/job.zig index 378b7cb..ff8abd5 100644 --- a/src/core/job.zig +++ b/src/core/job.zig @@ -5,7 +5,7 @@ const TaskFn = @import("tardy").TaskFn; pub const AfterType = union(enum) { recv, - sse: struct { + other: struct { func: *const anyopaque, ctx: *anyopaque, }, diff --git a/src/http/context.zig b/src/http/context.zig index b4a9fa1..82fc467 100644 --- a/src/http/context.zig +++ b/src/http/context.zig @@ -38,24 +38,6 @@ pub fn Context(comptime Server: type) type { triggered: bool = false, pub fn to_sse(self: *Self, then: TaskFn(bool, *SSE)) !void { - assert(!self.triggered); - self.triggered = true; - - self.response.set(.{ - .status = .OK, - .body = "", - .mime = Mime{ - .extension = ".sse", - .description = "Server-Sent Events", - .content_type = "text/event-stream", - }, - }); - - const headers = try self.provision.response.headers_into_buffer( - self.provision.buffer, - null, - ); - const sse = try self.allocator.create(SSE); sse.* = .{ .context = self, @@ -63,35 +45,102 @@ pub fn Context(comptime Server: type) type { .allocator = self.allocator, }; - const pslice = Pseudoslice.init(headers, "", self.provision.buffer); + try self.respond_headers_only( + .{ + .status = .OK, + .body = "", + .mime = Mime{ + .extension = ".sse", + .description = "Server-Sent Events", + .content_type = "text/event-stream", + }, + }, + null, + sse, + then, + ); + } + + pub fn close(self: *Self) !void { + self.provision.job = .close; + try self.runtime.net.close( + self.provision, + Server.close_task, + self.provision.socket, + ); + } + + pub fn send_then( + self: *Self, + data: []const u8, + ctx: anytype, + then: TaskFn(bool, @TypeOf(ctx)), + ) !void { + const pslice = Pseudoslice.init(data, "", self.provision.buffer); const first_chunk = try Server.prepare_send( self.runtime, self.provision, - .{ .sse = .{ - .func = then, - .ctx = sse, - } }, + .{ + .other = .{ + .func = then, + .ctx = ctx, + }, + }, pslice, ); try self.runtime.net.send( self.provision, - Server.send_then_sse_task, + Server.send_then_other_task, self.provision.socket, first_chunk, ); } - pub fn close(self: *Self) !void { - self.provision.job = .close; - try self.runtime.net.close( + pub fn send_then_recv(self: *Self, data: []const u8) !void { + const pslice = Pseudoslice.init(data, "", self.provision.buffer); + + const first_chunk = try Server.prepare_send( + self.runtime, self.provision, - Server.close_task, + .recv, + pslice, + ); + + try self.runtime.net.send( + self.provision, + Server.send_then_recv_task, self.provision.socket, + first_chunk, + ); + } + + // This will respond with the headers only. + // You will be in charge of sending the body. + pub fn respond_headers_only( + self: *Self, + options: ResponseSetOptions, + content_length: ?usize, + ctx: anytype, + then: TaskFn(bool, @TypeOf(ctx)), + ) !void { + assert(!self.triggered); + self.triggered = true; + + // the body should not be set. + assert(options.body == null); + self.response.set(options); + + const headers = try self.provision.response.headers_into_buffer( + self.provision.buffer, + content_length, ); + + try self.send_then(headers, ctx, then); } + /// This is your standard response. pub fn respond(self: *Self, options: ResponseSetOptions) !void { assert(!self.triggered); self.triggered = true; diff --git a/src/http/response.zig b/src/http/response.zig index 3335d9d..d67a2f5 100644 --- a/src/http/response.zig +++ b/src/http/response.zig @@ -50,7 +50,7 @@ pub const Response = struct { } } - pub fn headers_into_buffer(self: *Response, buffer: []u8, content_length: ?u32) ![]u8 { + pub fn headers_into_buffer(self: *Response, buffer: []u8, content_length: ?usize) ![]u8 { var index: usize = 0; // Status Line diff --git a/src/http/router.zig b/src/http/router.zig index ab12aab..1526be0 100644 --- a/src/http/router.zig +++ b/src/http/router.zig @@ -16,6 +16,7 @@ const QueryMap = @import("routing_trie.zig").QueryMap; const Runtime = @import("tardy").Runtime; const Task = @import("tardy").Task; +const Stat = @import("tardy").Stat; pub fn Router(comptime Server: type) type { return struct { @@ -50,9 +51,11 @@ pub fn Router(comptime Server: type) type { const FileProvision = struct { mime: Mime, context: *Context, + request: *const Request, + response: *Response, fd: std.posix.fd_t, - offset: usize, - list: std.ArrayList(u8), + file_size: u64, + rd_offset: usize, buffer: []u8, }; @@ -73,72 +76,132 @@ pub fn Router(comptime Server: type) type { } provision.fd = fd; - // TODO: If we have a If-None-Match by this point, we should fire off a stat request - // that way we can check the last modified time and compare that with our ETag. - // We generally avoid using the HTTP Date stuff since it can be so slow. - // - // If we have a matching etag, we can just respond with Not Modified. - // If we don't then we continue doing what we normally do. + try rt.fs.stat(provision, stat_file_task, fd); + } + + fn stat_file_task(_: *Runtime, stat: Stat, provision: *FileProvision) !void { + errdefer provision.context.respond(.{ + .status = .@"Internal Server Error", + .mime = Mime.HTML, + .body = "", + }) catch unreachable; + + // Set file size. + provision.file_size = stat.size; + log.err("file size: {d}", .{provision.file_size}); + + // generate the etag and attach it to the response. + var hash = std.hash.Wyhash.init(0); + hash.update(std.mem.asBytes(&stat.size)); + if (stat.modified) |modified| { + hash.update(std.mem.asBytes(&modified.seconds)); + hash.update(std.mem.asBytes(&modified.nanos)); + } + const etag_hash = hash.final(); + + const calc_etag = try std.fmt.allocPrint( + provision.context.allocator, + "\"{d}\"", + .{etag_hash}, + ); + try provision.response.headers.add("ETag", calc_etag); + + // If we have an ETag on the request... + if (provision.request.headers.get("If-None-Match")) |etag| { + if (std.mem.eql(u8, etag, calc_etag)) { + // If the ETag matches. + try provision.context.respond(.{ + .status = .@"Not Modified", + .mime = Mime.HTML, + .body = "", + }); + return; + } + } + + try provision.context.respond_headers_only(.{ + .status = .OK, + .mime = provision.mime, + }, stat.size, provision, start_stream_file_task); + } + + fn start_stream_file_task(rt: *Runtime, success: bool, provision: *FileProvision) !void { + errdefer { + std.posix.close(provision.fd); + provision.context.close() catch unreachable; + } + + if (!success) { + log.warn("starting file stream failed!", .{}); + try provision.context.close(); + return; + } + + // start streaming... try rt.fs.read( provision, read_file_task, - fd, + provision.fd, provision.buffer, - 0, + provision.rd_offset, ); } - fn read_file_task(rt: *Runtime, result: i32, provision: *FileProvision) !void { - errdefer provision.context.respond(.{ - .status = .@"Internal Server Error", - .mime = Mime.HTML, - .body = "", - }) catch unreachable; + fn read_file_task(_: *Runtime, result: i32, provision: *FileProvision) !void { + errdefer { + std.posix.close(provision.fd); + provision.context.close() catch unreachable; + } - if (result <= 0) { - // If we are done reading... - try rt.fs.close( - provision, - close_file_task, - provision.fd, - ); + if (result <= -1) { + log.warn("read file task failed", .{}); + std.posix.close(provision.fd); + try provision.context.close(); return; } const length: usize = @intCast(result); - try provision.list.appendSlice(provision.buffer[0..length]); + provision.rd_offset += length; + log.debug("current offset: {d} | fd: {}", .{ provision.rd_offset, provision.fd }); + + if (provision.rd_offset >= provision.file_size) { + log.debug("done streaming file | rd off: {d} | f size: {d} | result: {d}", .{ + provision.rd_offset, + provision.file_size, + result, + }); - // TODO: This needs to be a setting you pass in to the router. - // - //if (provision.list.items.len > 1024 * 1024 * 4) { - // provision.context.respond(.{ - // .status = .@"Content Too Large", - // .mime = Mime.HTML, - // .body = "File Too Large", - // }); - // return; - //} + std.posix.close(provision.fd); + try provision.context.send_then_recv(provision.buffer[0..length]); + } else { + try provision.context.send_then(provision.buffer[0..length], provision, send_file_task); + } + } + + fn send_file_task(rt: *Runtime, success: bool, provision: *FileProvision) !void { + errdefer { + std.posix.close(provision.fd); + provision.context.close() catch unreachable; + } - provision.offset += length; + if (!success) { + log.warn("send file stream failed!", .{}); + std.posix.close(provision.fd); + try provision.context.close(); + return; + } + // continue streaming.. try rt.fs.read( provision, read_file_task, provision.fd, provision.buffer, - provision.offset, + provision.rd_offset, ); } - fn close_file_task(_: *Runtime, _: void, provision: *FileProvision) !void { - try provision.context.respond(.{ - .status = .OK, - .mime = provision.mime, - .body = provision.list.items[0..], - }); - } - pub fn serve_fs_dir(self: *Self, comptime url_path: []const u8, comptime dir_path: []const u8) !void { assert(!self.locked); const arena = self.arena.allocator(); @@ -149,10 +212,6 @@ pub fn Router(comptime Server: type) type { const route = Route.init().get(slice, struct { fn handler_fn(ctx: *Context, real_dir: *const []const u8) !void { - // TODO: Add caching support. We shouldn't need to resend files - // all the time, especially if the user has gotten them before - // and has an ETag. - if (ctx.captures.len == 0) { try ctx.respond(.{ .status = .@"Not Found", @@ -197,14 +256,14 @@ pub fn Router(comptime Server: type) type { provision.* = .{ .mime = mime, .context = ctx, + .request = ctx.request, + .response = ctx.response, .fd = -1, - .offset = 0, - .list = std.ArrayList(u8).init(ctx.allocator), + .file_size = 0, + .rd_offset = 0, .buffer = ctx.provision.buffer, }; - // We also need to support chunked encoding. - // It makes a lot more sense for files atleast. try ctx.runtime.fs.open( provision, open_file_task, diff --git a/src/http/server.zig b/src/http/server.zig index b02ecfd..cd05850 100644 --- a/src/http/server.zig +++ b/src/http/server.zig @@ -486,12 +486,12 @@ pub fn Server(comptime security: Security) type { } } - pub const send_then_sse_task = send_then(struct { + pub const send_then_other_task = send_then(struct { fn inner(rt: *Runtime, success: bool, provision: *Provision) !void { const send_job = provision.job.send; - assert(send_job.after == .sse); - const func: TaskFn(bool, *anyopaque) = @ptrCast(@alignCast(send_job.after.sse.func)); - const ctx: *anyopaque = @ptrCast(@alignCast(send_job.after.sse.ctx)); + assert(send_job.after == .other); + const func: TaskFn(bool, *anyopaque) = @ptrCast(@alignCast(send_job.after.other.func)); + const ctx: *anyopaque = @ptrCast(@alignCast(send_job.after.other.ctx)); try @call(.auto, func, .{ rt, success, ctx }); if (!success) { @@ -527,7 +527,7 @@ pub fn Server(comptime security: Security) type { } }.inner); - fn send_then(comptime func: TaskFn(bool, *Provision)) TaskFn(i32, *Provision) { + pub fn send_then(comptime func: TaskFn(bool, *Provision)) TaskFn(i32, *Provision) { return struct { fn send_then_inner(rt: *Runtime, length: i32, provision: *Provision) !void { assert(provision.job == .send); @@ -586,7 +586,7 @@ pub fn Server(comptime security: Security) type { try rt.net.send( provision, - send_then_recv_task, + send_then_inner, provision.socket, job_tls.encrypted, ); @@ -600,7 +600,7 @@ pub fn Server(comptime security: Security) type { const remainder = job_tls.encrypted[job_tls.encrypted_count..]; try rt.net.send( provision, - send_then_recv_task, + send_then_inner, provision.socket, remainder, ); @@ -630,9 +630,13 @@ pub fn Server(comptime security: Security) type { plain_buffer.len + send_job.count, }); + // this is the problem. + // we are doing send then recv which is wrong!! + // + // we should be calling ourselves... try rt.net.send( provision, - send_then_recv_task, + send_then_inner, provision.socket, plain_buffer, ); diff --git a/src/http/sse.zig b/src/http/sse.zig index 2b5ee66..e6c8ab9 100644 --- a/src/http/sse.zig +++ b/src/http/sse.zig @@ -55,24 +55,7 @@ pub fn SSE(comptime Server: type) type { buffer[index] = '\n'; index += 1; - const pslice = Pseudoslice.init(buffer[0..index], "", buffer); - - const first_chunk = Server.prepare_send( - self.context.runtime, - self.context.provision, - .{ .sse = .{ - .func = then, - .ctx = then_context, - } }, - pslice, - ) catch unreachable; - - self.context.runtime.net.send( - self.context.provision, - Server.send_then_sse_task, - self.context.provision.socket, - first_chunk, - ) catch unreachable; + try self.context.send_then(buffer[0..index], then_context, then); } }; } From a2a011a334538a2de9f4b012da6dbdfd80603f73 Mon Sep 17 00:00:00 2001 From: Muki Kiboigo Date: Fri, 15 Nov 2024 17:13:08 -0800 Subject: [PATCH 7/8] fix(router): prevent double close on send fail --- src/http/router.zig | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/http/router.zig b/src/http/router.zig index 1526be0..395f21d 100644 --- a/src/http/router.zig +++ b/src/http/router.zig @@ -88,7 +88,7 @@ pub fn Router(comptime Server: type) type { // Set file size. provision.file_size = stat.size; - log.err("file size: {d}", .{provision.file_size}); + log.debug("file size: {d}", .{provision.file_size}); // generate the etag and attach it to the response. var hash = std.hash.Wyhash.init(0); @@ -134,7 +134,7 @@ pub fn Router(comptime Server: type) type { if (!success) { log.warn("starting file stream failed!", .{}); - try provision.context.close(); + std.posix.close(provision.fd); return; } @@ -188,7 +188,6 @@ pub fn Router(comptime Server: type) type { if (!success) { log.warn("send file stream failed!", .{}); std.posix.close(provision.fd); - try provision.context.close(); return; } From 791bb42fcc151a518146e4aefe71a250787f4598 Mon Sep 17 00:00:00 2001 From: Muki Kiboigo Date: Fri, 15 Nov 2024 22:36:45 -0800 Subject: [PATCH 8/8] feat(router): speed up fs directory serving --- src/http/router.zig | 53 +++++++++++++++++++++++++++------------------ 1 file changed, 32 insertions(+), 21 deletions(-) diff --git a/src/http/router.zig b/src/http/router.zig index 395f21d..2d0795a 100644 --- a/src/http/router.zig +++ b/src/http/router.zig @@ -56,6 +56,7 @@ pub fn Router(comptime Server: type) type { fd: std.posix.fd_t, file_size: u64, rd_offset: usize, + current_length: usize, buffer: []u8, }; @@ -79,7 +80,7 @@ pub fn Router(comptime Server: type) type { try rt.fs.stat(provision, stat_file_task, fd); } - fn stat_file_task(_: *Runtime, stat: Stat, provision: *FileProvision) !void { + fn stat_file_task(rt: *Runtime, stat: Stat, provision: *FileProvision) !void { errdefer provision.context.respond(.{ .status = .@"Internal Server Error", .mime = Mime.HTML, @@ -120,35 +121,25 @@ pub fn Router(comptime Server: type) type { } } - try provision.context.respond_headers_only(.{ + provision.response.set(.{ .status = .OK, .mime = provision.mime, - }, stat.size, provision, start_stream_file_task); - } + .body = null, + }); - fn start_stream_file_task(rt: *Runtime, success: bool, provision: *FileProvision) !void { - errdefer { - std.posix.close(provision.fd); - provision.context.close() catch unreachable; - } + const headers = try provision.response.headers_into_buffer(provision.buffer, stat.size); + provision.current_length = headers.len; - if (!success) { - log.warn("starting file stream failed!", .{}); - std.posix.close(provision.fd); - return; - } - - // start streaming... try rt.fs.read( provision, read_file_task, provision.fd, - provision.buffer, + provision.buffer[provision.current_length..], provision.rd_offset, ); } - fn read_file_task(_: *Runtime, result: i32, provision: *FileProvision) !void { + fn read_file_task(rt: *Runtime, result: i32, provision: *FileProvision) !void { errdefer { std.posix.close(provision.fd); provision.context.close() catch unreachable; @@ -163,9 +154,10 @@ pub fn Router(comptime Server: type) type { const length: usize = @intCast(result); provision.rd_offset += length; + provision.current_length += length; log.debug("current offset: {d} | fd: {}", .{ provision.rd_offset, provision.fd }); - if (provision.rd_offset >= provision.file_size) { + if (provision.rd_offset >= provision.file_size or result == 0) { log.debug("done streaming file | rd off: {d} | f size: {d} | result: {d}", .{ provision.rd_offset, provision.file_size, @@ -173,9 +165,24 @@ pub fn Router(comptime Server: type) type { }); std.posix.close(provision.fd); - try provision.context.send_then_recv(provision.buffer[0..length]); + try provision.context.send_then_recv(provision.buffer[0..provision.current_length]); } else { - try provision.context.send_then(provision.buffer[0..length], provision, send_file_task); + assert(provision.current_length <= provision.buffer.len); + if (provision.current_length == provision.buffer.len) { + try provision.context.send_then( + provision.buffer[0..provision.current_length], + provision, + send_file_task, + ); + } else { + try rt.fs.read( + provision, + read_file_task, + provision.fd, + provision.buffer[provision.current_length..], + provision.rd_offset, + ); + } } } @@ -191,6 +198,9 @@ pub fn Router(comptime Server: type) type { return; } + // reset current length + provision.current_length = 0; + // continue streaming.. try rt.fs.read( provision, @@ -260,6 +270,7 @@ pub fn Router(comptime Server: type) type { .fd = -1, .file_size = 0, .rd_offset = 0, + .current_length = 0, .buffer = ctx.provision.buffer, };