diff --git a/.env-sample b/.env-sample index d99b6965..959225e4 100644 --- a/.env-sample +++ b/.env-sample @@ -10,7 +10,7 @@ SERVER_PORT=8080 # Watcher TESTNET=true -# Tesnet bootstrap IP:PORT +# Testnet bootstrap IP:PORT BOOTSTRAP=127.0.0.1:6881 # Homeserver ID. Needed for event streams. HOMESERVER= diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e2f59875..863a4420 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -69,7 +69,7 @@ jobs: sleep 10 # Give the service a moment to start - name: Run integration tests - run: cargo nextest run -no-fail-fast + run: cargo nextest run --no-fail-fast - name: Show service logs if tests fail if: failure() diff --git a/Cargo.lock b/Cargo.lock index 17d86b76..29f453e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -154,13 +154,13 @@ checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" [[package]] name = "async-trait" -version = "0.1.85" +version = "0.1.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f934833b4b7233644e5848f235df3f57ed8c80f1528a26c3dfa13d2147fa056" +checksum = "644dd749086bf3771a2fbc5f256fdb982d53f011c7d5d560304eafeecebce79d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.98", ] [[package]] @@ -328,7 +328,7 @@ checksum = "57d123550fa8d071b7255cb0cc04dc302baa6c8c4a79f55701552684d8399bce" dependencies = [ "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.98", ] [[package]] @@ -471,9 +471,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.9.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "325918d6fe32f23b19878fe4b34794ae41fc19ddbe53b10571a4874d44ffd39b" +checksum = "f61dac84819c6588b558454b194026eb1f09c293b9036ae9b159e74e73ab6cf9" dependencies = [ "serde", ] @@ -486,9 +486,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.2.10" +version = "1.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13208fcbb66eaeffe09b99fffbe1af420f00a7b35aa99ad683dfc1aa76145229" +checksum = "e4730490333d58093109dc02c23174c3f4d490998c3fed3cc8e82d57afedb9cf" dependencies = [ "shlex", ] @@ -580,9 +580,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.27" +version = "4.5.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "769b0145982b4b48713e01ec42d61614425f27b7058bda7180a3a41f30104796" +checksum = "3e77c3243bd94243c03672cb5154667347c457ca271254724f9f393aee1c05ff" dependencies = [ "clap_builder", "clap_derive", @@ -602,14 +602,14 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.24" +version = "4.5.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54b755194d6389280185988721fffba69495eed5ee9feeee9a599b53db80318c" +checksum = "bf4ced95c6f4a675af3da73304b9ac4ed991640c36374e4b46795c49e17cf1ed" dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.98", ] [[package]] @@ -899,7 +899,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.98", ] [[package]] @@ -972,7 +972,7 @@ checksum = "30542c1ad912e0e3d22a1935c290e12e8a29d704a420177a31faad4a601a0800" dependencies = [ "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.98", ] [[package]] @@ -1015,7 +1015,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.98", ] [[package]] @@ -1044,9 +1044,9 @@ dependencies = [ [[package]] name = "dyn-clone" -version = "1.0.17" +version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d6ef0072f8a535281e4876be788938b528e9a1d43900b82c2569af7da799125" +checksum = "feeef44e73baff3a26d371801df019877a9866a8c493d315ab00177843314f35" [[package]] name = "ed25519" @@ -1283,7 +1283,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.98", ] [[package]] @@ -1877,7 +1877,7 @@ checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.98", ] [[package]] @@ -2223,7 +2223,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53a0d57c55d2d1dc62a2b1d16a0a1079eb78d67c36bdf468d582ab4482ec7002" dependencies = [ "quote", - "syn 2.0.96", + "syn 2.0.98", ] [[package]] @@ -2328,9 +2328,9 @@ checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" [[package]] name = "openssl" -version = "0.10.69" +version = "0.10.70" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5e534d133a060a3c19daec1eb3e98ec6f4685978834f2dbadfe2ec215bab64e" +checksum = "61cfb4e166a8bb8c9b55c500bc2308550148ece889be90f609377e58140f42c6" dependencies = [ "bitflags", "cfg-if", @@ -2349,7 +2349,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.98", ] [[package]] @@ -2360,9 +2360,9 @@ checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" [[package]] name = "openssl-sys" -version = "0.9.104" +version = "0.9.105" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45abf306cbf99debc8195b66b7346498d7b10c210de50418b5ccd7ceba08c741" +checksum = "8b22d5b84be05a8d6947c7cb71f7c849aa0f112acd4bf51c2a7c1c988ac0a9dc" dependencies = [ "cc", "libc", @@ -2487,7 +2487,7 @@ dependencies = [ "phf_shared", "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.98", ] [[package]] @@ -2501,22 +2501,22 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.1.8" +version = "1.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e2ec53ad785f4d35dac0adea7f7dc6f1bb277ad84a680c7afefeae05d1f5916" +checksum = "dfe2e71e1471fe07709406bf725f710b02927c9c54b2b5b2ec0e8087d97c327d" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.8" +version = "1.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d56a66c0c55993aa927429d0f8a0abfd74f084e4d9c192cffed01e418d83eefb" +checksum = "f6e859e6e5bd50440ab63c47e3ebabc90f26251f7c73c3d3e837b74a1cc3fa67" dependencies = [ "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.98", ] [[package]] @@ -2730,7 +2730,7 @@ dependencies = [ [[package]] name = "pubky-app-specs" version = "0.3.0" -source = "git+https://github.com/pubky/pubky-app-specs#9278a996d0804cd5b15d58dae9fc606ea0f4977c" +source = "git+https://github.com/pubky/pubky-app-specs#7300263e79a610725800a2ec17c1c200b1c87594" dependencies = [ "base32", "blake3", @@ -3204,7 +3204,7 @@ dependencies = [ "proc-macro2", "quote", "rust-embed-utils", - "syn 2.0.96", + "syn 2.0.98", "walkdir", ] @@ -3435,7 +3435,7 @@ checksum = "5a9bf7cf98d04a2b28aead066b7496853d4779c9cc183c440dbac457641e19a0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.98", ] [[package]] @@ -3647,9 +3647,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.96" +version = "2.0.98" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d5d0adab1ae378d7f53bdebc67a39f1f151407ef230f0ce2883572f5d8985c80" +checksum = "36147f1a48ae0ec2b5b3bc5b537d267457555a10dc06f3dbc8cb11ba3006d3b1" dependencies = [ "proc-macro2", "quote", @@ -3682,7 +3682,7 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" dependencies = [ "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.98", ] [[package]] @@ -3746,7 +3746,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.98", ] [[package]] @@ -3757,7 +3757,7 @@ checksum = "26afc1baea8a989337eeb52b6e72a039780ce45c3edfcc9c5b9d112feeb173c2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.98", ] [[package]] @@ -3862,7 +3862,7 @@ checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.98", ] [[package]] @@ -3904,7 +3904,7 @@ checksum = "1fe49a94e3a984b0d0ab97343dc3dcd52baae1ee13f005bfad39faea47d051dc" dependencies = [ "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.98", ] [[package]] @@ -4075,7 +4075,7 @@ checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.98", ] [[package]] @@ -4210,7 +4210,7 @@ source = "git+https://github.com/juhaku/utoipa?rev=d522f744259dc4fde5f45d187983f dependencies = [ "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.98", ] [[package]] @@ -4304,7 +4304,7 @@ dependencies = [ "log", "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.98", "wasm-bindgen-shared", ] @@ -4339,7 +4339,7 @@ checksum = "8ae87ea40c9f689fc23f209965b6fb8a99ad69aeeb0231408be24920604395de" dependencies = [ "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.98", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -4536,9 +4536,9 @@ checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" [[package]] name = "winnow" -version = "0.7.0" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e49d2d35d3fad69b39b94139037ecfb4f359f08958b9c11e7315ce770462419" +checksum = "86e376c75f4f43f44db463cf729e0d3acbf954d13e22c51e26e4c264b4ab545f" dependencies = [ "memchr", ] @@ -4584,7 +4584,7 @@ checksum = "2380878cad4ac9aac1e2435f3eb4020e8374b5f13c296cb75b4620ff8e229154" dependencies = [ "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.98", "synstructure", ] @@ -4606,7 +4606,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.98", ] [[package]] @@ -4626,7 +4626,7 @@ checksum = "595eed982f7d355beb85837f651fa22e90b3c044842dc7f2c2842c086f295808" dependencies = [ "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.98", "synstructure", ] @@ -4655,7 +4655,7 @@ checksum = "6eafa6dfb17584ea3e2bd6e76e0cc15ad7af12b09abdd1ca55961bed9b1063c6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.98", ] [[package]] diff --git a/benches/watcher.rs b/benches/watcher.rs index 3fc44d13..536c506d 100644 --- a/benches/watcher.rs +++ b/benches/watcher.rs @@ -73,10 +73,14 @@ fn bench_create_delete_user(c: &mut Criterion) { let (_, homeserver_url) = rt.block_on(create_homeserver_with_events()); c.bench_function("create_delete_homeserver_user", |b| { - b.to_async(&rt).iter(|| async { - // Benchmark the event processor initialization and run - let mut event_processor = EventProcessor::test(homeserver_url.clone()).await; - event_processor.run().await.unwrap(); + b.to_async(&rt).iter(|| { + // Clone the sender for each iteration + let homeserver_url_clone = homeserver_url.clone(); + async move { + // Benchmark the event processor initialization and run + let mut event_processor = EventProcessor::test(homeserver_url_clone).await; + event_processor.run().await.unwrap(); + } }); }); } diff --git a/examples/from_file.rs b/examples/from_file.rs index 8171b70b..5b7e57bc 100644 --- a/examples/from_file.rs +++ b/examples/from_file.rs @@ -17,7 +17,6 @@ async fn main() -> Result<(), DynError> { let mut event_processor = EventProcessor::from_config(&config).await?; let events = read_events_from_file().unwrap(); - event_processor.process_event_lines(events).await?; Ok(()) diff --git a/src/db/connectors/pubky.rs b/src/db/connectors/pubky.rs index 690c91e3..98ea92c9 100644 --- a/src/db/connectors/pubky.rs +++ b/src/db/connectors/pubky.rs @@ -2,76 +2,60 @@ use crate::Config; use mainline::Testnet; use pubky::Client; use std::sync::Arc; +use thiserror::Error; use tokio::sync::OnceCell; -static PUBKY_CONNECTOR_SINGLETON: OnceCell = OnceCell::const_new(); +static PUBKY_CONNECTOR_SINGLETON: OnceCell> = OnceCell::const_new(); -#[derive(Debug, Clone)] -pub struct PubkyConnector { - pubky_client: Arc, -} - -#[derive(Debug)] +#[derive(Debug, Error)] pub enum PubkyConnectorError { + #[error("PubkyConnector has already been initialized")] AlreadyInitialized, + + #[error("PubkyConnector not initialized")] NotInitialized, - IoError(std::io::Error), -} -impl From for PubkyConnectorError { - fn from(e: std::io::Error) -> Self { - PubkyConnectorError::IoError(e) - } -} + #[error("I/O error: {0}")] + IoError(#[from] std::io::Error), -impl std::fmt::Display for PubkyConnectorError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - PubkyConnectorError::AlreadyInitialized => { - write!(f, "PubkyConnector has already been initialized") - } - PubkyConnectorError::NotInitialized => write!(f, "PubkyConnector not initialized"), - PubkyConnectorError::IoError(e) => write!(f, "I/O error: {}", e), - } - } + #[error("Client initialization error: {0}")] + ClientError(String), } -impl std::error::Error for PubkyConnectorError {} +pub struct PubkyConnector; impl PubkyConnector { /// Initializes the PubkyConnector singleton with the given configuration - pub fn initialise( + pub async fn initialise( config: &Config, testnet: Option<&Testnet>, ) -> Result<(), PubkyConnectorError> { - // There is not need to initialise, already in the global context - if PUBKY_CONNECTOR_SINGLETON.get().is_some() { - return Ok(()); - } - let pubky_client = match testnet { - Some(testnet) => Client::builder().testnet(testnet).build()?, - None => match config.testnet { - true => Client::testnet()?, - false => Client::new()?, - }, - }; - - let manager = Self { - pubky_client: Arc::new(pubky_client), - }; - PUBKY_CONNECTOR_SINGLETON - .set(manager) - .map_err(|_| PubkyConnectorError::AlreadyInitialized)?; - Ok(()) + .get_or_try_init(|| async { + let pubky_client = match testnet { + Some(testnet) => Client::builder() + .testnet(testnet) + .build() + .map_err(|e| PubkyConnectorError::ClientError(e.to_string()))?, + None => match config.testnet { + true => Client::testnet() + .map_err(|e| PubkyConnectorError::ClientError(e.to_string()))?, + false => Client::new() + .map_err(|e| PubkyConnectorError::ClientError(e.to_string()))?, + }, + }; + + Ok(Arc::new(pubky_client)) + }) + .await + .map(|_| ()) } /// Retrieves the shared Client connection. pub fn get_pubky_client() -> Result, PubkyConnectorError> { - if let Some(resolver) = PUBKY_CONNECTOR_SINGLETON.get() { - Ok(resolver.pubky_client.clone()) - } else { - Err(PubkyConnectorError::NotInitialized) - } + PUBKY_CONNECTOR_SINGLETON + .get() + .cloned() + .ok_or(PubkyConnectorError::NotInitialized) } } diff --git a/src/db/graph/exec.rs b/src/db/graph/exec.rs index b7aa9274..8d14ef2e 100644 --- a/src/db/graph/exec.rs +++ b/src/db/graph/exec.rs @@ -13,7 +13,7 @@ pub enum OperationOutcome { CreatedOrDeleted, /// A required node/relationship was not found, indicating a missing dependency /// (often due to the node/relationship not yet being indexed or otherwise unavailable). - Pending, + MissingDependency, } /// Executes a graph query expected to return exactly one row containing a boolean column named @@ -22,7 +22,7 @@ pub enum OperationOutcome { /// - `true` => Returns [`OperationOutcome::Updated`] /// - `false` => Returns [`OperationOutcome::CreatedOrDeleted`] /// -/// If no rows are returned, this function returns [`OperationOutcome::Pending`], typically +/// If no rows are returned, this function returns [`OperationOutcome::MissingDependency`], typically /// indicating a missing dependency or an unmatched query condition. pub async fn execute_graph_operation(query: Query) -> Result { let mut result; @@ -38,7 +38,7 @@ pub async fn execute_graph_operation(query: Query) -> Result Ok(OperationOutcome::Updated), false => Ok(OperationOutcome::CreatedOrDeleted), }, - None => Ok(OperationOutcome::Pending), + None => Ok(OperationOutcome::MissingDependency), } } diff --git a/src/db/graph/queries/get.rs b/src/db/graph/queries/get.rs index c32ef019..78afa5ce 100644 --- a/src/db/graph/queries/get.rs +++ b/src/db/graph/queries/get.rs @@ -674,8 +674,6 @@ pub fn post_stream( cypher.push_str(&format!("LIMIT {}\n", limit)); } - println!("{:?}", cypher); - // Build the query and apply parameters using `param` method build_query_with_params(&cypher, &source, tags, kind, &pagination) } diff --git a/src/db/graph/queries/put.rs b/src/db/graph/queries/put.rs index c8abd309..9beb16ef 100644 --- a/src/db/graph/queries/put.rs +++ b/src/db/graph/queries/put.rs @@ -1,8 +1,8 @@ -use crate::events::uri::ParsedUri; use crate::models::post::PostRelationships; use crate::models::{file::FileDetails, post::PostDetails, user::UserDetails}; use crate::types::DynError; use neo4rs::{query, Query}; +use pubky_app_specs::{ParsedUri, Resource}; // Create a user node pub fn create_user(user: &UserDetails) -> Result { @@ -110,7 +110,10 @@ fn add_relationship_params( if let Some(uri) = uri { let parsed_uri = ParsedUri::try_from(uri.as_str())?; let parent_author_id = parsed_uri.user_id; - let parent_post_id = parsed_uri.post_id.ok_or("Missing post ID")?; + let parent_post_id = match parsed_uri.resource { + Resource::Post(id) => id, + _ => return Err("Reposted uri is not a Post resource".into()), + }; return Ok(cypher_query .param(author_param, parent_author_id.as_str()) diff --git a/src/db/kv/traits.rs b/src/db/kv/traits.rs index fd00de86..c822797b 100644 --- a/src/db/kv/traits.rs +++ b/src/db/kv/traits.rs @@ -531,12 +531,14 @@ pub trait RedisOps: Serialize + DeserializeOwned + Send + Sync { /// * `key_parts` - A slice of string slices that represent the parts used to form the key under which the sorted set is stored. /// * `member` - A slice of string slices that represent the parts used to form the key identifying the member within the sorted set. async fn check_sorted_set_member( + prefix: Option<&str>, key_parts: &[&str], member: &[&str], ) -> Result, DynError> { + let prefix = prefix.unwrap_or(SORTED_PREFIX); let key = key_parts.join(":"); let member_key = member.join(":"); - sorted_sets::check_member(SORTED_PREFIX, &key, &member_key).await + sorted_sets::check_member(prefix, &key, &member_key).await } /// Adds elements to a Redis sorted set using the provided key parts. @@ -594,6 +596,7 @@ pub trait RedisOps: Serialize + DeserializeOwned + Send + Sync { /// /// # Arguments /// + /// * `prefix` - An optional string representing the prefix for the Redis keys. If `Some(String)`, the prefix will be used /// * `key_parts` - A slice of string slices that represent the parts used to form the key under which the sorted set is stored. /// * `items` - A slice of string slices representing the elements to be removed from the sorted set. /// @@ -601,6 +604,7 @@ pub trait RedisOps: Serialize + DeserializeOwned + Send + Sync { /// /// Returns an error if the operation fails, such as if the Redis connection is unavailable. async fn remove_from_index_sorted_set( + prefix: Option<&str>, key_parts: &[&str], items: &[&str], ) -> Result<(), DynError> { @@ -608,11 +612,11 @@ pub trait RedisOps: Serialize + DeserializeOwned + Send + Sync { return Ok(()); } + let prefix = prefix.unwrap_or(SORTED_PREFIX); // Create the key by joining the key parts let key = key_parts.join(":"); - // Call the sorted_sets::del function to remove the items from the sorted set - sorted_sets::del("Sorted", &key, items).await + sorted_sets::del(prefix, &key, items).await } /// Retrieves a range of elements from a Redis sorted set using the provided key parts. diff --git a/src/events/error.rs b/src/events/error.rs new file mode 100644 index 00000000..c9a6a107 --- /dev/null +++ b/src/events/error.rs @@ -0,0 +1,25 @@ +use serde::{Deserialize, Serialize}; +use thiserror::Error; + +#[derive(Error, Debug, Clone, Serialize, Deserialize)] +pub enum EventProcessorError { + // Failed to execute query in the graph database + #[error("GraphQueryFailed: {message}")] + GraphQueryFailed { message: String }, + // The event could not be indexed due to missing graph dependencies + #[error("MissingDependency: Could not be indexed")] + MissingDependency { dependency: Vec }, + // The event appear to be unindexed. Verify the event in the retry queue + #[error("SkipIndexing: The PUT event appears to be unindexed")] + SkipIndexing, + // The event could not be parsed from a line + #[error("InvalidEventLine: {message}")] + InvalidEventLine { message: String }, + // The Pubky client could not resolve the pubky + #[error("PubkyClientError: {message}")] + PubkyClientError { message: String }, + // #[error("The event does not exist anymore in the homeserver")] + // ContentNotFound { dependency: String }, + // #[error("PubkyClient could not reach/resolve the homeserver")] + // NotResolvedHomeserver, +} diff --git a/src/events/handlers/bookmark.rs b/src/events/handlers/bookmark.rs index dfdf7b80..896f1e15 100644 --- a/src/events/handlers/bookmark.rs +++ b/src/events/handlers/bookmark.rs @@ -1,35 +1,26 @@ use crate::db::graph::exec::OperationOutcome; use crate::db::kv::index::json::JsonAction; -use crate::events::uri::ParsedUri; +use crate::events::error::EventProcessorError; use crate::models::post::Bookmark; use crate::models::user::UserCounts; use crate::types::DynError; use chrono::Utc; use log::debug; -use pubky_app_specs::traits::Validatable; -use pubky_app_specs::{PubkyAppBookmark, PubkyId}; - -//TODO: only /posts/ are bookmarkable as of now. -pub async fn put(user_id: PubkyId, bookmark_id: String, blob: &[u8]) -> Result<(), DynError> { - debug!("Indexing new bookmark: {} -> {}", user_id, bookmark_id); - - // Deserialize and validate bookmark - let bookmark = ::try_from(blob, &bookmark_id)?; - - sync_put(user_id, bookmark, bookmark_id).await -} +use pubky_app_specs::{ParsedUri, PubkyAppBookmark, PubkyId, Resource}; pub async fn sync_put( user_id: PubkyId, bookmark: PubkyAppBookmark, id: String, ) -> Result<(), DynError> { + debug!("Indexing new bookmark: {} -> {}", user_id, id); // Parse the URI to extract author_id and post_id using the updated parse_post_uri let parsed_uri = ParsedUri::try_from(bookmark.uri.as_str())?; - let (author_id, post_id) = ( - parsed_uri.user_id, - parsed_uri.post_id.ok_or("Bookmarked URI missing post_id")?, - ); + let author_id = parsed_uri.user_id; + let post_id = match parsed_uri.resource { + Resource::Post(id) => id, + _ => return Err("Bookmarked uri is not a Post resource".into()), + }; // Save new bookmark relationship to the graph, only if the bookmarked user exists let indexed_at = Utc::now().timestamp_millis(); @@ -37,9 +28,9 @@ pub async fn sync_put( match Bookmark::put_to_graph(&author_id, &post_id, &user_id, &id, indexed_at).await? { OperationOutcome::CreatedOrDeleted => false, OperationOutcome::Updated => true, - // TODO: Should return an error that should be processed by RetryManager - OperationOutcome::Pending => { - return Err("WATCHER: Missing some dependency to index the model".into()) + OperationOutcome::MissingDependency => { + let dependency = vec![format!("{author_id}:posts:{post_id}")]; + return Err(EventProcessorError::MissingDependency { dependency }.into()); } }; @@ -63,17 +54,16 @@ pub async fn del(user_id: PubkyId, bookmark_id: String) -> Result<(), DynError> } pub async fn sync_del(user_id: PubkyId, bookmark_id: String) -> Result<(), DynError> { - // DELETE FROM GRAPH let deleted_bookmark_info = Bookmark::del_from_graph(&user_id, &bookmark_id).await?; + // Ensure the bookmark exists in the graph before proceeding + let (post_id, author_id) = match deleted_bookmark_info { + Some(info) => info, + None => return Err(EventProcessorError::SkipIndexing.into()), + }; - if let Some((post_id, author_id)) = deleted_bookmark_info { - // DELETE FROM INDEXes - Bookmark::del_from_index(&user_id, &post_id, &author_id).await?; - - // Update user counts with the new bookmark - // Skip updating counts if bookmark was not found in graph - UserCounts::update(&user_id, "bookmarks", JsonAction::Decrement(1)).await?; - } + Bookmark::del_from_index(&user_id, &post_id, &author_id).await?; + // Update user counts + UserCounts::update(&user_id, "bookmarks", JsonAction::Decrement(1)).await?; Ok(()) } diff --git a/src/events/handlers/file.rs b/src/events/handlers/file.rs index 43b61ea3..95fe4ad7 100644 --- a/src/events/handlers/file.rs +++ b/src/events/handlers/file.rs @@ -11,30 +11,27 @@ use crate::{ Config, }; use log::{debug, error}; -use pubky_app_specs::{traits::Validatable, PubkyAppFile, PubkyId}; +use pubky_app_specs::{PubkyAppFile, PubkyId}; use tokio::{ fs::{self, remove_file, File}, io::AsyncWriteExt, }; -pub async fn put( +pub async fn sync_put( + file: PubkyAppFile, uri: String, user_id: PubkyId, file_id: String, - blob: &[u8], ) -> Result<(), DynError> { debug!("Indexing new file resource at {}/{}", user_id, file_id); - // Serialize and validate - let file_input = ::try_from(blob, &file_id)?; + debug!("file input {:?}", file); - debug!("file input {:?}", file_input); - - let file_meta = ingest(&user_id, file_id.as_str(), &file_input).await?; + let file_meta = ingest(&user_id, file_id.as_str(), &file).await?; // Create FileDetails object let file_details = - FileDetails::from_homeserver(&file_input, uri, user_id.to_string(), file_id, file_meta); + FileDetails::from_homeserver(&file, uri, user_id.to_string(), file_id, file_meta); // save new file into the Graph file_details.put_to_graph().await?; diff --git a/src/events/handlers/follow.rs b/src/events/handlers/follow.rs index a1328055..95445e9b 100644 --- a/src/events/handlers/follow.rs +++ b/src/events/handlers/follow.rs @@ -1,31 +1,28 @@ use crate::db::graph::exec::OperationOutcome; use crate::db::kv::index::json::JsonAction; +use crate::events::error::EventProcessorError; +use crate::events::retry::event::RetryEvent; use crate::models::follow::{Followers, Following, Friends, UserFollows}; use crate::models::notification::Notification; use crate::models::user::UserCounts; use crate::types::DynError; use log::debug; -use pubky_app_specs::PubkyId; - -pub async fn put(follower_id: PubkyId, followee_id: PubkyId, _blob: &[u8]) -> Result<(), DynError> { - debug!("Indexing new follow: {} -> {}", follower_id, followee_id); - - // TODO: in case we want to validate the content of this homeserver object or its `created_at` timestamp - // let _follow = ::try_from(&blob, &followee_id).await?; - - sync_put(follower_id, followee_id).await -} +use pubky_app_specs::{user_uri_builder, PubkyId}; pub async fn sync_put(follower_id: PubkyId, followee_id: PubkyId) -> Result<(), DynError> { + debug!("Indexing new follow: {} -> {}", follower_id, followee_id); // SAVE TO GRAPH // (follower_id)-[:FOLLOWS]->(followee_id) match Followers::put_to_graph(&follower_id, &followee_id).await? { // Do not duplicate the follow relationship OperationOutcome::Updated => return Ok(()), - // TODO: Should return an error that should be processed by RetryManager - // WIP: Create a custom error type to pass enough info to the RetryManager - OperationOutcome::Pending => { - return Err("WATCHER: Missing some dependency to index the model".into()) + OperationOutcome::MissingDependency => { + if let Some(key) = + RetryEvent::generate_index_key(&user_uri_builder(followee_id.to_string())) + { + let dependency = vec![key]; + return Err(EventProcessorError::MissingDependency { dependency }.into()); + } } // The relationship did not exist, create all related indexes OperationOutcome::CreatedOrDeleted => { @@ -66,13 +63,10 @@ pub async fn del(follower_id: PubkyId, followee_id: PubkyId) -> Result<(), DynEr } pub async fn sync_del(follower_id: PubkyId, followee_id: PubkyId) -> Result<(), DynError> { - // DELETE FROM GRAPH match Followers::del_from_graph(&follower_id, &followee_id).await? { // Both users exists but they do not have that relationship OperationOutcome::Updated => Ok(()), - OperationOutcome::Pending => { - Err("WATCHER: Missing some dependency to index the model".into()) - } + OperationOutcome::MissingDependency => Err(EventProcessorError::SkipIndexing.into()), OperationOutcome::CreatedOrDeleted => { // Check if the users are friends. Is this a break? :( let were_friends = Friends::check(&follower_id, &followee_id).await?; diff --git a/src/events/handlers/mute.rs b/src/events/handlers/mute.rs index a5874dcb..bfff2efe 100644 --- a/src/events/handlers/mute.rs +++ b/src/events/handlers/mute.rs @@ -1,29 +1,26 @@ use crate::db::graph::exec::OperationOutcome; +use crate::events::error::EventProcessorError; +use crate::events::retry::event::RetryEvent; use crate::models::user::Muted; use crate::types::DynError; use log::debug; -use pubky_app_specs::PubkyId; - -pub async fn put(user_id: PubkyId, muted_id: PubkyId, _blob: &[u8]) -> Result<(), DynError> { - debug!("Indexing new mute: {} -> {}", user_id, muted_id); - - // TODO: in case we want to validate the content of this homeserver object or its `created_at` timestamp - // let _mute = ::try_from(&blob, &muted_id).await?; - - sync_put(user_id, muted_id).await -} +use pubky_app_specs::{user_uri_builder, PubkyId}; pub async fn sync_put(user_id: PubkyId, muted_id: PubkyId) -> Result<(), DynError> { - // SAVE TO GRAPH + debug!("Indexing new mute: {} -> {}", user_id, muted_id); // (user_id)-[:MUTED]->(muted_id) match Muted::put_to_graph(&user_id, &muted_id).await? { OperationOutcome::Updated => Ok(()), - // TODO: Should return an error that should be processed by RetryManager - OperationOutcome::Pending => { - Err("WATCHER: Missing some dependency to index the model".into()) + OperationOutcome::MissingDependency => { + match RetryEvent::generate_index_key(&user_uri_builder(muted_id.to_string())) { + Some(key) => { + let dependency = vec![key]; + Err(EventProcessorError::MissingDependency { dependency }.into()) + } + None => Err("Could not generate missing dependency key".into()), + } } OperationOutcome::CreatedOrDeleted => { - // SAVE TO INDEX Muted(vec![muted_id.to_string()]) .put_to_index(&user_id) .await @@ -37,15 +34,10 @@ pub async fn del(user_id: PubkyId, muted_id: PubkyId) -> Result<(), DynError> { } pub async fn sync_del(user_id: PubkyId, muted_id: PubkyId) -> Result<(), DynError> { - // DELETE FROM GRAPH match Muted::del_from_graph(&user_id, &muted_id).await? { OperationOutcome::Updated => Ok(()), - // TODO: Should return an error that should be processed by RetryManager - OperationOutcome::Pending => { - Err("WATCHER: Missing some dependency to index the model".into()) - } + OperationOutcome::MissingDependency => Err(EventProcessorError::SkipIndexing.into()), OperationOutcome::CreatedOrDeleted => { - // REMOVE FROM INDEX Muted(vec![muted_id.to_string()]) .del_from_index(&user_id) .await diff --git a/src/events/handlers/post.rs b/src/events/handlers/post.rs index 67223901..b2ece67a 100644 --- a/src/events/handlers/post.rs +++ b/src/events/handlers/post.rs @@ -1,6 +1,7 @@ use crate::db::graph::exec::{exec_single_row, execute_graph_operation, OperationOutcome}; use crate::db::kv::index::json::JsonAction; -use crate::events::uri::ParsedUri; +use crate::events::error::EventProcessorError; +use crate::events::retry::event::RetryEvent; use crate::models::notification::{Notification, PostChangedSource, PostChangedType}; use crate::models::post::{ PostCounts, PostDetails, PostRelationships, PostStream, POST_TOTAL_ENGAGEMENT_KEY_PARTS, @@ -10,25 +11,18 @@ use crate::queries::get::post_is_safe_to_delete; use crate::types::DynError; use crate::{queries, RedisOps, ScoreAction}; use log::debug; -use pubky_app_specs::{traits::Validatable, PubkyAppPost, PubkyAppPostKind, PubkyId}; +use pubky_app_specs::{ + user_uri_builder, ParsedUri, PubkyAppPost, PubkyAppPostKind, PubkyId, Resource, +}; use super::utils::post_relationships_is_reply; -pub async fn put(author_id: PubkyId, post_id: String, blob: &[u8]) -> Result<(), DynError> { - // Process Post resource and update the databases - debug!("Indexing new post: {}/{}", author_id, post_id); - - // Serialize and validate - let post = ::try_from(blob, &post_id)?; - - sync_put(post, author_id, post_id).await -} - pub async fn sync_put( post: PubkyAppPost, author_id: PubkyId, post_id: String, ) -> Result<(), DynError> { + debug!("Indexing new post: {}/{}", author_id, post_id); // Create PostDetails object let post_details = PostDetails::from_homeserver(post.clone(), &author_id, &post_id).await?; // We avoid indexing replies into global feed sorted sets @@ -39,10 +33,28 @@ pub async fn sync_put( let existed = match post_details.put_to_graph(&post_relationships).await? { OperationOutcome::CreatedOrDeleted => false, OperationOutcome::Updated => true, - // TODO: Should return an error that should be processed by RetryManager - // WIP: Create a custom error type to pass enough info to the RetryManager - OperationOutcome::Pending => { - return Err("WATCHER: Missing some dependency to index the model".into()) + OperationOutcome::MissingDependency => { + let mut dependency = Vec::new(); + if let Some(replied_uri) = &post_relationships.replied { + let reply_dependency = RetryEvent::generate_index_key(replied_uri) + // This block is unlikely to be reached, as it would typically fail during the validation process + .unwrap_or_else(|| replied_uri.clone()); + dependency.push(reply_dependency); + } + if let Some(reposted_uri) = &post_relationships.reposted { + let reply_dependency = RetryEvent::generate_index_key(reposted_uri) + // This block is unlikely to be reached, as it would typically fail during the validation process + .unwrap_or_else(|| reposted_uri.clone()); + dependency.push(reply_dependency); + } + if dependency.is_empty() { + if let Some(key) = + RetryEvent::generate_index_key(&user_uri_builder(author_id.to_string())) + { + dependency.push(key); + } + } + return Err(EventProcessorError::MissingDependency { dependency }.into()); } }; @@ -92,7 +104,10 @@ pub async fn sync_put( let parsed_uri = ParsedUri::try_from(replied_uri.as_str())?; let parent_author_id = parsed_uri.user_id; - let parent_post_id = parsed_uri.post_id.ok_or("Missing post ID")?; + let parent_post_id = match parsed_uri.resource { + Resource::Post(id) => id, + _ => return Err("Reposted uri is not a Post resource".into()), + }; let parent_post_key_parts: &[&str; 2] = &[&parent_author_id, &parent_post_id]; @@ -133,7 +148,10 @@ pub async fn sync_put( let parsed_uri = ParsedUri::try_from(reposted_uri.as_str())?; let parent_author_id = parsed_uri.user_id; - let parent_post_id = parsed_uri.post_id.ok_or("Missing post ID")?; + let parent_post_id = match parsed_uri.resource { + Resource::Post(id) => id, + _ => return Err("Reposted uri is not a Post resource".into()), + }; let parent_post_key_parts: &[&str; 2] = &[&parent_author_id, &parent_post_id]; @@ -272,11 +290,7 @@ pub async fn del(author_id: PubkyId, post_id: String) -> Result<(), DynError> { sync_put(dummy_deleted_post, author_id, post_id).await?; } - // TODO: Should return an error that should be processed by RetryManager - // WIP: Create a custom error type to pass enough info to the RetryManager - OperationOutcome::Pending => { - return Err("WATCHER: Missing some dependency to index the model".into()) - } + OperationOutcome::MissingDependency => return Err(EventProcessorError::SkipIndexing.into()), }; Ok(()) @@ -305,7 +319,10 @@ pub async fn sync_del(author_id: PubkyId, post_id: String) -> Result<(), DynErro // Decrement counts for resposted post if existed if let Some(reposted) = relationships.reposted { let parsed_uri = ParsedUri::try_from(reposted.as_str())?; - let parent_post_id = parsed_uri.post_id.ok_or("Missing post ID")?; + let parent_post_id = match parsed_uri.resource { + Resource::Post(id) => id, + _ => return Err("Reposted uri is not a Post resource".into()), + }; let parent_post_key_parts: &[&str] = &[&parsed_uri.user_id, &parent_post_id]; @@ -341,7 +358,10 @@ pub async fn sync_del(author_id: PubkyId, post_id: String) -> Result<(), DynErro if let Some(replied) = relationships.replied { let parsed_uri = ParsedUri::try_from(replied.as_str())?; let parent_user_id = parsed_uri.user_id; - let parent_post_id = parsed_uri.post_id.ok_or("Missing post ID")?; + let parent_post_id = match parsed_uri.resource { + Resource::Post(id) => id, + _ => return Err("Replied uri is not a Post resource".into()), + }; let parent_post_key_parts: [&str; 2] = [&parent_user_id, &parent_post_id]; reply_parent_post_key_wrapper = diff --git a/src/events/handlers/tag.rs b/src/events/handlers/tag.rs index 0a1f8deb..3a5d2966 100644 --- a/src/events/handlers/tag.rs +++ b/src/events/handlers/tag.rs @@ -1,6 +1,7 @@ use crate::db::graph::exec::OperationOutcome; use crate::db::kv::index::json::JsonAction; -use crate::events::uri::ParsedUri; +use crate::events::error::EventProcessorError; +use crate::events::retry::event::RetryEvent; use crate::models::notification::Notification; use crate::models::post::{PostCounts, PostStream}; use crate::models::tag::post::TagPost; @@ -12,23 +13,25 @@ use crate::types::DynError; use crate::ScoreAction; use chrono::Utc; use log::debug; -use pubky_app_specs::{traits::Validatable, PubkyAppTag, PubkyId}; +use pubky_app_specs::{user_uri_builder, Resource}; +use pubky_app_specs::{ParsedUri, PubkyAppTag, PubkyId}; use super::utils::post_relationships_is_reply; -pub async fn put(tagger_id: PubkyId, tag_id: String, blob: &[u8]) -> Result<(), DynError> { +pub async fn sync_put( + tag: PubkyAppTag, + tagger_id: PubkyId, + tag_id: String, +) -> Result<(), DynError> { debug!("Indexing new tag: {} -> {}", tagger_id, tag_id); - // Deserialize and validate tag - let tag = ::try_from(blob, &tag_id)?; - // Parse the embeded URI to extract author_id and post_id using parse_tagged_post_uri let parsed_uri = ParsedUri::try_from(tag.uri.as_str())?; let indexed_at = Utc::now().timestamp_millis(); - match parsed_uri.post_id { + match parsed_uri.resource { // If post_id is in the tagged URI, we place tag to a post. - Some(post_id) => { + Resource::Post(post_id) => { put_sync_post( tagger_id, parsed_uri.user_id, @@ -41,7 +44,14 @@ pub async fn put(tagger_id: PubkyId, tag_id: String, blob: &[u8]) -> Result<(), .await } // If no post_id in the tagged URI, we place tag to a user. - None => put_sync_user(tagger_id, parsed_uri.user_id, tag_id, tag.label, indexed_at).await, + Resource::User => { + put_sync_user(tagger_id, parsed_uri.user_id, tag_id, tag.label, indexed_at).await + } + other => Err(format!( + "The tagged resource is not Post or User resource. Tagged resource: {:?}", + other + ) + .into()), } } @@ -64,7 +74,6 @@ async fn put_sync_post( post_uri: String, indexed_at: i64, ) -> Result<(), DynError> { - // SAVE TO GRAPH match TagPost::put_to_graph( &tagger_user_id, &author_id, @@ -76,10 +85,10 @@ async fn put_sync_post( .await? { OperationOutcome::Updated => Ok(()), - // TODO: Should return an error that should be processed by RetryManager - // WIP: Create a custom error type to pass enough info to the RetryManager - OperationOutcome::Pending => { - Err("WATCHER: Missing some dependency to index the model".into()) + OperationOutcome::MissingDependency => { + // Ensure that dependencies follow the same format as the RetryManager keys + let dependency = vec![format!("{author_id}:posts:{post_id}")]; + Err(EventProcessorError::MissingDependency { dependency }.into()) } OperationOutcome::CreatedOrDeleted => { // SAVE TO INDEXES @@ -139,7 +148,6 @@ async fn put_sync_user( tag_label: String, indexed_at: i64, ) -> Result<(), DynError> { - // SAVE TO GRAPH match TagUser::put_to_graph( &tagger_user_id, &tagged_user_id, @@ -151,10 +159,14 @@ async fn put_sync_user( .await? { OperationOutcome::Updated => Ok(()), - // TODO: Should return an error that should be processed by RetryManager - // WIP: Create a custom error type to pass enough info to the RetryManager - OperationOutcome::Pending => { - Err("WATCHER: Missing some dependency to index the model".into()) + OperationOutcome::MissingDependency => { + match RetryEvent::generate_index_key(&user_uri_builder(tagged_user_id.to_string())) { + Some(key) => { + let dependency = vec![key]; + Err(EventProcessorError::MissingDependency { dependency }.into()) + } + None => Err("Could not generate missing dependency key".into()), + } } OperationOutcome::CreatedOrDeleted => { // SAVE TO INDEX @@ -186,8 +198,6 @@ async fn put_sync_user( pub async fn del(user_id: PubkyId, tag_id: String) -> Result<(), DynError> { debug!("Deleting tag: {} -> {}", user_id, tag_id); - // DELETE FROM GRAPH - // Maybe better if we add as a local function instead of part of the trait? let tag_details = TagUser::del_from_graph(&user_id, &tag_id).await?; // CHOOSE THE EVENT TYPE if let Some((tagged_user_id, post_id, author_id, label)) = tag_details { @@ -206,9 +216,7 @@ pub async fn del(user_id: PubkyId, tag_id: String) -> Result<(), DynError> { } } } else { - // TODO: Should return an error that should be processed by RetryManager - // WIP: Create a custom error type to pass enough info to the RetryManager - return Err("WATCHER: Missing some dependency to index the model".into()); + return Err(EventProcessorError::SkipIndexing.into()); } Ok(()) } diff --git a/src/events/handlers/user.rs b/src/events/handlers/user.rs index 016d8daa..726a6f84 100644 --- a/src/events/handlers/user.rs +++ b/src/events/handlers/user.rs @@ -1,4 +1,5 @@ use crate::db::graph::exec::{execute_graph_operation, OperationOutcome}; +use crate::events::error::EventProcessorError; use crate::models::user::UserSearch; use crate::models::{ traits::Collection, @@ -7,23 +8,18 @@ use crate::models::{ use crate::queries::get::user_is_safe_to_delete; use crate::types::DynError; use log::debug; -use pubky_app_specs::{traits::Validatable, PubkyAppUser, PubkyId}; - -pub async fn put(user_id: PubkyId, blob: &[u8]) -> Result<(), DynError> { - // Process profile.json and update the databases - debug!("Indexing new user profile: {}", user_id); - - // Serialize and validate - let user = ::try_from(blob, &user_id)?; - - sync_put(user, user_id).await -} +use pubky_app_specs::{PubkyAppUser, PubkyId}; pub async fn sync_put(user: PubkyAppUser, user_id: PubkyId) -> Result<(), DynError> { + debug!("Indexing new user profile: {}", user_id); // Create UserDetails object let user_details = UserDetails::from_homeserver(user, &user_id).await?; - // SAVE TO GRAPH - user_details.put_to_graph().await?; + user_details + .put_to_graph() + .await + .map_err(|e| EventProcessorError::GraphQueryFailed { + message: format!("{:?}", e), + })?; // SAVE TO INDEX let user_id = user_details.id.clone(); UserSearch::put_to_index(&[&user_details]).await?; @@ -61,11 +57,7 @@ pub async fn del(user_id: PubkyId) -> Result<(), DynError> { sync_put(deleted_user, user_id).await?; } - // Should return an error that could not be inserted in the RetryManager - // TODO: WIP, it will be fixed in the comming PRs the error messages - OperationOutcome::Pending => { - return Err("WATCHER: Missing some dependency to index the model".into()) - } + OperationOutcome::MissingDependency => return Err(EventProcessorError::SkipIndexing.into()), } // TODO notifications for deleted user diff --git a/src/events/mod.rs b/src/events/mod.rs index a7fd1d66..ef248707 100644 --- a/src/events/mod.rs +++ b/src/events/mod.rs @@ -1,201 +1,172 @@ -use crate::db::connectors::pubky::PubkyConnector; -use log::{debug, error}; -use pubky_app_specs::PubkyId; -use reqwest; -use uri::ParsedUri; - +use crate::{db::connectors::pubky::PubkyConnector, types::DynError}; +use error::EventProcessorError; +use log::debug; +use pubky_app_specs::{ParsedUri, PubkyAppObject, Resource}; +use serde::{Deserialize, Serialize}; +use std::fmt; + +pub mod error; pub mod handlers; pub mod processor; -pub mod uri; - -#[derive(Debug, Clone)] -enum ResourceType { - User { - user_id: PubkyId, - }, - Post { - author_id: PubkyId, - post_id: String, - }, - Follow { - follower_id: PubkyId, - followee_id: PubkyId, - }, - Mute { - user_id: PubkyId, - muted_id: PubkyId, - }, - Bookmark { - user_id: PubkyId, - bookmark_id: String, - }, - Tag { - user_id: PubkyId, - tag_id: String, - }, - File { - user_id: PubkyId, - file_id: String, - }, -} +pub mod retry; // Look for the end pattern after the start index, or use the end of the string if not found -#[derive(Debug, Clone)] -enum EventType { +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum EventType { Put, Del, } +impl fmt::Display for EventType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let upper_case_str = match self { + EventType::Put => "PUT", + EventType::Del => "DEL", + }; + write!(f, "{}", upper_case_str) + } +} + #[derive(Debug, Clone)] pub struct Event { - uri: String, - event_type: EventType, - resource_type: ResourceType, + pub uri: String, + pub event_type: EventType, + pub parsed_uri: ParsedUri, } impl Event { - pub fn parse_event( - line: &str, - ) -> Result, Box> { - debug!("New event: {}", line); + pub fn parse_event(line: &str) -> Result, DynError> { + log::debug!("New event: {}", line); let parts: Vec<&str> = line.split(' ').collect(); if parts.len() != 2 { - return Err(format!("Malformed event line: {}", line).into()); + return Err(EventProcessorError::InvalidEventLine { + message: format!("Malformed event line, {}", line), + } + .into()); } let event_type = match parts[0] { "PUT" => EventType::Put, "DEL" => EventType::Del, - _ => { - return Err(format!("Unknown event type: {}", parts[0]).into()); + other => { + return Err(EventProcessorError::InvalidEventLine { + message: format!("Unknown event type: {}", other), + } + .into()) } }; + // Validate and parse the URI using pubky-app-specs let uri = parts[1].to_string(); - let parsed_uri = ParsedUri::try_from(uri.as_str()).unwrap_or_default(); - - //TODO: This conversion to a match statement that only uses IF conditions is silly. - // We could be patter matching the split test for "posts", "follows", etc maybe? - let resource_type = match uri { - _ if uri.ends_with("/pub/pubky.app/profile.json") => ResourceType::User { - user_id: parsed_uri.user_id, - }, - _ if uri.contains("/posts/") => ResourceType::Post { - author_id: parsed_uri.user_id, - post_id: parsed_uri.post_id.ok_or("Missing post_id")?, - }, - _ if uri.contains("/follows/") => ResourceType::Follow { - follower_id: parsed_uri.user_id, - followee_id: parsed_uri.follow_id.ok_or("Missing followee_id")?, - }, - _ if uri.contains("/mutes/") => ResourceType::Mute { - user_id: parsed_uri.user_id, - muted_id: parsed_uri.muted_id.ok_or("Missing muted_id")?, - }, - _ if uri.contains("/bookmarks/") => ResourceType::Bookmark { - user_id: parsed_uri.user_id, - bookmark_id: parsed_uri.bookmark_id.ok_or("Missing bookmark_id")?, - }, - _ if uri.contains("/tags/") => ResourceType::Tag { - user_id: parsed_uri.user_id, - tag_id: parsed_uri.tag_id.ok_or("Missing tag_id")?, - }, - _ if uri.contains("/files/") => ResourceType::File { - user_id: parsed_uri.user_id, - file_id: parsed_uri.file_id.ok_or("Missing file_id")?, - }, - _ if uri.contains("/blobs") => return Ok(None), - _ if uri.contains("/last_read") => return Ok(None), - _ if uri.contains("/settings") => return Ok(None), - _ => { - error!("Unrecognized resource in URI: {}", uri); - return Err("Unrecognized resource in URI".into()); + let parsed_uri = ParsedUri::try_from(uri.as_str()).map_err(|e| { + { + EventProcessorError::InvalidEventLine { + message: format!("Cannot parse event URI: {}", e), + } } - }; + })?; + + if parsed_uri.resource == Resource::Unknown { + return Err(EventProcessorError::InvalidEventLine { + message: format!("Unknown resource in URI: {}", uri), + } + .into()); + } Ok(Some(Event { uri, event_type, - resource_type, + parsed_uri, })) } - pub async fn handle(self) -> Result<(), Box> { + pub async fn handle(self) -> Result<(), DynError> { match self.event_type { EventType::Put => self.handle_put_event().await, EventType::Del => self.handle_del_event().await, } } - async fn handle_put_event(self) -> Result<(), Box> { - debug!("Handling PUT event for {:?}", self.resource_type); - - // User PUT event's into the homeserver write new data. We fetch the data - // for every Resource Type - let url = reqwest::Url::parse(&self.uri)?; - let pubky_client = PubkyConnector::get_pubky_client()?; + /// Handles a PUT event by fetching the blob from the homeserver + /// and using the importer to convert it to a PubkyAppObject. + pub async fn handle_put_event(self) -> Result<(), DynError> { + log::debug!("Handling PUT event for URI: {}", self.uri); + + let response; + { + let pubky_client = PubkyConnector::get_pubky_client()?; + response = match pubky_client.get(&self.uri).send().await { + Ok(response) => response, + Err(e) => { + return Err(EventProcessorError::PubkyClientError { + message: format!("{}", e), + } + .into()) + } + }; + } // drop the pubky_client lock - let response = match pubky_client.get(url).send().await { - Ok(response) => response, - Err(e) => { - error!("WATCHER: Failed to fetch content at {}: {}", self.uri, e); - return Err(e.into()); - } - }; let blob = response.bytes().await?; + let resource = self.parsed_uri.resource; + + // Use the new importer from pubky-app-specs + let pubky_object = PubkyAppObject::from_resource(&resource, &blob).map_err(|e| { + EventProcessorError::PubkyClientError { + message: format!( + "The importer could not create PubkyAppObject from Uri and Blob: {}", + e + ), + } + })?; - match self.resource_type { - ResourceType::User { user_id } => handlers::user::put(user_id, &blob).await?, - ResourceType::Post { author_id, post_id } => { - handlers::post::put(author_id, post_id, &blob).await? + let user_id = self.parsed_uri.user_id; + match (pubky_object, resource) { + (PubkyAppObject::User(user), Resource::User) => { + handlers::user::sync_put(user, user_id).await? + } + (PubkyAppObject::Post(post), Resource::Post(post_id)) => { + handlers::post::sync_put(post, user_id, post_id).await? + } + (PubkyAppObject::Follow(_follow), Resource::Follow(followee_id)) => { + handlers::follow::sync_put(user_id, followee_id).await? } - ResourceType::Follow { - follower_id, - followee_id, - } => handlers::follow::put(follower_id, followee_id, &blob).await?, - ResourceType::Mute { user_id, muted_id } => { - handlers::mute::put(user_id, muted_id, &blob).await? + (PubkyAppObject::Mute(_mute), Resource::Mute(muted_id)) => { + handlers::mute::sync_put(user_id, muted_id).await? } - ResourceType::Bookmark { - user_id, - bookmark_id, - } => handlers::bookmark::put(user_id, bookmark_id, &blob).await?, - ResourceType::Tag { user_id, tag_id } => { - handlers::tag::put(user_id, tag_id, &blob).await? + (PubkyAppObject::Bookmark(bookmark), Resource::Bookmark(bookmark_id)) => { + handlers::bookmark::sync_put(user_id, bookmark, bookmark_id).await? } - ResourceType::File { user_id, file_id } => { - handlers::file::put(self.uri, user_id, file_id, &blob).await? + (PubkyAppObject::Tag(tag), Resource::Tag(tag_id)) => { + handlers::tag::sync_put(tag, user_id, tag_id).await? + } + (PubkyAppObject::File(file), Resource::File(file_id)) => { + handlers::file::sync_put(file, self.uri, user_id, file_id).await? + } + other => { + log::debug!("Event type not handled, Resource: {:?}", other); } } - Ok(()) } - async fn handle_del_event(self) -> Result<(), Box> { - debug!("Handling DEL event for {:?}", self.resource_type); - - match self.resource_type { - ResourceType::User { user_id } => handlers::user::del(user_id).await?, - ResourceType::Post { author_id, post_id } => { - handlers::post::del(author_id, post_id).await? - } - ResourceType::Follow { - follower_id, - followee_id, - } => handlers::follow::del(follower_id, followee_id).await?, - ResourceType::Mute { user_id, muted_id } => { - handlers::mute::del(user_id, muted_id).await? + pub async fn handle_del_event(self) -> Result<(), DynError> { + debug!("Handling DEL event for URI: {}", self.uri); + + let user_id = self.parsed_uri.user_id; + match self.parsed_uri.resource { + Resource::User => handlers::user::del(user_id).await?, + Resource::Post(post_id) => handlers::post::del(user_id, post_id).await?, + Resource::Follow(followee_id) => handlers::follow::del(user_id, followee_id).await?, + Resource::Mute(muted_id) => handlers::mute::del(user_id, muted_id).await?, + Resource::Bookmark(bookmark_id) => { + handlers::bookmark::del(user_id, bookmark_id).await? } - ResourceType::Bookmark { - user_id, - bookmark_id, - } => handlers::bookmark::del(user_id, bookmark_id).await?, - ResourceType::Tag { user_id, tag_id } => handlers::tag::del(user_id, tag_id).await?, - ResourceType::File { user_id, file_id } => { - handlers::file::del(&user_id, file_id).await? + Resource::Tag(tag_id) => handlers::tag::del(user_id, tag_id).await?, + Resource::File(file_id) => handlers::file::del(&user_id, file_id).await?, + other => { + debug!("DEL event type not handled for resource: {:?}", other); } } - Ok(()) } } diff --git a/src/events/processor.rs b/src/events/processor.rs index 7ea7d2ad..a0e21f7c 100644 --- a/src/events/processor.rs +++ b/src/events/processor.rs @@ -1,6 +1,6 @@ -use std::time::Duration; - +use super::error::EventProcessorError; use super::Event; +use crate::events::retry::event::RetryEvent; use crate::types::DynError; use crate::PubkyConnector; use crate::{models::homeserver::Homeserver, Config}; @@ -8,46 +8,44 @@ use log::{debug, error, info}; use pubky_app_specs::PubkyId; pub struct EventProcessor { - homeserver: Homeserver, + pub homeserver: Homeserver, limit: u32, - max_retries: u64, } impl EventProcessor { - pub async fn from_config(config: &Config) -> Result { - let homeserver = Homeserver::from_config(config).await?; - let limit = config.events_limit; - let max_retries = config.max_retries; - - info!( - "Initialized Event Processor for homeserver: {:?}", - homeserver - ); - - Ok(Self { - homeserver, - limit, - max_retries, - }) - } - /// Creates a new `EventProcessor` instance for testing purposes. /// - /// Initializes an `EventProcessor` with a mock homeserver and a default configuration, - /// making it suitable for use in integration tests and benchmarking scenarios. + /// This function initializes an `EventProcessor` configured with: + /// - A mock homeserver constructed using the provided `homeserver_url` and `homeserver_pubky`. + /// - A default configuration, including an HTTP client, a limit of 1000 events, and a sender channel. + /// + /// It is designed for use in integration tests, benchmarking scenarios, or other test environments + /// where a controlled and predictable `EventProcessor` instance is required. /// /// # Parameters - /// - `homeserver_url`: The URL of the homeserver to be used in the test environment. + /// - `homeserver_id`: A `String` representing the URL of the homeserver to be used in the test environment. + /// - `tx`: A `RetryManagerSenderChannel` used to handle outgoing messages or events. pub async fn test(homeserver_id: String) -> Self { let id = PubkyId::try_from(&homeserver_id).expect("Homeserver ID should be valid"); let homeserver = Homeserver::new(id).await.unwrap(); Self { homeserver, limit: 1000, - max_retries: 3, } } + pub async fn from_config(config: &Config) -> Result { + let homeserver = Homeserver::from_config(config).await?; + let limit = config.events_limit; + + info!( + "Initialized Event Processor for homeserver: {:?}", + homeserver + ); + + Ok(Self { homeserver, limit }) + } + pub async fn run(&mut self) -> Result<(), DynError> { let lines = { self.poll_events().await.unwrap_or_default() }; if let Some(lines) = lines { @@ -56,7 +54,13 @@ impl EventProcessor { Ok(()) } - async fn poll_events(&mut self) -> Result>, Box> { + /// Polls new events from the homeserver. + /// + /// It sends a GET request to the homeserver's events endpoint + /// using the current cursor and a specified limit. It retrieves new event + /// URIs in a newline-separated format, processes it into a vector of strings, + /// and returns the result. + async fn poll_events(&mut self) -> Result>, DynError> { debug!("Polling new events from homeserver"); let response: String; @@ -84,6 +88,14 @@ impl EventProcessor { } } + /// Processes a batch of event lines retrieved from the homeserver. + /// + /// This function iterates over a vector of event URIs, handling each line based on its content: + /// - Lines starting with `cursor:` update the cursor for the homeserver and save it to the index. + /// - Other lines are parsed into events and processed accordingly. If parsing fails, an error is logged. + /// + /// # Parameters + /// - `lines`: A vector of strings representing event lines retrieved from the homeserver. pub async fn process_event_lines(&mut self, lines: Vec) -> Result<(), DynError> { for line in &lines { if line.starts_with("cursor:") { @@ -96,13 +108,13 @@ impl EventProcessor { let event = match Event::parse_event(line) { Ok(event) => event, Err(e) => { - error!("Error while creating event line from line: {}", e); + error!("{}", e); None } }; if let Some(event) = event { debug!("Processing event: {:?}", event); - self.handle_event_with_retry(event).await?; + self.handle_event(event).await?; } } } @@ -110,37 +122,47 @@ impl EventProcessor { Ok(()) } - // Generic retry on event handler - async fn handle_event_with_retry(&self, event: Event) -> Result<(), DynError> { - let mut attempts = 0; - loop { - match event.clone().handle().await { - Ok(_) => break Ok(()), - Err(e) => { - // TODO: Failing to index the profile.json, the error message might be different - // WIP: It will be fixed in the comming PRs the error messages - if e.to_string() != "WATCHER: Missing some dependency to index the model" { - attempts += 1; - if attempts >= self.max_retries { - error!( - "Error while handling event after {} attempts: {}", - attempts, e - ); - break Ok(()); - } else { - error!( - "Error while handling event: {}. Retrying attempt {}/{}", - e, attempts, self.max_retries - ); - // Optionally, add a delay between retries - tokio::time::sleep(Duration::from_millis(100)).await; - } - } else { - error!("PROCESSOR: Sending the event to RetryManager... Missing node(s) and/or relationship(s) to execute PUT or DEL operation(s)"); - return Ok(()); - } + /// Processes an event and track the fail event it if necessary + /// # Parameters: + /// - `event`: The event to be processed + async fn handle_event(&mut self, event: Event) -> Result<(), DynError> { + if let Err(e) = event.clone().handle().await { + if let Some((index_key, retry_event)) = extract_retry_event_info(&event, e) { + error!("{}, {}", retry_event.error_type, index_key); + if let Err(err) = retry_event.put_to_index(index_key).await { + error!("Failed to put event to retry index: {}", err); } } } + Ok(()) } } + +/// Extracts retry-related information from an event and its associated error +/// +/// # Parameters +/// - `event`: Reference to the event for which retry information is being extracted +/// - `error`: Determines whether the event is eligible for a retry or should be discarded +fn extract_retry_event_info(event: &Event, error: DynError) -> Option<(String, RetryEvent)> { + let retry_event = match error.downcast_ref::() { + Some(EventProcessorError::InvalidEventLine { message }) => { + error!("{}", message); + return None; + } + Some(event_processor_error) => RetryEvent::new(event_processor_error.clone()), + // Others errors must be logged at least for now + None => { + error!("Unhandled error type for URI: {}, {:?}", event.uri, error); + return None; + } + }; + + // Generate a compress index to save in the cache + let index = match RetryEvent::generate_index_key(&event.uri) { + Some(retry_index) => retry_index, + None => { + return None; + } + }; + Some((format!("{}:{}", event.event_type, index), retry_event)) +} diff --git a/src/events/retry/event.rs b/src/events/retry/event.rs new file mode 100644 index 00000000..9d76e9df --- /dev/null +++ b/src/events/retry/event.rs @@ -0,0 +1,97 @@ +use async_trait::async_trait; +use chrono::Utc; +use pubky_app_specs::ParsedUri; +use serde::{Deserialize, Serialize}; + +use crate::{events::error::EventProcessorError, types::DynError, RedisOps}; + +pub const RETRY_MAMAGER_PREFIX: &str = "RetryManager"; +pub const RETRY_MANAGER_EVENTS_INDEX: [&str; 1] = ["events"]; +pub const RETRY_MANAGER_STATE_INDEX: [&str; 1] = ["state"]; + +/// Represents an event in the retry queue and it is used to manage events that have failed +/// to process and need to be retried +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RetryEvent { + /// Retry attempts made for this event + pub retry_count: u32, + /// The type of error that caused the event to fail + /// This determines how the event should be processed during the retry process + pub error_type: EventProcessorError, +} + +#[async_trait] +impl RedisOps for RetryEvent { + async fn prefix() -> String { + String::from(RETRY_MAMAGER_PREFIX) + } +} + +impl RetryEvent { + pub fn new(error_type: EventProcessorError) -> Self { + Self { + retry_count: 0, + error_type, + } + } + + /// It processes a homeserver URI and extracts specific components to form a index key + /// in the format `"{pubkyId}:{repository_model}:{event_id}"` + /// # Parameters + /// - `event_uri`: A string slice representing the event URI to be processed + pub fn generate_index_key(event_uri: &str) -> Option { + let parsed_uri = match ParsedUri::try_from(event_uri) { + Ok(parsed_uri) => parsed_uri, + Err(_) => return None, + }; + + let user_id = parsed_uri.user_id; + let key = match parsed_uri.resource.id() { + Some(id) => format!("{}:{}:{}", user_id, parsed_uri.resource, id), + None => format!("{}:{}", user_id, parsed_uri.resource), + }; + + Some(key) + } + + /// Stores an event in both a sorted set and a JSON index in Redis. + /// It adds an event line to a Redis sorted set with a timestamp-based score + /// and also stores the event details in a separate JSON index for retrieval. + /// # Arguments + /// * `event_line` - A `String` representing the event line to be indexed. + pub async fn put_to_index(&self, event_line: String) -> Result<(), DynError> { + Self::put_index_sorted_set( + &RETRY_MANAGER_EVENTS_INDEX, + // NOTE: Don't know if we should use now timestamp or the event timestamp + &[(Utc::now().timestamp_millis() as f64, &event_line)], + Some(RETRY_MAMAGER_PREFIX), + None, + ) + .await?; + + let index = &[RETRY_MANAGER_STATE_INDEX, [&event_line]].concat(); + self.put_index_json(index, None, None).await?; + + Ok(()) + } + + /// Checks if a specific event exists in the Redis sorted set + /// # Arguments + /// * `event_index` - A `&str` representing the event index to check + pub async fn check_uri(event_index: &str) -> Result, DynError> { + Self::check_sorted_set_member( + Some(RETRY_MAMAGER_PREFIX), + &RETRY_MANAGER_EVENTS_INDEX, + &[event_index], + ) + .await + } + + /// Retrieves an event from the JSON index in Redis based on its index + /// # Arguments + /// * `event_index` - A `&str` representing the event index to retrieve + pub async fn get_from_index(event_index: &str) -> Result, DynError> { + let index: &Vec<&str> = &[RETRY_MANAGER_STATE_INDEX, [event_index]].concat(); + Self::try_from_index_json(index, None).await + } +} diff --git a/src/events/retry/mod.rs b/src/events/retry/mod.rs new file mode 100644 index 00000000..53f11265 --- /dev/null +++ b/src/events/retry/mod.rs @@ -0,0 +1 @@ +pub mod event; diff --git a/src/events/uri.rs b/src/events/uri.rs deleted file mode 100644 index fd225436..00000000 --- a/src/events/uri.rs +++ /dev/null @@ -1,85 +0,0 @@ -use crate::types::DynError; -use pubky_app_specs::PubkyId; -use std::convert::TryFrom; - -#[derive(Default, Debug)] -pub struct ParsedUri { - pub user_id: PubkyId, - pub post_id: Option, - pub follow_id: Option, - pub muted_id: Option, - pub bookmark_id: Option, - pub tag_id: Option, - pub file_id: Option, -} - -impl TryFrom<&str> for ParsedUri { - type Error = DynError; - - fn try_from(uri: &str) -> Result { - let mut parsed_uri = ParsedUri::default(); - - // Ensure the URI starts with the correct prefix - if !uri.starts_with("pubky://") { - return Err("Invalid URI, must start with pubky://".into()); - } - - // Extract the user_id from the initial part of the URI - if let Some(user_id) = extract_segment(uri, "pubky://", "/pub/") { - parsed_uri.user_id = PubkyId::try_from(user_id)?; - } else { - return Err("Uri Pubky ID is invalid".into()); - } - - // Ensure that the URI belongs to pubky.app - if let Some(app_segment) = extract_segment(uri, "/pub/", "/") { - if app_segment != "pubky.app" { - return Err("The Event URI does not belong to pubky.app".into()); - } - } else { - return Err("The Event URI is malformed".into()); - } - - // Extract post_id if present - if let Some(post_id) = extract_segment(uri, "/posts/", "/") { - parsed_uri.post_id = Some(post_id.to_string()); - } - - // Extract follow_id if present - if let Some(follow_id) = extract_segment(uri, "/follows/", "/") { - parsed_uri.follow_id = Some(PubkyId::try_from(follow_id)?); - } - - // Extract muted_id if present - if let Some(muted_id) = extract_segment(uri, "/mutes/", "/") { - parsed_uri.muted_id = Some(PubkyId::try_from(muted_id)?); - } - - // Extract bookmark_id if present - if let Some(bookmark_id) = extract_segment(uri, "/bookmarks/", "/") { - parsed_uri.bookmark_id = Some(bookmark_id.to_string()); - } - - // Extract tag_id if present - if let Some(tag_id) = extract_segment(uri, "/tags/", "/") { - parsed_uri.tag_id = Some(tag_id.to_string()); - } - - // Extract file_id if present - if let Some(file_id) = extract_segment(uri, "/files/", "/") { - parsed_uri.file_id = Some(file_id.to_string()); - } - - Ok(parsed_uri) - } -} - -fn extract_segment<'a>(uri: &'a str, start_pattern: &str, end_pattern: &str) -> Option<&'a str> { - let start_idx = uri.find(start_pattern)? + start_pattern.len(); - let end_idx = uri[start_idx..] - .find(end_pattern) - .map(|i| i + start_idx) - .unwrap_or_else(|| uri.len()); - - Some(&uri[start_idx..end_idx]) -} diff --git a/src/models/post/stream.rs b/src/models/post/stream.rs index d71e9994..5d965c7a 100644 --- a/src/models/post/stream.rs +++ b/src/models/post/stream.rs @@ -518,7 +518,8 @@ impl PostStream { post_id: &str, ) -> Result<(), DynError> { let element = format!("{}:{}", author_id, post_id); - Self::remove_from_index_sorted_set(&POST_TIMELINE_KEY_PARTS, &[element.as_str()]).await + Self::remove_from_index_sorted_set(None, &POST_TIMELINE_KEY_PARTS, &[element.as_str()]) + .await } /// Adds the post to a Redis sorted set using the `indexed_at` timestamp as the score. @@ -534,7 +535,7 @@ impl PostStream { post_id: &str, ) -> Result<(), DynError> { let key_parts = [&POST_PER_USER_KEY_PARTS[..], &[author_id]].concat(); - Self::remove_from_index_sorted_set(&key_parts, &[post_id]).await + Self::remove_from_index_sorted_set(None, &key_parts, &[post_id]).await } /// Adds the post response to a Redis sorted set using the `indexed_at` timestamp as the score. @@ -560,7 +561,7 @@ impl PostStream { ) -> Result<(), DynError> { let key_parts = [&POST_REPLIES_PER_POST_KEY_PARTS[..], parent_post_key_parts].concat(); let element = format!("{}:{}", author_id, reply_id); - Self::remove_from_index_sorted_set(&key_parts, &[element.as_str()]).await + Self::remove_from_index_sorted_set(None, &key_parts, &[element.as_str()]).await } /// Adds the post to a Redis sorted set of replies per author using the `indexed_at` timestamp as the score. @@ -580,7 +581,7 @@ impl PostStream { post_id: &str, ) -> Result<(), DynError> { let key_parts = [&POST_REPLIES_PER_USER_KEY_PARTS[..], &[author_id]].concat(); - Self::remove_from_index_sorted_set(&key_parts, &[post_id]).await + Self::remove_from_index_sorted_set(None, &key_parts, &[post_id]).await } /// Adds a bookmark to Redis sorted set using the `indexed_at` timestamp as the score. @@ -604,7 +605,7 @@ impl PostStream { ) -> Result<(), DynError> { let key_parts = [&BOOKMARKS_USER_KEY_PARTS[..], &[bookmarker_id]].concat(); let post_key = format!("{}:{}", author_id, post_id); - Self::remove_from_index_sorted_set(&key_parts, &[&post_key]).await + Self::remove_from_index_sorted_set(None, &key_parts, &[&post_key]).await } /// Adds the post to a Redis sorted set using the total engagement as the score. @@ -631,7 +632,8 @@ impl PostStream { post_id: &str, ) -> Result<(), DynError> { let post_key = format!("{}:{}", author_id, post_id); - Self::remove_from_index_sorted_set(&POST_TOTAL_ENGAGEMENT_KEY_PARTS, &[&post_key]).await + Self::remove_from_index_sorted_set(None, &POST_TOTAL_ENGAGEMENT_KEY_PARTS, &[&post_key]) + .await } pub async fn update_index_score( diff --git a/src/models/tag/search.rs b/src/models/tag/search.rs index 170eab6a..b3bee500 100644 --- a/src/models/tag/search.rs +++ b/src/models/tag/search.rs @@ -129,7 +129,7 @@ impl TagSearch { ) -> Result<(), DynError> { let post_key_slice: &[&str] = &[author_id, post_id]; let key_parts = [&TAG_GLOBAL_POST_TIMELINE[..], &[tag_label]].concat(); - let tag_search = Self::check_sorted_set_member(&key_parts, post_key_slice).await?; + let tag_search = Self::check_sorted_set_member(None, &key_parts, post_key_slice).await?; if tag_search.is_none() { let option = PostDetails::try_from_index_json(post_key_slice, None).await?; if let Some(post_details) = option { @@ -157,7 +157,7 @@ impl TagSearch { if label_taggers.is_none() { let key_parts = [&TAG_GLOBAL_POST_TIMELINE[..], &[tag_label]].concat(); let post_key = format!("{}:{}", author_id, post_id); - Self::remove_from_index_sorted_set(&key_parts, &[&post_key]).await?; + Self::remove_from_index_sorted_set(None, &key_parts, &[&post_key]).await?; } Ok(()) } diff --git a/src/models/user/search.rs b/src/models/user/search.rs index 124c78ac..7bcf45e4 100644 --- a/src/models/user/search.rs +++ b/src/models/user/search.rs @@ -124,6 +124,7 @@ impl UserSearch { } Self::remove_from_index_sorted_set( + None, &USER_NAME_KEY_PARTS, records_to_delete .iter() diff --git a/src/routes/v0/stream/posts.rs b/src/routes/v0/stream/posts.rs index 16a61bfc..b94e4ea8 100644 --- a/src/routes/v0/stream/posts.rs +++ b/src/routes/v0/stream/posts.rs @@ -93,8 +93,6 @@ pub async fn stream_posts_handler( query.initialize_defaults(); - println!("QUERY: {:?}", query); - // Enforce maximum number of tags if let Some(ref tags) = query.tags { if tags.len() > MAX_TAGS { diff --git a/src/watcher.rs b/src/watcher.rs index e3d64e29..475b74a5 100644 --- a/src/watcher.rs +++ b/src/watcher.rs @@ -8,8 +8,13 @@ use tokio::time::{sleep, Duration}; #[tokio::main] async fn main() -> Result<(), Box> { let config = Config::from_env(); + // Initializes database connectors for Neo4j and Redis setup(&config).await; - PubkyConnector::initialise(&config, None)?; + + // Initializes the PubkyConnector with the configuration + PubkyConnector::initialise(&config, None).await?; + + // Create and configure the event processor let mut event_processor = EventProcessor::from_config(&config).await?; loop { diff --git a/tests/service/stream/post/reach/utils.rs b/tests/service/stream/post/reach/utils.rs index c9b54fe2..31ba22ed 100644 --- a/tests/service/stream/post/reach/utils.rs +++ b/tests/service/stream/post/reach/utils.rs @@ -39,8 +39,6 @@ pub async fn test_reach_filter_with_posts( path.push_str(&format!("&limit={}", limit)); } - println!("PATH: {:?}", path); - let body = make_request(&path).await?; if verify_timeline { diff --git a/tests/watcher/bookmarks/fail_index.rs b/tests/watcher/bookmarks/fail_index.rs index f9a75154..6878327a 100644 --- a/tests/watcher/bookmarks/fail_index.rs +++ b/tests/watcher/bookmarks/fail_index.rs @@ -41,7 +41,7 @@ async fn test_homeserver_bookmark_without_user() -> Result<()> { uri: format!("pubky://{}/pub/pubky.app/posts/{}", author_id, post_id), created_at: chrono::Utc::now().timestamp_millis(), }; - let bookmark_blob = serde_json::to_vec(&bookmark)?; + // Create the bookmark of the shadow user let bookmark_id = bookmark.create_id(); let bookmark_url = format!( @@ -52,7 +52,7 @@ async fn test_homeserver_bookmark_without_user() -> Result<()> { // Switch OFF the event processor to simulate the pending events to index test = test.remove_event_processing().await; // Put bookmark - test.put(&bookmark_url, bookmark_blob).await?; + test.put(&bookmark_url, bookmark).await?; // Create raw event line to retrieve the content from the homeserver let bookmark_event = format!("PUT {}", bookmark_url); diff --git a/tests/watcher/bookmarks/mod.rs b/tests/watcher/bookmarks/mod.rs index 46cec66d..b77d9430 100644 --- a/tests/watcher/bookmarks/mod.rs +++ b/tests/watcher/bookmarks/mod.rs @@ -1,5 +1,6 @@ mod del; mod fail_index; mod raw; +mod retry_bookmark; mod utils; mod viewer; diff --git a/tests/watcher/bookmarks/retry_bookmark.rs b/tests/watcher/bookmarks/retry_bookmark.rs new file mode 100644 index 00000000..f4a88371 --- /dev/null +++ b/tests/watcher/bookmarks/retry_bookmark.rs @@ -0,0 +1,97 @@ +use crate::watcher::utils::watcher::{assert_eventually_exists, WatcherTest}; +use anyhow::Result; +use pubky_app_specs::{traits::HashId, PubkyAppBookmark, PubkyAppUser}; +use pubky_common::crypto::Keypair; +use pubky_nexus::events::{error::EventProcessorError, retry::event::RetryEvent, EventType}; + +/// The user profile is stored in the homeserver. Missing the post to connect the bookmark +#[tokio_shared_rt::test(shared)] +async fn test_homeserver_bookmark_cannot_index() -> Result<()> { + let mut test = WatcherTest::setup().await?; + + let keypair = Keypair::random(); + let user = PubkyAppUser { + bio: Some("test_homeserver_bookmark_cannot_index".to_string()), + image: None, + links: None, + name: "Watcher:IndexFail:Bookmark:User".to_string(), + status: None, + }; + let user_id = test.create_user(&keypair, &user).await?; + + // Use a placeholder parent post ID to intentionally avoid resolving it in the graph database + let fake_post_id = "0032QB10HCRHG"; + let fake_user_id = "ba3e8qeby33uq9cughpxdf7bew9etn1eq8bc3yhwg7p1f54yaozy"; + // Create parent post uri + let post_uri = format!("pubky://{fake_user_id}/pub/pubky.app/posts/{fake_post_id}"); + + // Create a bookmark content + let bookmark = PubkyAppBookmark { + uri: post_uri, + created_at: chrono::Utc::now().timestamp_millis(), + }; + + // Create the bookmark of the shadow user + let bookmark_id = bookmark.create_id(); + let bookmark_url = format!( + "pubky://{}/pub/pubky.app/bookmarks/{}", + user_id, bookmark_id + ); + // PUT bookmark + test.put(&bookmark_url, bookmark).await?; + + let put_index_key = format!( + "{}:{}", + EventType::Put, + RetryEvent::generate_index_key(&bookmark_url).unwrap() + ); + + assert_eventually_exists(&put_index_key).await; + + let timestamp = RetryEvent::check_uri(&put_index_key).await.unwrap(); + assert!(timestamp.is_some()); + + let event_retry = RetryEvent::get_from_index(&put_index_key).await.unwrap(); + assert!(event_retry.is_some()); + + let event_state = event_retry.unwrap(); + + assert_eq!(event_state.retry_count, 0); + + let dependency_uri = format!("{fake_user_id}:posts:{fake_post_id}"); + match event_state.error_type { + EventProcessorError::MissingDependency { dependency } => { + assert_eq!(dependency.len(), 1); + assert_eq!(dependency[0], dependency_uri); + } + _ => panic!("The error type has to be MissingDependency type"), + }; + + // DEL bookmark + test.del(&bookmark_url).await?; + + let del_index_key = format!( + "{}:{}", + EventType::Del, + RetryEvent::generate_index_key(&bookmark_url).unwrap() + ); + + assert_eventually_exists(&del_index_key).await; + + let timestamp = RetryEvent::check_uri(&del_index_key).await.unwrap(); + assert!(timestamp.is_some()); + + let event_retry = RetryEvent::get_from_index(&del_index_key).await.unwrap(); + assert!(event_retry.is_some()); + + let event_state = event_retry.unwrap(); + + assert_eq!(event_state.retry_count, 0); + + match event_state.error_type { + EventProcessorError::SkipIndexing => (), + _ => panic!("The error type has to be SkipIndexing type"), + }; + + Ok(()) +} diff --git a/tests/watcher/follows/mod.rs b/tests/watcher/follows/mod.rs index 5c0f725a..70ab8c20 100644 --- a/tests/watcher/follows/mod.rs +++ b/tests/watcher/follows/mod.rs @@ -7,4 +7,5 @@ mod put; mod put_friends; mod put_notification; mod put_sequential; +mod retry_follow; mod utils; diff --git a/tests/watcher/follows/retry_follow.rs b/tests/watcher/follows/retry_follow.rs new file mode 100644 index 00000000..f4810afa --- /dev/null +++ b/tests/watcher/follows/retry_follow.rs @@ -0,0 +1,85 @@ +use crate::watcher::utils::watcher::{assert_eventually_exists, WatcherTest}; +use anyhow::Result; +use pubky_app_specs::{user_uri_builder, PubkyAppUser}; +use pubky_common::crypto::Keypair; +use pubky_nexus::events::{error::EventProcessorError, retry::event::RetryEvent, EventType}; + +/// The user profile is stored in the homeserver. Missing the followee to connect with follower +#[tokio_shared_rt::test(shared)] +async fn test_homeserver_follow_cannot_index() -> Result<()> { + let mut test = WatcherTest::setup().await?; + + let followee_keypair = Keypair::random(); + let followee_id = followee_keypair.public_key().to_z32(); + // In that case, that user will act as a NotSyncUser or user not registered in pubky.app + // It will not have a profile.json + test.register_user(&followee_keypair).await?; + + let follower_keypair = Keypair::random(); + let follower_user = PubkyAppUser { + bio: Some("test_homeserver_follow_cannot_index".to_string()), + image: None, + links: None, + name: "Watcher:IndexFail:Follower".to_string(), + status: None, + }; + let follower_id = test.create_user(&follower_keypair, &follower_user).await?; + + test.create_follow(&follower_id, &followee_id).await?; + + let follow_url = format!("pubky://{follower_id}/pub/pubky.app/follows/{followee_id}"); + + let index_key = format!( + "{}:{}", + EventType::Put, + RetryEvent::generate_index_key(&follow_url).unwrap() + ); + assert_eventually_exists(&index_key).await; + + let timestamp = RetryEvent::check_uri(&index_key).await.unwrap(); + assert!(timestamp.is_some()); + + let event_retry = RetryEvent::get_from_index(&index_key).await.unwrap(); + assert!(event_retry.is_some()); + + let event_state = event_retry.unwrap(); + + assert_eq!(event_state.retry_count, 0); + + let dependency_key = RetryEvent::generate_index_key(&user_uri_builder(followee_id.to_string())); + + match event_state.error_type { + EventProcessorError::MissingDependency { dependency } => { + assert_eq!(dependency.len(), 1); + assert_eq!(dependency[0], dependency_key.unwrap()) + } + _ => panic!("The error type has to be MissingDependency type"), + }; + + test.del(&follow_url).await?; + + let del_index_key = format!( + "{}:{}", + EventType::Del, + RetryEvent::generate_index_key(&follow_url).unwrap() + ); + + assert_eventually_exists(&del_index_key).await; + + let timestamp = RetryEvent::check_uri(&del_index_key).await.unwrap(); + assert!(timestamp.is_some()); + + let event_retry = RetryEvent::get_from_index(&del_index_key).await.unwrap(); + assert!(event_retry.is_some()); + + let event_state = event_retry.unwrap(); + + assert_eq!(event_state.retry_count, 0); + + match event_state.error_type { + EventProcessorError::SkipIndexing => (), + _ => panic!("The error type has to be SkipIndexing type"), + }; + + Ok(()) +} diff --git a/tests/watcher/mutes/mod.rs b/tests/watcher/mutes/mod.rs index 51dc8e21..7d57bbf5 100644 --- a/tests/watcher/mutes/mod.rs +++ b/tests/watcher/mutes/mod.rs @@ -1,4 +1,5 @@ mod del; mod fail_index; mod put; +mod retry_mute; mod utils; diff --git a/tests/watcher/mutes/retry_mute.rs b/tests/watcher/mutes/retry_mute.rs new file mode 100644 index 00000000..8bf83249 --- /dev/null +++ b/tests/watcher/mutes/retry_mute.rs @@ -0,0 +1,87 @@ +use crate::watcher::utils::watcher::{assert_eventually_exists, WatcherTest}; +use anyhow::Result; +use pubky_app_specs::{user_uri_builder, PubkyAppUser}; +use pubky_common::crypto::Keypair; +use pubky_nexus::events::{error::EventProcessorError, retry::event::RetryEvent, EventType}; + +/// The user profile is stored in the homeserver. Missing the mutee to connect with muter +#[tokio_shared_rt::test(shared)] +async fn test_homeserver_mute_cannot_index() -> Result<()> { + let mut test = WatcherTest::setup().await?; + + let mutee_keypair = Keypair::random(); + let mutee_id = mutee_keypair.public_key().to_z32(); + // In that case, that user will act as a NotSyncUser or user not registered in pubky.app + // It will not have a profile.json + test.register_user(&mutee_keypair).await?; + + let muter_keypair = Keypair::random(); + let muter_user = PubkyAppUser { + bio: Some("test_homeserver_mute_cannot_index".to_string()), + image: None, + links: None, + name: "Watcher:IndexFail:Muter".to_string(), + status: None, + }; + let muter_id = test.create_user(&muter_keypair, &muter_user).await?; + + // Mute the user + test.create_mute(&muter_id, &mutee_id).await?; + + let mute_url = format!("pubky://{muter_id}/pub/pubky.app/mutes/{mutee_id}"); + + let index_key = format!( + "{}:{}", + EventType::Put, + RetryEvent::generate_index_key(&mute_url).unwrap() + ); + + assert_eventually_exists(&index_key).await; + + let timestamp = RetryEvent::check_uri(&index_key).await.unwrap(); + assert!(timestamp.is_some()); + + let event_retry = RetryEvent::get_from_index(&index_key).await.unwrap(); + assert!(event_retry.is_some()); + + let event_state = event_retry.unwrap(); + + assert_eq!(event_state.retry_count, 0); + + let dependency_key = RetryEvent::generate_index_key(&user_uri_builder(mutee_id.to_string())); + + match event_state.error_type { + EventProcessorError::MissingDependency { dependency } => { + assert_eq!(dependency.len(), 1); + assert_eq!(dependency[0], dependency_key.unwrap()) + } + _ => panic!("The error type has to be MissingDependency type"), + }; + + test.del(&mute_url).await?; + + let del_index_key = format!( + "{}:{}", + EventType::Del, + RetryEvent::generate_index_key(&mute_url).unwrap() + ); + + assert_eventually_exists(&del_index_key).await; + + let timestamp = RetryEvent::check_uri(&del_index_key).await.unwrap(); + assert!(timestamp.is_some()); + + let event_retry = RetryEvent::get_from_index(&del_index_key).await.unwrap(); + assert!(event_retry.is_some()); + + let event_state = event_retry.unwrap(); + + assert_eq!(event_state.retry_count, 0); + + match event_state.error_type { + EventProcessorError::SkipIndexing => (), + _ => panic!("The error type has to be SkipIndexing type"), + }; + + Ok(()) +} diff --git a/tests/watcher/posts/mod.rs b/tests/watcher/posts/mod.rs index bfb554ac..14d85ff1 100644 --- a/tests/watcher/posts/mod.rs +++ b/tests/watcher/posts/mod.rs @@ -24,4 +24,8 @@ mod reply_notification; mod reply_repost; mod repost; mod repost_notification; +mod retry_all; +mod retry_post; +mod retry_reply; +mod retry_repost; pub mod utils; diff --git a/tests/watcher/posts/retry_all.rs b/tests/watcher/posts/retry_all.rs new file mode 100644 index 00000000..485e7e34 --- /dev/null +++ b/tests/watcher/posts/retry_all.rs @@ -0,0 +1,105 @@ +use crate::watcher::utils::watcher::{assert_eventually_exists, WatcherTest}; +use anyhow::Result; +use pubky_app_specs::{PubkyAppPost, PubkyAppPostEmbed, PubkyAppPostKind, PubkyAppUser}; +use pubky_common::crypto::Keypair; +use pubky_nexus::events::{error::EventProcessorError, retry::event::RetryEvent, EventType}; + +/// The user profile is stored in the homeserver. Missing the post to connect the new one +#[tokio_shared_rt::test(shared)] +async fn test_homeserver_post_with_reply_repost_cannot_index() -> Result<()> { + let mut test = WatcherTest::setup().await?; + + let keypair = Keypair::random(); + + let user = PubkyAppUser { + bio: Some("test_homeserver_post_reply".to_string()), + image: None, + links: None, + name: "Watcher:IndexFail:PostRepost:User".to_string(), + status: None, + }; + + let user_id = test.create_user(&keypair, &user).await?; + + // Use a placeholder parent post ID to intentionally avoid resolving it in the graph database + let reply_fake_post_id = "0032QB10HCRHG"; + let repost_fake_post_id = "0032QB10HP6JJ"; + // Create parent post uri + let reply_uri = format!("pubky://{user_id}/pub/pubky.app/posts/{reply_fake_post_id}"); + let repost_uri = format!("pubky://{user_id}/pub/pubky.app/posts/{repost_fake_post_id}"); + + let repost_reply_post = PubkyAppPost { + content: "Watcher:IndexFail:PostRepost:User:Reply".to_string(), + kind: PubkyAppPostKind::Short, + parent: Some(reply_uri.clone()), + embed: Some(PubkyAppPostEmbed { + kind: PubkyAppPostKind::Short, + uri: repost_uri.clone(), + }), + attachments: None, + }; + + let repost_reply_post_id = test.create_post(&user_id, &repost_reply_post).await?; + + let repost_reply_url = format!("pubky://{user_id}/pub/pubky.app/posts/{repost_reply_post_id}"); + + let index_key = format!( + "{}:{}", + EventType::Put, + RetryEvent::generate_index_key(&repost_reply_url).unwrap() + ); + + assert_eventually_exists(&index_key).await; + + let timestamp = RetryEvent::check_uri(&index_key).await.unwrap(); + assert!(timestamp.is_some()); + + let event_retry = RetryEvent::get_from_index(&index_key).await.unwrap(); + assert!(event_retry.is_some()); + + let event_state = event_retry.unwrap(); + + assert_eq!(event_state.retry_count, 0); + + match event_state.error_type { + EventProcessorError::MissingDependency { dependency } => { + assert_eq!(dependency.len(), 2); + assert_eq!( + dependency[0], + RetryEvent::generate_index_key(&reply_uri).unwrap() + ); + assert_eq!( + dependency[1], + RetryEvent::generate_index_key(&repost_uri).unwrap() + ); + } + _ => panic!("The error type has to be MissingDependency type"), + }; + + test.del(&repost_reply_url).await?; + + let del_index_key = format!( + "{}:{}", + EventType::Del, + RetryEvent::generate_index_key(&repost_reply_url).unwrap() + ); + + assert_eventually_exists(&del_index_key).await; + + let timestamp = RetryEvent::check_uri(&del_index_key).await.unwrap(); + assert!(timestamp.is_some()); + + let event_retry = RetryEvent::get_from_index(&del_index_key).await.unwrap(); + assert!(event_retry.is_some()); + + let event_state = event_retry.unwrap(); + + assert_eq!(event_state.retry_count, 0); + + match event_state.error_type { + EventProcessorError::SkipIndexing => (), + _ => panic!("The error type has to be SkipIndexing type"), + }; + + Ok(()) +} diff --git a/tests/watcher/posts/retry_post.rs b/tests/watcher/posts/retry_post.rs new file mode 100644 index 00000000..72c544ab --- /dev/null +++ b/tests/watcher/posts/retry_post.rs @@ -0,0 +1,88 @@ +use crate::watcher::utils::watcher::{assert_eventually_exists, WatcherTest}; +use anyhow::Result; +use pubky_app_specs::{PubkyAppPost, PubkyAppPostKind}; +use pubky_common::crypto::Keypair; +use pubky_nexus::events::{error::EventProcessorError, retry::event::RetryEvent, EventType}; + +/// The user profile is stored in the homeserver. Missing the author to connect the post +#[tokio_shared_rt::test(shared)] +async fn test_homeserver_post_cannot_index() -> Result<()> { + let mut test = WatcherTest::setup().await?; + + let keypair = Keypair::random(); + let user_id = keypair.public_key().to_z32(); + + // In that case, that user will act as a NotSyncUser or user not registered in pubky.app + // It will not have a profile.json + test.register_user(&keypair).await?; + + let post = PubkyAppPost { + content: "Watcher:IndexFail:PostEvent:PostWithoutUser".to_string(), + kind: PubkyAppPostKind::Short, + parent: None, + embed: None, + attachments: None, + }; + + let post_id = test.create_post(&user_id, &post).await?; + + let post_url = format!("pubky://{user_id}/pub/pubky.app/posts/{post_id}"); + + let index_key = format!( + "{}:{}", + EventType::Put, + RetryEvent::generate_index_key(&post_url).unwrap() + ); + + assert_eventually_exists(&index_key).await; + + let timestamp = RetryEvent::check_uri(&index_key).await.unwrap(); + assert!(timestamp.is_some()); + + let event_retry = RetryEvent::get_from_index(&index_key).await.unwrap(); + assert!(event_retry.is_some()); + + let event_state = event_retry.unwrap(); + + assert_eq!(event_state.retry_count, 0); + + let dependency_uri = format!("pubky://{user_id}/pub/pubky.app/profile.json"); + + match event_state.error_type { + EventProcessorError::MissingDependency { dependency } => { + assert_eq!(dependency.len(), 1); + assert_eq!( + dependency[0], + RetryEvent::generate_index_key(&dependency_uri).unwrap() + ); + } + _ => panic!("The error type has to be MissingDependency type"), + }; + + test.del(&post_url).await?; + + let del_index_key = format!( + "{}:{}", + EventType::Del, + RetryEvent::generate_index_key(&post_url).unwrap() + ); + + assert_eventually_exists(&del_index_key).await; + + let timestamp = RetryEvent::check_uri(&del_index_key).await.unwrap(); + assert!(timestamp.is_some()); + + let event_retry = RetryEvent::get_from_index(&del_index_key).await.unwrap(); + assert!(event_retry.is_some()); + + let event_state = event_retry.unwrap(); + + assert_eq!(event_state.retry_count, 0); + + match event_state.error_type { + EventProcessorError::SkipIndexing => (), + _ => panic!("The error type has to be SkipIndexing type"), + }; + + Ok(()) +} diff --git a/tests/watcher/posts/retry_reply.rs b/tests/watcher/posts/retry_reply.rs new file mode 100644 index 00000000..6dd6e7a9 --- /dev/null +++ b/tests/watcher/posts/retry_reply.rs @@ -0,0 +1,96 @@ +use crate::watcher::utils::watcher::{assert_eventually_exists, WatcherTest}; +use anyhow::Result; +use pubky_app_specs::{PubkyAppPost, PubkyAppPostKind, PubkyAppUser}; +use pubky_common::crypto::Keypair; +use pubky_nexus::events::{error::EventProcessorError, retry::event::RetryEvent, EventType}; + +/// The user profile is stored in the homeserver. Missing the post to connect the new one +#[tokio_shared_rt::test(shared)] +async fn test_homeserver_post_reply_cannot_index() -> Result<()> { + let mut test = WatcherTest::setup().await?; + + let keypair = Keypair::random(); + + let user = PubkyAppUser { + bio: Some("test_homeserver_post_reply".to_string()), + image: None, + links: None, + name: "Watcher:IndexFail:PostReply:User".to_string(), + status: None, + }; + + let user_id = test.create_user(&keypair, &user).await?; + + // Use a placeholder parent post ID to intentionally avoid resolving it in the graph database + let parent_fake_post_id = "0032QB10HCRHG"; + // Create parent post uri + let dependency_uri = format!("pubky://{user_id}/pub/pubky.app/posts/{parent_fake_post_id}"); + + let reply_post = PubkyAppPost { + content: "Watcher:IndexFail:PostReply:User:Reply".to_string(), + kind: PubkyAppPostKind::Short, + parent: Some(dependency_uri.clone()), + embed: None, + attachments: None, + }; + + let reply_id = test.create_post(&user_id, &reply_post).await?; + + let reply_url = format!("pubky://{user_id}/pub/pubky.app/posts/{reply_id}"); + + let index_key = format!( + "{}:{}", + EventType::Put, + RetryEvent::generate_index_key(&reply_url).unwrap() + ); + + assert_eventually_exists(&index_key).await; + + let timestamp = RetryEvent::check_uri(&index_key).await.unwrap(); + assert!(timestamp.is_some()); + + let event_retry = RetryEvent::get_from_index(&index_key).await.unwrap(); + assert!(event_retry.is_some()); + + let event_state = event_retry.unwrap(); + + assert_eq!(event_state.retry_count, 0); + + match event_state.error_type { + EventProcessorError::MissingDependency { dependency } => { + assert_eq!(dependency.len(), 1); + assert_eq!( + dependency[0], + RetryEvent::generate_index_key(&dependency_uri).unwrap() + ); + } + _ => panic!("The error type has to be MissingDependency type"), + }; + + test.del(&reply_url).await?; + + let del_index_key = format!( + "{}:{}", + EventType::Del, + RetryEvent::generate_index_key(&reply_url).unwrap() + ); + + assert_eventually_exists(&del_index_key).await; + + let timestamp = RetryEvent::check_uri(&del_index_key).await.unwrap(); + assert!(timestamp.is_some()); + + let event_retry = RetryEvent::get_from_index(&del_index_key).await.unwrap(); + assert!(event_retry.is_some()); + + let event_state = event_retry.unwrap(); + + assert_eq!(event_state.retry_count, 0); + + match event_state.error_type { + EventProcessorError::SkipIndexing => (), + _ => panic!("The error type has to be SkipIndexing type"), + }; + + Ok(()) +} diff --git a/tests/watcher/posts/retry_repost.rs b/tests/watcher/posts/retry_repost.rs new file mode 100644 index 00000000..4fdddb9c --- /dev/null +++ b/tests/watcher/posts/retry_repost.rs @@ -0,0 +1,99 @@ +use crate::watcher::utils::watcher::{assert_eventually_exists, WatcherTest}; +use anyhow::Result; +use pubky_app_specs::{PubkyAppPost, PubkyAppPostEmbed, PubkyAppPostKind, PubkyAppUser}; +use pubky_common::crypto::Keypair; +use pubky_nexus::events::{error::EventProcessorError, retry::event::RetryEvent, EventType}; + +/// The user profile is stored in the homeserver. Missing the post to connect the new one +#[tokio_shared_rt::test(shared)] +async fn test_homeserver_post_repost_cannot_index() -> Result<()> { + let mut test = WatcherTest::setup().await?; + + let keypair = Keypair::random(); + + let user = PubkyAppUser { + bio: Some("test_homeserver_post_reply".to_string()), + image: None, + links: None, + name: "Watcher:IndexFail:PostRepost:User".to_string(), + status: None, + }; + + let user_id = test.create_user(&keypair, &user).await?; + + // Use a placeholder parent post ID to intentionally avoid resolving it in the graph database + let repost_fake_post_id = "0032QB10HCRHG"; + // Create parent post uri + let dependency_uri = format!("pubky://{user_id}/pub/pubky.app/posts/{repost_fake_post_id}"); + + let repost_post = PubkyAppPost { + content: "Watcher:IndexFail:PostRepost:User:Reply".to_string(), + kind: PubkyAppPostKind::Short, + parent: None, + embed: Some(PubkyAppPostEmbed { + kind: PubkyAppPostKind::Short, + uri: dependency_uri.clone(), + }), + attachments: None, + }; + + let repost_id = test.create_post(&user_id, &repost_post).await?; + + let repost_url = format!("pubky://{user_id}/pub/pubky.app/posts/{repost_id}"); + + let index_key = format!( + "{}:{}", + EventType::Put, + RetryEvent::generate_index_key(&repost_url).unwrap() + ); + + assert_eventually_exists(&index_key).await; + + let timestamp = RetryEvent::check_uri(&index_key).await.unwrap(); + assert!(timestamp.is_some()); + + let event_retry = RetryEvent::get_from_index(&index_key).await.unwrap(); + assert!(event_retry.is_some()); + + let event_state = event_retry.unwrap(); + + assert_eq!(event_state.retry_count, 0); + + match event_state.error_type { + EventProcessorError::MissingDependency { dependency } => { + assert_eq!(dependency.len(), 1); + assert_eq!( + dependency[0], + RetryEvent::generate_index_key(&dependency_uri).unwrap() + ); + } + _ => panic!("The error type has to be MissingDependency type"), + }; + + test.del(&repost_url).await?; + + let del_index_key = format!( + "{}:{}", + EventType::Del, + RetryEvent::generate_index_key(&repost_url).unwrap() + ); + + assert_eventually_exists(&del_index_key).await; + + let timestamp = RetryEvent::check_uri(&del_index_key).await.unwrap(); + assert!(timestamp.is_some()); + + let event_retry = RetryEvent::get_from_index(&del_index_key).await.unwrap(); + assert!(event_retry.is_some()); + + let event_state = event_retry.unwrap(); + + assert_eq!(event_state.retry_count, 0); + + match event_state.error_type { + EventProcessorError::SkipIndexing => (), + _ => panic!("The error type has to be SkipIndexing type"), + }; + + Ok(()) +} diff --git a/tests/watcher/posts/utils.rs b/tests/watcher/posts/utils.rs index 1b48ebe9..27ecf839 100644 --- a/tests/watcher/posts/utils.rs +++ b/tests/watcher/posts/utils.rs @@ -40,7 +40,7 @@ pub async fn check_member_global_timeline_user_post( ) -> Result> { let post_key: &[&str] = &[user_id, post_id]; let global_timeline_timestamp = - PostStream::check_sorted_set_member(&POST_TIMELINE_KEY_PARTS, post_key) + PostStream::check_sorted_set_member(None, &POST_TIMELINE_KEY_PARTS, post_key) .await .unwrap(); Ok(global_timeline_timestamp) @@ -52,7 +52,7 @@ pub async fn check_member_user_post_timeline( ) -> Result> { let post_stream_key_parts = [&POST_PER_USER_KEY_PARTS[..], &[user_id]].concat(); let post_timeline_timestamp = - PostStream::check_sorted_set_member(&post_stream_key_parts, &[post_id]) + PostStream::check_sorted_set_member(None, &post_stream_key_parts, &[post_id]) .await .unwrap(); Ok(post_timeline_timestamp) @@ -64,7 +64,7 @@ pub async fn check_member_user_replies_timeline( ) -> Result> { let post_stream_key_parts = [&POST_REPLIES_PER_USER_KEY_PARTS[..], &[user_id]].concat(); let post_timeline_timestamp = - PostStream::check_sorted_set_member(&post_stream_key_parts, &[post_id]) + PostStream::check_sorted_set_member(None, &post_stream_key_parts, &[post_id]) .await .unwrap(); Ok(post_timeline_timestamp) @@ -72,7 +72,7 @@ pub async fn check_member_user_replies_timeline( pub async fn check_member_total_engagement_user_posts(post_key: &[&str]) -> Result> { let total_engagement = - PostStream::check_sorted_set_member(&POST_TOTAL_ENGAGEMENT_KEY_PARTS, post_key) + PostStream::check_sorted_set_member(None, &POST_TOTAL_ENGAGEMENT_KEY_PARTS, post_key) .await .unwrap(); Ok(total_engagement) @@ -85,7 +85,7 @@ pub async fn check_member_post_replies( ) -> Result> { let key_parts = [&POST_REPLIES_PER_POST_KEY_PARTS[..], &[author_id, post_id]].concat(); - let post_replies = PostStream::check_sorted_set_member(&key_parts, post_key) + let post_replies = PostStream::check_sorted_set_member(None, &key_parts, post_key) .await .unwrap(); Ok(post_replies) diff --git a/tests/watcher/tags/fail_index.rs b/tests/watcher/tags/fail_index.rs index c7a6c318..8be9c966 100644 --- a/tests/watcher/tags/fail_index.rs +++ b/tests/watcher/tags/fail_index.rs @@ -6,7 +6,7 @@ use pubky_app_specs::{traits::HashId, PubkyAppPost, PubkyAppTag, PubkyAppUser}; use pubky_common::crypto::Keypair; #[tokio_shared_rt::test(shared)] -async fn test_homeserver_tag_user_not_found() -> Result<()> { +async fn test_homeserver_tag_cannot_add_while_index() -> Result<()> { let mut test = WatcherTest::setup().await?; let tagged_keypair = Keypair::random(); diff --git a/tests/watcher/tags/mod.rs b/tests/watcher/tags/mod.rs index 38cb8bd4..c38aa3e5 100644 --- a/tests/watcher/tags/mod.rs +++ b/tests/watcher/tags/mod.rs @@ -3,6 +3,8 @@ mod post_del; mod post_muti_user; mod post_notification; mod post_put; +mod retry_post_tag; +mod retry_user_tag; mod user_notification; mod user_to_self_put; mod user_to_user_del; diff --git a/tests/watcher/tags/retry_post_tag.rs b/tests/watcher/tags/retry_post_tag.rs new file mode 100644 index 00000000..bad37cc1 --- /dev/null +++ b/tests/watcher/tags/retry_post_tag.rs @@ -0,0 +1,113 @@ +use crate::watcher::utils::watcher::{assert_eventually_exists, WatcherTest}; +use anyhow::Result; +use chrono::Utc; +use pubky_app_specs::{traits::HashId, PubkyAppTag, PubkyAppUser}; +use pubky_common::crypto::Keypair; +use pubky_nexus::events::{error::EventProcessorError, retry::event::RetryEvent, EventType}; + +#[tokio_shared_rt::test(shared)] +async fn test_homeserver_post_tag_event_to_queue() -> Result<()> { + let mut test = WatcherTest::setup().await?; + + let tagger_keypair = Keypair::random(); + let tagger_user = PubkyAppUser { + bio: Some("test_homeserver_user_tag_event_to_queue".to_string()), + image: None, + links: None, + name: "Watcher:Retry:Post:CannotTag:Tagger:Sync".to_string(), + status: None, + }; + let tagger_user_id = test.create_user(&tagger_keypair, &tagger_user).await?; + + // Create a key but it would not be synchronised in nexus + let author_keypair = Keypair::random(); + let author = PubkyAppUser { + bio: Some("test_homeserver_user_tag_event_to_queue".to_string()), + image: None, + links: None, + name: "Watcher:Retry:Post:CannotTag:Author:Sync".to_string(), + status: None, + }; + let author_id = test.create_user(&author_keypair, &author).await?; + + // => Create user tag + let label = "peak_limit"; + + let dependency_uri = format!( + "pubky://{author_id}/pub/pubky.app/posts/{}", + "0032Q4SFBFD4G" + ); + + // Create a tag in a fake post + let tag = PubkyAppTag { + uri: dependency_uri.clone(), + label: label.to_string(), + created_at: Utc::now().timestamp_millis(), + }; + let tag_url = format!( + "pubky://{tagger_user_id}/pub/pubky.app/tags/{}", + tag.create_id() + ); + + // PUT user tag + // That operation is going to write the event in the pending events queue, so block a bit the thread + // to let write the indexes + test.put(tag_url.as_str(), tag).await?; + + let index_key = format!( + "{}:{}", + EventType::Put, + RetryEvent::generate_index_key(&tag_url).unwrap() + ); + + assert_eventually_exists(&index_key).await; + + let timestamp = RetryEvent::check_uri(&index_key).await.unwrap(); + assert!(timestamp.is_some()); + + let event_retry = RetryEvent::get_from_index(&index_key).await.unwrap(); + + assert!(event_retry.is_some()); + + let event_state = event_retry.unwrap(); + + assert_eq!(event_state.retry_count, 0); + + match event_state.error_type { + EventProcessorError::MissingDependency { dependency } => { + assert_eq!(dependency.len(), 1); + assert_eq!( + dependency[0], + RetryEvent::generate_index_key(&dependency_uri).unwrap() + ); + } + _ => panic!("The error type has to be MissingDependency type"), + }; + + test.del(&tag_url).await?; + + let del_index_key = format!( + "{}:{}", + EventType::Del, + RetryEvent::generate_index_key(&tag_url).unwrap() + ); + + assert_eventually_exists(&del_index_key).await; + + let timestamp = RetryEvent::check_uri(&del_index_key).await.unwrap(); + assert!(timestamp.is_some()); + + let event_retry = RetryEvent::get_from_index(&del_index_key).await.unwrap(); + assert!(event_retry.is_some()); + + let event_state = event_retry.unwrap(); + + assert_eq!(event_state.retry_count, 0); + + match event_state.error_type { + EventProcessorError::SkipIndexing => (), + _ => panic!("The error type has to be SkipIndexing type"), + }; + + Ok(()) +} diff --git a/tests/watcher/tags/retry_user_tag.rs b/tests/watcher/tags/retry_user_tag.rs new file mode 100644 index 00000000..4a61910f --- /dev/null +++ b/tests/watcher/tags/retry_user_tag.rs @@ -0,0 +1,103 @@ +use crate::watcher::utils::watcher::{assert_eventually_exists, WatcherTest}; +use anyhow::Result; +use chrono::Utc; +use pubky_app_specs::{traits::HashId, PubkyAppTag, PubkyAppUser}; +use pubky_common::crypto::Keypair; +use pubky_nexus::events::{error::EventProcessorError, retry::event::RetryEvent, EventType}; + +#[tokio_shared_rt::test(shared)] +async fn test_homeserver_user_tag_event_to_queue() -> Result<()> { + let mut test = WatcherTest::setup().await?; + + let tagger_keypair = Keypair::random(); + let tagger_user = PubkyAppUser { + bio: Some("test_homeserver_user_tag_event_to_queue".to_string()), + image: None, + links: None, + name: "Watcher:Retry:User:CannotTag:Tagger:Sync".to_string(), + status: None, + }; + let tagger_user_id = test.create_user(&tagger_keypair, &tagger_user).await?; + + // Create a user key but it would not be synchronised in nexus + let shadow_keypair = Keypair::random(); + test.register_user(&shadow_keypair).await?; + let shadow_user_id = shadow_keypair.public_key().to_z32(); + + // => Create user tag + let label = "friendly"; + + let dependency_uri = format!("pubky://{shadow_user_id}/pub/pubky.app/profile.json"); + + let tag = PubkyAppTag { + uri: dependency_uri.clone(), + label: label.to_string(), + created_at: Utc::now().timestamp_millis(), + }; + let tag_url = format!( + "pubky://{}/pub/pubky.app/tags/{}", + tagger_user_id, + tag.create_id() + ); + + // PUT user tag + // That operation is going to write the event in the pending events queue, so block a bit the thread + // to let write the indexes + test.put(tag_url.as_str(), tag).await?; + + let index_key = format!( + "{}:{}", + EventType::Put, + RetryEvent::generate_index_key(&tag_url).unwrap() + ); + + assert_eventually_exists(&index_key).await; + + let timestamp = RetryEvent::check_uri(&index_key).await.unwrap(); + assert!(timestamp.is_some()); + + let event_retry = RetryEvent::get_from_index(&index_key).await.unwrap(); + assert!(event_retry.is_some()); + + let event_state = event_retry.unwrap(); + + assert_eq!(event_state.retry_count, 0); + + match event_state.error_type { + EventProcessorError::MissingDependency { dependency } => { + assert_eq!(dependency.len(), 1); + assert_eq!( + dependency[0], + RetryEvent::generate_index_key(&dependency_uri).unwrap() + ); + } + _ => panic!("The error type has to be MissingDependency type"), + }; + + test.del(&tag_url).await?; + + let del_index_key = format!( + "{}:{}", + EventType::Del, + RetryEvent::generate_index_key(&tag_url).unwrap() + ); + + assert_eventually_exists(&del_index_key).await; + + let timestamp = RetryEvent::check_uri(&del_index_key).await.unwrap(); + assert!(timestamp.is_some()); + + let event_retry = RetryEvent::get_from_index(&del_index_key).await.unwrap(); + assert!(event_retry.is_some()); + + let event_state = event_retry.unwrap(); + + assert_eq!(event_state.retry_count, 0); + + match event_state.error_type { + EventProcessorError::SkipIndexing => (), + _ => panic!("The error type has to be SkipIndexing type"), + }; + + Ok(()) +} diff --git a/tests/watcher/tags/utils.rs b/tests/watcher/tags/utils.rs index 6ebb9e3e..0dfc682e 100644 --- a/tests/watcher/tags/utils.rs +++ b/tests/watcher/tags/utils.rs @@ -63,6 +63,7 @@ pub async fn check_member_total_engagement_post_tag( label: &str, ) -> Result> { let total_engagement = TagSearch::check_sorted_set_member( + None, &[&TAG_GLOBAL_POST_ENGAGEMENT[..], &[label]].concat(), post_key, ) @@ -76,6 +77,7 @@ pub async fn check_member_post_tag_global_timeline( label: &str, ) -> Result> { let exist_in_timeline = TagSearch::check_sorted_set_member( + None, &[&TAG_GLOBAL_POST_TIMELINE[..], &[label]].concat(), post_key, ) diff --git a/tests/watcher/users/raw.rs b/tests/watcher/users/raw.rs index 66be1874..2de3a306 100644 --- a/tests/watcher/users/raw.rs +++ b/tests/watcher/users/raw.rs @@ -65,6 +65,7 @@ async fn test_homeserver_user_put_event() -> Result<()> { // Sorted:Users:Name let is_member = UserSearch::check_sorted_set_member( + None, &USER_NAME_KEY_PARTS, &[&user.name.to_lowercase(), &user_id], ) diff --git a/tests/watcher/users/utils.rs b/tests/watcher/users/utils.rs index 2de7d68f..d7249abf 100644 --- a/tests/watcher/users/utils.rs +++ b/tests/watcher/users/utils.rs @@ -9,16 +9,17 @@ use pubky_nexus::{ pub async fn check_member_most_followed(user_id: &str) -> Result> { let pioneer_score = - UserStream::check_sorted_set_member(&USER_MOSTFOLLOWED_KEY_PARTS, &[user_id]) + UserStream::check_sorted_set_member(None, &USER_MOSTFOLLOWED_KEY_PARTS, &[user_id]) .await .unwrap(); Ok(pioneer_score) } pub async fn check_member_user_pioneer(user_id: &str) -> Result> { - let pioneer_score = UserStream::check_sorted_set_member(&USER_PIONEERS_KEY_PARTS, &[user_id]) - .await - .unwrap(); + let pioneer_score = + UserStream::check_sorted_set_member(None, &USER_PIONEERS_KEY_PARTS, &[user_id]) + .await + .unwrap(); Ok(pioneer_score) } diff --git a/tests/watcher/utils/dht.rs b/tests/watcher/utils/dht.rs index 7017aad9..11663afd 100644 --- a/tests/watcher/utils/dht.rs +++ b/tests/watcher/utils/dht.rs @@ -1,4 +1,5 @@ use anyhow::{anyhow, Error}; +use log::warn; use mainline::Testnet; use std::sync::Arc; use tokio::sync::OnceCell; @@ -26,9 +27,11 @@ impl TestnetDHTNetwork { // TODO: maybe add the node number in environment variable nodes: Arc::new(Testnet::new(nodes)?), }; - DHT_TESTNET_NETWORK_SINGLETON - .set(testnet) - .map_err(|_| anyhow!("Already initiailsed"))?; + if let Err(_) = DHT_TESTNET_NETWORK_SINGLETON.set(testnet) { + warn!("DHT Testnet network was already initialized."); + return Ok(()); + } + Ok(()) } diff --git a/tests/watcher/utils/watcher.rs b/tests/watcher/utils/watcher.rs index 5125d985..b208fb02 100644 --- a/tests/watcher/utils/watcher.rs +++ b/tests/watcher/utils/watcher.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use super::dht::TestnetDHTNetwork; use anyhow::{anyhow, Result}; use chrono::Utc; @@ -7,7 +9,10 @@ use pubky_app_specs::{ }; use pubky_common::crypto::Keypair; use pubky_homeserver::Homeserver; -use pubky_nexus::{events::Event, setup, types::DynError, Config, EventProcessor, PubkyConnector}; +use pubky_nexus::events::retry::event::RetryEvent; +use pubky_nexus::events::Event; +use pubky_nexus::types::DynError; +use pubky_nexus::{setup, Config, EventProcessor, PubkyConnector}; /// Struct to hold the setup environment for tests pub struct WatcherTest { @@ -25,8 +30,9 @@ impl WatcherTest { /// 2. Initializes database connectors for Neo4j and Redis. /// 3. Sets up the global DHT test network for the watcher. /// 4. Creates and starts a test homeserver instance. - /// 5. Initializes the PubkyConnector with the configuration and global test DHT nodes. - /// 6. Creates and configures the event processor with the homeserver URL. + /// 5. Initializes a retry manager and ensures robustness by managing retries asynchronously. + /// 6. Initializes the PubkyConnector with the configuration and global test DHT nodes. + /// 7. Creates and configures the event processor with the homeserver URL. /// /// # Returns /// Returns an instance of `Self` containing the configuration, homeserver, @@ -41,7 +47,7 @@ impl WatcherTest { let homeserver = Homeserver::start_test(&testnet).await?; let homeserver_id = homeserver.public_key().to_string(); - match PubkyConnector::initialise(&config, Some(&testnet)) { + match PubkyConnector::initialise(&config, Some(&testnet)).await { Ok(_) => debug!("WatcherTest: PubkyConnector initialised"), Err(e) => debug!("WatcherTest: {}", e), } @@ -56,11 +62,13 @@ impl WatcherTest { }) } + /// Disables event processing and returns the modified instance. pub async fn remove_event_processing(mut self) -> Self { self.ensure_event_processing = false; self } + /// Ensures that event processing is completed if it is enabled. pub async fn ensure_event_processing_complete(&mut self) -> Result<()> { if self.ensure_event_processing { self.event_processor.run().await.map_err(|e| anyhow!(e))?; @@ -251,3 +259,32 @@ pub async fn retrieve_and_handle_event_line(event_line: &str) -> Result<(), DynE Ok(()) } + +/// Attempts to read an event index with retries before timing out +/// # Arguments +/// * `event_index` - A string slice representing the index to check +pub async fn assert_eventually_exists(event_index: &str) { + const SLEEP_MS: u64 = 3; + const MAX_RETRIES: usize = 50; + + for attempt in 0..MAX_RETRIES { + debug!( + "RetryEvent: Trying to read index {:?}, attempt {}/{} ({}ms)", + event_index, + attempt + 1, + MAX_RETRIES, + SLEEP_MS * attempt as u64 + ); + match RetryEvent::check_uri(event_index).await { + Ok(timeframe) => { + if timeframe.is_some() { + return (); + } + } + Err(e) => panic!("Error while getting index: {:?}", e), + }; + // Nap time + tokio::time::sleep(Duration::from_millis(SLEEP_MS)).await; + } + panic!("TIMEOUT: It takes to much time to read the RetryManager new index") +}