Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add useJSONContent option for express and sveltekit #41

Merged
merged 5 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions examples/express/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,94 @@ app.use('/workflow', serve<{ message: string }>(async (context) => {
console.log('step 2 input', result1, 'output', output)
return output
})
}, {
verbose: true,
retries: 0
}));

app.post("/get-data", (req, res) => {
res.send("hey there");
});

// here we are

app.use(
"/test",
serveQstashWorkflow(async (context: Parameters<RouteFunction<unknown>>[0]) => {
await context.run("test", async () => {
// console.log("test");
});
}, {
useJSONContent: true
})
);
CahidArda marked this conversation as resolved.
Show resolved Hide resolved

import { IncomingHttpHeaders } from "http";
import { RouteFunction, serve as basicServe, WorkflowServeOptions } from "@upstash/workflow";
import { Request as ExpressRequest, Response, Router } from "express";

function transformHeaders(headers: IncomingHttpHeaders): [string, string][] {
return Object.entries(headers).map(([key, value]) => [
key,
Array.isArray(value) ? value.join(", ") : value ?? "",
]);
}

export function serveQstashWorkflow<TInitialPayload = unknown>(
routeFunction: RouteFunction<TInitialPayload>,
options?: Omit<WorkflowServeOptions<globalThis.Response, TInitialPayload>, "onStepFinish">
): Router {
const router = express.Router();

router.post("*", async (req: ExpressRequest, res: Response) => {
const protocol = (req.headers["x-forwarded-proto"] as string) || req.protocol;
const host = req.headers.host;
const url = `${protocol}://${host}${req.originalUrl}`;
const headers = transformHeaders(req.headers);

let reqBody: string | undefined;

if (req.headers["content-type"]?.includes("text/plain")) {
reqBody = req.body;
} else if (req.headers["content-type"]?.includes("application/json")) {
reqBody = JSON.stringify(req.body);
}

const request = new Request(url, {
headers: headers,
body: reqBody || "{}",
method: "POST",
});

const { handler: serveHandler } = basicServe<TInitialPayload>(routeFunction, options);

try {
const response = await serveHandler(request);

// Set status code
res.status(response.status);

// Set headers
response.headers.forEach((value, key) => {
res.setHeader(key, value);
});

// Send body
CahidArda marked this conversation as resolved.
Show resolved Hide resolved
if (response.body) {
const buffer = await response.arrayBuffer();
res.send(Buffer.from(buffer));
} else {
res.end();
}
} catch (error) {
console.error("Error in workflow handler:", error);
res.status(500).json({ error: "Internal Server Error" });
}
});

return router;
}

app.listen(3001, () => {
console.log('Server running on port 3001');
});
12 changes: 4 additions & 8 deletions examples/nextjs/app/path/route.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
export const runtime = 'nodejs';

import { serve } from '@upstash/workflow/nextjs'

const someWork = (input: string) => {
Expand All @@ -7,13 +9,7 @@ const someWork = (input: string) => {
export const { POST } = serve<string>(async (context) => {
const input = context.requestPayload
const result1 = await context.run('step1', async () => {
const output = someWork(input)
console.log('step 1 input', input, 'output', output)
return output
})

await context.run('step2', async () => {
const output = someWork(result1)
console.log('step 2 input', result1, 'output', output)
console.log(someWork(input));
})
}, {
})
4 changes: 2 additions & 2 deletions platforms/astro.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import type { APIContext, APIRoute } from "astro";

import type { WorkflowServeOptions, WorkflowContext } from "../src";
import type { PublicServeOptions, WorkflowContext } from "../src";
import { serve as serveBase } from "../src";

export function serve<TInitialPayload = unknown>(
routeFunction: (
workflowContext: WorkflowContext<TInitialPayload>,
apiContext: APIContext
) => Promise<void>,
options?: Omit<WorkflowServeOptions<Response, TInitialPayload>, "onStepFinish">
options?: PublicServeOptions<TInitialPayload>
) {
const POST: APIRoute = (apiContext) => {
const { handler } = serveBase<TInitialPayload>(
Expand Down
4 changes: 2 additions & 2 deletions platforms/cloudflare.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { RouteFunction, WorkflowServeOptions } from "../src";
import type { PublicServeOptions, RouteFunction } from "../src";
import { serve as serveBase } from "../src";

export type WorkflowBindings = {
Expand Down Expand Up @@ -58,7 +58,7 @@ const getArgs = (
*/
export const serve = <TInitialPayload = unknown>(
routeFunction: RouteFunction<TInitialPayload>,
options?: Omit<WorkflowServeOptions<Response, TInitialPayload>, "onStepFinish">
options?: PublicServeOptions<TInitialPayload>
): { fetch: (...args: PagesHandlerArgs | WorkersHandlerArgs) => Promise<Response> } => {
const fetch = async (...args: PagesHandlerArgs | WorkersHandlerArgs) => {
const { request, env } = getArgs(args);
Expand Down
21 changes: 15 additions & 6 deletions platforms/express.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,18 @@ export function serve<TInitialPayload = unknown>(
return;
}

// only allow application/json content type
if (request_.headers["content-type"] !== "application/json") {
res.status(400).json("Only application/json content type is allowed in express workflows");
return;
let requestBody: string;
if (request_.headers["content-type"]?.includes("text/plain")) {
requestBody = request_.body;
} else if (request_.headers["content-type"]?.includes("application/json")) {
requestBody = JSON.stringify(request_.body);
} else {
requestBody =
typeof request_.body === "string" ? request_.body : JSON.stringify(request_.body);
}

console.log(request_.headers, request_.body);

// create Request
const protocol = request_.protocol;
const host = request_.get("host") || "localhost";
Expand All @@ -34,13 +40,16 @@ export function serve<TInitialPayload = unknown>(
const webRequest = new Request(url, {
method: request_.method,
headers: new Headers(request_.headers as Record<string, string>),
body: JSON.stringify(request_.body),
body: requestBody,
});

// create handler
const { handler: serveHandler } = serveBase<TInitialPayload>(
(workflowContext) => routeFunction(workflowContext),
options
{
...options,
useJSONContent: true,
}
);

const response = await serveHandler(webRequest);
Expand Down
4 changes: 2 additions & 2 deletions platforms/h3.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { defineEventHandler, readRawBody } from "h3";

import type { RouteFunction, WorkflowServeOptions } from "../src";
import type { PublicServeOptions, RouteFunction } from "../src";
import { serve as serveBase } from "../src";
import type { IncomingHttpHeaders } from "node:http";

Expand All @@ -14,7 +14,7 @@ function transformHeaders(headers: IncomingHttpHeaders): [string, string][] {

export const serve = <TInitialPayload = unknown>(
routeFunction: RouteFunction<TInitialPayload>,
options?: Omit<WorkflowServeOptions<Response, TInitialPayload>, "onStepFinish">
options?: PublicServeOptions<TInitialPayload>
) => {
const handler = defineEventHandler(async (event) => {
const method = event.node.req.method;
Expand Down
4 changes: 2 additions & 2 deletions platforms/hono.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { Context } from "hono";
import type { RouteFunction, WorkflowServeOptions } from "../src";
import type { PublicServeOptions, RouteFunction } from "../src";
import { serve as serveBase } from "../src";
import { Variables } from "hono/types";

Expand All @@ -26,7 +26,7 @@ export const serve = <
TVariables extends Variables = Variables,
>(
routeFunction: RouteFunction<TInitialPayload>,
options?: Omit<WorkflowServeOptions<Response, TInitialPayload>, "onStepFinish">
options?: PublicServeOptions<TInitialPayload>
): ((context: Context<{ Bindings: TBindings; Variables: TVariables }>) => Promise<Response>) => {
const handler = async (context: Context<{ Bindings: TBindings; Variables: TVariables }>) => {
const environment = context.env;
Expand Down
6 changes: 3 additions & 3 deletions platforms/nextjs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
/* eslint-disable @typescript-eslint/ban-ts-comment */
import type { NextApiHandler, NextApiRequest, NextApiResponse } from "next";

import type { WorkflowServeOptions, RouteFunction } from "../src";
import type { RouteFunction, PublicServeOptions } from "../src";
import { serve as serveBase } from "../src";

/**
Expand All @@ -16,7 +16,7 @@ import { serve as serveBase } from "../src";
*/
export const serve = <TInitialPayload = unknown>(
routeFunction: RouteFunction<TInitialPayload>,
options?: Omit<WorkflowServeOptions<Response, TInitialPayload>, "onStepFinish">
options?: PublicServeOptions<TInitialPayload>
): { POST: (request: Request) => Promise<Response> } => {
const { handler: serveHandler } = serveBase<TInitialPayload, Request, Response>(
routeFunction,
Expand All @@ -32,7 +32,7 @@ export const serve = <TInitialPayload = unknown>(

export const servePagesRouter = <TInitialPayload = unknown>(
routeFunction: RouteFunction<TInitialPayload>,
options?: Omit<WorkflowServeOptions<Response, TInitialPayload>, "onStepFinish">
options?: PublicServeOptions<TInitialPayload>
): { handler: NextApiHandler } => {
const { handler: serveHandler } = serveBase(routeFunction, options);

Expand Down
4 changes: 2 additions & 2 deletions platforms/solidjs.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { APIEvent } from "@solidjs/start/server";

import type { RouteFunction, WorkflowServeOptions } from "../src";
import type { PublicServeOptions, RouteFunction } from "../src";
import { serve as serveBase } from "../src";

/**
Expand All @@ -14,7 +14,7 @@ import { serve as serveBase } from "../src";
*/
export const serve = <TInitialPayload = unknown>(
routeFunction: RouteFunction<TInitialPayload>,
options?: Omit<WorkflowServeOptions<Response, TInitialPayload>, "onStepFinish">
options?: PublicServeOptions<TInitialPayload>
) => {
// Create a handler which receives an event and calls the
// serveBase method
Expand Down
11 changes: 7 additions & 4 deletions platforms/svelte.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { RequestHandler } from "@sveltejs/kit";

import type { RouteFunction, WorkflowServeOptions } from "../src";
import type { PublicServeOptions, RouteFunction } from "../src";
import { serve as serveBase } from "../src";

/**
Expand All @@ -14,12 +14,15 @@ import { serve as serveBase } from "../src";
*/
export const serve = <TInitialPayload = unknown>(
routeFunction: RouteFunction<TInitialPayload>,
options: Omit<WorkflowServeOptions<Response, TInitialPayload>, "onStepFinish"> & {
env: WorkflowServeOptions["env"];
options: PublicServeOptions<TInitialPayload> & {
env: PublicServeOptions["env"]; // make env required
}
): { POST: RequestHandler } => {
const handler: RequestHandler = async ({ request }) => {
const { handler: serveHandler } = serveBase<TInitialPayload>(routeFunction, options);
const { handler: serveHandler } = serveBase<TInitialPayload>(routeFunction, {
...options,
useJSONContent: true,
ytkimirti marked this conversation as resolved.
Show resolved Hide resolved
});
return await serveHandler(request);
};

Expand Down
3 changes: 2 additions & 1 deletion src/serve/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ export const serve = <
baseUrl,
env,
retries,
useJSONContent,
} = processOptions<TResponse, TInitialPayload>(options);
const debug = WorkflowLogger.getLogger(verbose);

Expand Down Expand Up @@ -161,7 +162,7 @@ export const serve = <
} else if (callReturnCheck.value === "continue-workflow") {
// request is not third party call. Continue workflow as usual
const result = isFirstInvocation
? await triggerFirstInvocation(workflowContext, retries, debug)
? await triggerFirstInvocation(workflowContext, retries, useJSONContent, debug)
: await triggerRouteFunction({
onStep: async () => routeFunction(workflowContext),
onCleanup: async () => {
Expand Down
1 change: 1 addition & 0 deletions src/serve/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ export const processOptions = <TResponse extends Response = Response, TInitialPa
baseUrl: environment.UPSTASH_WORKFLOW_URL,
env: environment,
retries: DEFAULT_RETRIES,
useJSONContent: false,
...options,
};
};
Expand Down
13 changes: 13 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ export type FinishCondition =
| "fromCallback"
| "auth-fail"
| "failure-callback";

export type WorkflowServeOptions<
TResponse extends Response = Response,
TInitialPayload = unknown,
Expand Down Expand Up @@ -218,8 +219,20 @@ export type WorkflowServeOptions<
* 3 by default
*/
retries?: number;
/**
* Whether the framework should use `content-type: application/json`
* in `triggerFirstInvocation`.
Comment on lines +223 to +224
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this very confusing because we cannot assume anyone is familiar with our internal workings, they will not know what triggerFirstInvocationis. To me a much more friendly approach would be directly specifying headers that are then forwarded, those can include the content type but are not limited

Copy link
Collaborator Author

@CahidArda CahidArda Dec 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

useJSONContent is not part of the options we allow in serve methods for frameworks (see PublicServeOptions). So it won't be visible to users unless they they import serve from the route like import { serve } from "@upstash/workflow"

The reason we need this parameter is because we used to have publishJSON in client.trigger. this didn't work because publishJSON adds content-type: application/json, which breaks some frameworks. So we used publish. But some frameworks still need the content type to be application/json. sveltekit is one of them.

If you have a better alternative to the current implementation we can change it

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed useJSONContent from the public API

*
* Not part of the public API.
*/
useJSONContent?: boolean;
};

export type PublicServeOptions<
TInitialPayload = unknown,
TResponse extends Response = Response,
> = Omit<WorkflowServeOptions<TResponse, TInitialPayload>, "onStepFinish" | "useJSONContent">;

/**
* Payload passed as body in failureFunction
*/
Expand Down
14 changes: 7 additions & 7 deletions src/workflow-requests.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -563,9 +563,9 @@ describe("Workflow Requests", () => {
url: WORKFLOW_ENDPOINT,
});

await triggerFirstInvocation(context, 3);
const debug = new WorkflowLogger({ logLevel: "INFO", logOutput: "console" });
const spy = spyOn(debug, "log");
await triggerFirstInvocation(context, 3);
const debug = new WorkflowLogger({ logLevel: "INFO", logOutput: "console" });
const spy = spyOn(debug, "log");

const firstDelete = await triggerWorkflowDelete(context, debug);
expect(firstDelete).toEqual({ deleted: true });
Expand Down Expand Up @@ -606,7 +606,7 @@ describe("Workflow Requests", () => {
const debug = new WorkflowLogger({ logLevel: "INFO", logOutput: "console" });
const spy = spyOn(debug, "log");

await triggerFirstInvocation(context, 3, debug);
await triggerFirstInvocation(context, 3, false, debug);
expect(spy).toHaveBeenCalledTimes(1);

await workflowClient.cancel({ ids: [workflowRunId] });
Expand Down Expand Up @@ -657,7 +657,7 @@ describe("Workflow Requests", () => {
const debug = new WorkflowLogger({ logLevel: "INFO", logOutput: "console" });
const spy = spyOn(debug, "log");

await triggerFirstInvocation(context, 3, debug);
await triggerFirstInvocation(context, 3, false, debug);
expect(spy).toHaveBeenCalledTimes(1);

await workflowClient.cancel({ ids: [workflowRunId] });
Expand Down Expand Up @@ -707,14 +707,14 @@ describe("Workflow Requests", () => {
const debug = new WorkflowLogger({ logLevel: "INFO", logOutput: "console" });
const spy = spyOn(debug, "log");

const resultOne = await triggerFirstInvocation(context, 3, debug);
const resultOne = await triggerFirstInvocation(context, 3, false, debug);
expect(resultOne.isOk()).toBeTrue();
// @ts-expect-error value will exist because of isOk
expect(resultOne.value).toBe("success");

expect(spy).toHaveBeenCalledTimes(1);

const resultTwo = await triggerFirstInvocation(context, 0, debug);
const resultTwo = await triggerFirstInvocation(context, 0, false, debug);
expect(resultTwo.isOk()).toBeTrue();
// @ts-expect-error value will exist because of isOk
expect(resultTwo.value).toBe("workflow-run-already-exists");
Expand Down
Loading
Loading