From 8e203ac888e77457091ef07e9d8e2964a20efdd5 Mon Sep 17 00:00:00 2001 From: William Stein Date: Thu, 23 Jan 2025 03:17:31 +0000 Subject: [PATCH] NATS: reimplement terminal initialization... yet again! --- docs/nats/devlog.md | 5 +- .../nats-terminal-connection.ts | 89 +++++++++++-------- src/packages/project/nats/terminal.ts | 46 +++++++--- 3 files changed, 90 insertions(+), 50 deletions(-) diff --git a/docs/nats/devlog.md b/docs/nats/devlog.md index 5f21207f02..958e06546f 100644 --- a/docs/nats/devlog.md +++ b/docs/nats/devlog.md @@ -219,8 +219,9 @@ NOTE!!! The above consumer is ephemeral -- it disappears if we don't grab it via ## [ ] Goal: Jetstream permissions -- [ ] project should set up the stream for capturing terminal outputs. -- [ ] delete old messages with a given subject. `nats stream purge project-81e0c408-ac65-4114-bad5-5f4b6539bd0e-terminal --seq=7000` +- [x] project should set up the stream for capturing terminal outputs. +- [x] delete old messages with a given subject. `nats stream purge project-81e0c408-ac65-4114-bad5-5f4b6539bd0e-terminal --seq=7000` + - there is a setting max\_msgs\_per\_subject on a stream, so **we just set that and are done!** Gees. It is too easy. - [ ] handle the other messages like resize - [ ] permissions for jetstream usage and access diff --git a/src/packages/frontend/frame-editors/terminal-editor/nats-terminal-connection.ts b/src/packages/frontend/frame-editors/terminal-editor/nats-terminal-connection.ts index 6ed80ec90e..3b89a0a6bc 100644 --- a/src/packages/frontend/frame-editors/terminal-editor/nats-terminal-connection.ts +++ b/src/packages/frontend/frame-editors/terminal-editor/nats-terminal-connection.ts @@ -4,25 +4,38 @@ import { JSONCodec } from "nats.ws"; import sha1 from "sha1"; import { reuseInFlight } from "@cocalc/util/reuse-in-flight"; +const jc = JSONCodec(); + export class NatsTerminalConnection extends EventEmitter { private project_id: string; private path: string; private subject: string; - private state: null | "running" | "off" | "closed"; + private state: null | "running" | "init" | "closed"; private consumer?; - private startInit: number = 0; + // keep = optional number of messages to retain between clients/sessions/view, i.e., + // "amount of history". This is global to all terminals in the project. + private keep?: number; - constructor({ project_id, path }) { + constructor({ + project_id, + path, + keep, + }: { + project_id: string; + path: string; + keep?: number; + }) { super(); this.project_id = project_id; this.path = path; + this.keep = keep; // move to util so guaranteed in sync with project this.subject = `project.${project_id}.terminal.${sha1(path)}`; } write = async (data) => { - if (Date.now() - this.startInit <= 2000) { - // ignore initial data while initializing (e.g., first 2 seconds for now -- TODO: use nats more cleverly) + if (this.state == "init") { + // ignore initial data while initializing. // This is the trickt to avoid "junk characters" on refresh/reconnect. return; } @@ -30,30 +43,22 @@ export class NatsTerminalConnection extends EventEmitter { await this.start(); } if (typeof data != "string") { - //console.log("write -- todo:", data); - // TODO: not yet implemented, e.g., {cmd: 'size', rows: 18, cols: 180} + console.log(data); return; } - const write = async () => { - const f = async () => { - await webapp_client.nats_client.project({ - project_id: this.project_id, - endpoint: "write-to-terminal", - params: { path: this.path, data }, - }); - }; - try { - await f(); - } catch (_err) { - await this.start(); - await f(); - } + const f = async () => { + await webapp_client.nats_client.project({ + project_id: this.project_id, + endpoint: "write-to-terminal", + params: { path: this.path, data, keep: this.keep }, + }); }; + try { - await write(); - } catch (_) { + await f(); + } catch (_err) { await this.start(); - await write(); + await f(); } }; @@ -87,28 +92,42 @@ export class NatsTerminalConnection extends EventEmitter { }; init = async () => { + this.state = "init"; await this.start(); this.consumer = await this.getConsumer(); this.run(); }; + private handle = (mesg) => { + if (this.state == "closed") { + return true; + } + const { data } = jc.decode(mesg.data) as any; + if (data != null) { + this.emit("data", data); + } + }; + private run = async () => { if (this.consumer == null) { return; } - const jc = JSONCodec(); - // this loop runs forever (or until state = closed or this.consumer.closed())... - this.startInit = Date.now(); - for await (const mesg of await this.consumer.consume()) { - if (this.state == "closed") { + const messages = await this.consumer.fetch({ + max_messages: this.keep, + expires: 1000, + }); + for await (const mesg of messages) { + if (this.handle(mesg)) { return; } - const { exit, data } = jc.decode(mesg.data) as any; - if (exit) { - this.state = "off"; - } else if (data != null) { - this.state = "running"; - this.emit("data", data); + } + if (this.state == "init") { + this.state = "running"; + } + // TODO: this loop runs until state = closed or this.consumer.closed()... ? + for await (const mesg of await this.consumer.consume()) { + if (this.handle(mesg)) { + return; } } }; diff --git a/src/packages/project/nats/terminal.ts b/src/packages/project/nats/terminal.ts index 9e8c552e74..2fdb18ef24 100644 --- a/src/packages/project/nats/terminal.ts +++ b/src/packages/project/nats/terminal.ts @@ -1,5 +1,7 @@ /* -Quick very simple terminal proof of concept for testing NATS +Terminal + +- using NATS */ import { spawn } from "node-pty"; @@ -11,8 +13,14 @@ import { project_id } from "@cocalc/project/data"; import { sha1 } from "@cocalc/backend/sha1"; import { reuseInFlight } from "@cocalc/util/reuse-in-flight"; import { JSONCodec } from "nats"; -import { /*jetstream,*/ jetstreamManager } from "@nats-io/jetstream"; +import { jetstreamManager } from "@nats-io/jetstream"; +import { getLogger } from "@cocalc/project/logger"; + +const logger = getLogger("server:nats:terminal"); +const DEFAULT_KEEP = 300; +const MIN_KEEP = 5; +const MAX_KEEP = 2000; const EXIT_MESSAGE = "\r\n\r\n[Process completed - press any key]\r\n\r\n"; const DEFAULT_COMMAND = "/bin/bash"; const jc = JSONCodec(); @@ -24,7 +32,7 @@ export const createTerminal = reuseInFlight( if (params == null) { throw Error("params must be specified"); } - const { path, options } = params; + const { path, ...options } = params; if (!path) { throw Error("path must be specified"); } @@ -69,17 +77,22 @@ class Session { public subject: string; private state: "running" | "off" = "off"; private streamName: string; + private keep: number; constructor({ path, options, nc }) { + logger.debug("create session ", { path, options }); this.nc = nc; this.path = path; - this.options = options ?? {}; + this.options = options; + this.keep = Math.max( + MIN_KEEP, + Math.min(this.options.keep ?? DEFAULT_KEEP, MAX_KEEP), + ); this.subject = `project.${project_id}.terminal.${sha1(path)}`; this.streamName = `project-${project_id}-terminal`; } write = async (data) => { - console.log("write", { data }); if (this.state == "off") { await this.restart(); } @@ -96,11 +109,21 @@ class Session { // idempotent so don't have to check if there is already a stream const nc = this.nc; const jsm = await jetstreamManager(nc); - await jsm.streams.add({ - name: this.streamName, - subjects: [`project.${project_id}.terminal.>`], - compression: "s2", - }); + try { + await jsm.streams.add({ + name: this.streamName, + subjects: [`project.${project_id}.terminal.>`], + compression: "s2", + max_msgs_per_subject: this.keep, + }); + } catch (_err) { + // probably already exists + await jsm.streams.update(this.streamName, { + subjects: [`project.${project_id}.terminal.>`], + compression: "s2" as any, + max_msgs_per_subject: this.keep, + }); + } }; init = async () => { @@ -127,10 +150,7 @@ class Session { }); this.state = "running"; await this.getStream(); - //const js = await jetstream(this.nc); this.pty.onData(async (data) => { - // console.log("onData", { data }); - //await js.publish(this.streamName, jc.encode({ data })); this.nc.publish(this.subject, jc.encode({ data })); }); this.pty.onExit((status) => {