Skip to content

Commit

Permalink
Clean up Stream logic (#57)
Browse files Browse the repository at this point in the history
* Rename event::receives to event::callback, add Event enum

* Update stream settings with new subject format

* Update prefix for jobs

* Enable listener

* Update stream name to FARMHAND_EVENTS

* Make stream name a constant
  • Loading branch information
sneakycrow authored Feb 22, 2025
1 parent 93faf64 commit 63f6270
Show file tree
Hide file tree
Showing 10 changed files with 159 additions and 31 deletions.
10 changes: 9 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,21 @@ name = "api"
path = "src/bin/api.rs"

[[bin]]
name = "job_runner"
name = "queue"
path = "src/bin/job_runner.rs"

[[bin]]
name = "listener"
path = "src/bin/stream_listener.rs"

[[bin]]
name = "up"
path = "src/bin/up.rs"

[[bin]]
name = "down"
path = "src/bin/down.rs"

[dependencies]
anyhow = "1.0.95"
async-trait = "0.1"
Expand Down
6 changes: 5 additions & 1 deletion justfile
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ dev-api:

# Run the job runner in dev mode
dev-queue:
cargo run --bin job_runner
cargo run --bin queue

# Run the listener in dev mode
dev-listener:
cargo run --bin listener

# Database commands
create-db:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::sync::Arc;

use crate::{
api::{app_state::AppState, routes::auth::oauth::twitch::TwitchCredentials},
workers::runner::chat,
workers::{events::Event, runner::chat::ChatMessagePayload},
};

type HmacSha256 = Hmac<Sha256>;
Expand Down Expand Up @@ -100,17 +100,28 @@ pub async fn handle_webhook(
tracing::debug!("Event type: {}", notification.subscription.event_type);
let notification_type = notification.subscription.event_type;
match notification_type.as_str() {
// TODO: Replace with channel.chat.message when ready
// NOTE: This is set as channel.subscribe because the twitch CLI does not support channel.chat.message yet
"channel.chat.message" => {
tracing::debug!("Channel chat message received");
let Some(event) = notification.event else {
// Pull the raw payload out of the notification
let Some(raw_payload) = notification.event else {
tracing::error!("Received channel.chat.message notification without event");
return (StatusCode::BAD_REQUEST, "Missing event data").into_response();
};
// Parse into a ChatMessagePayload so we can get the appropriate subject
let message_payload =
serde_json::from_value::<ChatMessagePayload>(raw_payload.clone());
let Ok(message_payload) = message_payload else {
tracing::error!("Failed to parse channel.chat.message notification");
return (StatusCode::BAD_REQUEST, "Invalid event data").into_response();
};
// Get the subject
let subject = Event::from(message_payload).get_subject();
state
.job_queue
.publish("farmhand_jobs.chat.save".to_string(), event.to_string())
.publish(
subject.to_string(),
raw_payload.to_string(), // Pass the original payload so we can skip serialization
)
.await
.map_err(|e| {
tracing::error!("Failed to publish chat message job: {}", e);
Expand Down
2 changes: 1 addition & 1 deletion src/api/twitch/eventsub/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
pub mod receivers;
pub mod callback;
pub mod subscribers;
2 changes: 1 addition & 1 deletion src/bin/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ async fn main() {
.nest(
"/eventsub",
Router::new()
.route("/", post(twitch::eventsub::receivers::handle_webhook))
.route("/", post(twitch::eventsub::callback::handle_webhook))
.with_state(state.clone()),
)
.nest(
Expand Down
46 changes: 31 additions & 15 deletions src/bin/job_runner.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use anyhow::Result;
use async_nats::jetstream::AckKind;
use farmhand::workers::{self, runner::process_message};
use farmhand::workers::{
self,
events::{EVENT_PREFIX, PRIMARY_STREAM},
runner::process_message,
};
use futures::StreamExt;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

Expand All @@ -15,41 +19,53 @@ async fn main() -> Result<()> {
// Connect to the stream
tracing::debug!("Connecting to NATS server");
let nats_client = workers::create_nats_client().await?;
let jq_name = "FARMHAND_JOBS".to_string();
let jq_name = PRIMARY_STREAM.to_string();
tracing::debug!("Connecting to queue");
let queue = workers::Queue::connect(jq_name, nats_client)
.await
.expect("Failed to create worker queue");

// Create a consumer for the queue
let subject = "farmhand_jobs.>".to_string(); // All jobs
// Get all jobs from the stream
let subject = format!("{}.jobs.>", EVENT_PREFIX); // All farmhand jobs

// TODO: Make this ID dynamic so we can run more than one runner at a time
// Make sure not too make it too dynamic, as they are intended to be re-used
let runner_name = "farmhand_runner_1".to_string();
tracing::info!("Listening for jobs {} on {}", subject, runner_name);
// Create the consumer to listen for jobs
let consumer = queue.create_consumer(Some(runner_name), subject).await?;

// Start consuming jobs
loop {
// TODO: Make this max_messages dynamic
let mut jobs = consumer.fetch().max_messages(3).messages().await?;

// Start processing jobs
let mut handles = Vec::new();
while let Some(job) = jobs.next().await {
// Make sure the job is good to go
let Ok(job) = job else {
tracing::error!("Failed to receive job");
continue;
};
// Process the message itself, ack on success, nack on failure
match process_message(&job.message).await {
Ok(_) => job.ack().await.expect("Failed to ack job"),
Err(err) => {
tracing::error!("Failed to process job: {}", err);
job.ack_with(AckKind::Nak(None))
.await
.expect("Failed to nack job");
let handle = tokio::spawn(async move {
match process_message(&job.message).await {
Ok(_) => job.ack().await.expect("Failed to ack job"),
Err(err) => {
tracing::error!("Failed to process job: {}", err);
job.ack_with(AckKind::Nak(None))
.await
.expect("Failed to nack job");
}
}
}
});
handles.push(handle);
}

for handle in handles {
handle.await.expect("Task failed");
}

// Optional: Add a small delay to prevent tight loops when there are no jobs
// Add a small delay to prevent tight loops when there are no jobs
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
}
49 changes: 49 additions & 0 deletions src/bin/stream_listener.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
//! This is a program for simply logging the stream of events going to farmhand nats
//! It intentionally does not do anything else
use anyhow::Result;
use farmhand::workers::{self, events::EVENT_PREFIX};
use futures::StreamExt;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

#[tokio::main]
async fn main() -> Result<()> {
// Initialize tracing subscriber
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
// Create the NATS client
tracing::debug!("Connecting to NATS server");
let nats_client = workers::create_nats_client().await?;
// Setup the Jetstream queue
let jq_name = std::env::args()
.nth(1)
.unwrap_or_else(|| "FARMHAND_JOBS".to_string());
let queue = workers::Queue::connect(jq_name, nats_client)
.await
.expect("Failed to create worker queue");

// Get all events from the stream
let subject = format!("{}.>", EVENT_PREFIX); // All farmhand events
let runner_name = "farmhand_listener_1".to_string();
tracing::info!("Listening for events {} on {}", subject, runner_name);
// Create the consumer to listen for events
let consumer = queue.create_consumer(Some(runner_name), subject).await?;
loop {
let mut jobs = consumer.fetch().max_messages(20).messages().await?;
while let Some(job) = jobs.next().await {
// Make sure the job is good to go
let Ok(job) = job else {
tracing::error!("Failed to receive job");
continue;
};
tracing::info!("{:?}", job);
}

// Add a small delay to prevent tight loops when there are no jobs
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
}
25 changes: 18 additions & 7 deletions src/bin/up.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
use anyhow::Result;
use farmhand::{db, workers};
use farmhand::{
db,
workers::{
self,
events::{EVENT_PREFIX, PRIMARY_STREAM},
},
};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

#[tokio::main]
Expand Down Expand Up @@ -48,12 +54,17 @@ async fn init_project_nats() {
.expect("Failed to connect to NATS");

// Create the job queue stream
let jq_name = "FARMHAND_JOBS".to_string();
let jq_desc = Some("Farmhand job runner queue".to_string());
let jq_subjects = vec!["farmhand_jobs.>".to_string()];
workers::Queue::new(jq_name, jq_desc, jq_subjects, nats_client)
.await
.expect("Failed to create worker queue");
let jq_desc = Some("All Farmhand events".to_string());
let all_events = format!("{}.>", EVENT_PREFIX);
let jq_subjects = vec![all_events];
workers::Queue::new(
PRIMARY_STREAM.to_string(),
jq_desc,
jq_subjects,
nats_client,
)
.await
.expect("Failed to create worker queue");

tracing::info!("Successfully initialized NATS worker queue");
}
28 changes: 28 additions & 0 deletions src/workers/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use super::runner::chat::ChatMessagePayload;

/// Represents events we send and receive from NATS
/// Primarily used to get the appropriate subject name for an event
pub enum Event {
ChatMessage(ChatMessagePayload),
}

pub const EVENT_PREFIX: &str = "farmhand";
pub const PRIMARY_STREAM: &str = "FARMHAND_EVENTS";

impl Event {
pub fn get_subject(&self) -> String {
match self {
// twitch.events.{broadcaster_name}.chat_message
Event::ChatMessage(payload) => format!(
"{}.twitch.events.{}.chat_message",
EVENT_PREFIX, payload.broadcaster_user_name
),
}
}
}

impl From<ChatMessagePayload> for Event {
fn from(payload: ChatMessagePayload) -> Self {
Event::ChatMessage(payload)
}
}
1 change: 1 addition & 0 deletions src/workers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod events;
pub mod runner;

pub use runner::Queue;
Expand Down

0 comments on commit 63f6270

Please sign in to comment.