Skip to content

Commit

Permalink
Replace Queue with NATS-based system (#56)
Browse files Browse the repository at this point in the history
* Add binaries up/down, add initial queue modules

* Allow dead code to Queue struct

* Fix subjects, make queue name uppercase

* Initialize job runner

* Add new nats-based runner system

* Post twitch channel chat messages to nats

* Remove query macro
  • Loading branch information
sneakycrow authored Feb 17, 2025
1 parent e7910f8 commit 93faf64
Show file tree
Hide file tree
Showing 21 changed files with 829 additions and 27 deletions.
267 changes: 262 additions & 5 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ path = "src/bin/api.rs"
name = "job_runner"
path = "src/bin/job_runner.rs"

[[bin]]
name = "up"
path = "src/bin/up.rs"

[dependencies]
anyhow = "1.0.95"
async-trait = "0.1"
Expand Down Expand Up @@ -48,3 +52,4 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] }
urlencoding = "2.1"
uuid = { version = "1.6", features = ["v4", "serde"] }
walkdir = "2.5.0"
async-nats = "0.38.0"
29 changes: 21 additions & 8 deletions justfile
Original file line number Diff line number Diff line change
@@ -1,17 +1,34 @@
set dotenv-load := true

# Initialization Commands
up:
@just create-db
cargo run --bin up

# De-initialization Commands
down:
cargo run --bin down

# Build Commands
build-api:
cargo build --bin api

# Dev Commands
dev-ui: dev-web

alias dev-ui := dev-web
# Run the web server in dev mode
dev-web:
yarn dev

# Database commands
init-db: create-db migrate
# Run the api server in dev mode
dev-api:
cargo run --bin api

# Run the job runner in dev mode
dev-queue:
cargo run --bin job_runner

# Database commands
create-db:
sqlx database create

Expand Down Expand Up @@ -51,15 +68,11 @@ verify:

# Nuke all data
nuke:
@just drop-db
@just down
@just clean

# Install all dependencies
# TODO: Install ffmpeg dependency
install:
yarn
cargo check

init:
@just install
@just init-db
3 changes: 2 additions & 1 deletion k8s/base/api/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ metadata:
name: farmhand-api-config
data:
FRONTEND_URL: "https://staging.farmhand.witchscrow.com"
RUST_LOG: "api=debug,db=debug,queue=debug,tower_http=debug,axum::rejection=trace"
RUST_LOG: "api=debug,db=debug,farmhand=debug,tower_http=debug,axum::rejection=trace"
FFMPEG_LOCATION: "/opt/homebrew/bin/ffmpeg" # TODO: Make this is the right path for docker context
TWITCH_REDIRECT_URI: "https://staging.api.farmhand.witchscrow.com/auth/twitch/callback"
STORAGE: "videos/staging" # TODO: Make this dynamic per environment
UPLOAD_BUCKET: "farmhand"
NATS_URL: "nats://nats.staging.svc.cluster.local:4222"
11 changes: 10 additions & 1 deletion src/api/app_state.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use super::config::Config;
use crate::{db::connect_to_database, storage::s3::create_s3_client};
use crate::{db::connect_to_database, storage::s3::create_s3_client, workers};
use sqlx::PgPool;

/// Shared state available to the API
pub struct AppState {
pub db: PgPool,
pub job_queue: workers::Queue,
pub config: Config,
pub s3_client: aws_sdk_s3::Client,
}
Expand All @@ -19,9 +20,17 @@ impl AppState {
// Create the S3 Client
let s3_client = create_s3_client().await;

// Connect to the job queue
let nats_client = workers::create_nats_client().await?;
let jq_name = "FARMHAND_JOBS".to_string();
let job_queue = workers::Queue::connect(jq_name, nats_client)
.await
.expect("Failed to create worker queue");

Ok(Self {
config,
db,
job_queue,
s3_client,
})
}
Expand Down
35 changes: 28 additions & 7 deletions src/api/twitch/eventsub/receivers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ use serde::{Deserialize, Serialize};
use sha2::Sha256;
use std::sync::Arc;

use crate::api::{app_state::AppState, routes::auth::oauth::twitch::TwitchCredentials};
use crate::{
api::{app_state::AppState, routes::auth::oauth::twitch::TwitchCredentials},
workers::runner::chat,
};

type HmacSha256 = Hmac<Sha256>;
const HMAC_PREFIX: &str = "sha256=";
Expand Down Expand Up @@ -42,7 +45,7 @@ struct Notification {
}

pub async fn handle_webhook(
State(_state): State<Arc<AppState>>,
State(state): State<Arc<AppState>>,
headers: HeaderMap,
body: Bytes,
) -> impl IntoResponse {
Expand Down Expand Up @@ -95,11 +98,29 @@ pub async fn handle_webhook(
match message_type {
"notification" => {
tracing::debug!("Event type: {}", notification.subscription.event_type);
if let Some(event) = notification.event {
tracing::debug!(
"Event data: {}",
serde_json::to_string_pretty(&event).unwrap()
);
let notification_type = notification.subscription.event_type;
match notification_type.as_str() {
// TODO: Replace with channel.chat.message when ready
// NOTE: This is set as channel.subscribe because the twitch CLI does not support channel.chat.message yet
"channel.chat.message" => {
tracing::debug!("Channel chat message received");
let Some(event) = notification.event else {
tracing::error!("Received channel.chat.message notification without event");
return (StatusCode::BAD_REQUEST, "Missing event data").into_response();
};
state
.job_queue
.publish("farmhand_jobs.chat.save".to_string(), event.to_string())
.await
.map_err(|e| {
tracing::error!("Failed to publish chat message job: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})
.expect("Failed to publish chat message job");
}
_ => {
tracing::warn!("Unhandled notification event type: {}", notification_type);
}
}
StatusCode::NO_CONTENT.into_response()
}
Expand Down
5 changes: 2 additions & 3 deletions src/bin/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ async fn main() {
// Start the tracer
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| {
"api=debug,db=debug,queue=debug,tower_http=debug,axum::rejection=trace".into()
}),
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "api=info,tower_http=debug,axum::rejection=trace".into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
Expand Down
36 changes: 36 additions & 0 deletions src/bin/down.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use anyhow::Result;
use farmhand::{db, workers};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "down=info".into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
tracing::warn!("Deleting all data from the project, this is a destructive operation");
for i in (1..=5).rev() {
tracing::warn!("Deleting all data in {} seconds...", i);
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
tracing::info!("Starting deletion process");
let db_pool = db::connect_to_database().await?;

// Delete all data from the database
tracing::debug!("Deleting all data from the database");
db::delete_all_data(&db_pool).await?;

tracing::info!("Successfully deleted all data from the database");

// Delete all streams
tracing::debug!("Deleting all streams");
let nats_client = workers::create_nats_client().await?;
let jq_name = "FARMHAND_JOBS".to_string();
workers::Queue::delete(jq_name, nats_client).await?;

tracing::info!("Successfully deleted all streams");
Ok(())
}
56 changes: 54 additions & 2 deletions src/bin/job_runner.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,55 @@
pub fn main() {
tracing::info!("Hello from the job runner!");
use anyhow::Result;
use async_nats::jetstream::AckKind;
use farmhand::workers::{self, runner::process_message};
use futures::StreamExt;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
// Connect to the stream
tracing::debug!("Connecting to NATS server");
let nats_client = workers::create_nats_client().await?;
let jq_name = "FARMHAND_JOBS".to_string();
tracing::debug!("Connecting to queue");
let queue = workers::Queue::connect(jq_name, nats_client)
.await
.expect("Failed to create worker queue");

// Create a consumer for the queue
let subject = "farmhand_jobs.>".to_string(); // All jobs
let runner_name = "farmhand_runner_1".to_string();
tracing::info!("Listening for jobs {} on {}", subject, runner_name);
let consumer = queue.create_consumer(Some(runner_name), subject).await?;

// Start consuming jobs
loop {
let mut jobs = consumer.fetch().max_messages(3).messages().await?;

while let Some(job) = jobs.next().await {
// Make sure the job is good to go
let Ok(job) = job else {
tracing::error!("Failed to receive job");
continue;
};
// Process the message itself, ack on success, nack on failure
match process_message(&job.message).await {
Ok(_) => job.ack().await.expect("Failed to ack job"),
Err(err) => {
tracing::error!("Failed to process job: {}", err);
job.ack_with(AckKind::Nak(None))
.await
.expect("Failed to nack job");
}
}
}

// Optional: Add a small delay to prevent tight loops when there are no jobs
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
}
59 changes: 59 additions & 0 deletions src/bin/up.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use anyhow::Result;
use farmhand::{db, workers};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "up=info".into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
tracing::info!("Initializing project");
// Run database-related initialization tasks
let (_db_handle, _nats_handle) = tokio::join!(init_project_db(), init_project_nats());

Ok(())
}

/// Function for initializing project-wide database dependencies
async fn init_project_db() {
tracing::debug!("Starting database initialization");

// Connect to the database
tracing::debug!("Connecting to database");
let db_pool = db::connect_to_database()
.await
.expect("Failed to connect to database");

// Run migrations so we can use the database
tracing::debug!("Running migrations");
db::run_migrations(&db_pool)
.await
.expect("Failed to run migrations");

tracing::info!("Successfully initialized database");
}

/// Function for initializing project-wide nats dependencies
async fn init_project_nats() {
tracing::debug!("Starting NATS initialization");

// Connect to the NATS server
tracing::debug!("Connecting to NATS server");
let nats_client = workers::create_nats_client()
.await
.expect("Failed to connect to NATS");

// Create the job queue stream
let jq_name = "FARMHAND_JOBS".to_string();
let jq_desc = Some("Farmhand job runner queue".to_string());
let jq_subjects = vec!["farmhand_jobs.>".to_string()];
workers::Queue::new(jq_name, jq_desc, jq_subjects, nats_client)
.await
.expect("Failed to create worker queue");

tracing::info!("Successfully initialized NATS worker queue");
}
33 changes: 33 additions & 0 deletions src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,36 @@ pub async fn run_migrations(pool: &PgPool) -> Result<(), sqlx::Error> {

Ok(())
}
/// Function for deleting all data from the database
pub async fn delete_all_data(pool: &PgPool) -> Result<(), sqlx::Error> {
tracing::debug!("Deleting all data from the database");
sqlx::query(
"DO $$ DECLARE
r RECORD;
BEGIN
-- Drop all tables
FOR r IN (SELECT tablename FROM pg_tables WHERE schemaname = 'public') LOOP
EXECUTE 'DROP TABLE IF EXISTS ' || quote_ident(r.tablename) || ' CASCADE';
END LOOP;
-- Drop all types/enums
FOR r IN (SELECT typname FROM pg_type WHERE typnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'public')) LOOP
EXECUTE 'DROP TYPE IF EXISTS ' || quote_ident(r.typname) || ' CASCADE';
END LOOP;
-- Drop all functions/triggers
FOR r IN (SELECT proname FROM pg_proc WHERE pronamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'public')) LOOP
EXECUTE 'DROP FUNCTION IF EXISTS ' || quote_ident(r.proname) || ' CASCADE';
END LOOP;
-- Drop all sequences
FOR r IN (SELECT sequence_name FROM information_schema.sequences WHERE sequence_schema = 'public') LOOP
EXECUTE 'DROP SEQUENCE IF EXISTS ' || quote_ident(r.sequence_name) || ' CASCADE';
END LOOP;
END $$;"
)
.execute(pool)
.await?;

Ok(())
}
3 changes: 3 additions & 0 deletions src/error/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod queue;

pub use queue::QueueError;
7 changes: 7 additions & 0 deletions src/error/queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
use thiserror::Error;

#[derive(Error, Debug)]
pub enum QueueError {
#[error("Invalid Connection: {0}")]
InvalidConnection(String),
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod api;
pub mod db;
pub mod error;
pub mod prelude;
pub mod storage;
pub mod vod;
Expand Down
4 changes: 4 additions & 0 deletions src/workers/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub mod runner;

pub use runner::Queue;
pub use runner::{create_nats_client, get_nats_url};
Loading

0 comments on commit 93faf64

Please sign in to comment.