From 7e15d047ef026fb24f30e6b81011eaa721529de3 Mon Sep 17 00:00:00 2001 From: Douglas Gubert Date: Fri, 3 Nov 2023 17:16:48 -0300 Subject: [PATCH] Adopt jsonrpc-lite package --- deno-runtime/deno.jsonc | 1 + deno-runtime/deno.lock | 7 +- deno-runtime/lib/messenger.ts | 151 +++++++++----------- deno-runtime/main.ts | 54 ++++--- package-lock.json | 5 + package.json | 3 +- src/server/runtime/AppsEngineDenoRuntime.ts | 66 ++++++--- 7 files changed, 159 insertions(+), 128 deletions(-) diff --git a/deno-runtime/deno.jsonc b/deno-runtime/deno.jsonc index 56764e1ae..e6ab99e27 100644 --- a/deno-runtime/deno.jsonc +++ b/deno-runtime/deno.jsonc @@ -3,5 +3,6 @@ "acorn": "npm:acorn@8.10.0", "acorn-walk": "npm:acorn-walk@8.2.0", "astring": "npm:astring@1.8.6" + "jsonrpc-lite": "npm:jsonrpc-lite@2.2.0", } } diff --git a/deno-runtime/deno.lock b/deno-runtime/deno.lock index c1966f8a4..5c46ee52e 100644 --- a/deno-runtime/deno.lock +++ b/deno-runtime/deno.lock @@ -40,7 +40,8 @@ "specifiers": { "acorn-walk@8.2.0": "acorn-walk@8.2.0", "acorn@8.10.0": "acorn@8.10.0", - "astring@1.8.6": "astring@1.8.6" + "astring@1.8.6": "astring@1.8.6", + "jsonrpc-lite@2.2.0": "jsonrpc-lite@2.2.0" }, "packages": { "acorn-walk@8.2.0": { @@ -54,6 +55,10 @@ "astring@1.8.6": { "integrity": "sha512-ISvCdHdlTDlH5IpxQJIex7BWBywFWgjJSVdwst+/iQCoEYnyOaQ95+X1JGshuBjGp6nxKUy1jMgE3zPqN7fQdg==", "dependencies": {} + }, + "jsonrpc-lite@2.2.0": { + "integrity": "sha512-/cbbSxtZWs1O7R4tWqabrCM/t3N8qKUZMAg9IUqpPvUs6UyRvm6pCNYkskyKN/XU0UgffW+NY2ZRr8t0AknX7g==", + "dependencies": {} } } } diff --git a/deno-runtime/lib/messenger.ts b/deno-runtime/lib/messenger.ts index 98e50028e..1b8951726 100644 --- a/deno-runtime/lib/messenger.ts +++ b/deno-runtime/lib/messenger.ts @@ -1,113 +1,90 @@ -export type JSONRPC_Message = { - jsonrpc: '2.0-rc'; -}; - -export type RequestDescriptor = { - method: string; - params: any[]; -}; - -export type Request = JSONRPC_Message & - RequestDescriptor & { - id: string; - }; - -export type SuccessResponseDescriptor = { - id: string; - result: any; -}; - -export type SuccessResponse = JSONRPC_Message & SuccessResponseDescriptor; - -export type ErrorResponseDescriptor = { - error: { - code: number; - message: string; - data?: Record; - }; - id: string | null; -}; - -export type ErrorResponse = JSONRPC_Message & ErrorResponseDescriptor; - -export type Response = SuccessResponse | ErrorResponse; - -export function isJSONRPCMessage(message: object): message is JSONRPC_Message { - return 'jsonrpc' in message && message['jsonrpc'] === '2.0-rc'; -} +import * as jsonrpc from 'jsonrpc-lite'; -export function isRequest(message: object): message is Request { - return isJSONRPCMessage(message) && 'method' in message && 'params' in message && 'id' in message; -} +export type RequestDescriptor = Pick; + +export type NotificationDescriptor = Pick; + +export type SuccessResponseDescriptor = Pick; -export function isResponse(message: object): message is Response { - return isJSONRPCMessage(message) && ('result' in message || 'error' in message); +export type ErrorResponseDescriptor = Pick; + +export type JsonRpcRequest = jsonrpc.IParsedObjectRequest | jsonrpc.IParsedObjectNotification; +export type JsonRpcResponse = jsonrpc.IParsedObjectSuccess | jsonrpc.IParsedObjectError; + +export function isRequest(message: jsonrpc.IParsedObject): message is JsonRpcRequest { + return message.type === 'request' || message.type === 'notification'; } -export function isErrorResponse(response: Response): response is ErrorResponse { - return 'error' in response; +export function isResponse(message: jsonrpc.IParsedObject): message is JsonRpcResponse { + return message.type === 'success' || message.type === 'error'; } -export function isSuccessResponse(response: Response): response is SuccessResponse { - return 'result' in response; +export function isErrorResponse(message: jsonrpc.JsonRpc): message is jsonrpc.ErrorObject { + return message instanceof jsonrpc.ErrorObject; } const encoder = new TextEncoder(); export const RPCResponseObserver = new EventTarget(); -export async function serverParseError(): Promise { - const rpc: ErrorResponse = { - jsonrpc: '2.0-rc', - id: null, - error: { message: 'Parse error', code: -32700 }, - }; +export function parseMessage(message: string) { + const parsed = jsonrpc.parse(message); + + if (Array.isArray(parsed)) { + throw jsonrpc.error(null, jsonrpc.JsonRpcError.invalidRequest(null)); + } - const encoded = encoder.encode(JSON.stringify(rpc)); + if (parsed.type === 'invalid') { + throw jsonrpc.error(null, parsed.payload); + } + + return parsed; +} + +export async function send(message: jsonrpc.JsonRpc): Promise { + const encoded = encoder.encode(message.serialize()); await Deno.stdout.write(encoded); } -export async function serverMethodNotFound(id: string): Promise { - const rpc: ErrorResponse = { - jsonrpc: '2.0-rc', - id, - error: { message: 'Method not found', code: -32601 }, - }; +export async function sendInvalidRequestError(): Promise { + const rpc = jsonrpc.error(null, jsonrpc.JsonRpcError.invalidRequest(null)); - const encoded = encoder.encode(JSON.stringify(rpc)); - await Deno.stdout.write(encoded); + await send(rpc); +} + +export async function sendInvalidParamsError(id: jsonrpc.ID): Promise { + const rpc = jsonrpc.error(id, jsonrpc.JsonRpcError.invalidParams(null)); + + await send(rpc); +} + +export async function sendParseError(): Promise { + const rpc = jsonrpc.error(null, jsonrpc.JsonRpcError.parseError(null)); + + await send(rpc); +} + +export async function sendMethodNotFound(id: jsonrpc.ID): Promise { + const rpc = jsonrpc.error(id, jsonrpc.JsonRpcError.methodNotFound(null)); + + await send(rpc); } export async function errorResponse({ error: { message, code = -32000, data }, id }: ErrorResponseDescriptor): Promise { - const rpc: ErrorResponse = { - jsonrpc: '2.0-rc', - id, - error: { message, code, ...(data && { data }) }, - }; - - const encoded = encoder.encode(JSON.stringify(rpc)); - Deno.stdout.write(encoded); + const rpc = jsonrpc.error(id, new jsonrpc.JsonRpcError(message, code, data)); + + await send(rpc); } export async function successResponse({ id, result }: SuccessResponseDescriptor): Promise { - const rpc: SuccessResponse = { - jsonrpc: '2.0-rc', - id, - result, - }; + const rpc = jsonrpc.success(id, result); - const encoded = encoder.encode(JSON.stringify(rpc)); - await Deno.stdout.write(encoded); + await send(rpc); } -export async function sendRequest(requestDescriptor: RequestDescriptor): Promise { - const request: Request = { - jsonrpc: '2.0-rc', - id: Math.random().toString(36).slice(2), - ...requestDescriptor, - }; +export async function sendRequest(requestDescriptor: RequestDescriptor): Promise { + const request = jsonrpc.request(Math.random().toString(36).slice(2), requestDescriptor.method, requestDescriptor.params); - const encoded = encoder.encode(JSON.stringify(request)); - await Deno.stdout.write(encoded); + await send(request); return new Promise((resolve, reject) => { const handler = (event: Event) => { @@ -125,3 +102,9 @@ export async function sendRequest(requestDescriptor: RequestDescriptor): Promise RPCResponseObserver.addEventListener(`response:${request.id}`, handler); }); } + +export function sendNotification({ method, params }: NotificationDescriptor) { + const request = jsonrpc.notification(method, params); + + send(request); +} diff --git a/deno-runtime/main.ts b/deno-runtime/main.ts index 9e50220a8..1bf162b18 100644 --- a/deno-runtime/main.ts +++ b/deno-runtime/main.ts @@ -18,13 +18,6 @@ const require = createRequire(import.meta.url); // @deno-types='../definition/App.d.ts' const { App } = require('../definition/App'); -async function notifyEngine(notify: Record): Promise { - const encoder = new TextEncoder(); - const encoded = encoder.encode(JSON.stringify(notify)); - await Deno.stdout.write(encoded); - return undefined; -} - const ALLOWED_NATIVE_MODULES = ['path', 'url', 'crypto', 'buffer', 'stream', 'net', 'http', 'https', 'zlib', 'util', 'punycode', 'os', 'querystring']; const ALLOWED_EXTERNAL_MODULES = ['uuid']; @@ -95,12 +88,24 @@ async function handlInitializeApp({ id, source }: { id: string; source: string } return app; } -async function handleRequest({ method, params, id }: Messenger.Request): Promise { +async function handleRequest({ type, payload }: Messenger.JsonRpcRequest): Promise { + // We're not handling notifications at the moment + if (type === 'notification') { + return Messenger.sendInvalidRequestError(); + } + + const { id, method, params } = payload; + switch (method) { case 'construct': { - const [appId, source] = params; - app = await handlInitializeApp({ id: appId, source }) - Messenger.successResponse(id, { result: "hooray!" }); + const [appId, source] = params as [string, string]; + + if (!appId || !source) { + return Messenger.sendInvalidParamsError(id); + } + + const app = await handlInitializeApp({ id: appId, source }) + Messenger.successResponse({ id, result: 'hooray' }); break; } default: { @@ -113,32 +118,37 @@ async function handleRequest({ method, params, id }: Messenger.Request): Promise } } -async function handleResponse(response: Messenger.Response): Promise { +function handleResponse(response: Messenger.JsonRpcResponse): void { let event: Event; - if (Messenger.isErrorResponse(response)) { - event = new ErrorEvent(`response:${response.id}`, { error: response.error }); + if (response.type === 'error') { + event = new ErrorEvent(`response:${response.payload.id}`, { error: response.payload.error }); } else { - event = new CustomEvent(`response:${response.id}`, { detail: response.result }); + event = new CustomEvent(`response:${response.payload.id}`, { detail: response.payload.result }); } Messenger.RPCResponseObserver.dispatchEvent(event); } async function main() { - setTimeout(() => notifyEngine({ method: 'ready' }), 1_780); + setTimeout(() => Messenger.sendNotification({ method: 'ready' }), 1_780); const decoder = new TextDecoder(); - let app: typeof App; for await (const chunk of Deno.stdin.readable) { const message = decoder.decode(chunk); - let JSONRPCMessage + let JSONRPCMessage; try { - JSONRPCMessage = JSON.parse(message); - } catch (_) { - return Messenger.serverParseError(); + JSONRPCMessage = Messenger.parseMessage(message); + } catch (error) { + if (Messenger.isErrorResponse(error)) { + await Messenger.send(error); + } else { + await Messenger.sendParseError(); + } + + continue; } if (Messenger.isRequest(JSONRPCMessage)) { @@ -146,7 +156,7 @@ async function main() { } if (Messenger.isResponse(JSONRPCMessage)) { - await handleResponse(JSONRPCMessage); + handleResponse(JSONRPCMessage); } } } diff --git a/package-lock.json b/package-lock.json index 60e3df93f..055288bc9 100644 --- a/package-lock.json +++ b/package-lock.json @@ -6845,6 +6845,11 @@ "integrity": "sha512-POQXvpdL69+CluYsillJ7SUhKvytYjW9vG/GKpnf+xP8UWgYEM/RaMzHHofbALDiKbbP1W8UEYmgGl39WkPZsg==", "dev": true }, + "jsonrpc-lite": { + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/jsonrpc-lite/-/jsonrpc-lite-2.2.0.tgz", + "integrity": "sha512-/cbbSxtZWs1O7R4tWqabrCM/t3N8qKUZMAg9IUqpPvUs6UyRvm6pCNYkskyKN/XU0UgffW+NY2ZRr8t0AknX7g==" + }, "just-debounce": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/just-debounce/-/just-debounce-1.1.0.tgz", diff --git a/package.json b/package.json index b6e0fa22f..58a98f435 100644 --- a/package.json +++ b/package.json @@ -93,8 +93,9 @@ "dependencies": { "adm-zip": "^0.5.9", "cryptiles": "^4.1.3", - "deno-bin": "^1.36.2", + "deno-bin": "1.36.2", "jose": "^4.11.1", + "jsonrpc-lite": "^2.2.0", "lodash.clonedeep": "^4.5.0", "semver": "^5.7.1", "stack-trace": "0.0.10", diff --git a/src/server/runtime/AppsEngineDenoRuntime.ts b/src/server/runtime/AppsEngineDenoRuntime.ts index 743e6d77e..bb93072cf 100644 --- a/src/server/runtime/AppsEngineDenoRuntime.ts +++ b/src/server/runtime/AppsEngineDenoRuntime.ts @@ -2,6 +2,8 @@ import * as child_process from 'child_process'; import * as path from 'path'; import { EventEmitter } from 'stream'; +import * as jsonrpc from 'jsonrpc-lite'; + import type { AppAccessorManager, AppApiManager } from '../managers'; import type { AppManager } from '../AppManager'; @@ -29,6 +31,11 @@ export function getDenoWrapperPath(): string { } } +type ControllerDeps = { + accessors: AppAccessorManager; + api: AppApiManager; +}; + export class DenoRuntimeSubprocessController extends EventEmitter { private readonly deno: child_process.ChildProcess; @@ -38,8 +45,12 @@ export class DenoRuntimeSubprocessController extends EventEmitter { private state: 'uninitialized' | 'ready' | 'invalid' | 'unknown'; + private readonly accessors: AppAccessorManager; + + private readonly api: AppApiManager; + // We need to keep the appSource around in case the Deno process needs to be restarted - constructor(private readonly appId: string, private readonly appSource: string) { + constructor(private readonly appId: string, private readonly appSource: string, deps: ControllerDeps) { super(); this.state = 'uninitialized'; @@ -55,6 +66,9 @@ export class DenoRuntimeSubprocessController extends EventEmitter { } catch { this.state = 'invalid'; } + + this.accessors = deps.accessors; + this.api = deps.api; } emit(eventName: string | symbol, ...args: any[]): boolean { @@ -78,10 +92,10 @@ export class DenoRuntimeSubprocessController extends EventEmitter { this.sendRequest({ method: 'construct', params: [this.appId, this.appSource] }); } - public async sendRequest(message: Record): Promise { + public async sendRequest(message: Pick): Promise { const id = String(Math.random()).substring(2); - this.deno.stdin.write(JSON.stringify({ id, ...message })); + this.deno.stdin.write(jsonrpc.request(id, message.method, message.params).serialize()); return this.waitForResult(id); } @@ -94,15 +108,15 @@ export class DenoRuntimeSubprocessController extends EventEmitter { this.once('ready', resolve); - setTimeout(reject, this.options.timeout); + setTimeout(() => reject(new Error('Timeout: app process not ready')), this.options.timeout); }); } private waitForResult(id: string): Promise { return new Promise((resolve, reject) => { - this.once(`result:${id}`, (result: unknown[]) => resolve(...result)); + this.once(`result:${id}`, (result: unknown[]) => resolve(result)); - setTimeout(reject, this.options.timeout); + setTimeout(() => reject(new Error('Request timed out')), this.options.timeout); }); } @@ -117,8 +131,8 @@ export class DenoRuntimeSubprocessController extends EventEmitter { this.on('ready', this.onReady.bind(this)); } - private async handleIncomingMessage(message: Record): Promise { - const { method, id } = message; + private async handleIncomingMessage(message: jsonrpc.IParsedObjectNotification | jsonrpc.IParsedObjectRequest): Promise { + const { method, id } = message.payload; switch (method) { case 'ready': @@ -130,23 +144,35 @@ export class DenoRuntimeSubprocessController extends EventEmitter { } } - private async handleResultMessage(message: Record): Promise { - const { id, result } = message; + private async handleResultMessage(message: jsonrpc.IParsedObjectError | jsonrpc.IParsedObjectSuccess): Promise { + const { id } = message.payload; - this.emit(`result:${id}`, result); + let param; + + if (message.type === 'success') { + param = message.payload.result; + } else { + param = message.payload.error; + } + + this.emit(`result:${id}`, param); } private async parseOutput(chunk: Buffer): Promise { let message; try { - message = JSON.parse(chunk.toString()); + message = jsonrpc.parse(chunk.toString()); - if ('method' in message) { + if (Array.isArray(message)) { + throw new Error('Invalid message format'); + } + + if (message.type === 'request' || message.type === 'notification') { return this.handleIncomingMessage(message); } - if ('result' in message && 'id' in message) { + if (message.type === 'success' || message.type === 'error') { return this.handleResultMessage(message); } @@ -172,14 +198,14 @@ type ExecRequestContext = { export class AppsEngineDenoRuntime { private readonly subprocesses: Record = {}; - private readonly accessorManager: AppAccessorManager; + // private readonly accessorManager: AppAccessorManager; - private readonly apiManager: AppApiManager; + // private readonly apiManager: AppApiManager; - constructor(manager: AppManager) { - this.accessorManager = manager.getAccessorManager(); - this.apiManager = manager.getApiManager(); - } + // constructor(manager: AppManager) { + // this.accessorManager = manager.getAccessorManager(); + // this.apiManager = manager.getApiManager(); + // } public async startRuntimeForApp({ appId, appSource }: AppRuntimeParams, options = { force: false }): Promise { if (appId in this.subprocesses && !options.force) {