diff --git a/src/api/twitch/eventsub/callback.rs b/src/api/twitch/eventsub/callback.rs index 7046b79..3cc61dc 100644 --- a/src/api/twitch/eventsub/callback.rs +++ b/src/api/twitch/eventsub/callback.rs @@ -82,7 +82,7 @@ pub async fn handle_webhook( }; let stream_payload = - match serde_json::from_value::(raw_payload.clone()) { + match serde_json::from_value::(raw_payload) { Ok(payload) => payload, Err(err) => { tracing::error!( @@ -111,19 +111,31 @@ pub async fn handle_webhook( .into_response(); }; // Save the stream to the database - if let Err(create_stream_err) = - Stream::create(user_account.user_id, start_time, &state.db).await + let stream = match Stream::create(user_account.user_id, start_time, &state.db) + .await { - tracing::error!("Failed to create stream: {}", create_stream_err); - return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to create stream") - .into_response(); - } + Ok(stream) => stream, + Err(err) => { + tracing::error!("Failed to create stream: {}", err); + return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to create stream") + .into_response(); + } + }; // Lastly, publish the stream status event - let subject = Event::from(stream_payload).get_subject(); + let event = Event::from(stream_payload).set_stream_db_id(stream.id); + let subject = event.get_subject(); + let Ok(payload) = serde_json::to_string(&event) else { + tracing::error!("Failed to serialize event payload"); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + "Failed to serialize event payload", + ) + .into_response(); + }; state .event_stream - .publish(subject.to_string(), raw_payload.to_string()) // Pass the original payload so we can skip serialization + .publish(subject, payload) .await .map_err(|e| { tracing::error!("Failed to publish stream status event: {}", e); @@ -138,7 +150,7 @@ pub async fn handle_webhook( }; let stream_payload = - match serde_json::from_value::(raw_payload.clone()) { + match serde_json::from_value::(raw_payload) { Ok(payload) => payload, Err(err) => { tracing::error!( @@ -160,6 +172,7 @@ pub async fn handle_webhook( ) .into_response(); }; + // Get the last active stream for the user let Ok(last_active_stream) = Stream::find_most_recent_active_by_user_id(user_account.user_id, &state.db) .await @@ -168,30 +181,36 @@ pub async fn handle_webhook( return (StatusCode::BAD_REQUEST, "Failed to find last active stream") .into_response(); }; - match last_active_stream { - Some(mut stream) => { - let end_time = Utc::now(); - if let Err(e) = stream.end_stream(end_time, &state.db).await { - tracing::error!("Failed to end stream: {}", e); - return (StatusCode::BAD_REQUEST, "Failed to end stream") - .into_response(); - } - } - None => { - tracing::error!( - "Failed to find last active stream for user: {}", - user_account.user_id - ); - return (StatusCode::BAD_REQUEST, "Failed to find last active stream") - .into_response(); - } - } + // If there is not an active stream, something went wrong + let Some(mut stream) = last_active_stream else { + tracing::error!( + "Failed to find last active stream for user: {}", + user_account.user_id + ); + return (StatusCode::BAD_REQUEST, "Failed to find last active stream") + .into_response(); + }; + // Update the end time of the stream + let end_time = Utc::now(); + let Ok(stream) = stream.end_stream(end_time, &state.db).await else { + tracing::error!("Failed to end stream, could not update database"); + return (StatusCode::BAD_REQUEST, "Failed to end stream").into_response(); + }; // Lastly, publish the stream status event - let subject = Event::from(stream_payload).get_subject(); + let event = Event::from(stream_payload).set_stream_db_id(stream.id); + let subject = event.get_subject(); + let Ok(payload) = serde_json::to_string(&event) else { + tracing::error!("Failed to serialize stream status event"); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + "Failed to serialize stream status event", + ) + .into_response(); + }; state .event_stream - .publish(subject.to_string(), raw_payload.to_string()) // Pass the original payload so we can skip serialization + .publish(subject, payload) .await .map_err(|e| { tracing::error!("Failed to publish stream status event: {}", e); diff --git a/src/db/streams.rs b/src/db/streams.rs index 021cc5e..b6a8066 100644 --- a/src/db/streams.rs +++ b/src/db/streams.rs @@ -2,7 +2,7 @@ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use sqlx::{types::Uuid, PgPool}; -#[derive(sqlx::FromRow, Debug, Serialize, Deserialize)] +#[derive(sqlx::FromRow, Debug, Serialize, Deserialize, Clone)] pub struct Stream { pub id: Uuid, pub user_id: Uuid, @@ -34,7 +34,7 @@ impl Stream { user_id: Uuid, start_time: DateTime, pool: &PgPool, - ) -> Result { + ) -> Result { let stream = Stream::new(user_id, start_time); sqlx::query_as::<_, Stream>( @@ -54,22 +54,22 @@ impl Stream { } /// Finds a stream by ID - pub async fn find_by_id(id: Uuid, pool: &PgPool) -> Result { - sqlx::query_as::<_, Stream>("SELECT * FROM streams WHERE id = $1") + pub async fn find_by_id(id: Uuid, pool: &PgPool) -> Result { + sqlx::query_as::<_, Self>("SELECT * FROM streams WHERE id = $1") .bind(id) .fetch_one(pool) .await } /// Finds all streams - pub async fn all(pool: &PgPool) -> Result, sqlx::Error> { - sqlx::query_as::<_, Stream>("SELECT * FROM streams ORDER BY start_time DESC") + pub async fn all(pool: &PgPool) -> Result, sqlx::Error> { + sqlx::query_as::<_, Self>("SELECT * FROM streams ORDER BY start_time DESC") .fetch_all(pool) .await } /// Finds all streams for a specific user - pub async fn find_by_user_id(user_id: Uuid, pool: &PgPool) -> Result, sqlx::Error> { - sqlx::query_as::<_, Stream>( + pub async fn find_by_user_id(user_id: Uuid, pool: &PgPool) -> Result, sqlx::Error> { + sqlx::query_as::<_, Self>( "SELECT * FROM streams WHERE user_id = $1 ORDER BY start_time DESC", ) .bind(user_id) @@ -81,8 +81,8 @@ impl Stream { pub async fn find_active_by_user_id( user_id: Uuid, pool: &PgPool, - ) -> Result, sqlx::Error> { - sqlx::query_as::<_, Stream>( + ) -> Result, sqlx::Error> { + sqlx::query_as::<_, Self>( "SELECT * FROM streams WHERE user_id = $1 AND end_time IS NULL ORDER BY start_time DESC", @@ -96,8 +96,8 @@ impl Stream { pub async fn find_most_recent_active_by_user_id( user_id: Uuid, pool: &PgPool, - ) -> Result, sqlx::Error> { - sqlx::query_as::<_, Stream>( + ) -> Result, sqlx::Error> { + sqlx::query_as::<_, Self>( "SELECT * FROM streams WHERE user_id = $1 AND end_time IS NULL ORDER BY start_time DESC @@ -109,8 +109,8 @@ impl Stream { } /// Finds all active streams (no end time) - pub async fn find_active(pool: &PgPool) -> Result, sqlx::Error> { - sqlx::query_as::<_, Stream>( + pub async fn find_active(pool: &PgPool) -> Result, sqlx::Error> { + sqlx::query_as::<_, Self>( "SELECT * FROM streams WHERE end_time IS NULL ORDER BY start_time DESC", ) .fetch_all(pool) @@ -122,7 +122,7 @@ impl Stream { &mut self, end_time: DateTime, pool: &PgPool, - ) -> Result<(), sqlx::Error> { + ) -> Result { sqlx::query( "UPDATE streams SET end_time = $1, @@ -136,7 +136,7 @@ impl Stream { self.end_time = Some(end_time); self.updated_at = Utc::now(); - Ok(()) + Ok(self.clone()) } /// Updates the stream's event log URL diff --git a/src/event/mod.rs b/src/event/mod.rs index 5c87842..6a7be96 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -1,5 +1,7 @@ pub mod stream; +use serde::{Deserialize, Serialize}; pub use stream::Stream; +use uuid::Uuid; use crate::twitch::{ChatMessagePayload, StreamStatusPayload}; pub use stream::EVENT_STREAM; @@ -9,23 +11,29 @@ pub const EVENT_PREFIX: &str = "events"; pub const JOB_PREFIX: &str = "jobs"; pub const JOB_STREAM: &str = "FARMHAND_JOBS"; -/// Represents events we send and receive from NATS -/// Primarily used to get the appropriate subject name for an event -pub enum Event { +#[derive(Serialize, Deserialize)] +pub struct Event { + payload: EventPayload, + stream_db_id: Option, +} + +/// Represents event types we send and receive from NATS +#[derive(Serialize, Deserialize)] +pub enum EventPayload { ChatMessage(ChatMessagePayload), StreamStatus(StreamStatusPayload), } impl Event { pub fn get_subject(&self) -> String { - let raw_subject = match self { + let raw_subject = match &self.payload { // farmhand.events.twitch.{broadcaster_name}.chat_message - Event::ChatMessage(payload) => format!( + EventPayload::ChatMessage(payload) => format!( "{}.{}.twitch.events.{}.chat_message", MESSAGE_PREFIX, EVENT_PREFIX, payload.broadcaster_user_name ), // farmhand.events.twitch.{broadcaster_name}.stream_status - Event::StreamStatus(payload) => { + EventPayload::StreamStatus(payload) => { let status = if payload.started_at.is_some() { "online" } else { @@ -40,16 +48,26 @@ impl Event { // Make sure the subject is lowercase raw_subject.to_lowercase() } + pub fn set_stream_db_id(mut self, stream_db_id: Uuid) -> Self { + self.stream_db_id = Some(stream_db_id); + self + } } impl From for Event { fn from(payload: ChatMessagePayload) -> Self { - Event::ChatMessage(payload) + Event { + payload: EventPayload::ChatMessage(payload), + stream_db_id: None, + } } } impl From for Event { fn from(payload: StreamStatusPayload) -> Self { - Event::StreamStatus(payload) + Event { + payload: EventPayload::StreamStatus(payload), + stream_db_id: None, + } } }