Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add separate stream for jobs #58

Merged
merged 3 commits into from
Feb 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions src/api/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ use super::config::Config;
use crate::{
db::connect_to_database,
storage::s3::create_s3_client,
workers::{self, events::PRIMARY_STREAM},
workers::events::Stream,
workers::{create_nats_client, Queue},
};
use sqlx::PgPool;

/// Shared state available to the API
pub struct AppState {
pub db: PgPool,
pub job_queue: workers::Queue,
pub job_queue: Queue,
pub event_stream: Stream,
pub config: Config,
pub s3_client: aws_sdk_s3::Client,
}
Expand All @@ -24,17 +26,24 @@ impl AppState {
// Create the S3 Client
let s3_client = create_s3_client().await;

// Create a NATS client
let nats_client = create_nats_client().await?;

// Connect to the job queue
let nats_client = workers::create_nats_client().await?;
let jq_name = PRIMARY_STREAM.to_string();
let job_queue = workers::Queue::connect(jq_name, nats_client)
let job_queue = Queue::connect(nats_client.clone())
.await
.expect("Failed to create worker queue");

// Connect to the event stream
let event_stream = Stream::connect(nats_client.clone())
.await
.expect("Failed to connect to event stream");

Ok(Self {
config,
db,
job_queue,
event_stream,
s3_client,
})
}
Expand Down
4 changes: 2 additions & 2 deletions src/api/twitch/eventsub/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::sync::Arc;

use crate::{
api::{app_state::AppState, routes::auth::oauth::twitch::TwitchCredentials},
workers::{events::Event, runner::chat::ChatMessagePayload},
workers::events::{chat::ChatMessagePayload, Event},
};

type HmacSha256 = Hmac<Sha256>;
Expand Down Expand Up @@ -117,7 +117,7 @@ pub async fn handle_webhook(
// Get the subject
let subject = Event::from(message_payload).get_subject();
state
.job_queue
.event_stream
.publish(
subject.to_string(),
raw_payload.to_string(), // Pass the original payload so we can skip serialization
Expand Down
11 changes: 7 additions & 4 deletions src/bin/down.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use anyhow::Result;
use farmhand::{db, workers};
use farmhand::{
db,
workers::{create_nats_client, Queue, Stream},
};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

#[tokio::main]
Expand Down Expand Up @@ -27,9 +30,9 @@ async fn main() -> Result<()> {

// Delete all streams
tracing::debug!("Deleting all streams");
let nats_client = workers::create_nats_client().await?;
let jq_name = "FARMHAND_JOBS".to_string();
workers::Queue::delete(jq_name, nats_client).await?;
let nats_client = create_nats_client().await?;
Queue::delete(nats_client.clone()).await?;
Stream::delete(nats_client.clone()).await?;

tracing::info!("Successfully deleted all streams");
Ok(())
Expand Down
11 changes: 3 additions & 8 deletions src/bin/job_runner.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
use anyhow::Result;
use async_nats::jetstream::AckKind;
use farmhand::workers::{
self,
events::{EVENT_PREFIX, PRIMARY_STREAM},
runner::process_message,
};
use farmhand::workers::{self, events::MESSAGE_PREFIX, queue::process_message};
use futures::StreamExt;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

Expand All @@ -19,14 +15,13 @@ async fn main() -> Result<()> {
// Connect to the stream
tracing::debug!("Connecting to NATS server");
let nats_client = workers::create_nats_client().await?;
let jq_name = PRIMARY_STREAM.to_string();
tracing::debug!("Connecting to queue");
let queue = workers::Queue::connect(jq_name, nats_client)
let queue = workers::Queue::connect(nats_client)
.await
.expect("Failed to create worker queue");

// Get all jobs from the stream
let subject = format!("{}.jobs.>", EVENT_PREFIX); // All farmhand jobs
let subject = format!("{}.jobs.>", MESSAGE_PREFIX); // All farmhand jobs

// TODO: Make this ID dynamic so we can run more than one runner at a time
// Make sure not too make it too dynamic, as they are intended to be re-used
Expand Down
14 changes: 4 additions & 10 deletions src/bin/stream_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@
//! It intentionally does not do anything else

use anyhow::Result;
use farmhand::workers::{
self,
events::{EVENT_PREFIX, PRIMARY_STREAM},
};
use farmhand::workers::{self, events::MESSAGE_PREFIX};
use futures::StreamExt;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

Expand All @@ -22,19 +19,16 @@ async fn main() -> Result<()> {
tracing::debug!("Connecting to NATS server");
let nats_client = workers::create_nats_client().await?;
// Setup the Jetstream queue
let jq_name = std::env::args()
.nth(1)
.unwrap_or_else(|| PRIMARY_STREAM.to_string());
let queue = workers::Queue::connect(jq_name, nats_client)
let listener = workers::Stream::connect(nats_client)
.await
.expect("Failed to create worker queue");

// Get all events from the stream
let subject = format!("{}.>", EVENT_PREFIX); // All farmhand events
let subject = format!("{}.>", MESSAGE_PREFIX); // All farmhand events
let runner_name = "farmhand_listener_1".to_string();
tracing::info!("Listening for events {} on {}", subject, runner_name);
// Create the consumer to listen for events
let consumer = queue.create_consumer(Some(runner_name), subject).await?;
let consumer = listener.create_consumer(Some(runner_name), subject).await?;
loop {
let mut jobs = consumer.fetch().max_messages(20).messages().await?;
while let Some(job) = jobs.next().await {
Expand Down
29 changes: 19 additions & 10 deletions src/bin/up.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use farmhand::{
db,
workers::{
self,
events::{EVENT_PREFIX, PRIMARY_STREAM},
events::{EVENT_PREFIX, EVENT_STREAM, JOB_PREFIX, JOB_STREAM, MESSAGE_PREFIX},
},
};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
Expand Down Expand Up @@ -53,18 +53,27 @@ async fn init_project_nats() {
.await
.expect("Failed to connect to NATS");

// Create the job queue stream
let jq_desc = Some("All Farmhand events".to_string());
let all_events = format!("{}.>", EVENT_PREFIX);
let jq_subjects = vec![all_events];
workers::Queue::new(
PRIMARY_STREAM.to_string(),
jq_desc,
jq_subjects,
nats_client,
// Create the event stream
let all_events_subject = format!("{}.{}.>", MESSAGE_PREFIX, EVENT_PREFIX);
workers::Stream::new(
EVENT_STREAM.to_string(),
Some("All Farmhand events".to_string()),
vec![all_events_subject],
nats_client.clone(),
)
.await
.expect("Failed to create worker queue");

// Create the job stream
let all_jobs_subject = format!("{}.{}.>", MESSAGE_PREFIX, JOB_PREFIX);
workers::Queue::new(
JOB_STREAM.to_string(),
Some("All Farmhand jobs".to_string()),
vec![all_jobs_subject],
nats_client.clone(),
)
.await
.expect("Failed to create job queue");

tracing::info!("Successfully initialized NATS worker queue");
}
2 changes: 1 addition & 1 deletion src/error/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
pub mod queue;

pub use queue::QueueError;
pub use queue::{QueueError, StreamError};
6 changes: 6 additions & 0 deletions src/error/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,9 @@ pub enum QueueError {
#[error("Invalid Connection: {0}")]
InvalidConnection(String),
}

#[derive(Error, Debug)]
pub enum StreamError {
#[error("Invalid Connection: {0}")]
InvalidConnection(String),
}
28 changes: 0 additions & 28 deletions src/workers/events.rs

This file was deleted.

18 changes: 0 additions & 18 deletions src/workers/runner/chat.rs → src/workers/events/chat.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use anyhow::Result;
use serde::{Deserialize, Serialize};

use super::Runner;

#[derive(Deserialize, Serialize)]
pub struct ChatMessagePayload {
pub message: Message,
Expand Down Expand Up @@ -85,18 +82,3 @@ impl ChatMessagePayload {
}
}
}

/// A runner for processing chat messages
pub struct ChatMessageRunner;

impl Runner for ChatMessageRunner {
type Payload = ChatMessagePayload;

async fn process_job(&self, payload: Self::Payload) -> Result<()> {
tracing::debug!(
"Processing job with runner ChatMessageRunner for broadcaster: {broadcaster}",
broadcaster = payload.broadcaster_user_name
);
Ok(())
}
}
35 changes: 35 additions & 0 deletions src/workers/events/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use chat::ChatMessagePayload;

pub mod chat;
pub mod stream;
pub use stream::Stream;

/// Represents events we send and receive from NATS
/// Primarily used to get the appropriate subject name for an event
pub enum Event {
ChatMessage(ChatMessagePayload),
}

pub const MESSAGE_PREFIX: &str = "farmhand";
pub const EVENT_PREFIX: &str = "events";
pub const JOB_PREFIX: &str = "jobs";
pub const EVENT_STREAM: &str = "FARMHAND_EVENTS";
pub const JOB_STREAM: &str = "FARMHAND_JOBS";

impl Event {
pub fn get_subject(&self) -> String {
match self {
// farmhand.events.twitch.{broadcaster_name}.chat_message
Event::ChatMessage(payload) => format!(
"{}.{}.twitch.events.{}.chat_message",
MESSAGE_PREFIX, EVENT_PREFIX, payload.broadcaster_user_name
),
}
}
}

impl From<ChatMessagePayload> for Event {
fn from(payload: ChatMessagePayload) -> Self {
Event::ChatMessage(payload)
}
}
Loading