diff --git a/src/server/AppManager.ts b/src/server/AppManager.ts index 40533fb1b..bafe0383b 100644 --- a/src/server/AppManager.ts +++ b/src/server/AppManager.ts @@ -484,9 +484,9 @@ export class AppManager { app.getStorageItem().marketplaceInfo = storageItem.marketplaceInfo; await app.validateLicense().catch(); + storageItem.status = await app.getStatus(); // This is async, but we don't care since it only updates in the database // and it should not mutate any properties we care about - storageItem.status = await app.getStatus(); await this.appMetadataStorage.update(storageItem).catch(); return true; @@ -549,7 +549,7 @@ export class AppManager { // the App instance from the source. const app = await this.getCompiler().toSandBox(this, descriptor, result); - undoSteps.push(() => app.getDenoRuntime().stopApp()); + undoSteps.push(() => this.getRuntime().stopRuntime(app.getDenoRuntime())); // Create a user for the app try { @@ -641,7 +641,7 @@ export class AppManager { await this.appMetadataStorage.remove(app.getID()); await this.appSourceStorage.remove(app.getStorageItem()).catch(); - app.getDenoRuntime().stopApp(); + await this.getRuntime().stopRuntime(app.getDenoRuntime()); this.apps.delete(app.getID()); } @@ -687,7 +687,7 @@ export class AppManager { descriptor.signature = await this.signatureManager.signApp(descriptor); const stored = await this.appMetadataStorage.update(descriptor); - this.apps.get(old.id).getDenoRuntime().stopApp(); + await this.getRuntime().stopRuntime(this.apps.get(old.id).getDenoRuntime()); const app = await this.getCompiler().toSandBox(this, descriptor, result); @@ -731,7 +731,7 @@ export class AppManager { if (appPackageOrInstance instanceof Buffer) { const parseResult = await this.getParser().unpackageApp(appPackageOrInstance); - this.apps.get(stored.id).getDenoRuntime().stopApp(); + await this.getRuntime().stopRuntime(this.apps.get(stored.id).getDenoRuntime()); return this.getCompiler().toSandBox(this, stored, parseResult); } diff --git a/src/server/managers/AppRuntimeManager.ts b/src/server/managers/AppRuntimeManager.ts index 5f9a3f15a..64adf9b0a 100644 --- a/src/server/managers/AppRuntimeManager.ts +++ b/src/server/managers/AppRuntimeManager.ts @@ -45,8 +45,10 @@ export class AppRuntimeManager { return subprocess.sendRequest(execRequest); } - public stopRuntime(runtime: DenoRuntimeSubprocessController): void { - const appId = runtime.getAppId(); + public async stopRuntime(controller: DenoRuntimeSubprocessController): Promise { + await controller.stopApp(); + + const appId = controller.getAppId(); if (appId in this.subprocesses) { delete this.subprocesses[appId]; diff --git a/src/server/runtime/deno/AppsEngineDenoRuntime.ts b/src/server/runtime/deno/AppsEngineDenoRuntime.ts index f741d9b38..2355749da 100644 --- a/src/server/runtime/deno/AppsEngineDenoRuntime.ts +++ b/src/server/runtime/deno/AppsEngineDenoRuntime.ts @@ -5,16 +5,17 @@ import { type Readable, EventEmitter } from 'stream'; import * as jsonrpc from 'jsonrpc-lite'; import debugFactory from 'debug'; -import { encoder, decoder } from './codec'; +import { decoder } from './codec'; import type { AppManager } from '../../AppManager'; import type { AppLogStorage } from '../../storage'; import type { AppBridges } from '../../bridges'; import type { IParseAppPackageResult } from '../../compiler'; import type { AppAccessorManager, AppApiManager } from '../../managers'; import type { ILoggerStorageEntry } from '../../logging'; -import type { AppRuntimeManager } from '../../managers/AppRuntimeManager'; import { AppStatus } from '../../../definition/AppStatus'; import { bundleLegacyApp } from './bundler'; +import { ProcessMessenger } from './ProcessMessenger'; +import { LivenessManager } from './LivenessManager'; const baseDebug = debugFactory('appsEngine:runtime:deno'); @@ -47,7 +48,6 @@ export const ALLOWED_ENVIRONMENT_VARIABLES = [ 'NODE_EXTRA_CA_CERTS', // Accessed by the `https` node module ]; -const COMMAND_PING = '_zPING'; const COMMAND_PONG = '_zPONG'; export const JSONRPC_METHOD_NOT_FOUND = -32601; @@ -80,7 +80,9 @@ export type DenoRuntimeOptions = { }; export class DenoRuntimeSubprocessController extends EventEmitter { - private readonly deno: child_process.ChildProcess; + private deno: child_process.ChildProcess; + + private state: 'uninitialized' | 'ready' | 'invalid' | 'restarting' | 'unknown' | 'stopped'; private readonly debug: debug.Debugger; @@ -88,8 +90,6 @@ export class DenoRuntimeSubprocessController extends EventEmitter { timeout: 10000, }; - private state: 'uninitialized' | 'ready' | 'invalid' | 'unknown' | 'stopped'; - private readonly accessors: AppAccessorManager; private readonly api: AppApiManager; @@ -98,16 +98,31 @@ export class DenoRuntimeSubprocessController extends EventEmitter { private readonly bridges: AppBridges; - private readonly runtimeManager: AppRuntimeManager; + private readonly messenger: ProcessMessenger; + + private readonly livenessManager: LivenessManager; // We need to keep the appSource around in case the Deno process needs to be restarted constructor(manager: AppManager, private readonly appPackage: IParseAppPackageResult) { super(); + this.debug = baseDebug.extend(appPackage.info.id); + this.messenger = new ProcessMessenger(this.debug); + this.livenessManager = new LivenessManager({ + controller: this, + messenger: this.messenger, + debug: this.debug, + }); + this.state = 'uninitialized'; - this.debug = baseDebug.extend(appPackage.info.id); + this.accessors = manager.getAccessorManager(); + this.api = manager.getApiManager(); + this.logStorage = manager.getLogStorage(); + this.bridges = manager.getBridges(); + } + public spawnProcess(): void { try { const denoExePath = getDenoExecutablePath(); const denoWrapperPath = getDenoWrapperPath(); @@ -121,7 +136,7 @@ export class DenoRuntimeSubprocessController extends EventEmitter { // If the app doesn't request any permissions, it gets the default set of permissions, which includes "networking" // If the app requests specific permissions, we need to check whether it requests "networking" or not - if (!appPackage.info.permissions || appPackage.info.permissions.findIndex((p) => p.name === 'networking.default')) { + if (!this.appPackage.info.permissions || this.appPackage.info.permissions.findIndex((p) => p.name === 'networking.default')) { hasNetworkingPermission = true; } @@ -132,59 +147,39 @@ export class DenoRuntimeSubprocessController extends EventEmitter { `--allow-env=${ALLOWED_ENVIRONMENT_VARIABLES.join(',')}`, denoWrapperPath, '--subprocess', + this.appPackage.info.id, ]; - this.debug('Starting Deno subprocess for app with options %O', options); - this.deno = child_process.spawn(denoExePath, options, { env: null }); + this.messenger.setReceiver(this.deno); + this.livenessManager.attach(this.deno); + + this.debug('Started subprocess %d with options %O', this.deno.pid, options); this.setupListeners(); - this.startPing(); } catch (e) { this.state = 'invalid'; console.error(`Failed to start Deno subprocess for app ${this.getAppId()}`, e); } - - this.accessors = manager.getAccessorManager(); - this.api = manager.getApiManager(); - this.logStorage = manager.getLogStorage(); - this.bridges = manager.getBridges(); - this.runtimeManager = manager.getRuntime(); } - /** - * Start up the process of ping/pong for liveness check - * - * The message exchange does not use JSON RPC as it adds a lot of overhead - * with the creation and encoding of a full object for transfer. By using a - * string the process is less intensive. - */ - private startPing() { - const ping = () => { - const start = Date.now(); - - const responsePromise = new Promise((resolve, reject) => { - const onceCallback = () => { - clearTimeout(timeoutId); - this.debug('Ping successful in %d ms', Date.now() - start); - resolve(); - }; - - const timeoutId = setTimeout(() => { - this.debug('Ping failed in %d ms', Date.now() - start); - this.off('pong', onceCallback); - reject(); - }, this.options.timeout); - - this.once('pong', onceCallback); - }).catch(() => {}); - - this.send(COMMAND_PING); - - responsePromise.finally(() => setTimeout(ping, 5000)); - }; + public async killProcess(): Promise { + // This field is not populated if the process is killed by the OS + if (this.deno.killed) { + this.debug('App process was already killed'); + return; + } - ping(); + // What else should we do? + if (this.deno.kill('SIGKILL')) { + // Let's wait until we get confirmation the process exited + await new Promise((r) => this.deno.on('exit', r)); + } else { + this.debug('Tried killing the process but failed. Was it already dead?'); + } + + delete this.deno; + this.messenger.clearReceiver(); } // Debug purposes, could be deleted later @@ -207,10 +202,14 @@ export class DenoRuntimeSubprocessController extends EventEmitter { if (this.deno.exitCode !== null) { return AppStatus.UNKNOWN; } + return this.sendRequest({ method: 'app:getStatus', params: [] }) as Promise; } public async setupApp() { + this.debug('Setting up app subprocess'); + this.spawnProcess(); + // If there is more than one file in the package, then it is a legacy app that has not been bundled if (Object.keys(this.appPackage.files).length > 1) { await bundleLegacyApp(this.appPackage); @@ -221,34 +220,35 @@ export class DenoRuntimeSubprocessController extends EventEmitter { await this.sendRequest({ method: 'app:construct', params: [this.appPackage] }); } - public stopApp() { - this.debug('Stopping app'); + public async stopApp() { + this.debug('Stopping app subprocess'); - if (this.deno.killed) { - return true; - } + this.state = 'stopped'; - // What else should we do? - if (!this.deno.kill('SIGKILL')) { - return false; - } + await this.killProcess(); + } - this.state = 'stopped'; + public async restartApp() { + this.debug('Restarting app subprocess'); + + this.state = 'restarting'; + + await this.killProcess(); + + await this.setupApp(); + + // setupApp() changes the state to 'ready' - we'll need to workaround that for now + this.state = 'restarting'; - this.runtimeManager.stopRuntime(this); + await this.sendRequest({ method: 'app:initialize' }); - return true; + this.state = 'ready'; } public getAppId(): string { return this.appPackage.info.id; } - private send(message: jsonrpc.JsonRpc | typeof COMMAND_PING) { - this.debug('Sending message to subprocess %o', message); - this.deno.stdin.write(encoder.encode(message)); - } - public async sendRequest(message: Pick, options = this.options): Promise { const id = String(Math.random().toString(36)).substring(2); @@ -260,7 +260,7 @@ export class DenoRuntimeSubprocessController extends EventEmitter { this.debug('Request %s for method %s took %dms', id, message.method, Date.now() - start); }); - this.send(request); + this.messenger.send(request); return promise; } @@ -314,7 +314,7 @@ export class DenoRuntimeSubprocessController extends EventEmitter { this.state = 'invalid'; console.error('Failed to startup Deno subprocess', err); }); - this.on('ready', this.onReady.bind(this)); + this.once('ready', this.onReady.bind(this)); this.parseStdout(this.deno.stdout); } @@ -327,6 +327,13 @@ export class DenoRuntimeSubprocessController extends EventEmitter { const managerOrigin = accessorMethods.shift(); const tailMethodName = accessorMethods.pop(); + // If we're restarting the app, we can't register resources again, so we + // hijack requests for the `ConfigurationExtend` accessor and don't let them through + // This needs to be refactored ASAP + if (this.state === 'restarting' && managerOrigin === 'getConfigurationExtend') { + return jsonrpc.success(id, null); + } + if (managerOrigin === 'api' && tailMethodName === 'listApis') { const result = this.api.listApis(this.appPackage.info.id); @@ -450,7 +457,7 @@ export class DenoRuntimeSubprocessController extends EventEmitter { if (method.startsWith('accessor:')) { const result = await this.handleAccessorMessage(message as jsonrpc.IParsedObjectRequest); - this.send(result); + this.messenger.send(result); return; } @@ -458,7 +465,7 @@ export class DenoRuntimeSubprocessController extends EventEmitter { if (method.startsWith('bridges:')) { const result = await this.handleBridgeMessage(message as jsonrpc.IParsedObjectRequest); - this.send(result); + this.messenger.send(result); return; } diff --git a/src/server/runtime/deno/LivenessManager.ts b/src/server/runtime/deno/LivenessManager.ts new file mode 100644 index 000000000..4fb2433e8 --- /dev/null +++ b/src/server/runtime/deno/LivenessManager.ts @@ -0,0 +1,184 @@ +import type { ChildProcess } from 'child_process'; +import { EventEmitter } from 'stream'; + +import type { DenoRuntimeSubprocessController } from './AppsEngineDenoRuntime'; +import type { ProcessMessenger } from './ProcessMessenger'; + +const COMMAND_PING = '_zPING'; + +const defaultOptions: LivenessManager['options'] = { + pingRequestTimeout: 10000, + pingFrequencyInMS: 10000, + consecutiveTimeoutLimit: 4, + maxRestarts: 3, +}; + +/** + * Responsible for pinging the Deno subprocess and for restarting it + * if something doesn't look right + */ +export class LivenessManager { + private readonly controller: DenoRuntimeSubprocessController; + + private readonly messenger: ProcessMessenger; + + private readonly debug: debug.Debugger; + + private readonly options: { + // How long should we wait for a response to the ping request + pingRequestTimeout: number; + + // How long is the delay between ping messages + pingFrequencyInMS: number; + + // Limit of times the process can timeout the ping response before we consider it as unresponsive + consecutiveTimeoutLimit: number; + + // Limit of times we can try to restart a process + maxRestarts: number; + }; + + private subprocess: ChildProcess; + + // This is the perfect use-case for an AbortController, but it's experimental in Node 14.x + private pingAbortController: EventEmitter; + + private pingTimeoutConsecutiveCount = 0; + + private restartCount = 0; + + private restartLog: Record[] = []; + + constructor( + deps: { + controller: DenoRuntimeSubprocessController; + messenger: ProcessMessenger; + debug: debug.Debugger; + }, + options: Partial = {}, + ) { + this.controller = deps.controller; + this.messenger = deps.messenger; + this.debug = deps.debug; + this.pingAbortController = new EventEmitter(); + + this.options = Object.assign({}, defaultOptions, options); + } + + public attach(deno: ChildProcess) { + this.subprocess = deno; + + this.pingTimeoutConsecutiveCount = 0; + + this.controller.once('ready', () => this.ping()); + this.subprocess.once('exit', this.handleExit.bind(this)); + } + + /** + * Start up the process of ping/pong for liveness check + * + * The message exchange does not use JSON RPC as it adds a lot of overhead + * with the creation and encoding of a full object for transfer. By using a + * string the process is less intensive. + */ + private ping() { + const start = Date.now(); + + let aborted = false; + + const setAborted = () => { + this.debug('Ping aborted'); + + aborted = true; + }; + + // If we get an abort, ping should not continue + this.pingAbortController.once('abort', setAborted); + + new Promise((resolve, reject) => { + const onceCallback = () => { + this.debug('Ping successful in %d ms', Date.now() - start); + clearTimeout(timeoutId); + this.pingTimeoutConsecutiveCount = 0; + resolve(); + }; + + const timeoutCallback = () => { + this.debug('Ping failed in %d ms (consecutive failure #%d)', Date.now() - start, this.pingTimeoutConsecutiveCount); + this.controller.off('pong', onceCallback); + this.pingTimeoutConsecutiveCount++; + reject('timeout'); + }; + + const timeoutId = setTimeout(timeoutCallback, this.options.pingRequestTimeout); + + this.controller.once('pong', onceCallback); + }) + .then(() => !aborted) + .catch((reason) => { + if (aborted) { + return false; + } + + if (reason === 'timeout' && this.pingTimeoutConsecutiveCount >= this.options.consecutiveTimeoutLimit) { + this.debug('Subprocess failed to respond to pings %d consecutive times. Attempting restart...', this.options.consecutiveTimeoutLimit); + this.restartProcess(); + return false; + } + + return true; + }) + .then((shouldContinue) => { + if (!shouldContinue) { + this.pingAbortController.off('abort', setAborted); + return; + } + + setTimeout(() => { + if (aborted) return; + + this.pingAbortController.off('abort', setAborted); + this.ping(); + }, this.options.pingFrequencyInMS); + }); + + this.messenger.send(COMMAND_PING); + } + + private handleExit(exitCode: number, signal: string) { + this.pingAbortController.emit('abort'); + + const processState = this.controller.getProcessState(); + // If the we're restarting the process, or want to stop the process, or it exited cleanly, nothing else for us to do + if (processState === 'restarting' || processState === 'stopped' || (exitCode === 0 && !signal)) { + return; + } + + // Otherwise we try to restart the subprocess, if possible + if (signal) { + this.debug('App has been killed (%s). Attempting restart #%d...', signal, this.restartCount + 1); + } else { + this.debug('App has exited with code %d. Attempting restart #%d...', exitCode, this.restartCount + 1); + } + + this.restartProcess(); + } + + private restartProcess() { + if (this.restartCount >= this.options.maxRestarts) { + this.debug('Limit of restarts reached (%d). Aborting restart...', this.options.maxRestarts); + this.controller.stopApp(); + return; + } + + this.pingTimeoutConsecutiveCount = 0; + this.restartCount++; + this.restartLog.push({ + restartedAt: new Date(), + source: 'liveness-manager', + pid: this.subprocess.pid, + }); + + this.controller.restartApp(); + } +} diff --git a/src/server/runtime/deno/ProcessMessenger.ts b/src/server/runtime/deno/ProcessMessenger.ts new file mode 100644 index 000000000..03d03d125 --- /dev/null +++ b/src/server/runtime/deno/ProcessMessenger.ts @@ -0,0 +1,48 @@ +import { ChildProcess } from 'child_process'; + +import type { JsonRpc } from 'jsonrpc-lite'; + +import { encoder } from './codec'; + +export class ProcessMessenger { + private deno: ChildProcess; + + private _sendStrategy: (message: JsonRpc) => void; + + constructor(private readonly debug: debug.Debugger) { + this._sendStrategy = this.strategyError; + } + + public get send() { + return this._sendStrategy.bind(this); + } + + public setReceiver(deno: ChildProcess) { + this.deno = deno; + + this.switchStrategy(); + } + + public clearReceiver() { + delete this.deno; + + this.switchStrategy(); + } + + private switchStrategy() { + if (this.deno instanceof ChildProcess) { + this._sendStrategy = this.strategySend.bind(this); + } else { + this._sendStrategy = this.strategyError.bind(this); + } + } + + private strategyError(_message: JsonRpc) { + throw new Error('No process configured to receive a message'); + } + + private strategySend(message: JsonRpc) { + this.debug('Sending message to subprocess %o', message); + this.deno.stdin.write(encoder.encode(message)); + } +} diff --git a/tests/server/runtime/DenoRuntimeSubprocessController.spec.ts b/tests/server/runtime/DenoRuntimeSubprocessController.spec.ts index 10ee0365f..1cc1846ea 100644 --- a/tests/server/runtime/DenoRuntimeSubprocessController.spec.ts +++ b/tests/server/runtime/DenoRuntimeSubprocessController.spec.ts @@ -40,6 +40,7 @@ export class DenuRuntimeSubprocessControllerTestFixture { @Setup public setup() { this.controller = new DenoRuntimeSubprocessController(this.manager, this.appPackage); + this.controller.setupApp(); } @Teardown