Skip to content

Commit

Permalink
Save streams representation to the database (#59)
Browse files Browse the repository at this point in the history
* Add stream DB module + migration

* Add processing to other routes for twitch callback

* Fix rollback

* Add user FK to stream

* Move ChatMessagePayload to new vendors module

* Move NATS to its own module

* Move events to their own module

* Move queue to its own module

* Delete workers module

* Add event stream support for stream.online|offline

* Add support for creating stream in db on stream.online event

* Check for stream online event when saving new stream
  • Loading branch information
sneakycrow authored Feb 24, 2025
1 parent 706d32b commit ac3ebd9
Show file tree
Hide file tree
Showing 25 changed files with 400 additions and 72 deletions.
4 changes: 2 additions & 2 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ create-db:
drop-db:
sqlx database drop

mig_source := "crates/common/migrations"
mig_source := "./migrations"

mig: migrate
migrate:
Expand All @@ -49,7 +49,7 @@ mig-add mig_name:
sqlx migrate add {{ mig_name }} --source {{ mig_source }}

revert:
sqlx migrate run --source {{ mig_source }}
sqlx migrate revert --source {{ mig_source }}

# Utility commands
sync: sync-web
Expand Down
3 changes: 3 additions & 0 deletions migrations/20250224032704_add_stream.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
DROP TRIGGER IF EXISTS update_streams_updated_at ON streams;

DROP TABLE IF EXISTS streams;
27 changes: 27 additions & 0 deletions migrations/20250224032704_add_stream.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
CREATE TABLE IF NOT EXISTS streams (
id UUID PRIMARY KEY DEFAULT gen_random_uuid (),
user_id UUID NOT NULL REFERENCES users (id) ON DELETE CASCADE,
start_time TIMESTAMP
WITH
TIME ZONE NOT NULL,
end_time TIMESTAMP
WITH
TIME ZONE,
event_log_url TEXT,
video_url TEXT,
created_at TIMESTAMP
WITH
TIME ZONE NOT NULL DEFAULT NOW (),
updated_at TIMESTAMP
WITH
TIME ZONE NOT NULL DEFAULT NOW ()
);

-- Add an index on start_time for efficient querying
CREATE INDEX idx_streams_start_time ON streams (start_time);

CREATE INDEX idx_streams_user_id ON streams (user_id);

-- Create trigger using existing function
CREATE TRIGGER update_streams_updated_at BEFORE
UPDATE ON streams FOR EACH ROW EXECUTE FUNCTION update_updated_at_column ();
4 changes: 1 addition & 3 deletions src/api/app_state.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use super::config::Config;
use crate::{
db::connect_to_database,
db::connect_to_database, event::Stream, nats::create_nats_client, queue::Queue,
storage::s3::create_s3_client,
workers::events::Stream,
workers::{create_nats_client, Queue},
};
use sqlx::PgPool;

Expand Down
100 changes: 71 additions & 29 deletions src/api/twitch/eventsub/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,46 +4,20 @@ use axum::{
response::IntoResponse,
};
use bytes::Bytes;
use chrono::{DateTime, Utc};
use hmac::{Hmac, Mac};
use serde::{Deserialize, Serialize};
use sha2::Sha256;
use std::sync::Arc;

use crate::{
api::{app_state::AppState, routes::auth::oauth::twitch::TwitchCredentials},
workers::events::{chat::ChatMessagePayload, Event},
db::streams::Stream,
event::Event,
twitch::{subscription::Notification, ChatMessagePayload, StreamStatusPayload},
};

type HmacSha256 = Hmac<Sha256>;
const HMAC_PREFIX: &str = "sha256=";

#[derive(Debug, Deserialize, Serialize)]
struct Subscription {
id: String,
status: String,
#[serde(rename = "type")]
event_type: String,
version: String,
cost: i32,
condition: serde_json::Value,
transport: Transport,
created_at: DateTime<Utc>,
}

#[derive(Debug, Deserialize, Serialize)]
struct Transport {
method: String,
callback: String,
}

#[derive(Debug, Deserialize, Serialize)]
struct Notification {
subscription: Subscription,
event: Option<serde_json::Value>,
challenge: Option<String>,
}

pub async fn handle_webhook(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
Expand Down Expand Up @@ -100,6 +74,74 @@ pub async fn handle_webhook(
tracing::debug!("Event type: {}", notification.subscription.event_type);
let notification_type = notification.subscription.event_type;
match notification_type.as_str() {
"stream.online" | "stream.offline" => {
let Some(raw_payload) = notification.event else {
tracing::error!("Received stream status notification without event");
return (StatusCode::BAD_REQUEST, "Missing event data").into_response();
};

let stream_payload =
serde_json::from_value::<StreamStatusPayload>(raw_payload.clone());
let Ok(stream_payload) = stream_payload else {
tracing::error!("Failed to parse stream status notification");
return (StatusCode::BAD_REQUEST, "Invalid event data").into_response();
};

// If the stream is online, then we also want to start a new stream in the database for that user
if stream_payload.is_online() {
// Start by getting the user account by the payload
let Ok(user_account) =
stream_payload.find_broadcaster_account(&state.db).await
else {
tracing::error!("Failed to find broadcaster account");
return (
StatusCode::BAD_REQUEST,
"Failed to find broadcaster account",
)
.into_response();
};
// Parse the start time from the payload
let Some(start_time) = stream_payload.event.started_at else {
tracing::error!("Failed to find stream start time");
return (StatusCode::BAD_REQUEST, "Failed to find stream start time")
.into_response();
};
// Save the stream to the database
let Ok(_user_stream) =
Stream::create(user_account.user_id, start_time, &state.db).await
else {
tracing::error!("Failed to create stream");
return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to create stream")
.into_response();
};
}

// Lastly, publish the stream status event
let subject = Event::from(stream_payload).get_subject();
state
.event_stream
.publish(subject.to_string(), raw_payload.to_string()) // Pass the original payload so we can skip serialization
.await
.map_err(|e| {
tracing::error!("Failed to publish stream status event: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})
.expect("Failed to publish stream status event");
}
"channel.follow" => {
return (
StatusCode::NOT_IMPLEMENTED,
"Channel follow not implemented",
)
.into_response()
}
"channel.subscribe" => {
return (
StatusCode::NOT_IMPLEMENTED,
"Channel subscription not implemented",
)
.into_response()
}
"channel.chat.message" => {
tracing::debug!("Channel chat message received");
// Pull the raw payload out of the notification
Expand Down
5 changes: 1 addition & 4 deletions src/bin/down.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use anyhow::Result;
use farmhand::{
db,
workers::{create_nats_client, Queue, Stream},
};
use farmhand::{db, event::Stream, nats::create_nats_client, queue::Queue};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

#[tokio::main]
Expand Down
10 changes: 7 additions & 3 deletions src/bin/job_runner.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use anyhow::Result;
use async_nats::jetstream::AckKind;
use farmhand::workers::{self, events::MESSAGE_PREFIX, queue::process_message};
use farmhand::{
event::MESSAGE_PREFIX,
nats::create_nats_client,
queue::{process_message, Queue},
};
use futures::StreamExt;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

Expand All @@ -14,9 +18,9 @@ async fn main() -> Result<()> {
.init();
// Connect to the stream
tracing::debug!("Connecting to NATS server");
let nats_client = workers::create_nats_client().await?;
let nats_client = create_nats_client().await?;
tracing::debug!("Connecting to queue");
let queue = workers::Queue::connect(nats_client)
let queue = Queue::connect(nats_client)
.await
.expect("Failed to create worker queue");

Expand Down
9 changes: 6 additions & 3 deletions src/bin/stream_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
//! It intentionally does not do anything else
use anyhow::Result;
use farmhand::workers::{self, events::MESSAGE_PREFIX};
use farmhand::{
event::{Stream, MESSAGE_PREFIX},
nats::create_nats_client,
};
use futures::StreamExt;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

Expand All @@ -17,9 +20,9 @@ async fn main() -> Result<()> {
.init();
// Create the NATS client
tracing::debug!("Connecting to NATS server");
let nats_client = workers::create_nats_client().await?;
let nats_client = create_nats_client().await?;
// Setup the Jetstream queue
let listener = workers::Stream::connect(nats_client)
let listener = Stream::connect(nats_client)
.await
.expect("Failed to create worker queue");

Expand Down
13 changes: 6 additions & 7 deletions src/bin/up.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use anyhow::Result;
use farmhand::{
db,
workers::{
self,
events::{EVENT_PREFIX, EVENT_STREAM, JOB_PREFIX, JOB_STREAM, MESSAGE_PREFIX},
},
event::{Stream, EVENT_PREFIX, EVENT_STREAM, JOB_PREFIX, JOB_STREAM, MESSAGE_PREFIX},
nats::create_nats_client,
queue::Queue,
};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

Expand Down Expand Up @@ -49,13 +48,13 @@ async fn init_project_nats() {

// Connect to the NATS server
tracing::debug!("Connecting to NATS server");
let nats_client = workers::create_nats_client()
let nats_client = create_nats_client()
.await
.expect("Failed to connect to NATS");

// Create the event stream
let all_events_subject = format!("{}.{}.>", MESSAGE_PREFIX, EVENT_PREFIX);
workers::Stream::new(
Stream::new(
EVENT_STREAM.to_string(),
Some("All Farmhand events".to_string()),
vec![all_events_subject],
Expand All @@ -66,7 +65,7 @@ async fn init_project_nats() {

// Create the job stream
let all_jobs_subject = format!("{}.{}.>", MESSAGE_PREFIX, JOB_PREFIX);
workers::Queue::new(
Queue::new(
JOB_STREAM.to_string(),
Some("All Farmhand jobs".to_string()),
vec![all_jobs_subject],
Expand Down
1 change: 1 addition & 0 deletions src/db/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod accounts;
pub mod streams;
pub mod users;
pub mod videos;

Expand Down
Loading

0 comments on commit ac3ebd9

Please sign in to comment.