Skip to content

Commit

Permalink
NATS: reimplement terminal initialization... yet again!
Browse files Browse the repository at this point in the history
  • Loading branch information
williamstein committed Jan 23, 2025
1 parent 362568e commit 8e203ac
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 50 deletions.
5 changes: 3 additions & 2 deletions docs/nats/devlog.md
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,9 @@ NOTE!!! The above consumer is ephemeral -- it disappears if we don't grab it via
## [ ] Goal: Jetstream permissions
- [ ] project should set up the stream for capturing terminal outputs.
- [ ] delete old messages with a given subject. `nats stream purge project-81e0c408-ac65-4114-bad5-5f4b6539bd0e-terminal --seq=7000`
- [x] project should set up the stream for capturing terminal outputs.
- [x] delete old messages with a given subject. `nats stream purge project-81e0c408-ac65-4114-bad5-5f4b6539bd0e-terminal --seq=7000`
- there is a setting max\_msgs\_per\_subject on a stream, so **we just set that and are done!** Gees. It is too easy.
- [ ] handle the other messages like resize
- [ ] permissions for jetstream usage and access
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,56 +4,61 @@ import { JSONCodec } from "nats.ws";
import sha1 from "sha1";
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";

const jc = JSONCodec();

export class NatsTerminalConnection extends EventEmitter {
private project_id: string;
private path: string;
private subject: string;
private state: null | "running" | "off" | "closed";
private state: null | "running" | "init" | "closed";
private consumer?;
private startInit: number = 0;
// keep = optional number of messages to retain between clients/sessions/view, i.e.,
// "amount of history". This is global to all terminals in the project.
private keep?: number;

constructor({ project_id, path }) {
constructor({
project_id,
path,
keep,
}: {
project_id: string;
path: string;
keep?: number;
}) {
super();
this.project_id = project_id;
this.path = path;
this.keep = keep;
// move to util so guaranteed in sync with project
this.subject = `project.${project_id}.terminal.${sha1(path)}`;
}

write = async (data) => {
if (Date.now() - this.startInit <= 2000) {
// ignore initial data while initializing (e.g., first 2 seconds for now -- TODO: use nats more cleverly)
if (this.state == "init") {
// ignore initial data while initializing.
// This is the trickt to avoid "junk characters" on refresh/reconnect.
return;
}
if (this.state != "running") {
await this.start();
}
if (typeof data != "string") {
//console.log("write -- todo:", data);
// TODO: not yet implemented, e.g., {cmd: 'size', rows: 18, cols: 180}
console.log(data);
return;
}
const write = async () => {
const f = async () => {
await webapp_client.nats_client.project({
project_id: this.project_id,
endpoint: "write-to-terminal",
params: { path: this.path, data },
});
};
try {
await f();
} catch (_err) {
await this.start();
await f();
}
const f = async () => {
await webapp_client.nats_client.project({
project_id: this.project_id,
endpoint: "write-to-terminal",
params: { path: this.path, data, keep: this.keep },
});
};

try {
await write();
} catch (_) {
await f();
} catch (_err) {
await this.start();
await write();
await f();
}
};

Expand Down Expand Up @@ -87,28 +92,42 @@ export class NatsTerminalConnection extends EventEmitter {
};

init = async () => {
this.state = "init";
await this.start();
this.consumer = await this.getConsumer();
this.run();
};

private handle = (mesg) => {
if (this.state == "closed") {
return true;
}
const { data } = jc.decode(mesg.data) as any;
if (data != null) {
this.emit("data", data);
}
};

private run = async () => {
if (this.consumer == null) {
return;
}
const jc = JSONCodec();
// this loop runs forever (or until state = closed or this.consumer.closed())...
this.startInit = Date.now();
for await (const mesg of await this.consumer.consume()) {
if (this.state == "closed") {
const messages = await this.consumer.fetch({
max_messages: this.keep,
expires: 1000,
});
for await (const mesg of messages) {
if (this.handle(mesg)) {
return;
}
const { exit, data } = jc.decode(mesg.data) as any;
if (exit) {
this.state = "off";
} else if (data != null) {
this.state = "running";
this.emit("data", data);
}
if (this.state == "init") {
this.state = "running";
}
// TODO: this loop runs until state = closed or this.consumer.closed()... ?
for await (const mesg of await this.consumer.consume()) {
if (this.handle(mesg)) {
return;
}
}
};
Expand Down
46 changes: 33 additions & 13 deletions src/packages/project/nats/terminal.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
/*
Quick very simple terminal proof of concept for testing NATS
Terminal
- using NATS
*/

import { spawn } from "node-pty";
Expand All @@ -11,8 +13,14 @@ import { project_id } from "@cocalc/project/data";
import { sha1 } from "@cocalc/backend/sha1";
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
import { JSONCodec } from "nats";
import { /*jetstream,*/ jetstreamManager } from "@nats-io/jetstream";
import { jetstreamManager } from "@nats-io/jetstream";
import { getLogger } from "@cocalc/project/logger";

const logger = getLogger("server:nats:terminal");

const DEFAULT_KEEP = 300;
const MIN_KEEP = 5;
const MAX_KEEP = 2000;
const EXIT_MESSAGE = "\r\n\r\n[Process completed - press any key]\r\n\r\n";
const DEFAULT_COMMAND = "/bin/bash";
const jc = JSONCodec();
Expand All @@ -24,7 +32,7 @@ export const createTerminal = reuseInFlight(
if (params == null) {
throw Error("params must be specified");
}
const { path, options } = params;
const { path, ...options } = params;
if (!path) {
throw Error("path must be specified");
}
Expand Down Expand Up @@ -69,17 +77,22 @@ class Session {
public subject: string;
private state: "running" | "off" = "off";
private streamName: string;
private keep: number;

constructor({ path, options, nc }) {
logger.debug("create session ", { path, options });
this.nc = nc;
this.path = path;
this.options = options ?? {};
this.options = options;
this.keep = Math.max(
MIN_KEEP,
Math.min(this.options.keep ?? DEFAULT_KEEP, MAX_KEEP),
);
this.subject = `project.${project_id}.terminal.${sha1(path)}`;
this.streamName = `project-${project_id}-terminal`;
}

write = async (data) => {
console.log("write", { data });
if (this.state == "off") {
await this.restart();
}
Expand All @@ -96,11 +109,21 @@ class Session {
// idempotent so don't have to check if there is already a stream
const nc = this.nc;
const jsm = await jetstreamManager(nc);
await jsm.streams.add({
name: this.streamName,
subjects: [`project.${project_id}.terminal.>`],
compression: "s2",
});
try {
await jsm.streams.add({
name: this.streamName,
subjects: [`project.${project_id}.terminal.>`],
compression: "s2",
max_msgs_per_subject: this.keep,
});
} catch (_err) {
// probably already exists
await jsm.streams.update(this.streamName, {
subjects: [`project.${project_id}.terminal.>`],
compression: "s2" as any,
max_msgs_per_subject: this.keep,
});
}
};

init = async () => {
Expand All @@ -127,10 +150,7 @@ class Session {
});
this.state = "running";
await this.getStream();
//const js = await jetstream(this.nc);
this.pty.onData(async (data) => {
// console.log("onData", { data });
//await js.publish(this.streamName, jc.encode({ data }));
this.nc.publish(this.subject, jc.encode({ data }));
});
this.pty.onExit((status) => {
Expand Down

0 comments on commit 8e203ac

Please sign in to comment.