Skip to content

Commit

Permalink
Fix Save Stream + Add End Stream logic (#60)
Browse files Browse the repository at this point in the history
* Fix parsing for started at timestamp

* Make stream_payload more debuggable

* Separate online from offline further, obscure subscription wrapper

* Add debugging for save stream

* Make sure create method passes user_id

* Add logic to add end time to stream for stream.offline event

* Fix lookup by account id rather than user id

* Make sure to lowercase-ify the subject
  • Loading branch information
sneakycrow authored Feb 24, 2025
1 parent ac3ebd9 commit 6746a73
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 56 deletions.
131 changes: 101 additions & 30 deletions src/api/twitch/eventsub/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use axum::{
response::IntoResponse,
};
use bytes::Bytes;
use chrono::Utc;
use hmac::{Hmac, Mac};
use sha2::Sha256;
use std::sync::Arc;
Expand Down Expand Up @@ -74,46 +75,116 @@ pub async fn handle_webhook(
tracing::debug!("Event type: {}", notification.subscription.event_type);
let notification_type = notification.subscription.event_type;
match notification_type.as_str() {
"stream.online" | "stream.offline" => {
"stream.online" => {
let Some(raw_payload) = notification.event else {
tracing::error!("Received stream status notification without event");
return (StatusCode::BAD_REQUEST, "Missing event data").into_response();
};

let stream_payload =
serde_json::from_value::<StreamStatusPayload>(raw_payload.clone());
let Ok(stream_payload) = stream_payload else {
tracing::error!("Failed to parse stream status notification");
return (StatusCode::BAD_REQUEST, "Invalid event data").into_response();
match serde_json::from_value::<StreamStatusPayload>(raw_payload.clone()) {
Ok(payload) => payload,
Err(err) => {
tracing::error!(
"Failed to parse stream status notification: {}",
err
);
return (StatusCode::BAD_REQUEST, "Invalid event data")
.into_response();
}
};

// Start by getting the user account by the payload
let Ok(user_account) = stream_payload.find_broadcaster_account(&state.db).await
else {
tracing::error!("Failed to find broadcaster account");
return (
StatusCode::BAD_REQUEST,
"Failed to find broadcaster account",
)
.into_response();
};
// Parse the start time from the payload
let Some(start_time) = stream_payload.started_at() else {
tracing::error!("Failed to find stream start time");
return (StatusCode::BAD_REQUEST, "Failed to find stream start time")
.into_response();
};
// Save the stream to the database
if let Err(create_stream_err) =
Stream::create(user_account.user_id, start_time, &state.db).await
{
tracing::error!("Failed to create stream: {}", create_stream_err);
return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to create stream")
.into_response();
}

// If the stream is online, then we also want to start a new stream in the database for that user
if stream_payload.is_online() {
// Start by getting the user account by the payload
let Ok(user_account) =
stream_payload.find_broadcaster_account(&state.db).await
else {
tracing::error!("Failed to find broadcaster account");
return (
StatusCode::BAD_REQUEST,
"Failed to find broadcaster account",
)
.into_response();
};
// Parse the start time from the payload
let Some(start_time) = stream_payload.event.started_at else {
tracing::error!("Failed to find stream start time");
return (StatusCode::BAD_REQUEST, "Failed to find stream start time")
.into_response();
// Lastly, publish the stream status event
let subject = Event::from(stream_payload).get_subject();
state
.event_stream
.publish(subject.to_string(), raw_payload.to_string()) // Pass the original payload so we can skip serialization
.await
.map_err(|e| {
tracing::error!("Failed to publish stream status event: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})
.expect("Failed to publish stream status event");
}
"stream.offline" => {
let Some(raw_payload) = notification.event else {
tracing::error!("Received stream status notification without event");
return (StatusCode::BAD_REQUEST, "Missing event data").into_response();
};

let stream_payload =
match serde_json::from_value::<StreamStatusPayload>(raw_payload.clone()) {
Ok(payload) => payload,
Err(err) => {
tracing::error!(
"Failed to parse stream status notification: {}",
err
);
return (StatusCode::BAD_REQUEST, "Invalid event data")
.into_response();
}
};
// Save the stream to the database
let Ok(_user_stream) =
Stream::create(user_account.user_id, start_time, &state.db).await
else {
tracing::error!("Failed to create stream");
return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to create stream")

// Start by getting the user account by the payload
let Ok(user_account) = stream_payload.find_broadcaster_account(&state.db).await
else {
tracing::error!("Failed to find broadcaster account");
return (
StatusCode::BAD_REQUEST,
"Failed to find broadcaster account",
)
.into_response();
};
let Ok(last_active_stream) =
Stream::find_most_recent_active_by_user_id(user_account.user_id, &state.db)
.await
else {
tracing::error!("Failed to find last active stream");
return (StatusCode::BAD_REQUEST, "Failed to find last active stream")
.into_response();
};
match last_active_stream {
Some(mut stream) => {
let end_time = Utc::now();
if let Err(e) = stream.end_stream(end_time, &state.db).await {
tracing::error!("Failed to end stream: {}", e);
return (StatusCode::BAD_REQUEST, "Failed to end stream")
.into_response();
}
}
None => {
tracing::error!(
"Failed to find last active stream for user: {}",
user_account.user_id
);
return (StatusCode::BAD_REQUEST, "Failed to find last active stream")
.into_response();
};
}
}

// Lastly, publish the stream status event
Expand Down
36 changes: 30 additions & 6 deletions src/db/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,21 @@ impl Stream {
}

/// Creates a new stream in the database
pub async fn create(user_id: Uuid, start_time: DateTime<Utc>, pool: &PgPool) -> Result<Stream, sqlx::Error> {
pub async fn create(
user_id: Uuid,
start_time: DateTime<Utc>,
pool: &PgPool,
) -> Result<Stream, sqlx::Error> {
let stream = Stream::new(user_id, start_time);

sqlx::query_as::<_, Stream>(
"INSERT INTO streams (
id, start_time, end_time, event_log_url, video_url
) VALUES ($1, $2, $3, $4, $5)
id, user_id, start_time, end_time, event_log_url, video_url
) VALUES ($1, $2, $3, $4, $5, $6)
RETURNING *",
)
.bind(stream.id)
.bind(stream.user_id)
.bind(stream.start_time)
.bind(stream.end_time)
.bind(&stream.event_log_url)
Expand All @@ -65,25 +70,44 @@ impl Stream {
/// Finds all streams for a specific user
pub async fn find_by_user_id(user_id: Uuid, pool: &PgPool) -> Result<Vec<Stream>, sqlx::Error> {
sqlx::query_as::<_, Stream>(
"SELECT * FROM streams WHERE user_id = $1 ORDER BY start_time DESC"
"SELECT * FROM streams WHERE user_id = $1 ORDER BY start_time DESC",
)
.bind(user_id)
.fetch_all(pool)
.await
}

/// Finds active streams for a specific user
pub async fn find_active_by_user_id(user_id: Uuid, pool: &PgPool) -> Result<Vec<Stream>, sqlx::Error> {
pub async fn find_active_by_user_id(
user_id: Uuid,
pool: &PgPool,
) -> Result<Vec<Stream>, sqlx::Error> {
sqlx::query_as::<_, Stream>(
"SELECT * FROM streams
WHERE user_id = $1 AND end_time IS NULL
ORDER BY start_time DESC"
ORDER BY start_time DESC",
)
.bind(user_id)
.fetch_all(pool)
.await
}

/// Gets the most recent active stream for a user
pub async fn find_most_recent_active_by_user_id(
user_id: Uuid,
pool: &PgPool,
) -> Result<Option<Stream>, sqlx::Error> {
sqlx::query_as::<_, Stream>(
"SELECT * FROM streams
WHERE user_id = $1 AND end_time IS NULL
ORDER BY start_time DESC
LIMIT 1",
)
.bind(user_id)
.fetch_optional(pool)
.await
}

/// Finds all active streams (no end time)
pub async fn find_active(pool: &PgPool) -> Result<Vec<Stream>, sqlx::Error> {
sqlx::query_as::<_, Stream>(
Expand Down
10 changes: 6 additions & 4 deletions src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,27 @@ pub enum Event {

impl Event {
pub fn get_subject(&self) -> String {
match self {
let raw_subject = match self {
// farmhand.events.twitch.{broadcaster_name}.chat_message
Event::ChatMessage(payload) => format!(
"{}.{}.twitch.events.{}.chat_message",
MESSAGE_PREFIX, EVENT_PREFIX, payload.broadcaster_user_name
),
// farmhand.events.twitch.{broadcaster_name}.stream_status
Event::StreamStatus(payload) => {
let status = if payload.is_online() {
let status = if payload.started_at.is_some() {
"online"
} else {
"offline"
};
format!(
"{}.{}.twitch.events.{}.stream_{}",
MESSAGE_PREFIX, EVENT_PREFIX, payload.event.broadcaster_user_name, status
MESSAGE_PREFIX, EVENT_PREFIX, payload.broadcaster_user_name, status
)
}
}
};
// Make sure the subject is lowercase
raw_subject.to_lowercase()
}
}

Expand Down
24 changes: 8 additions & 16 deletions src/vendors/twitch/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,6 @@ use crate::db::accounts::Account;

#[derive(Debug, Deserialize, Serialize)]
pub struct StreamStatusPayload {
pub subscription: super::subscription::Subscription,
pub event: StreamEvent,
}

#[derive(Debug, Deserialize, Serialize)]
pub struct StreamEvent {
#[serde(default)]
pub id: Option<String>,
pub broadcaster_user_id: String,
Expand All @@ -20,22 +14,20 @@ pub struct StreamEvent {
#[serde(default, rename = "type")]
pub stream_type: Option<String>,
#[serde(default)]
pub started_at: Option<DateTime<Utc>>,
pub started_at: Option<String>,
}

impl StreamStatusPayload {
/// Check if this is an online event
pub fn is_online(&self) -> bool {
self.subscription.event_type == "stream.online"
}

/// Check if this is an offline event
pub fn is_offline(&self) -> bool {
self.subscription.event_type == "stream.offline"
/// Get the started timestamp as DateTime if available
pub fn started_at(&self) -> Option<DateTime<Utc>> {
self.started_at
.as_ref()
.and_then(|ts| DateTime::parse_from_rfc3339(ts).ok())
.map(|dt| dt.with_timezone(&Utc))
}

/// Find the associated user account based on the broadcaster ID
pub async fn find_broadcaster_account(&self, pool: &PgPool) -> Result<Account, sqlx::Error> {
Account::find_by_provider("twitch", &self.event.broadcaster_user_id, pool).await
Account::find_by_provider("twitch", &self.broadcaster_user_id, pool).await
}
}

0 comments on commit 6746a73

Please sign in to comment.