Skip to content

Commit

Permalink
feat(deno-runtime): restart app process if it times out consecutively (
Browse files Browse the repository at this point in the history
  • Loading branch information
d-gubert authored Jul 9, 2024
1 parent 7f60119 commit 02d8fec
Show file tree
Hide file tree
Showing 6 changed files with 320 additions and 78 deletions.
10 changes: 5 additions & 5 deletions src/server/AppManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -484,9 +484,9 @@ export class AppManager {
app.getStorageItem().marketplaceInfo = storageItem.marketplaceInfo;
await app.validateLicense().catch();

storageItem.status = await app.getStatus();
// This is async, but we don't care since it only updates in the database
// and it should not mutate any properties we care about
storageItem.status = await app.getStatus();
await this.appMetadataStorage.update(storageItem).catch();

return true;
Expand Down Expand Up @@ -549,7 +549,7 @@ export class AppManager {
// the App instance from the source.
const app = await this.getCompiler().toSandBox(this, descriptor, result);

undoSteps.push(() => app.getDenoRuntime().stopApp());
undoSteps.push(() => this.getRuntime().stopRuntime(app.getDenoRuntime()));

// Create a user for the app
try {
Expand Down Expand Up @@ -641,7 +641,7 @@ export class AppManager {
await this.appMetadataStorage.remove(app.getID());
await this.appSourceStorage.remove(app.getStorageItem()).catch();

app.getDenoRuntime().stopApp();
await this.getRuntime().stopRuntime(app.getDenoRuntime());

this.apps.delete(app.getID());
}
Expand Down Expand Up @@ -687,7 +687,7 @@ export class AppManager {
descriptor.signature = await this.signatureManager.signApp(descriptor);
const stored = await this.appMetadataStorage.update(descriptor);

this.apps.get(old.id).getDenoRuntime().stopApp();
await this.getRuntime().stopRuntime(this.apps.get(old.id).getDenoRuntime());

const app = await this.getCompiler().toSandBox(this, descriptor, result);

Expand Down Expand Up @@ -731,7 +731,7 @@ export class AppManager {
if (appPackageOrInstance instanceof Buffer) {
const parseResult = await this.getParser().unpackageApp(appPackageOrInstance);

this.apps.get(stored.id).getDenoRuntime().stopApp();
await this.getRuntime().stopRuntime(this.apps.get(stored.id).getDenoRuntime());

return this.getCompiler().toSandBox(this, stored, parseResult);
}
Expand Down
6 changes: 4 additions & 2 deletions src/server/managers/AppRuntimeManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ export class AppRuntimeManager {
return subprocess.sendRequest(execRequest);
}

public stopRuntime(runtime: DenoRuntimeSubprocessController): void {
const appId = runtime.getAppId();
public async stopRuntime(controller: DenoRuntimeSubprocessController): Promise<void> {
await controller.stopApp();

const appId = controller.getAppId();

if (appId in this.subprocesses) {
delete this.subprocesses[appId];
Expand Down
149 changes: 78 additions & 71 deletions src/server/runtime/deno/AppsEngineDenoRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@ import { type Readable, EventEmitter } from 'stream';
import * as jsonrpc from 'jsonrpc-lite';
import debugFactory from 'debug';

import { encoder, decoder } from './codec';
import { decoder } from './codec';
import type { AppManager } from '../../AppManager';
import type { AppLogStorage } from '../../storage';
import type { AppBridges } from '../../bridges';
import type { IParseAppPackageResult } from '../../compiler';
import type { AppAccessorManager, AppApiManager } from '../../managers';
import type { ILoggerStorageEntry } from '../../logging';
import type { AppRuntimeManager } from '../../managers/AppRuntimeManager';
import { AppStatus } from '../../../definition/AppStatus';
import { bundleLegacyApp } from './bundler';
import { ProcessMessenger } from './ProcessMessenger';
import { LivenessManager } from './LivenessManager';

const baseDebug = debugFactory('appsEngine:runtime:deno');

Expand Down Expand Up @@ -47,7 +48,6 @@ export const ALLOWED_ENVIRONMENT_VARIABLES = [
'NODE_EXTRA_CA_CERTS', // Accessed by the `https` node module
];

const COMMAND_PING = '_zPING';
const COMMAND_PONG = '_zPONG';

export const JSONRPC_METHOD_NOT_FOUND = -32601;
Expand Down Expand Up @@ -80,16 +80,16 @@ export type DenoRuntimeOptions = {
};

export class DenoRuntimeSubprocessController extends EventEmitter {
private readonly deno: child_process.ChildProcess;
private deno: child_process.ChildProcess;

private state: 'uninitialized' | 'ready' | 'invalid' | 'restarting' | 'unknown' | 'stopped';

private readonly debug: debug.Debugger;

private readonly options = {
timeout: 10000,
};

private state: 'uninitialized' | 'ready' | 'invalid' | 'unknown' | 'stopped';

private readonly accessors: AppAccessorManager;

private readonly api: AppApiManager;
Expand All @@ -98,16 +98,31 @@ export class DenoRuntimeSubprocessController extends EventEmitter {

private readonly bridges: AppBridges;

private readonly runtimeManager: AppRuntimeManager;
private readonly messenger: ProcessMessenger;

private readonly livenessManager: LivenessManager;

// We need to keep the appSource around in case the Deno process needs to be restarted
constructor(manager: AppManager, private readonly appPackage: IParseAppPackageResult) {
super();

this.debug = baseDebug.extend(appPackage.info.id);
this.messenger = new ProcessMessenger(this.debug);
this.livenessManager = new LivenessManager({
controller: this,
messenger: this.messenger,
debug: this.debug,
});

this.state = 'uninitialized';

this.debug = baseDebug.extend(appPackage.info.id);
this.accessors = manager.getAccessorManager();
this.api = manager.getApiManager();
this.logStorage = manager.getLogStorage();
this.bridges = manager.getBridges();
}

public spawnProcess(): void {
try {
const denoExePath = getDenoExecutablePath();
const denoWrapperPath = getDenoWrapperPath();
Expand All @@ -121,7 +136,7 @@ export class DenoRuntimeSubprocessController extends EventEmitter {

// If the app doesn't request any permissions, it gets the default set of permissions, which includes "networking"
// If the app requests specific permissions, we need to check whether it requests "networking" or not
if (!appPackage.info.permissions || appPackage.info.permissions.findIndex((p) => p.name === 'networking.default')) {
if (!this.appPackage.info.permissions || this.appPackage.info.permissions.findIndex((p) => p.name === 'networking.default')) {
hasNetworkingPermission = true;
}

Expand All @@ -132,59 +147,39 @@ export class DenoRuntimeSubprocessController extends EventEmitter {
`--allow-env=${ALLOWED_ENVIRONMENT_VARIABLES.join(',')}`,
denoWrapperPath,
'--subprocess',
this.appPackage.info.id,
];

this.debug('Starting Deno subprocess for app with options %O', options);

this.deno = child_process.spawn(denoExePath, options, { env: null });
this.messenger.setReceiver(this.deno);
this.livenessManager.attach(this.deno);

this.debug('Started subprocess %d with options %O', this.deno.pid, options);

this.setupListeners();
this.startPing();
} catch (e) {
this.state = 'invalid';
console.error(`Failed to start Deno subprocess for app ${this.getAppId()}`, e);
}

this.accessors = manager.getAccessorManager();
this.api = manager.getApiManager();
this.logStorage = manager.getLogStorage();
this.bridges = manager.getBridges();
this.runtimeManager = manager.getRuntime();
}

/**
* Start up the process of ping/pong for liveness check
*
* The message exchange does not use JSON RPC as it adds a lot of overhead
* with the creation and encoding of a full object for transfer. By using a
* string the process is less intensive.
*/
private startPing() {
const ping = () => {
const start = Date.now();

const responsePromise = new Promise<void>((resolve, reject) => {
const onceCallback = () => {
clearTimeout(timeoutId);
this.debug('Ping successful in %d ms', Date.now() - start);
resolve();
};

const timeoutId = setTimeout(() => {
this.debug('Ping failed in %d ms', Date.now() - start);
this.off('pong', onceCallback);
reject();
}, this.options.timeout);

this.once('pong', onceCallback);
}).catch(() => {});

this.send(COMMAND_PING);

responsePromise.finally(() => setTimeout(ping, 5000));
};
public async killProcess(): Promise<void> {
// This field is not populated if the process is killed by the OS
if (this.deno.killed) {
this.debug('App process was already killed');
return;
}

ping();
// What else should we do?
if (this.deno.kill('SIGKILL')) {
// Let's wait until we get confirmation the process exited
await new Promise<void>((r) => this.deno.on('exit', r));
} else {
this.debug('Tried killing the process but failed. Was it already dead?');
}

delete this.deno;
this.messenger.clearReceiver();
}

// Debug purposes, could be deleted later
Expand All @@ -207,10 +202,14 @@ export class DenoRuntimeSubprocessController extends EventEmitter {
if (this.deno.exitCode !== null) {
return AppStatus.UNKNOWN;
}

return this.sendRequest({ method: 'app:getStatus', params: [] }) as Promise<AppStatus>;
}

public async setupApp() {
this.debug('Setting up app subprocess');
this.spawnProcess();

// If there is more than one file in the package, then it is a legacy app that has not been bundled
if (Object.keys(this.appPackage.files).length > 1) {
await bundleLegacyApp(this.appPackage);
Expand All @@ -221,34 +220,35 @@ export class DenoRuntimeSubprocessController extends EventEmitter {
await this.sendRequest({ method: 'app:construct', params: [this.appPackage] });
}

public stopApp() {
this.debug('Stopping app');
public async stopApp() {
this.debug('Stopping app subprocess');

if (this.deno.killed) {
return true;
}
this.state = 'stopped';

// What else should we do?
if (!this.deno.kill('SIGKILL')) {
return false;
}
await this.killProcess();
}

this.state = 'stopped';
public async restartApp() {
this.debug('Restarting app subprocess');

this.state = 'restarting';

await this.killProcess();

await this.setupApp();

// setupApp() changes the state to 'ready' - we'll need to workaround that for now
this.state = 'restarting';

this.runtimeManager.stopRuntime(this);
await this.sendRequest({ method: 'app:initialize' });

return true;
this.state = 'ready';
}

public getAppId(): string {
return this.appPackage.info.id;
}

private send(message: jsonrpc.JsonRpc | typeof COMMAND_PING) {
this.debug('Sending message to subprocess %o', message);
this.deno.stdin.write(encoder.encode(message));
}

public async sendRequest(message: Pick<jsonrpc.RequestObject, 'method' | 'params'>, options = this.options): Promise<unknown> {
const id = String(Math.random().toString(36)).substring(2);

Expand All @@ -260,7 +260,7 @@ export class DenoRuntimeSubprocessController extends EventEmitter {
this.debug('Request %s for method %s took %dms', id, message.method, Date.now() - start);
});

this.send(request);
this.messenger.send(request);

return promise;
}
Expand Down Expand Up @@ -314,7 +314,7 @@ export class DenoRuntimeSubprocessController extends EventEmitter {
this.state = 'invalid';
console.error('Failed to startup Deno subprocess', err);
});
this.on('ready', this.onReady.bind(this));
this.once('ready', this.onReady.bind(this));
this.parseStdout(this.deno.stdout);
}

Expand All @@ -327,6 +327,13 @@ export class DenoRuntimeSubprocessController extends EventEmitter {
const managerOrigin = accessorMethods.shift();
const tailMethodName = accessorMethods.pop();

// If we're restarting the app, we can't register resources again, so we
// hijack requests for the `ConfigurationExtend` accessor and don't let them through
// This needs to be refactored ASAP
if (this.state === 'restarting' && managerOrigin === 'getConfigurationExtend') {
return jsonrpc.success(id, null);
}

if (managerOrigin === 'api' && tailMethodName === 'listApis') {
const result = this.api.listApis(this.appPackage.info.id);

Expand Down Expand Up @@ -450,15 +457,15 @@ export class DenoRuntimeSubprocessController extends EventEmitter {
if (method.startsWith('accessor:')) {
const result = await this.handleAccessorMessage(message as jsonrpc.IParsedObjectRequest);

this.send(result);
this.messenger.send(result);

return;
}

if (method.startsWith('bridges:')) {
const result = await this.handleBridgeMessage(message as jsonrpc.IParsedObjectRequest);

this.send(result);
this.messenger.send(result);

return;
}
Expand Down
Loading

0 comments on commit 02d8fec

Please sign in to comment.