Skip to content

Commit

Permalink
Serialize event log db id with payload
Browse files Browse the repository at this point in the history
  • Loading branch information
sneakycrow committed Feb 24, 2025
1 parent 6746a73 commit ee8e99a
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 54 deletions.
79 changes: 49 additions & 30 deletions src/api/twitch/eventsub/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ pub async fn handle_webhook(
};

let stream_payload =
match serde_json::from_value::<StreamStatusPayload>(raw_payload.clone()) {
match serde_json::from_value::<StreamStatusPayload>(raw_payload) {
Ok(payload) => payload,
Err(err) => {
tracing::error!(
Expand Down Expand Up @@ -111,19 +111,31 @@ pub async fn handle_webhook(
.into_response();
};
// Save the stream to the database
if let Err(create_stream_err) =
Stream::create(user_account.user_id, start_time, &state.db).await
let stream = match Stream::create(user_account.user_id, start_time, &state.db)
.await
{
tracing::error!("Failed to create stream: {}", create_stream_err);
return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to create stream")
.into_response();
}
Ok(stream) => stream,
Err(err) => {
tracing::error!("Failed to create stream: {}", err);
return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to create stream")
.into_response();
}
};

// Lastly, publish the stream status event
let subject = Event::from(stream_payload).get_subject();
let event = Event::from(stream_payload).set_stream_db_id(stream.id);
let subject = event.get_subject();
let Ok(payload) = serde_json::to_string(&event) else {
tracing::error!("Failed to serialize event payload");
return (
StatusCode::INTERNAL_SERVER_ERROR,
"Failed to serialize event payload",
)
.into_response();
};
state
.event_stream
.publish(subject.to_string(), raw_payload.to_string()) // Pass the original payload so we can skip serialization
.publish(subject, payload)
.await
.map_err(|e| {
tracing::error!("Failed to publish stream status event: {}", e);
Expand All @@ -138,7 +150,7 @@ pub async fn handle_webhook(
};

let stream_payload =
match serde_json::from_value::<StreamStatusPayload>(raw_payload.clone()) {
match serde_json::from_value::<StreamStatusPayload>(raw_payload) {
Ok(payload) => payload,
Err(err) => {
tracing::error!(
Expand All @@ -160,6 +172,7 @@ pub async fn handle_webhook(
)
.into_response();
};
// Get the last active stream for the user
let Ok(last_active_stream) =
Stream::find_most_recent_active_by_user_id(user_account.user_id, &state.db)
.await
Expand All @@ -168,30 +181,36 @@ pub async fn handle_webhook(
return (StatusCode::BAD_REQUEST, "Failed to find last active stream")
.into_response();
};
match last_active_stream {
Some(mut stream) => {
let end_time = Utc::now();
if let Err(e) = stream.end_stream(end_time, &state.db).await {
tracing::error!("Failed to end stream: {}", e);
return (StatusCode::BAD_REQUEST, "Failed to end stream")
.into_response();
}
}
None => {
tracing::error!(
"Failed to find last active stream for user: {}",
user_account.user_id
);
return (StatusCode::BAD_REQUEST, "Failed to find last active stream")
.into_response();
}
}
// If there is not an active stream, something went wrong
let Some(mut stream) = last_active_stream else {
tracing::error!(
"Failed to find last active stream for user: {}",
user_account.user_id
);
return (StatusCode::BAD_REQUEST, "Failed to find last active stream")
.into_response();
};
// Update the end time of the stream
let end_time = Utc::now();
let Ok(stream) = stream.end_stream(end_time, &state.db).await else {
tracing::error!("Failed to end stream, could not update database");
return (StatusCode::BAD_REQUEST, "Failed to end stream").into_response();
};

// Lastly, publish the stream status event
let subject = Event::from(stream_payload).get_subject();
let event = Event::from(stream_payload).set_stream_db_id(stream.id);
let subject = event.get_subject();
let Ok(payload) = serde_json::to_string(&event) else {
tracing::error!("Failed to serialize stream status event");
return (
StatusCode::INTERNAL_SERVER_ERROR,
"Failed to serialize stream status event",
)
.into_response();
};
state
.event_stream
.publish(subject.to_string(), raw_payload.to_string()) // Pass the original payload so we can skip serialization
.publish(subject, payload)
.await
.map_err(|e| {
tracing::error!("Failed to publish stream status event: {}", e);
Expand Down
32 changes: 16 additions & 16 deletions src/db/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::{types::Uuid, PgPool};

#[derive(sqlx::FromRow, Debug, Serialize, Deserialize)]
#[derive(sqlx::FromRow, Debug, Serialize, Deserialize, Clone)]
pub struct Stream {
pub id: Uuid,
pub user_id: Uuid,
Expand Down Expand Up @@ -34,7 +34,7 @@ impl Stream {
user_id: Uuid,
start_time: DateTime<Utc>,
pool: &PgPool,
) -> Result<Stream, sqlx::Error> {
) -> Result<Self, sqlx::Error> {
let stream = Stream::new(user_id, start_time);

sqlx::query_as::<_, Stream>(
Expand All @@ -54,22 +54,22 @@ impl Stream {
}

/// Finds a stream by ID
pub async fn find_by_id(id: Uuid, pool: &PgPool) -> Result<Stream, sqlx::Error> {
sqlx::query_as::<_, Stream>("SELECT * FROM streams WHERE id = $1")
pub async fn find_by_id(id: Uuid, pool: &PgPool) -> Result<Self, sqlx::Error> {
sqlx::query_as::<_, Self>("SELECT * FROM streams WHERE id = $1")
.bind(id)
.fetch_one(pool)
.await
}

/// Finds all streams
pub async fn all(pool: &PgPool) -> Result<Vec<Stream>, sqlx::Error> {
sqlx::query_as::<_, Stream>("SELECT * FROM streams ORDER BY start_time DESC")
pub async fn all(pool: &PgPool) -> Result<Vec<Self>, sqlx::Error> {
sqlx::query_as::<_, Self>("SELECT * FROM streams ORDER BY start_time DESC")
.fetch_all(pool)
.await
}
/// Finds all streams for a specific user
pub async fn find_by_user_id(user_id: Uuid, pool: &PgPool) -> Result<Vec<Stream>, sqlx::Error> {
sqlx::query_as::<_, Stream>(
pub async fn find_by_user_id(user_id: Uuid, pool: &PgPool) -> Result<Vec<Self>, sqlx::Error> {
sqlx::query_as::<_, Self>(
"SELECT * FROM streams WHERE user_id = $1 ORDER BY start_time DESC",
)
.bind(user_id)
Expand All @@ -81,8 +81,8 @@ impl Stream {
pub async fn find_active_by_user_id(
user_id: Uuid,
pool: &PgPool,
) -> Result<Vec<Stream>, sqlx::Error> {
sqlx::query_as::<_, Stream>(
) -> Result<Vec<Self>, sqlx::Error> {
sqlx::query_as::<_, Self>(
"SELECT * FROM streams
WHERE user_id = $1 AND end_time IS NULL
ORDER BY start_time DESC",
Expand All @@ -96,8 +96,8 @@ impl Stream {
pub async fn find_most_recent_active_by_user_id(
user_id: Uuid,
pool: &PgPool,
) -> Result<Option<Stream>, sqlx::Error> {
sqlx::query_as::<_, Stream>(
) -> Result<Option<Self>, sqlx::Error> {
sqlx::query_as::<_, Self>(
"SELECT * FROM streams
WHERE user_id = $1 AND end_time IS NULL
ORDER BY start_time DESC
Expand All @@ -109,8 +109,8 @@ impl Stream {
}

/// Finds all active streams (no end time)
pub async fn find_active(pool: &PgPool) -> Result<Vec<Stream>, sqlx::Error> {
sqlx::query_as::<_, Stream>(
pub async fn find_active(pool: &PgPool) -> Result<Vec<Self>, sqlx::Error> {
sqlx::query_as::<_, Self>(
"SELECT * FROM streams WHERE end_time IS NULL ORDER BY start_time DESC",
)
.fetch_all(pool)
Expand All @@ -122,7 +122,7 @@ impl Stream {
&mut self,
end_time: DateTime<Utc>,
pool: &PgPool,
) -> Result<(), sqlx::Error> {
) -> Result<Self, sqlx::Error> {
sqlx::query(
"UPDATE streams
SET end_time = $1,
Expand All @@ -136,7 +136,7 @@ impl Stream {

self.end_time = Some(end_time);
self.updated_at = Utc::now();
Ok(())
Ok(self.clone())
}

/// Updates the stream's event log URL
Expand Down
34 changes: 26 additions & 8 deletions src/event/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
pub mod stream;
use serde::{Deserialize, Serialize};
pub use stream::Stream;
use uuid::Uuid;

use crate::twitch::{ChatMessagePayload, StreamStatusPayload};
pub use stream::EVENT_STREAM;
Expand All @@ -9,23 +11,29 @@ pub const EVENT_PREFIX: &str = "events";
pub const JOB_PREFIX: &str = "jobs";
pub const JOB_STREAM: &str = "FARMHAND_JOBS";

/// Represents events we send and receive from NATS
/// Primarily used to get the appropriate subject name for an event
pub enum Event {
#[derive(Serialize, Deserialize)]
pub struct Event {
payload: EventPayload,
stream_db_id: Option<Uuid>,
}

/// Represents event types we send and receive from NATS
#[derive(Serialize, Deserialize)]
pub enum EventPayload {
ChatMessage(ChatMessagePayload),
StreamStatus(StreamStatusPayload),
}

impl Event {
pub fn get_subject(&self) -> String {
let raw_subject = match self {
let raw_subject = match &self.payload {
// farmhand.events.twitch.{broadcaster_name}.chat_message
Event::ChatMessage(payload) => format!(
EventPayload::ChatMessage(payload) => format!(
"{}.{}.twitch.events.{}.chat_message",
MESSAGE_PREFIX, EVENT_PREFIX, payload.broadcaster_user_name
),
// farmhand.events.twitch.{broadcaster_name}.stream_status
Event::StreamStatus(payload) => {
EventPayload::StreamStatus(payload) => {
let status = if payload.started_at.is_some() {
"online"
} else {
Expand All @@ -40,16 +48,26 @@ impl Event {
// Make sure the subject is lowercase
raw_subject.to_lowercase()
}
pub fn set_stream_db_id(mut self, stream_db_id: Uuid) -> Self {
self.stream_db_id = Some(stream_db_id);
self
}
}

impl From<ChatMessagePayload> for Event {
fn from(payload: ChatMessagePayload) -> Self {
Event::ChatMessage(payload)
Event {
payload: EventPayload::ChatMessage(payload),
stream_db_id: None,
}
}
}

impl From<StreamStatusPayload> for Event {
fn from(payload: StreamStatusPayload) -> Self {
Event::StreamStatus(payload)
Event {
payload: EventPayload::StreamStatus(payload),
stream_db_id: None,
}
}
}

0 comments on commit ee8e99a

Please sign in to comment.