From 63f62707b6e8b070f0f9bae4f7b96dcde0ba303e Mon Sep 17 00:00:00 2001 From: Zachary Corvidae Date: Fri, 21 Feb 2025 16:55:12 -0800 Subject: [PATCH] Clean up Stream logic (#57) * Rename event::receives to event::callback, add Event enum * Update stream settings with new subject format * Update prefix for jobs * Enable listener * Update stream name to FARMHAND_EVENTS * Make stream name a constant --- Cargo.toml | 10 +++- justfile | 6 ++- .../eventsub/{receivers.rs => callback.rs} | 21 ++++++-- src/api/twitch/eventsub/mod.rs | 2 +- src/bin/api.rs | 2 +- src/bin/job_runner.rs | 46 +++++++++++------ src/bin/stream_listener.rs | 49 +++++++++++++++++++ src/bin/up.rs | 25 +++++++--- src/workers/events.rs | 28 +++++++++++ src/workers/mod.rs | 1 + 10 files changed, 159 insertions(+), 31 deletions(-) rename src/api/twitch/eventsub/{receivers.rs => callback.rs} (83%) create mode 100644 src/bin/stream_listener.rs create mode 100644 src/workers/events.rs diff --git a/Cargo.toml b/Cargo.toml index c895506..a188702 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,13 +8,21 @@ name = "api" path = "src/bin/api.rs" [[bin]] -name = "job_runner" +name = "queue" path = "src/bin/job_runner.rs" +[[bin]] +name = "listener" +path = "src/bin/stream_listener.rs" + [[bin]] name = "up" path = "src/bin/up.rs" +[[bin]] +name = "down" +path = "src/bin/down.rs" + [dependencies] anyhow = "1.0.95" async-trait = "0.1" diff --git a/justfile b/justfile index d54dbf7..5c38b95 100644 --- a/justfile +++ b/justfile @@ -26,7 +26,11 @@ dev-api: # Run the job runner in dev mode dev-queue: - cargo run --bin job_runner + cargo run --bin queue + +# Run the listener in dev mode +dev-listener: + cargo run --bin listener # Database commands create-db: diff --git a/src/api/twitch/eventsub/receivers.rs b/src/api/twitch/eventsub/callback.rs similarity index 83% rename from src/api/twitch/eventsub/receivers.rs rename to src/api/twitch/eventsub/callback.rs index 315d945..c4430e8 100644 --- a/src/api/twitch/eventsub/receivers.rs +++ b/src/api/twitch/eventsub/callback.rs @@ -12,7 +12,7 @@ use std::sync::Arc; use crate::{ api::{app_state::AppState, routes::auth::oauth::twitch::TwitchCredentials}, - workers::runner::chat, + workers::{events::Event, runner::chat::ChatMessagePayload}, }; type HmacSha256 = Hmac; @@ -100,17 +100,28 @@ pub async fn handle_webhook( tracing::debug!("Event type: {}", notification.subscription.event_type); let notification_type = notification.subscription.event_type; match notification_type.as_str() { - // TODO: Replace with channel.chat.message when ready - // NOTE: This is set as channel.subscribe because the twitch CLI does not support channel.chat.message yet "channel.chat.message" => { tracing::debug!("Channel chat message received"); - let Some(event) = notification.event else { + // Pull the raw payload out of the notification + let Some(raw_payload) = notification.event else { tracing::error!("Received channel.chat.message notification without event"); return (StatusCode::BAD_REQUEST, "Missing event data").into_response(); }; + // Parse into a ChatMessagePayload so we can get the appropriate subject + let message_payload = + serde_json::from_value::(raw_payload.clone()); + let Ok(message_payload) = message_payload else { + tracing::error!("Failed to parse channel.chat.message notification"); + return (StatusCode::BAD_REQUEST, "Invalid event data").into_response(); + }; + // Get the subject + let subject = Event::from(message_payload).get_subject(); state .job_queue - .publish("farmhand_jobs.chat.save".to_string(), event.to_string()) + .publish( + subject.to_string(), + raw_payload.to_string(), // Pass the original payload so we can skip serialization + ) .await .map_err(|e| { tracing::error!("Failed to publish chat message job: {}", e); diff --git a/src/api/twitch/eventsub/mod.rs b/src/api/twitch/eventsub/mod.rs index 087e6a1..3b4c66b 100644 --- a/src/api/twitch/eventsub/mod.rs +++ b/src/api/twitch/eventsub/mod.rs @@ -1,2 +1,2 @@ -pub mod receivers; +pub mod callback; pub mod subscribers; diff --git a/src/bin/api.rs b/src/bin/api.rs index a9bd820..1e91daf 100644 --- a/src/bin/api.rs +++ b/src/bin/api.rs @@ -62,7 +62,7 @@ async fn main() { .nest( "/eventsub", Router::new() - .route("/", post(twitch::eventsub::receivers::handle_webhook)) + .route("/", post(twitch::eventsub::callback::handle_webhook)) .with_state(state.clone()), ) .nest( diff --git a/src/bin/job_runner.rs b/src/bin/job_runner.rs index e695deb..aac1dd3 100644 --- a/src/bin/job_runner.rs +++ b/src/bin/job_runner.rs @@ -1,6 +1,10 @@ use anyhow::Result; use async_nats::jetstream::AckKind; -use farmhand::workers::{self, runner::process_message}; +use farmhand::workers::{ + self, + events::{EVENT_PREFIX, PRIMARY_STREAM}, + runner::process_message, +}; use futures::StreamExt; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -15,22 +19,27 @@ async fn main() -> Result<()> { // Connect to the stream tracing::debug!("Connecting to NATS server"); let nats_client = workers::create_nats_client().await?; - let jq_name = "FARMHAND_JOBS".to_string(); + let jq_name = PRIMARY_STREAM.to_string(); tracing::debug!("Connecting to queue"); let queue = workers::Queue::connect(jq_name, nats_client) .await .expect("Failed to create worker queue"); - // Create a consumer for the queue - let subject = "farmhand_jobs.>".to_string(); // All jobs + // Get all jobs from the stream + let subject = format!("{}.jobs.>", EVENT_PREFIX); // All farmhand jobs + + // TODO: Make this ID dynamic so we can run more than one runner at a time + // Make sure not too make it too dynamic, as they are intended to be re-used let runner_name = "farmhand_runner_1".to_string(); tracing::info!("Listening for jobs {} on {}", subject, runner_name); + // Create the consumer to listen for jobs let consumer = queue.create_consumer(Some(runner_name), subject).await?; - // Start consuming jobs loop { + // TODO: Make this max_messages dynamic let mut jobs = consumer.fetch().max_messages(3).messages().await?; - + // Start processing jobs + let mut handles = Vec::new(); while let Some(job) = jobs.next().await { // Make sure the job is good to go let Ok(job) = job else { @@ -38,18 +47,25 @@ async fn main() -> Result<()> { continue; }; // Process the message itself, ack on success, nack on failure - match process_message(&job.message).await { - Ok(_) => job.ack().await.expect("Failed to ack job"), - Err(err) => { - tracing::error!("Failed to process job: {}", err); - job.ack_with(AckKind::Nak(None)) - .await - .expect("Failed to nack job"); + let handle = tokio::spawn(async move { + match process_message(&job.message).await { + Ok(_) => job.ack().await.expect("Failed to ack job"), + Err(err) => { + tracing::error!("Failed to process job: {}", err); + job.ack_with(AckKind::Nak(None)) + .await + .expect("Failed to nack job"); + } } - } + }); + handles.push(handle); + } + + for handle in handles { + handle.await.expect("Task failed"); } - // Optional: Add a small delay to prevent tight loops when there are no jobs + // Add a small delay to prevent tight loops when there are no jobs tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; } } diff --git a/src/bin/stream_listener.rs b/src/bin/stream_listener.rs new file mode 100644 index 0000000..1651b6c --- /dev/null +++ b/src/bin/stream_listener.rs @@ -0,0 +1,49 @@ +//! This is a program for simply logging the stream of events going to farmhand nats +//! It intentionally does not do anything else + +use anyhow::Result; +use farmhand::workers::{self, events::EVENT_PREFIX}; +use futures::StreamExt; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + +#[tokio::main] +async fn main() -> Result<()> { + // Initialize tracing subscriber + tracing_subscriber::registry() + .with( + tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into()), + ) + .with(tracing_subscriber::fmt::layer()) + .init(); + // Create the NATS client + tracing::debug!("Connecting to NATS server"); + let nats_client = workers::create_nats_client().await?; + // Setup the Jetstream queue + let jq_name = std::env::args() + .nth(1) + .unwrap_or_else(|| "FARMHAND_JOBS".to_string()); + let queue = workers::Queue::connect(jq_name, nats_client) + .await + .expect("Failed to create worker queue"); + + // Get all events from the stream + let subject = format!("{}.>", EVENT_PREFIX); // All farmhand events + let runner_name = "farmhand_listener_1".to_string(); + tracing::info!("Listening for events {} on {}", subject, runner_name); + // Create the consumer to listen for events + let consumer = queue.create_consumer(Some(runner_name), subject).await?; + loop { + let mut jobs = consumer.fetch().max_messages(20).messages().await?; + while let Some(job) = jobs.next().await { + // Make sure the job is good to go + let Ok(job) = job else { + tracing::error!("Failed to receive job"); + continue; + }; + tracing::info!("{:?}", job); + } + + // Add a small delay to prevent tight loops when there are no jobs + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } +} diff --git a/src/bin/up.rs b/src/bin/up.rs index c225c27..d245581 100644 --- a/src/bin/up.rs +++ b/src/bin/up.rs @@ -1,5 +1,11 @@ use anyhow::Result; -use farmhand::{db, workers}; +use farmhand::{ + db, + workers::{ + self, + events::{EVENT_PREFIX, PRIMARY_STREAM}, + }, +}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; #[tokio::main] @@ -48,12 +54,17 @@ async fn init_project_nats() { .expect("Failed to connect to NATS"); // Create the job queue stream - let jq_name = "FARMHAND_JOBS".to_string(); - let jq_desc = Some("Farmhand job runner queue".to_string()); - let jq_subjects = vec!["farmhand_jobs.>".to_string()]; - workers::Queue::new(jq_name, jq_desc, jq_subjects, nats_client) - .await - .expect("Failed to create worker queue"); + let jq_desc = Some("All Farmhand events".to_string()); + let all_events = format!("{}.>", EVENT_PREFIX); + let jq_subjects = vec![all_events]; + workers::Queue::new( + PRIMARY_STREAM.to_string(), + jq_desc, + jq_subjects, + nats_client, + ) + .await + .expect("Failed to create worker queue"); tracing::info!("Successfully initialized NATS worker queue"); } diff --git a/src/workers/events.rs b/src/workers/events.rs new file mode 100644 index 0000000..437a849 --- /dev/null +++ b/src/workers/events.rs @@ -0,0 +1,28 @@ +use super::runner::chat::ChatMessagePayload; + +/// Represents events we send and receive from NATS +/// Primarily used to get the appropriate subject name for an event +pub enum Event { + ChatMessage(ChatMessagePayload), +} + +pub const EVENT_PREFIX: &str = "farmhand"; +pub const PRIMARY_STREAM: &str = "FARMHAND_EVENTS"; + +impl Event { + pub fn get_subject(&self) -> String { + match self { + // twitch.events.{broadcaster_name}.chat_message + Event::ChatMessage(payload) => format!( + "{}.twitch.events.{}.chat_message", + EVENT_PREFIX, payload.broadcaster_user_name + ), + } + } +} + +impl From for Event { + fn from(payload: ChatMessagePayload) -> Self { + Event::ChatMessage(payload) + } +} diff --git a/src/workers/mod.rs b/src/workers/mod.rs index 46e2e8f..4c05f01 100644 --- a/src/workers/mod.rs +++ b/src/workers/mod.rs @@ -1,3 +1,4 @@ +pub mod events; pub mod runner; pub use runner::Queue;