Skip to content

Commit

Permalink
Add separate stream for jobs (#58)
Browse files Browse the repository at this point in the history
* Restructure workers

* Add separate stream for events v jobs

* Fix descriptions
  • Loading branch information
sneakycrow authored Feb 22, 2025
1 parent 83b3afe commit 706d32b
Show file tree
Hide file tree
Showing 18 changed files with 208 additions and 105 deletions.
19 changes: 14 additions & 5 deletions src/api/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ use super::config::Config;
use crate::{
db::connect_to_database,
storage::s3::create_s3_client,
workers::{self, events::PRIMARY_STREAM},
workers::events::Stream,
workers::{create_nats_client, Queue},
};
use sqlx::PgPool;

/// Shared state available to the API
pub struct AppState {
pub db: PgPool,
pub job_queue: workers::Queue,
pub job_queue: Queue,
pub event_stream: Stream,
pub config: Config,
pub s3_client: aws_sdk_s3::Client,
}
Expand All @@ -24,17 +26,24 @@ impl AppState {
// Create the S3 Client
let s3_client = create_s3_client().await;

// Create a NATS client
let nats_client = create_nats_client().await?;

// Connect to the job queue
let nats_client = workers::create_nats_client().await?;
let jq_name = PRIMARY_STREAM.to_string();
let job_queue = workers::Queue::connect(jq_name, nats_client)
let job_queue = Queue::connect(nats_client.clone())
.await
.expect("Failed to create worker queue");

// Connect to the event stream
let event_stream = Stream::connect(nats_client.clone())
.await
.expect("Failed to connect to event stream");

Ok(Self {
config,
db,
job_queue,
event_stream,
s3_client,
})
}
Expand Down
4 changes: 2 additions & 2 deletions src/api/twitch/eventsub/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::sync::Arc;

use crate::{
api::{app_state::AppState, routes::auth::oauth::twitch::TwitchCredentials},
workers::{events::Event, runner::chat::ChatMessagePayload},
workers::events::{chat::ChatMessagePayload, Event},
};

type HmacSha256 = Hmac<Sha256>;
Expand Down Expand Up @@ -117,7 +117,7 @@ pub async fn handle_webhook(
// Get the subject
let subject = Event::from(message_payload).get_subject();
state
.job_queue
.event_stream
.publish(
subject.to_string(),
raw_payload.to_string(), // Pass the original payload so we can skip serialization
Expand Down
11 changes: 7 additions & 4 deletions src/bin/down.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use anyhow::Result;
use farmhand::{db, workers};
use farmhand::{
db,
workers::{create_nats_client, Queue, Stream},
};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

#[tokio::main]
Expand Down Expand Up @@ -27,9 +30,9 @@ async fn main() -> Result<()> {

// Delete all streams
tracing::debug!("Deleting all streams");
let nats_client = workers::create_nats_client().await?;
let jq_name = "FARMHAND_JOBS".to_string();
workers::Queue::delete(jq_name, nats_client).await?;
let nats_client = create_nats_client().await?;
Queue::delete(nats_client.clone()).await?;
Stream::delete(nats_client.clone()).await?;

tracing::info!("Successfully deleted all streams");
Ok(())
Expand Down
11 changes: 3 additions & 8 deletions src/bin/job_runner.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
use anyhow::Result;
use async_nats::jetstream::AckKind;
use farmhand::workers::{
self,
events::{EVENT_PREFIX, PRIMARY_STREAM},
runner::process_message,
};
use farmhand::workers::{self, events::MESSAGE_PREFIX, queue::process_message};
use futures::StreamExt;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

Expand All @@ -19,14 +15,13 @@ async fn main() -> Result<()> {
// Connect to the stream
tracing::debug!("Connecting to NATS server");
let nats_client = workers::create_nats_client().await?;
let jq_name = PRIMARY_STREAM.to_string();
tracing::debug!("Connecting to queue");
let queue = workers::Queue::connect(jq_name, nats_client)
let queue = workers::Queue::connect(nats_client)
.await
.expect("Failed to create worker queue");

// Get all jobs from the stream
let subject = format!("{}.jobs.>", EVENT_PREFIX); // All farmhand jobs
let subject = format!("{}.jobs.>", MESSAGE_PREFIX); // All farmhand jobs

// TODO: Make this ID dynamic so we can run more than one runner at a time
// Make sure not too make it too dynamic, as they are intended to be re-used
Expand Down
14 changes: 4 additions & 10 deletions src/bin/stream_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@
//! It intentionally does not do anything else
use anyhow::Result;
use farmhand::workers::{
self,
events::{EVENT_PREFIX, PRIMARY_STREAM},
};
use farmhand::workers::{self, events::MESSAGE_PREFIX};
use futures::StreamExt;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

Expand All @@ -22,19 +19,16 @@ async fn main() -> Result<()> {
tracing::debug!("Connecting to NATS server");
let nats_client = workers::create_nats_client().await?;
// Setup the Jetstream queue
let jq_name = std::env::args()
.nth(1)
.unwrap_or_else(|| PRIMARY_STREAM.to_string());
let queue = workers::Queue::connect(jq_name, nats_client)
let listener = workers::Stream::connect(nats_client)
.await
.expect("Failed to create worker queue");

// Get all events from the stream
let subject = format!("{}.>", EVENT_PREFIX); // All farmhand events
let subject = format!("{}.>", MESSAGE_PREFIX); // All farmhand events
let runner_name = "farmhand_listener_1".to_string();
tracing::info!("Listening for events {} on {}", subject, runner_name);
// Create the consumer to listen for events
let consumer = queue.create_consumer(Some(runner_name), subject).await?;
let consumer = listener.create_consumer(Some(runner_name), subject).await?;
loop {
let mut jobs = consumer.fetch().max_messages(20).messages().await?;
while let Some(job) = jobs.next().await {
Expand Down
29 changes: 19 additions & 10 deletions src/bin/up.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use farmhand::{
db,
workers::{
self,
events::{EVENT_PREFIX, PRIMARY_STREAM},
events::{EVENT_PREFIX, EVENT_STREAM, JOB_PREFIX, JOB_STREAM, MESSAGE_PREFIX},
},
};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
Expand Down Expand Up @@ -53,18 +53,27 @@ async fn init_project_nats() {
.await
.expect("Failed to connect to NATS");

// Create the job queue stream
let jq_desc = Some("All Farmhand events".to_string());
let all_events = format!("{}.>", EVENT_PREFIX);
let jq_subjects = vec![all_events];
workers::Queue::new(
PRIMARY_STREAM.to_string(),
jq_desc,
jq_subjects,
nats_client,
// Create the event stream
let all_events_subject = format!("{}.{}.>", MESSAGE_PREFIX, EVENT_PREFIX);
workers::Stream::new(
EVENT_STREAM.to_string(),
Some("All Farmhand events".to_string()),
vec![all_events_subject],
nats_client.clone(),
)
.await
.expect("Failed to create worker queue");

// Create the job stream
let all_jobs_subject = format!("{}.{}.>", MESSAGE_PREFIX, JOB_PREFIX);
workers::Queue::new(
JOB_STREAM.to_string(),
Some("All Farmhand jobs".to_string()),
vec![all_jobs_subject],
nats_client.clone(),
)
.await
.expect("Failed to create job queue");

tracing::info!("Successfully initialized NATS worker queue");
}
2 changes: 1 addition & 1 deletion src/error/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
pub mod queue;

pub use queue::QueueError;
pub use queue::{QueueError, StreamError};
6 changes: 6 additions & 0 deletions src/error/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,9 @@ pub enum QueueError {
#[error("Invalid Connection: {0}")]
InvalidConnection(String),
}

#[derive(Error, Debug)]
pub enum StreamError {
#[error("Invalid Connection: {0}")]
InvalidConnection(String),
}
28 changes: 0 additions & 28 deletions src/workers/events.rs

This file was deleted.

18 changes: 0 additions & 18 deletions src/workers/runner/chat.rs → src/workers/events/chat.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use anyhow::Result;
use serde::{Deserialize, Serialize};

use super::Runner;

#[derive(Deserialize, Serialize)]
pub struct ChatMessagePayload {
pub message: Message,
Expand Down Expand Up @@ -85,18 +82,3 @@ impl ChatMessagePayload {
}
}
}

/// A runner for processing chat messages
pub struct ChatMessageRunner;

impl Runner for ChatMessageRunner {
type Payload = ChatMessagePayload;

async fn process_job(&self, payload: Self::Payload) -> Result<()> {
tracing::debug!(
"Processing job with runner ChatMessageRunner for broadcaster: {broadcaster}",
broadcaster = payload.broadcaster_user_name
);
Ok(())
}
}
35 changes: 35 additions & 0 deletions src/workers/events/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use chat::ChatMessagePayload;

pub mod chat;
pub mod stream;
pub use stream::Stream;

/// Represents events we send and receive from NATS
/// Primarily used to get the appropriate subject name for an event
pub enum Event {
ChatMessage(ChatMessagePayload),
}

pub const MESSAGE_PREFIX: &str = "farmhand";
pub const EVENT_PREFIX: &str = "events";
pub const JOB_PREFIX: &str = "jobs";
pub const EVENT_STREAM: &str = "FARMHAND_EVENTS";
pub const JOB_STREAM: &str = "FARMHAND_JOBS";

impl Event {
pub fn get_subject(&self) -> String {
match self {
// farmhand.events.twitch.{broadcaster_name}.chat_message
Event::ChatMessage(payload) => format!(
"{}.{}.twitch.events.{}.chat_message",
MESSAGE_PREFIX, EVENT_PREFIX, payload.broadcaster_user_name
),
}
}
}

impl From<ChatMessagePayload> for Event {
fn from(payload: ChatMessagePayload) -> Self {
Event::ChatMessage(payload)
}
}
Loading

0 comments on commit 706d32b

Please sign in to comment.