Skip to content

Commit

Permalink
Implement server recovery and peer routing (ekzhang#15)
Browse files Browse the repository at this point in the history
* Add snapshot support and Redis client

* Add simple snapshotting test

* Server discovery, peer routing, and recovery

* Add immediate persistence and inactive expiry

* Decrease shell data snapshot to 32 KiB

* Enable Redis TLS (a couple duplicate dependencies)

* Edit background sync to respect sync_now()
  • Loading branch information
ekzhang authored Oct 29, 2023
1 parent 85e3d07 commit 53e0055
Show file tree
Hide file tree
Showing 30 changed files with 1,029 additions and 275 deletions.
268 changes: 165 additions & 103 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ rand = "0.8.5"
serde = { version = "1.0.188", features = ["derive", "rc"] }
tokio = { version = "1.32.0", features = ["full"] }
tokio-stream = { version = "0.1.14", features = ["sync"] }
tonic = { version = "0.10.0", features = ["tls", "tls-roots"] }
tonic = { version = "0.10.0", features = ["tls", "tls-webpki-roots"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }

Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ FROM alpine:latest
WORKDIR /root
COPY --from=frontend /usr/src/app/build build
COPY --from=backend /home/rust/src/target/release/sshx-server .
CMD ["./sshx-server", "--host"]
CMD ["./sshx-server", "--listen", "::"]
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,19 @@ This will compile the `sshx` binary and place it in your `~/.cargo/bin` folder.

### Workflow

First, start service containers for development.

```shell
docker compose up -d
```

Install [Rust 1.70+](https://www.rust-lang.org/),
[Node v18](https://nodejs.org/), [NPM v9](https://www.npmjs.com/), and
[mprocs](https://github.com/pvolok/mprocs). Then, run

```shell
$ npm install
$ mprocs
npm install
mprocs
```

This will compile and start the server, an instance of the client, and the web
Expand Down
12 changes: 12 additions & 0 deletions compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Services used by sshx for development. These listen on ports 126XX, to reduce the chance that they
# conflict with other processes.
#
# You can start them with `docker compose up -d`.

services:
redis:
image: bitnami/redis:7.2
environment:
- ALLOW_EMPTY_PASSWORD=yes
ports:
- 127.0.0.1:12601:6379
1 change: 1 addition & 0 deletions crates/sshx-core/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let descriptor_path = PathBuf::from(env::var("OUT_DIR").unwrap()).join("sshx.bin");
tonic_build::configure()
.file_descriptor_set_path(descriptor_path)
.bytes(["."])
.compile(&["proto/sshx.proto"], &["proto/"])?;
Ok(())
}
38 changes: 28 additions & 10 deletions crates/sshx-core/proto/sshx.proto
Original file line number Diff line number Diff line change
Expand Up @@ -64,22 +64,22 @@ message NewShell {
// Bidirectional streaming update from the client.
message ClientUpdate {
oneof client_message {
string hello = 1; // First stream message: "name,token".
TerminalData data = 2; // Stream data from the terminal.
string hello = 1; // First stream message: "name,token".
TerminalData data = 2; // Stream data from the terminal.
NewShell created_shell = 3; // Acknowledge that a new shell was created.
uint32 closed_shell = 4; // Acknowledge that a shell was closed.
uint32 closed_shell = 4; // Acknowledge that a shell was closed.
string error = 15;
}
}

// Bidirectional streaming update from the server.
message ServerUpdate {
oneof server_message {
TerminalInput input = 1; // Remote input bytes, received from the user.
NewShell create_shell = 2; // ID of a new shell.
uint32 close_shell = 3; // ID of a shell to close.
SequenceNumbers sync = 4; // Periodic sequence number sync.
TerminalSize resize = 5; // Resize a terminal window.
TerminalInput input = 1; // Remote input bytes, received from the user.
NewShell create_shell = 2; // ID of a new shell.
uint32 close_shell = 3; // ID of a shell to close.
SequenceNumbers sync = 4; // Periodic sequence number sync.
TerminalSize resize = 5; // Resize a terminal window.
string error = 15;
}
}
Expand All @@ -91,6 +91,24 @@ message CloseRequest {
}

// Server response to closing a session.
message CloseResponse {
bool exists = 1; // True if the session was found and closed.
message CloseResponse {}

// Snapshot of a session, used to restore state for persistence across servers.
message SerializedSession {
bytes encrypted_zeros = 1;
map<uint32, SerializedShell> shells = 2;
uint32 next_sid = 3;
uint32 next_uid = 4;
}

message SerializedShell {
uint64 seqnum = 1;
repeated bytes data = 2;
uint64 chunk_offset = 3;
uint64 byte_offset = 4;
bool closed = 5;
int32 winsize_x = 6;
int32 winsize_y = 7;
uint32 winsize_rows = 8;
uint32 winsize_cols = 9;
}
14 changes: 14 additions & 0 deletions crates/sshx-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,18 @@ impl IdCounter {
pub fn next_uid(&self) -> Uid {
Uid(self.next_uid.fetch_add(1, Ordering::Relaxed))
}

/// Return the current internal values of the counter.
pub fn get_current_values(&self) -> (Sid, Uid) {
(
Sid(self.next_sid.load(Ordering::Relaxed)),
Uid(self.next_uid.load(Ordering::Relaxed)),
)
}

/// Set the internal values of the counter.
pub fn set_current_values(&self, sid: Sid, uid: Uid) {
self.next_sid.store(sid.0, Ordering::Relaxed);
self.next_uid.store(uid.0, Ordering::Relaxed);
}
}
11 changes: 8 additions & 3 deletions crates/sshx-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,29 @@ bytes = { version = "1.5.0", features = ["serde"] }
ciborium = "0.2.1"
clap.workspace = true
dashmap = "5.5.3"
deadpool = "0.10.0"
deadpool-redis = "0.13.0"
futures-util = { version = "0.3.28", features = ["sink"] }
hmac = "0.12.1"
hyper = { version = "0.14.27", features = ["full"] }
parking_lot = "0.12.1"
prost.workspace = true
rand.workspace = true
redis = { version = "0.23.3", features = ["tokio-rustls-comp", "tls-rustls-webpki-roots"] }
serde.workspace = true
sha2 = "0.10.7"
sshx-core = { path = "../sshx-core" }
tokio.workspace = true
tokio-stream.workspace = true
tokio-tungstenite = "0.20.0"
tonic.workspace = true
tonic-reflection = "0.10.0"
tower = { version = "0.4.13", features = ["steer"] }
tower-http = { version = "0.4.4", features = ["fs", "redirect", "trace"] }
tracing.workspace = true
tracing-subscriber.workspace = true
zstd = "0.12.4"

[dev-dependencies]
futures-util = { version = "0.3.28", features = ["sink"] }
reqwest = "0.11.20"
reqwest = { version = "0.11.20", default-features = false, features = ["rustls-tls"] }
sshx = { path = "../sshx" }
tokio-tungstenite = "0.20.0"
52 changes: 24 additions & 28 deletions crates/sshx-server/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,28 +40,23 @@ impl SshxService for GrpcServer {
type ChannelStream = ReceiverStream<Result<ServerUpdate, Status>>;

async fn open(&self, request: Request<OpenRequest>) -> RR<OpenResponse> {
use dashmap::mapref::entry::Entry::*;

let request = request.into_inner();
let origin = match &self.0.override_origin {
Some(origin) => origin.clone(),
None => request.origin,
};
let origin = self.0.override_origin().unwrap_or(request.origin);
if origin.is_empty() {
return Err(Status::invalid_argument("origin is empty"));
}
let name = rand_alphanumeric(10);
info!(%name, "creating new session");
match self.0.store.entry(name.clone()) {
Occupied(_) => return Err(Status::already_exists("generated duplicate ID")),
Vacant(v) => {
match self.0.lookup(&name) {
Some(_) => return Err(Status::already_exists("generated duplicate ID")),
None => {
let metadata = Metadata {
encrypted_zeros: request.encrypted_zeros.into(),
encrypted_zeros: request.encrypted_zeros,
};
v.insert(Session::new(metadata).into());
self.0.insert(&name, Arc::new(Session::new(metadata)));
}
};
let token = self.0.mac.clone().chain_update(&name).finalize();
let token = self.0.mac().chain_update(&name).finalize();
let url = format!("{origin}/s/{name}");
Ok(Response::new(OpenResponse {
name,
Expand All @@ -81,14 +76,18 @@ impl SshxService for GrpcServer {
let (name, token) = hello
.split_once(',')
.ok_or_else(|| Status::invalid_argument("missing name and token"))?;
validate_token(&self.0.mac, name, token)?;
validate_token(self.0.mac(), name, token)?;
name.to_string()
}
_ => return Err(Status::invalid_argument("invalid first message")),
};
let session = match self.0.store.get(&session_name) {
Some(session) => Arc::clone(&session),
None => return Err(Status::not_found("session not found")),
let session = match self.0.backend_connect(&session_name).await {
Ok(Some(session)) => session,
Ok(None) => return Err(Status::not_found("session not found")),
Err(err) => {
error!(?err, "failed to connect to backend session");
return Err(Status::internal(err.to_string()));
}
};

// We now spawn an asynchronous task that sends updates to the client. Note that
Expand All @@ -106,22 +105,19 @@ impl SshxService for GrpcServer {

async fn close(&self, request: Request<CloseRequest>) -> RR<CloseResponse> {
let request = request.into_inner();
validate_token(&self.0.mac, &request.name, &request.token)?;
let exists = match self.0.store.remove(&request.name) {
Some((_, session)) => {
session.shutdown();
true
}
None => false,
};
Ok(Response::new(CloseResponse { exists }))
validate_token(self.0.mac(), &request.name, &request.token)?;
if let Err(err) = self.0.close_session(&request.name).await {
error!(?err, "failed to close session");
return Err(Status::internal(err.to_string()));
}
Ok(Response::new(CloseResponse {}))
}
}

/// Validate the client token for a session.
fn validate_token(mac: &(impl Mac + Clone), name: &str, token: &str) -> Result<(), Status> {
fn validate_token(mac: impl Mac, name: &str, token: &str) -> Result<(), Status> {
if let Ok(token) = BASE64_STANDARD.decode(token) {
if mac.clone().chain_update(name).verify_slice(&token).is_ok() {
if mac.chain_update(name).verify_slice(&token).is_ok() {
return Ok(());
}
}
Expand Down Expand Up @@ -182,7 +178,7 @@ async fn handle_update(tx: &ServerTx, session: &Session, update: ClientUpdate) -
return send_err(tx, "unexpected hello".into()).await;
}
Some(ClientMessage::Data(data)) => {
if let Err(err) = session.add_data(Sid(data.id), data.data.into(), data.seq) {
if let Err(err) = session.add_data(Sid(data.id), data.data, data.seq) {
return send_err(tx, format!("add data: {:?}", err)).await;
}
}
Expand Down
66 changes: 26 additions & 40 deletions crates/sshx-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,15 @@
use std::{net::SocketAddr, sync::Arc};

use anyhow::Result;
use dashmap::DashMap;
use hmac::{Hmac, Mac as _};
use hyper::server::conn::AddrIncoming;
use session::Session;
use sha2::Sha256;
use sshx_core::rand_alphanumeric;
use utils::Shutdown;

use crate::state::ServerState;

pub mod grpc;
mod listen;
pub mod session;
pub mod state;
pub mod utils;
pub mod web;

Expand All @@ -38,30 +36,12 @@ pub struct ServerOptions {

/// Override the origin returned for the Open() RPC.
pub override_origin: Option<String>,
}

/// Shared state object for global server logic.
pub struct ServerState {
/// Message authentication code for signing tokens.
pub mac: Hmac<Sha256>,

/// Override the origin returned for the Open() RPC.
pub override_origin: Option<String>,

/// A concurrent map of session IDs to session objects.
pub store: DashMap<String, Arc<Session>>,
}
/// URL of the Redis server that stores session data.
pub redis_url: Option<String>,

impl ServerState {
/// Create an empty server state using the given secret.
pub fn new(options: ServerOptions) -> Self {
let secret = options.secret.unwrap_or_else(|| rand_alphanumeric(22));
Self {
mac: Hmac::new_from_slice(secret.as_bytes()).unwrap(),
override_origin: options.override_origin,
store: DashMap::new(),
}
}
/// Hostname of this server, if running multiple servers.
pub host: Option<String>,
}

/// Stateful object that manages the sshx server, with graceful termination.
Expand All @@ -72,26 +52,34 @@ pub struct Server {

impl Server {
/// Create a new application server, but do not listen for connections yet.
pub fn new(options: ServerOptions) -> Self {
Self {
state: Arc::new(ServerState::new(options)),
pub fn new(options: ServerOptions) -> Result<Self> {
Ok(Self {
state: Arc::new(ServerState::new(options)?),
shutdown: Shutdown::new(),
}
})
}

/// Returns the server's state object.
pub fn state(&self) -> Arc<ServerState> {
Arc::clone(&self.state)
}

/// Returns a future that resolves when the server is terminated.
async fn terminated(&self) {
self.shutdown.wait().await
}

/// Run the application server, listening on a stream of connections.
pub async fn listen(&self, incoming: AddrIncoming) -> Result<()> {
listen::start_server(self.state(), incoming, self.terminated()).await
let state = self.state.clone();
let terminated = self.shutdown.wait();
tokio::spawn(async move {
let background_tasks = futures_util::future::join(
state.listen_for_transfers(),
state.close_old_sessions(),
);
tokio::select! {
_ = terminated => {}
_ = background_tasks => {}
}
});

listen::start_server(self.state(), incoming, self.shutdown.wait()).await
}

/// Convenience function to call [`Server::listen`] bound to a TCP address.
Expand All @@ -104,8 +92,6 @@ impl Server {
// Stop receiving new network connections.
self.shutdown.shutdown();
// Terminate each of the existing sessions.
for entry in &self.state.store {
entry.value().shutdown();
}
self.state.shutdown();
}
}
Loading

0 comments on commit 53e0055

Please sign in to comment.