diff --git a/justfile b/justfile index 5c38b95..82f6089 100644 --- a/justfile +++ b/justfile @@ -39,7 +39,7 @@ create-db: drop-db: sqlx database drop -mig_source := "crates/common/migrations" +mig_source := "./migrations" mig: migrate migrate: @@ -49,7 +49,7 @@ mig-add mig_name: sqlx migrate add {{ mig_name }} --source {{ mig_source }} revert: - sqlx migrate run --source {{ mig_source }} + sqlx migrate revert --source {{ mig_source }} # Utility commands sync: sync-web diff --git a/migrations/20250224032704_add_stream.down.sql b/migrations/20250224032704_add_stream.down.sql new file mode 100644 index 0000000..907b048 --- /dev/null +++ b/migrations/20250224032704_add_stream.down.sql @@ -0,0 +1,3 @@ +DROP TRIGGER IF EXISTS update_streams_updated_at ON streams; + +DROP TABLE IF EXISTS streams; diff --git a/migrations/20250224032704_add_stream.up.sql b/migrations/20250224032704_add_stream.up.sql new file mode 100644 index 0000000..80e438e --- /dev/null +++ b/migrations/20250224032704_add_stream.up.sql @@ -0,0 +1,27 @@ +CREATE TABLE IF NOT EXISTS streams ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid (), + user_id UUID NOT NULL REFERENCES users (id) ON DELETE CASCADE, + start_time TIMESTAMP + WITH + TIME ZONE NOT NULL, + end_time TIMESTAMP + WITH + TIME ZONE, + event_log_url TEXT, + video_url TEXT, + created_at TIMESTAMP + WITH + TIME ZONE NOT NULL DEFAULT NOW (), + updated_at TIMESTAMP + WITH + TIME ZONE NOT NULL DEFAULT NOW () +); + +-- Add an index on start_time for efficient querying +CREATE INDEX idx_streams_start_time ON streams (start_time); + +CREATE INDEX idx_streams_user_id ON streams (user_id); + +-- Create trigger using existing function +CREATE TRIGGER update_streams_updated_at BEFORE +UPDATE ON streams FOR EACH ROW EXECUTE FUNCTION update_updated_at_column (); diff --git a/src/api/app_state.rs b/src/api/app_state.rs index 161310d..8b973e9 100644 --- a/src/api/app_state.rs +++ b/src/api/app_state.rs @@ -1,9 +1,7 @@ use super::config::Config; use crate::{ - db::connect_to_database, + db::connect_to_database, event::Stream, nats::create_nats_client, queue::Queue, storage::s3::create_s3_client, - workers::events::Stream, - workers::{create_nats_client, Queue}, }; use sqlx::PgPool; diff --git a/src/api/twitch/eventsub/callback.rs b/src/api/twitch/eventsub/callback.rs index d64f73f..979cbd9 100644 --- a/src/api/twitch/eventsub/callback.rs +++ b/src/api/twitch/eventsub/callback.rs @@ -4,46 +4,20 @@ use axum::{ response::IntoResponse, }; use bytes::Bytes; -use chrono::{DateTime, Utc}; use hmac::{Hmac, Mac}; -use serde::{Deserialize, Serialize}; use sha2::Sha256; use std::sync::Arc; use crate::{ api::{app_state::AppState, routes::auth::oauth::twitch::TwitchCredentials}, - workers::events::{chat::ChatMessagePayload, Event}, + db::streams::Stream, + event::Event, + twitch::{subscription::Notification, ChatMessagePayload, StreamStatusPayload}, }; type HmacSha256 = Hmac; const HMAC_PREFIX: &str = "sha256="; -#[derive(Debug, Deserialize, Serialize)] -struct Subscription { - id: String, - status: String, - #[serde(rename = "type")] - event_type: String, - version: String, - cost: i32, - condition: serde_json::Value, - transport: Transport, - created_at: DateTime, -} - -#[derive(Debug, Deserialize, Serialize)] -struct Transport { - method: String, - callback: String, -} - -#[derive(Debug, Deserialize, Serialize)] -struct Notification { - subscription: Subscription, - event: Option, - challenge: Option, -} - pub async fn handle_webhook( State(state): State>, headers: HeaderMap, @@ -100,6 +74,74 @@ pub async fn handle_webhook( tracing::debug!("Event type: {}", notification.subscription.event_type); let notification_type = notification.subscription.event_type; match notification_type.as_str() { + "stream.online" | "stream.offline" => { + let Some(raw_payload) = notification.event else { + tracing::error!("Received stream status notification without event"); + return (StatusCode::BAD_REQUEST, "Missing event data").into_response(); + }; + + let stream_payload = + serde_json::from_value::(raw_payload.clone()); + let Ok(stream_payload) = stream_payload else { + tracing::error!("Failed to parse stream status notification"); + return (StatusCode::BAD_REQUEST, "Invalid event data").into_response(); + }; + + // If the stream is online, then we also want to start a new stream in the database for that user + if stream_payload.is_online() { + // Start by getting the user account by the payload + let Ok(user_account) = + stream_payload.find_broadcaster_account(&state.db).await + else { + tracing::error!("Failed to find broadcaster account"); + return ( + StatusCode::BAD_REQUEST, + "Failed to find broadcaster account", + ) + .into_response(); + }; + // Parse the start time from the payload + let Some(start_time) = stream_payload.event.started_at else { + tracing::error!("Failed to find stream start time"); + return (StatusCode::BAD_REQUEST, "Failed to find stream start time") + .into_response(); + }; + // Save the stream to the database + let Ok(_user_stream) = + Stream::create(user_account.user_id, start_time, &state.db).await + else { + tracing::error!("Failed to create stream"); + return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to create stream") + .into_response(); + }; + } + + // Lastly, publish the stream status event + let subject = Event::from(stream_payload).get_subject(); + state + .event_stream + .publish(subject.to_string(), raw_payload.to_string()) // Pass the original payload so we can skip serialization + .await + .map_err(|e| { + tracing::error!("Failed to publish stream status event: {}", e); + StatusCode::INTERNAL_SERVER_ERROR + }) + .expect("Failed to publish stream status event"); + } + "channel.follow" => { + return ( + StatusCode::NOT_IMPLEMENTED, + "Channel follow not implemented", + ) + .into_response() + } + "channel.subscribe" => { + return ( + StatusCode::NOT_IMPLEMENTED, + "Channel subscription not implemented", + ) + .into_response() + } "channel.chat.message" => { tracing::debug!("Channel chat message received"); // Pull the raw payload out of the notification diff --git a/src/bin/down.rs b/src/bin/down.rs index a847637..17a3ad0 100644 --- a/src/bin/down.rs +++ b/src/bin/down.rs @@ -1,8 +1,5 @@ use anyhow::Result; -use farmhand::{ - db, - workers::{create_nats_client, Queue, Stream}, -}; +use farmhand::{db, event::Stream, nats::create_nats_client, queue::Queue}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; #[tokio::main] diff --git a/src/bin/job_runner.rs b/src/bin/job_runner.rs index 3f7882b..8502aad 100644 --- a/src/bin/job_runner.rs +++ b/src/bin/job_runner.rs @@ -1,6 +1,10 @@ use anyhow::Result; use async_nats::jetstream::AckKind; -use farmhand::workers::{self, events::MESSAGE_PREFIX, queue::process_message}; +use farmhand::{ + event::MESSAGE_PREFIX, + nats::create_nats_client, + queue::{process_message, Queue}, +}; use futures::StreamExt; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -14,9 +18,9 @@ async fn main() -> Result<()> { .init(); // Connect to the stream tracing::debug!("Connecting to NATS server"); - let nats_client = workers::create_nats_client().await?; + let nats_client = create_nats_client().await?; tracing::debug!("Connecting to queue"); - let queue = workers::Queue::connect(nats_client) + let queue = Queue::connect(nats_client) .await .expect("Failed to create worker queue"); diff --git a/src/bin/stream_listener.rs b/src/bin/stream_listener.rs index 0c4cb5b..b15f77d 100644 --- a/src/bin/stream_listener.rs +++ b/src/bin/stream_listener.rs @@ -2,7 +2,10 @@ //! It intentionally does not do anything else use anyhow::Result; -use farmhand::workers::{self, events::MESSAGE_PREFIX}; +use farmhand::{ + event::{Stream, MESSAGE_PREFIX}, + nats::create_nats_client, +}; use futures::StreamExt; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -17,9 +20,9 @@ async fn main() -> Result<()> { .init(); // Create the NATS client tracing::debug!("Connecting to NATS server"); - let nats_client = workers::create_nats_client().await?; + let nats_client = create_nats_client().await?; // Setup the Jetstream queue - let listener = workers::Stream::connect(nats_client) + let listener = Stream::connect(nats_client) .await .expect("Failed to create worker queue"); diff --git a/src/bin/up.rs b/src/bin/up.rs index 90cc402..8fea707 100644 --- a/src/bin/up.rs +++ b/src/bin/up.rs @@ -1,10 +1,9 @@ use anyhow::Result; use farmhand::{ db, - workers::{ - self, - events::{EVENT_PREFIX, EVENT_STREAM, JOB_PREFIX, JOB_STREAM, MESSAGE_PREFIX}, - }, + event::{Stream, EVENT_PREFIX, EVENT_STREAM, JOB_PREFIX, JOB_STREAM, MESSAGE_PREFIX}, + nats::create_nats_client, + queue::Queue, }; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -49,13 +48,13 @@ async fn init_project_nats() { // Connect to the NATS server tracing::debug!("Connecting to NATS server"); - let nats_client = workers::create_nats_client() + let nats_client = create_nats_client() .await .expect("Failed to connect to NATS"); // Create the event stream let all_events_subject = format!("{}.{}.>", MESSAGE_PREFIX, EVENT_PREFIX); - workers::Stream::new( + Stream::new( EVENT_STREAM.to_string(), Some("All Farmhand events".to_string()), vec![all_events_subject], @@ -66,7 +65,7 @@ async fn init_project_nats() { // Create the job stream let all_jobs_subject = format!("{}.{}.>", MESSAGE_PREFIX, JOB_PREFIX); - workers::Queue::new( + Queue::new( JOB_STREAM.to_string(), Some("All Farmhand jobs".to_string()), vec![all_jobs_subject], diff --git a/src/db/mod.rs b/src/db/mod.rs index 9ce5587..66dbe1e 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -1,4 +1,5 @@ pub mod accounts; +pub mod streams; pub mod users; pub mod videos; diff --git a/src/db/streams.rs b/src/db/streams.rs new file mode 100644 index 0000000..6c89d4f --- /dev/null +++ b/src/db/streams.rs @@ -0,0 +1,162 @@ +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use sqlx::{types::Uuid, PgPool}; + +#[derive(sqlx::FromRow, Debug, Serialize, Deserialize)] +pub struct Stream { + pub id: Uuid, + pub user_id: Uuid, + pub start_time: DateTime, + pub end_time: Option>, + pub event_log_url: Option, + pub video_url: Option, + pub created_at: DateTime, + pub updated_at: DateTime, +} + +impl Stream { + /// Creates a new stream instance (not persisted) + pub fn new(user_id: Uuid, start_time: DateTime) -> Self { + Stream { + id: Uuid::new_v4(), + user_id, + start_time, + end_time: None, + event_log_url: None, + video_url: None, + created_at: Utc::now(), + updated_at: Utc::now(), + } + } + + /// Creates a new stream in the database + pub async fn create(user_id: Uuid, start_time: DateTime, pool: &PgPool) -> Result { + let stream = Stream::new(user_id, start_time); + + sqlx::query_as::<_, Stream>( + "INSERT INTO streams ( + id, start_time, end_time, event_log_url, video_url + ) VALUES ($1, $2, $3, $4, $5) + RETURNING *", + ) + .bind(stream.id) + .bind(stream.start_time) + .bind(stream.end_time) + .bind(&stream.event_log_url) + .bind(&stream.video_url) + .fetch_one(pool) + .await + } + + /// Finds a stream by ID + pub async fn find_by_id(id: Uuid, pool: &PgPool) -> Result { + sqlx::query_as::<_, Stream>("SELECT * FROM streams WHERE id = $1") + .bind(id) + .fetch_one(pool) + .await + } + + /// Finds all streams + pub async fn all(pool: &PgPool) -> Result, sqlx::Error> { + sqlx::query_as::<_, Stream>("SELECT * FROM streams ORDER BY start_time DESC") + .fetch_all(pool) + .await + } + /// Finds all streams for a specific user + pub async fn find_by_user_id(user_id: Uuid, pool: &PgPool) -> Result, sqlx::Error> { + sqlx::query_as::<_, Stream>( + "SELECT * FROM streams WHERE user_id = $1 ORDER BY start_time DESC" + ) + .bind(user_id) + .fetch_all(pool) + .await + } + + /// Finds active streams for a specific user + pub async fn find_active_by_user_id(user_id: Uuid, pool: &PgPool) -> Result, sqlx::Error> { + sqlx::query_as::<_, Stream>( + "SELECT * FROM streams + WHERE user_id = $1 AND end_time IS NULL + ORDER BY start_time DESC" + ) + .bind(user_id) + .fetch_all(pool) + .await + } + + /// Finds all active streams (no end time) + pub async fn find_active(pool: &PgPool) -> Result, sqlx::Error> { + sqlx::query_as::<_, Stream>( + "SELECT * FROM streams WHERE end_time IS NULL ORDER BY start_time DESC", + ) + .fetch_all(pool) + .await + } + + /// Updates the stream end time + pub async fn end_stream( + &mut self, + end_time: DateTime, + pool: &PgPool, + ) -> Result<(), sqlx::Error> { + sqlx::query( + "UPDATE streams + SET end_time = $1, + updated_at = CURRENT_TIMESTAMP + WHERE id = $2", + ) + .bind(end_time) + .bind(self.id) + .execute(pool) + .await?; + + self.end_time = Some(end_time); + self.updated_at = Utc::now(); + Ok(()) + } + + /// Updates the stream's event log URL + pub async fn set_event_log(&mut self, url: String, pool: &PgPool) -> Result<(), sqlx::Error> { + sqlx::query( + "UPDATE streams + SET event_log_url = $1, + updated_at = CURRENT_TIMESTAMP + WHERE id = $2", + ) + .bind(&url) + .bind(self.id) + .execute(pool) + .await?; + + self.event_log_url = Some(url); + self.updated_at = Utc::now(); + Ok(()) + } + + /// Updates the stream's video URL + pub async fn set_video(&mut self, url: String, pool: &PgPool) -> Result<(), sqlx::Error> { + sqlx::query( + "UPDATE streams + SET video_url = $1, + updated_at = CURRENT_TIMESTAMP + WHERE id = $2", + ) + .bind(&url) + .bind(self.id) + .execute(pool) + .await?; + + self.video_url = Some(url); + self.updated_at = Utc::now(); + Ok(()) + } + + /// Deletes a stream + pub async fn delete(&self, pool: &PgPool) -> Result<(), sqlx::Error> { + sqlx::query("DELETE FROM streams WHERE id = $1") + .bind(self.id) + .execute(pool) + .await?; + Ok(()) + } +} diff --git a/src/workers/events/mod.rs b/src/event/mod.rs similarity index 54% rename from src/workers/events/mod.rs rename to src/event/mod.rs index 3ebd482..257e35c 100644 --- a/src/workers/events/mod.rs +++ b/src/event/mod.rs @@ -1,21 +1,21 @@ -use chat::ChatMessagePayload; - -pub mod chat; pub mod stream; pub use stream::Stream; -/// Represents events we send and receive from NATS -/// Primarily used to get the appropriate subject name for an event -pub enum Event { - ChatMessage(ChatMessagePayload), -} +use crate::twitch::{ChatMessagePayload, StreamStatusPayload}; +pub use stream::EVENT_STREAM; pub const MESSAGE_PREFIX: &str = "farmhand"; pub const EVENT_PREFIX: &str = "events"; pub const JOB_PREFIX: &str = "jobs"; -pub const EVENT_STREAM: &str = "FARMHAND_EVENTS"; pub const JOB_STREAM: &str = "FARMHAND_JOBS"; +/// Represents events we send and receive from NATS +/// Primarily used to get the appropriate subject name for an event +pub enum Event { + ChatMessage(ChatMessagePayload), + StreamStatus(StreamStatusPayload), +} + impl Event { pub fn get_subject(&self) -> String { match self { @@ -24,6 +24,18 @@ impl Event { "{}.{}.twitch.events.{}.chat_message", MESSAGE_PREFIX, EVENT_PREFIX, payload.broadcaster_user_name ), + // farmhand.events.twitch.{broadcaster_name}.stream_status + Event::StreamStatus(payload) => { + let status = if payload.is_online() { + "online" + } else { + "offline" + }; + format!( + "{}.{}.twitch.events.{}.stream_{}", + MESSAGE_PREFIX, EVENT_PREFIX, payload.event.broadcaster_user_name, status + ) + } } } } @@ -33,3 +45,9 @@ impl From for Event { Event::ChatMessage(payload) } } + +impl From for Event { + fn from(payload: StreamStatusPayload) -> Self { + Event::StreamStatus(payload) + } +} diff --git a/src/workers/events/stream.rs b/src/event/stream.rs similarity index 97% rename from src/workers/events/stream.rs rename to src/event/stream.rs index bb92702..4dff28e 100644 --- a/src/workers/events/stream.rs +++ b/src/event/stream.rs @@ -7,7 +7,9 @@ use async_nats::{ Client, }; -use crate::{error::StreamError, workers::EVENT_STREAM}; +use crate::error::StreamError; + +pub const EVENT_STREAM: &str = "FARMHAND_EVENTS"; #[allow(dead_code)] /// TODO: Remove dead code annotation after implementing diff --git a/src/lib.rs b/src/lib.rs index dfc8cca..e9b1b5c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,12 @@ pub mod api; pub mod db; pub mod error; +pub mod event; +pub mod nats; pub mod prelude; +pub mod queue; pub mod storage; +pub mod vendors; pub mod vod; -pub mod workers; + +pub use vendors::twitch; diff --git a/src/workers/nats.rs b/src/nats/mod.rs similarity index 100% rename from src/workers/nats.rs rename to src/nats/mod.rs diff --git a/src/workers/queue/hls_stream.rs b/src/queue/hls_stream.rs similarity index 100% rename from src/workers/queue/hls_stream.rs rename to src/queue/hls_stream.rs diff --git a/src/workers/queue/legacy.rs b/src/queue/legacy.rs similarity index 100% rename from src/workers/queue/legacy.rs rename to src/queue/legacy.rs diff --git a/src/workers/queue/mod.rs b/src/queue/mod.rs similarity index 100% rename from src/workers/queue/mod.rs rename to src/queue/mod.rs diff --git a/src/workers/queue/queue.rs b/src/queue/queue.rs similarity index 98% rename from src/workers/queue/queue.rs rename to src/queue/queue.rs index 7de9f25..b6be895 100644 --- a/src/workers/queue/queue.rs +++ b/src/queue/queue.rs @@ -8,7 +8,7 @@ use async_nats::{ Client, }; -use crate::{error::QueueError, workers::JOB_STREAM}; +use crate::{error::QueueError, event::JOB_STREAM}; #[allow(dead_code)] /// TODO: Remove dead code annotation after implementing diff --git a/src/vendors/mod.rs b/src/vendors/mod.rs new file mode 100644 index 0000000..b41b76c --- /dev/null +++ b/src/vendors/mod.rs @@ -0,0 +1 @@ +pub mod twitch; diff --git a/src/workers/events/chat.rs b/src/vendors/twitch/chat.rs similarity index 100% rename from src/workers/events/chat.rs rename to src/vendors/twitch/chat.rs diff --git a/src/vendors/twitch/mod.rs b/src/vendors/twitch/mod.rs new file mode 100644 index 0000000..97dcaa7 --- /dev/null +++ b/src/vendors/twitch/mod.rs @@ -0,0 +1,6 @@ +pub mod chat; +pub mod stream; +pub mod subscription; + +pub use chat::ChatMessagePayload; +pub use stream::StreamStatusPayload; diff --git a/src/vendors/twitch/stream.rs b/src/vendors/twitch/stream.rs new file mode 100644 index 0000000..9a78362 --- /dev/null +++ b/src/vendors/twitch/stream.rs @@ -0,0 +1,41 @@ +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use sqlx::PgPool; + +use crate::db::accounts::Account; + +#[derive(Debug, Deserialize, Serialize)] +pub struct StreamStatusPayload { + pub subscription: super::subscription::Subscription, + pub event: StreamEvent, +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct StreamEvent { + #[serde(default)] + pub id: Option, + pub broadcaster_user_id: String, + pub broadcaster_user_login: String, + pub broadcaster_user_name: String, + #[serde(default, rename = "type")] + pub stream_type: Option, + #[serde(default)] + pub started_at: Option>, +} + +impl StreamStatusPayload { + /// Check if this is an online event + pub fn is_online(&self) -> bool { + self.subscription.event_type == "stream.online" + } + + /// Check if this is an offline event + pub fn is_offline(&self) -> bool { + self.subscription.event_type == "stream.offline" + } + + /// Find the associated user account based on the broadcaster ID + pub async fn find_broadcaster_account(&self, pool: &PgPool) -> Result { + Account::find_by_provider("twitch", &self.event.broadcaster_user_id, pool).await + } +} diff --git a/src/vendors/twitch/subscription.rs b/src/vendors/twitch/subscription.rs new file mode 100644 index 0000000..d0d36b3 --- /dev/null +++ b/src/vendors/twitch/subscription.rs @@ -0,0 +1,28 @@ +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Deserialize, Serialize)] +pub struct Subscription { + pub id: String, + pub status: String, + #[serde(rename = "type")] + pub event_type: String, + pub version: String, + pub cost: i32, + pub condition: serde_json::Value, + pub transport: Transport, + pub created_at: DateTime, +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct Transport { + pub method: String, + pub callback: String, +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct Notification { + pub subscription: Subscription, + pub event: Option, + pub challenge: Option, +} diff --git a/src/workers/mod.rs b/src/workers/mod.rs deleted file mode 100644 index 10761d1..0000000 --- a/src/workers/mod.rs +++ /dev/null @@ -1,9 +0,0 @@ -pub mod events; -pub mod nats; -pub mod queue; - -pub use nats::{create_nats_client, get_nats_url}; - -pub use events::Stream; -pub use events::{EVENT_STREAM, JOB_STREAM, MESSAGE_PREFIX}; -pub use queue::Queue;