diff --git a/Cargo.lock b/Cargo.lock index 84e8126..e7ce739 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -674,6 +674,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06ba6d68e24814cb8de6bb986db8222d3a027d15872cabc0d18817bc3c0e4471" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-deque" version = "0.8.5" @@ -2993,6 +3002,7 @@ dependencies = [ "thiserror 2.0.3", "tokio", "tracing", + "tracing-appender", "tracing-subscriber", "web3-utils", ] @@ -4579,6 +4589,18 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-appender" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3566e8ce28cc0a3fe42519fc80e6b4c943cc4c8cef275620eb8dac2d3d4e06cf" +dependencies = [ + "crossbeam-channel", + "thiserror 1.0.64", + "time", + "tracing-subscriber", +] + [[package]] name = "tracing-attributes" version = "0.1.28" @@ -4734,9 +4756,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.11.0" +version = "1.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a" +checksum = "b3758f5e68192bb96cc8f9b7e2c2cfdabb435499a28499a42f8f984092adad4b" dependencies = [ "getrandom", ] diff --git a/cli/src/main.rs b/cli/src/main.rs index 24f7047..49c658f 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -2,9 +2,8 @@ use clap::{Args, Parser, Subcommand}; use futures::{stream, StreamExt, TryStreamExt}; use ipfs::IpfsClient; use sdk::mapping::{Entity, Named}; -use sdk::{ids, pb}; +use sdk::{ids, neo4rs, pb}; use sink::bootstrap::constants; -use sink::kg; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; @@ -21,7 +20,7 @@ async fn main() -> anyhow::Result<()> { init_tracing(); let args = AppArgs::parse(); - let kg = kg::Client::new( + let neo4j: neo4rs::Graph = neo4rs::Graph::new( &args.neo4j_args.neo4j_uri, &args.neo4j_args.neo4j_user, &args.neo4j_args.neo4j_pass, @@ -45,7 +44,7 @@ async fn main() -> anyhow::Result<()> { unimplemented!() } Command::Describe { id, space_id } => { - let entity = Entity::::find_by_id(&kg.neo4j, &id, &space_id) + let entity = Entity::::find_by_id(&neo4j, &id, &space_id) .await? .expect("Entity not found"); @@ -77,12 +76,12 @@ async fn main() -> anyhow::Result<()> { } Command::ImportSpace { ipfs_hash, - space_id, + space_id: _, } => { - let ops = import_space(&ipfs_client, &ipfs_hash).await?; + let _ = import_space(&ipfs_client, &ipfs_hash).await?; // let rollups = conversions::batch_ops(ops); - kg.process_ops(&Default::default(), &space_id, ops).await? + // neo4j.process_ops(&Default::default(), &space_id, ops).await? } Command::CreateEntityId { n } => { for _ in 0..n { diff --git a/codegen/src/lib.rs b/codegen/src/lib.rs index c5740d8..4635bd1 100644 --- a/codegen/src/lib.rs +++ b/codegen/src/lib.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use futures::{stream, StreamExt, TryStreamExt}; use sdk::mapping::{Entity, Named}; -use sdk::system_ids; +use sdk::{neo4rs, system_ids}; use swc::config::SourceMapsConfig; use swc::PrintArgs; use swc_common::{sync::Lrc, SourceMap, Span}; @@ -160,7 +160,7 @@ impl EntityExt for Entity { /// Generate a TypeScript class declaration from an entity. /// Note: The entity must be a `Type` entity. -pub async fn gen_type(_kg: &sink::kg::Client, entity: &Entity) -> anyhow::Result { +pub async fn gen_type(_kg: &neo4rs::Graph, entity: &Entity) -> anyhow::Result { // let attrs = kg.attribute_nodes::(entity.id()).await?; let attrs = vec![]; // FIXME: Temporary while we figure out what to do with codegen @@ -236,7 +236,7 @@ pub async fn gen_type(_kg: &sink::kg::Client, entity: &Entity) -> anyhow: } /// Generate a TypeScript module containing class definitions from all types in the knowledge graph. -pub async fn gen_types(kg: &sink::kg::Client) -> anyhow::Result { +pub async fn gen_types(kg: &neo4rs::Graph) -> anyhow::Result { let import_stmts = vec![ quote!("import { Driver, Node } from 'neo4j-driver';" as ModuleItem), quote!("import { Entity } from './kg';" as ModuleItem), @@ -271,7 +271,7 @@ pub async fn gen_types(kg: &sink::kg::Client) -> anyhow::Result { } /// Generate and render TypeScript code from the knowledge graph. -pub async fn codegen(kg: &sink::kg::Client) -> anyhow::Result { +pub async fn codegen(kg: &neo4rs::Graph) -> anyhow::Result { let cm: Lrc = Default::default(); let compiler = swc::Compiler::new(cm.clone()); diff --git a/sdk/src/ids/base58.rs b/sdk/src/ids/base58.rs index ea9a5b7..6bb21b9 100644 --- a/sdk/src/ids/base58.rs +++ b/sdk/src/ids/base58.rs @@ -52,16 +52,24 @@ mod tests { #[test] fn test_base58_encoding() { assert_eq!( + encode_uuid_to_base58("1cc6995f-6cc2-4c7a-9592-1466bf95f6be"), "4Z6VLmpipszCVZb21Fey5F", - encode_uuid_to_base58("1cc6995f-6cc2-4c7a-9592-1466bf95f6be") + ) + } + + #[test] + fn test_base58_encoding_2() { + assert_eq!( + encode_uuid_to_base58("08c4f093-7858-4b7c-9b94-b82e448abcff"), + "25omwWh6HYgeRQKCaSpVpa", ) } #[test] fn test_base58_decoding() { assert_eq!( + decode_base58_to_uuid("4Z6VLmpipszCVZb21Fey5F").unwrap(), "1cc6995f-6cc2-4c7a-9592-1466bf95f6be", - decode_base58_to_uuid("4Z6VLmpipszCVZb21Fey5F").unwrap() ) } diff --git a/sdk/src/ids/id.rs b/sdk/src/ids/id.rs index fb8e354..91feee2 100644 --- a/sdk/src/ids/id.rs +++ b/sdk/src/ids/id.rs @@ -1,7 +1,7 @@ use std::fmt::Display; use md5::{Digest, Md5}; -use uuid::Uuid; +use uuid::Builder; use super::base58::encode_uuid_to_base58; @@ -58,12 +58,29 @@ pub fn create_space_id(network: &str, address: &str) -> String { pub fn create_id_from_unique_string(text: &str) -> String { let mut hasher = Md5::new(); hasher.update(text); - let hashed = hasher.finalize(); + let hashed: [u8; 16] = hasher.finalize().into(); - let uuid = Uuid::from_slice(&hashed[..]).unwrap(); + let uuid = Builder::from_random_bytes(hashed).into_uuid(); encode_uuid_to_base58(&uuid.to_string()) } pub fn create_geo_id() -> String { encode_uuid_to_base58(&uuid::Uuid::new_v4().to_string()) } + +#[cfg(test)] +mod tests { + use super::*; + use crate::network_ids; + + #[test] + fn test_space_id() { + assert_eq!( + create_space_id( + network_ids::GEO, + "0xcD48eF54771d9cf7dDA324c64bF4e53C161aF294" + ), + "25omwWh6HYgeRQKCaSpVpa" + ) + } +} diff --git a/sdk/src/models/account.rs b/sdk/src/models/account.rs index 96c1df3..5cd6be3 100644 --- a/sdk/src/models/account.rs +++ b/sdk/src/models/account.rs @@ -12,7 +12,7 @@ pub struct GeoAccount { impl GeoAccount { pub fn new(address: String, block: &BlockMetadata) -> Entity { - let checksummed_address = checksum_address(&address, None); + let checksummed_address = checksum_address(&address); Entity::new( &ids::create_id_from_unique_string(&checksummed_address), indexer_ids::INDEXER_SPACE_ID, @@ -25,6 +25,6 @@ impl GeoAccount { } pub fn new_id(address: &str) -> String { - ids::create_id_from_unique_string(&checksum_address(address, None)) + ids::create_id_from_unique_string(&checksum_address(address)) } } diff --git a/sdk/src/models/proposal.rs b/sdk/src/models/proposal.rs index 04971f7..c8c9c00 100644 --- a/sdk/src/models/proposal.rs +++ b/sdk/src/models/proposal.rs @@ -90,7 +90,7 @@ impl Proposal { let query = neo4rs::query(QUERY) .param("proposal_id", proposal_id) - .param("plugin_address", checksum_address(plugin_address, None)); + .param("plugin_address", checksum_address(plugin_address)); #[derive(Debug, Deserialize)] struct ResultRow { diff --git a/sdk/src/models/space.rs b/sdk/src/models/space.rs index 021732c..813c567 100644 --- a/sdk/src/models/space.rs +++ b/sdk/src/models/space.rs @@ -33,7 +33,7 @@ pub struct Space { impl Space { pub fn new_id(network: &str, address: &str) -> String { - ids::create_id_from_unique_string(&format!("{network}:{}", checksum_address(address, None))) + ids::create_id_from_unique_string(&format!("{network}:{}", checksum_address(address))) } pub fn builder(id: &str, dao_contract_address: &str, block: &BlockMetadata) -> SpaceBuilder { @@ -57,7 +57,7 @@ impl Space { let query = neo4rs::query(QUERY).param( "dao_contract_address", - checksum_address(dao_contract_address, None), + checksum_address(dao_contract_address), ); #[derive(Debug, Deserialize)] @@ -89,7 +89,7 @@ impl Space { let query = neo4rs::query(QUERY).param( "space_plugin_address", - checksum_address(space_plugin_address, None), + checksum_address(space_plugin_address), ); #[derive(Debug, Deserialize)] @@ -121,7 +121,7 @@ impl Space { let query = neo4rs::query(QUERY).param( "voting_plugin_address", - checksum_address(voting_plugin_address, None), + checksum_address(voting_plugin_address), ); #[derive(Debug, Deserialize)] @@ -153,7 +153,7 @@ impl Space { let query = neo4rs::query(QUERY).param( "member_access_plugin", - checksum_address(member_access_plugin, None), + checksum_address(member_access_plugin), ); #[derive(Debug, Deserialize)] @@ -185,7 +185,7 @@ impl Space { let query = neo4rs::query(QUERY).param( "personal_space_admin_plugin", - checksum_address(personal_space_admin_plugin, None), + checksum_address(personal_space_admin_plugin), ); #[derive(Debug, Deserialize)] @@ -256,7 +256,7 @@ impl SpaceBuilder { block: block.clone(), network: network_ids::GEO.to_string(), r#type: SpaceType::Public, - dao_contract_address: checksum_address(dao_contract_address, None), + dao_contract_address: checksum_address(dao_contract_address), space_plugin_address: None, voting_plugin_address: None, member_access_plugin: None, @@ -275,28 +275,27 @@ impl SpaceBuilder { } pub fn dao_contract_address(mut self, dao_contract_address: &str) -> Self { - self.dao_contract_address = checksum_address(dao_contract_address, None); + self.dao_contract_address = checksum_address(dao_contract_address); self } pub fn space_plugin_address(mut self, space_plugin_address: &str) -> Self { - self.space_plugin_address = Some(checksum_address(space_plugin_address, None)); + self.space_plugin_address = Some(checksum_address(space_plugin_address)); self } pub fn voting_plugin_address(mut self, voting_plugin_address: &str) -> Self { - self.voting_plugin_address = Some(checksum_address(voting_plugin_address, None)); + self.voting_plugin_address = Some(checksum_address(voting_plugin_address)); self } pub fn member_access_plugin(mut self, member_access_plugin: &str) -> Self { - self.member_access_plugin = Some(checksum_address(member_access_plugin, None)); + self.member_access_plugin = Some(checksum_address(member_access_plugin)); self } pub fn personal_space_admin_plugin(mut self, personal_space_admin_plugin: &str) -> Self { - self.personal_space_admin_plugin = - Some(checksum_address(personal_space_admin_plugin, None)); + self.personal_space_admin_plugin = Some(checksum_address(personal_space_admin_plugin)); self } diff --git a/sink/Cargo.toml b/sink/Cargo.toml index bfc1975..e512b27 100644 --- a/sink/Cargo.toml +++ b/sink/Cargo.toml @@ -29,6 +29,7 @@ substreams-utils = { version = "0.1.0", path = "../substreams-utils" } sdk = { version = "0.1.0", path = "../sdk" } ipfs = { version = "0.1.0", path = "../ipfs" } web3-utils = { version = "0.1.0", path = "../web3-utils" } +tracing-appender = "0.2.3" [dev-dependencies] serde_path_to_error = "0.1.16" diff --git a/sink/src/bootstrap/constants.rs b/sink/src/bootstrap/constants.rs index ca4f4b4..985db59 100644 --- a/sink/src/bootstrap/constants.rs +++ b/sink/src/bootstrap/constants.rs @@ -2,7 +2,7 @@ pub const ROOT_SPACE_CREATED_AT: u32 = 1670280473; pub const ROOT_SPACE_CREATED_AT_BLOCK: u32 = 620; pub const ROOT_SPACE_CREATED_BY_ID: &str = "0x66703c058795B9Cb215fbcc7c6b07aee7D216F24"; -pub const ROOT_SPACE_ID: &str = "BJqiLPcSgfF8FRxkFr76Uy"; +pub const ROOT_SPACE_ID: &str = "EHoZ9qvSPmzxNmReVcCTSw"; pub const ROOT_SPACE_DAO_ADDRESS: &str = "0xB3191d353c4e409Add754112544296449B18c1Af"; pub const ROOT_SPACE_PLUGIN_ADDRESS: &str = "0x2a2d20e5262b27e6383da774E942dED3e4Bf5FaF"; pub const ROOT_SPACE_MAIN_VOTING_ADDRESS: &str = "0x9445A38102792654D92F1ba76Ee26a52Aa1E466e"; diff --git a/sink/src/events/edit_published.rs b/sink/src/events/edit_published.rs index 8db0d8e..7612676 100644 --- a/sink/src/events/edit_published.rs +++ b/sink/src/events/edit_published.rs @@ -1,6 +1,8 @@ use futures::{stream, StreamExt, TryStreamExt}; use ipfs::deserialize; use sdk::{ + error::DatabaseError, + mapping::{Entity, Relation}, models::{self, EditProposal, Space}, pb::{self, geo}, }; @@ -42,9 +44,7 @@ impl EventHandler { proposal.proposal_id ); - self.kg - .process_ops(block, &proposal.space, proposal.ops) - .await + self.process_ops(block, &proposal.space, proposal.ops).await }) .await .map_err(|e| HandlerError::Other(format!("{e:?}").into()))?; // TODO: Convert anyhow::Error to HandlerError properly @@ -57,13 +57,13 @@ impl EventHandler { edit: &geo::EditPublished, ) -> Result, HandlerError> { let space = if let Some(space) = - Space::find_by_space_plugin_address(&self.kg.neo4j, &edit.plugin_address) + Space::find_by_space_plugin_address(&self.neo4j, &edit.plugin_address) .await .map_err(|e| { HandlerError::Other( format!( "Error querying space with plugin address {} {e:?}", - checksum_address(&edit.plugin_address, None) + checksum_address(&edit.plugin_address) ) .into(), ) @@ -135,4 +135,103 @@ impl EventHandler { _ => Ok(vec![]), } } + + pub async fn process_ops( + &self, + block: &models::BlockMetadata, + space_id: &str, + ops: impl IntoIterator, + ) -> Result<(), DatabaseError> { + for op in ops { + match (op.r#type(), op) { + ( + pb::ipfs::OpType::SetTriple, + pb::ipfs::Op { + triple: + Some(pb::ipfs::Triple { + entity, + attribute, + value: Some(value), + }), + .. + }, + ) => { + tracing::info!("SetTriple: {}, {}, {:?}", entity, attribute, value,); + + Entity::<()>::set_triple( + &self.neo4j, + block, + space_id, + &entity, + &attribute, + &value, + ) + .await? + } + ( + pb::ipfs::OpType::DeleteTriple, + pb::ipfs::Op { + triple: Some(triple), + .. + }, + ) => { + tracing::info!( + "DeleteTriple: {}, {}, {:?}", + triple.entity, + triple.attribute, + triple.value, + ); + + Entity::<()>::delete_triple(&self.neo4j, block, space_id, triple).await? + } + // TODO: Handle these cases + // (pb::ipfs::OpType::SetTripleBatch, op) => { + // } + // (pb::ipfs::OpType::DeleteEntity, op) => { + // } + ( + pb::ipfs::OpType::CreateRelation, + pb::ipfs::Op { + relation: Some(relation), + .. + }, + ) => { + tracing::info!( + "CreateRelation: {}, {}, {}, {}", + relation.id, + relation.r#type, + relation.from_entity, + relation.to_entity, + ); + + Relation::<()>::new( + &relation.id, + space_id, + &relation.r#type, + &relation.from_entity, + &relation.to_entity, + block, + (), + ) + .upsert(&self.neo4j) + .await? + } + ( + pb::ipfs::OpType::DeleteRelation, + pb::ipfs::Op { + relation: Some(relation), + .. + }, + ) => { + tracing::info!("DeleteRelation: {}", relation.id); + Entity::<()>::delete(&self.neo4j, block, &relation.id, space_id).await? + } + (typ, maybe_triple) => { + tracing::warn!("Unhandled case: {:?} {:?}", typ, maybe_triple); + } + } + } + + Ok(()) + } } diff --git a/sink/src/events/editor_added.rs b/sink/src/events/editor_added.rs index befddd3..76a84cb 100644 --- a/sink/src/events/editor_added.rs +++ b/sink/src/events/editor_added.rs @@ -15,11 +15,11 @@ impl EventHandler { ) -> Result<(), HandlerError> { match try_join!( Space::find_by_voting_plugin_address( - &self.kg.neo4j, + &self.neo4j, &editor_added.main_voting_plugin_address, ), Space::find_by_personal_plugin_address( - &self.kg.neo4j, + &self.neo4j, &editor_added.main_voting_plugin_address ) )? { @@ -28,11 +28,11 @@ impl EventHandler { let editor = models::GeoAccount::new(editor_added.editor_address.clone(), block); // Add geo account - editor.upsert(&self.kg.neo4j).await?; + editor.upsert(&self.neo4j).await?; // Add space editor relation SpaceEditor::new(editor.id(), space.id(), block) - .upsert(&self.kg.neo4j) + .upsert(&self.neo4j) .await?; } // Space not found @@ -41,7 +41,7 @@ impl EventHandler { "Block #{} ({}): Could not add editor for unknown space with voting_plugin_address = {}", block.block_number, block.timestamp, - checksum_address(&editor_added.main_voting_plugin_address, None) + checksum_address(&editor_added.main_voting_plugin_address) ); } } diff --git a/sink/src/events/editor_removed.rs b/sink/src/events/editor_removed.rs index 088e119..4128f3c 100644 --- a/sink/src/events/editor_removed.rs +++ b/sink/src/events/editor_removed.rs @@ -11,11 +11,11 @@ impl EventHandler { editor_removed: &geo::EditorRemoved, block: &models::BlockMetadata, ) -> Result<(), HandlerError> { - let space = Space::find_by_dao_address(&self.kg.neo4j, &editor_removed.dao_address).await?; + let space = Space::find_by_dao_address(&self.neo4j, &editor_removed.dao_address).await?; if let Some(space) = space { SpaceEditor::remove( - &self.kg.neo4j, + &self.neo4j, &GeoAccount::new_id(&editor_removed.editor_address), space.id(), ) diff --git a/sink/src/events/handler.rs b/sink/src/events/handler.rs index 2ab2430..1442047 100644 --- a/sink/src/events/handler.rs +++ b/sink/src/events/handler.rs @@ -5,7 +5,7 @@ use prost::Message; use sdk::{error::DatabaseError, ids::create_geo_id, models::BlockMetadata, pb::geo::GeoOutput}; use substreams_utils::pb::sf::substreams::rpc::v2::BlockScopedData; -use crate::{kg, metrics}; +use crate::metrics; #[derive(thiserror::Error, Debug)] pub enum HandlerError { @@ -26,14 +26,14 @@ pub enum HandlerError { pub struct EventHandler { pub(crate) ipfs: IpfsClient, - pub(crate) kg: kg::Client, + pub(crate) neo4j: neo4rs::Graph, } impl EventHandler { - pub fn new(kg: kg::Client) -> Self { + pub fn new(neo4j: neo4rs::Graph) -> Self { Self { ipfs: IpfsClient::from_url("https://gateway.lighthouse.storage/ipfs/"), - kg, + neo4j, } } } @@ -82,7 +82,10 @@ impl substreams_utils::Sink for EventHandler { ); } let created_space_ids = stream::iter(&value.spaces_created) - .then(|event| async { self.handle_space_created(event, &block).await }) + .then(|event| async { + self.handle_space_created(event, &value.edits_published, &block) + .await + }) .try_collect::>() .await?; diff --git a/sink/src/events/initial_editors_added.rs b/sink/src/events/initial_editors_added.rs index 40f36bc..7a992f7 100644 --- a/sink/src/events/initial_editors_added.rs +++ b/sink/src/events/initial_editors_added.rs @@ -12,11 +12,9 @@ impl EventHandler { initial_editor_added: &geo::InitialEditorAdded, block: &models::BlockMetadata, ) -> Result<(), HandlerError> { - let space = Space::find_by_voting_plugin_address( - &self.kg.neo4j, - &initial_editor_added.plugin_address, - ) - .await?; + let space = + Space::find_by_voting_plugin_address(&self.neo4j, &initial_editor_added.plugin_address) + .await?; if let Some(space) = &space { stream::iter(&initial_editor_added.addresses) @@ -25,11 +23,11 @@ impl EventHandler { let editor = GeoAccount::new(editor.clone(), block); // Add geo account - editor.upsert(&self.kg.neo4j).await?; + editor.upsert(&self.neo4j).await?; // Add space editor relation SpaceEditor::new(editor.id(), space.id(), block) - .upsert(&self.kg.neo4j) + .upsert(&self.neo4j) .await?; Ok(()) diff --git a/sink/src/events/member_added.rs b/sink/src/events/member_added.rs index ae36b1d..a554606 100644 --- a/sink/src/events/member_added.rs +++ b/sink/src/events/member_added.rs @@ -14,11 +14,11 @@ impl EventHandler { ) -> Result<(), HandlerError> { match try_join!( Space::find_by_voting_plugin_address( - &self.kg.neo4j, + &self.neo4j, &member_added.main_voting_plugin_address ), Space::find_by_personal_plugin_address( - &self.kg.neo4j, + &self.neo4j, &member_added.main_voting_plugin_address ) )? { @@ -27,11 +27,11 @@ impl EventHandler { let member = GeoAccount::new(member_added.member_address.clone(), block); // Add geo account - member.upsert(&self.kg.neo4j).await?; + member.upsert(&self.neo4j).await?; // Add space member relation SpaceMember::new(member.id(), space.id(), block) - .upsert(&self.kg.neo4j) + .upsert(&self.neo4j) .await?; } // Space not found diff --git a/sink/src/events/member_removed.rs b/sink/src/events/member_removed.rs index e048cfa..5debf3d 100644 --- a/sink/src/events/member_removed.rs +++ b/sink/src/events/member_removed.rs @@ -12,11 +12,11 @@ impl EventHandler { block: &models::BlockMetadata, ) -> Result<(), HandlerError> { let space = - models::Space::find_by_dao_address(&self.kg.neo4j, &member_removed.dao_address).await?; + models::Space::find_by_dao_address(&self.neo4j, &member_removed.dao_address).await?; if let Some(space) = space { SpaceMember::remove( - &self.kg.neo4j, + &self.neo4j, &models::GeoAccount::new_id(&member_removed.member_address), space.id(), ) diff --git a/sink/src/events/proposal_created.rs b/sink/src/events/proposal_created.rs index 4a8155d..5d5e97e 100644 --- a/sink/src/events/proposal_created.rs +++ b/sink/src/events/proposal_created.rs @@ -19,7 +19,7 @@ impl EventHandler { }, block, ) - .upsert(&self.kg.neo4j) + .upsert(&self.neo4j) .await?; // Create Space > PROPOSALS > Proposal relation @@ -28,7 +28,7 @@ impl EventHandler { &models::Proposal::new_id(&add_member_proposal.proposal_id), block, ) - .upsert(&self.kg.neo4j) + .upsert(&self.neo4j) .await?; // Create Proposal > CREATOR > Account relation @@ -37,7 +37,7 @@ impl EventHandler { &add_member_proposal.creator, block, ) - .upsert(&self.kg.neo4j) + .upsert(&self.neo4j) .await?; Ok(()) @@ -59,7 +59,7 @@ impl EventHandler { }, block, ) - .upsert(&self.kg.neo4j) + .upsert(&self.neo4j) .await?; // Create Space > PROPOSALS > Proposal relation @@ -68,7 +68,7 @@ impl EventHandler { &models::Proposal::new_id(&remove_member_proposal.proposal_id), block, ) - .upsert(&self.kg.neo4j) + .upsert(&self.neo4j) .await?; // Create Proposal > CREATOR > Account relation @@ -77,7 +77,7 @@ impl EventHandler { &remove_member_proposal.creator, block, ) - .upsert(&self.kg.neo4j) + .upsert(&self.neo4j) .await?; Ok(()) @@ -99,7 +99,7 @@ impl EventHandler { }, block, ) - .upsert(&self.kg.neo4j) + .upsert(&self.neo4j) .await?; // Create Space > PROPOSALS > Proposal relation @@ -108,7 +108,7 @@ impl EventHandler { &models::Proposal::new_id(&add_editor_proposal.proposal_id), block, ) - .upsert(&self.kg.neo4j) + .upsert(&self.neo4j) .await?; // Create Proposal > CREATOR > Account relation @@ -117,7 +117,7 @@ impl EventHandler { &add_editor_proposal.creator, block, ) - .upsert(&self.kg.neo4j) + .upsert(&self.neo4j) .await?; Ok(()) @@ -139,7 +139,7 @@ impl EventHandler { }, block, ) - .upsert(&self.kg.neo4j) + .upsert(&self.neo4j) .await?; // Create Space > PROPOSALS > Proposal relation @@ -148,7 +148,7 @@ impl EventHandler { &models::Proposal::new_id(&remove_editor_proposal.proposal_id), block, ) - .upsert(&self.kg.neo4j) + .upsert(&self.neo4j) .await?; // Create Proposal > CREATOR > Account relation @@ -157,7 +157,7 @@ impl EventHandler { &remove_editor_proposal.creator, block, ) - .upsert(&self.kg.neo4j) + .upsert(&self.neo4j) .await?; Ok(()) @@ -179,7 +179,7 @@ impl EventHandler { }, block, ) - .upsert(&self.kg.neo4j) + .upsert(&self.neo4j) .await?; // Create Space > PROPOSALS > Proposal relation @@ -188,7 +188,7 @@ impl EventHandler { &models::Proposal::new_id(&add_subspace_proposal.proposal_id), block, ) - .upsert(&self.kg.neo4j) + .upsert(&self.neo4j) .await?; // Create Proposal > CREATOR > Account relation @@ -197,7 +197,7 @@ impl EventHandler { &add_subspace_proposal.creator, block, ) - .upsert(&self.kg.neo4j) + .upsert(&self.neo4j) .await?; Ok(()) @@ -219,7 +219,7 @@ impl EventHandler { }, block, ) - .upsert(&self.kg.neo4j) + .upsert(&self.neo4j) .await?; // Create Space > PROPOSALS > Proposal relation @@ -228,7 +228,7 @@ impl EventHandler { &models::Proposal::new_id(&remove_subspace_proposal.proposal_id), block, ) - .upsert(&self.kg.neo4j) + .upsert(&self.neo4j) .await?; // Create Proposal > CREATOR > Account relation @@ -237,7 +237,7 @@ impl EventHandler { &remove_subspace_proposal.creator, block, ) - .upsert(&self.kg.neo4j) + .upsert(&self.neo4j) .await?; Ok(()) diff --git a/sink/src/events/proposal_executed.rs b/sink/src/events/proposal_executed.rs index 63b0e10..d68f1c9 100644 --- a/sink/src/events/proposal_executed.rs +++ b/sink/src/events/proposal_executed.rs @@ -9,7 +9,7 @@ impl EventHandler { block: &models::BlockMetadata, ) -> Result<(), HandlerError> { Ok(models::Proposal::set_status( - &self.kg.neo4j, + &self.neo4j, block, &proposal_executed.proposal_id, models::proposal::ProposalStatus::Executed, diff --git a/sink/src/events/space_created.rs b/sink/src/events/space_created.rs index b610315..97a16ab 100644 --- a/sink/src/events/space_created.rs +++ b/sink/src/events/space_created.rs @@ -1,7 +1,7 @@ use sdk::{ models::{self, GeoAccount, Space, SpaceType}, network_ids, - pb::geo, + pb::{self, geo}, }; use web3_utils::checksum_address; @@ -12,46 +12,53 @@ impl EventHandler { pub async fn handle_space_created( &self, space_created: &geo::GeoSpaceCreated, - // edits_published: &[geo::EditPublished], + edits_published: &[geo::EditPublished], block: &models::BlockMetadata, ) -> Result { - // Match the space creation events with their corresponding initial proposal (if any) - // let initial_proposals = spaces_created - // .iter() - // .filter_map(|event| { - // edits_published - // .iter() - // .find(|proposal| { - // checksum_address(&proposal.plugin_address, None) - // == checksum_address(&event.space_address, None) - // }) - // .map(|proposal| (event.space_address.clone(), proposal)) - // }) - // .collect::>(); - - // tracing::info!() - - // For spaces with an initial proposal, get the space ID from the import (if available) - // let space_ids = stream::iter(initial_proposals) - // .filter_map(|(space_address, proposal_processed)| async move { - // let ipfs_hash = proposal_processed.content_uri.replace("ipfs://", ""); - // self.ipfs - // .get::(&ipfs_hash, true) - // .await - // .ok() - // .map(|import| { - // ( - // space_address, - // Space::new_id( - // &import.previous_network, - // &import.previous_contract_address, - // ), - // ) - // }) - // }) - // .collect::>() - // .await; - let space_id = Space::new_id(network_ids::GEO, &space_created.dao_address); + let maybe_initial_proposal = edits_published.iter().find(|proposal| { + checksum_address(&proposal.plugin_address) + == checksum_address(&space_created.space_address) + }); + + let maybe_existing_space_id = match maybe_initial_proposal { + Some(initial_proposal) => { + let bytes = self + .ipfs + .get_bytes(&initial_proposal.content_uri.replace("ipfs://", ""), true) + .await?; + + if let Ok(metadata) = ipfs::deserialize::(&bytes) { + match metadata.r#type() { + pb::ipfs::ActionType::ImportSpace => { + let import = ipfs::deserialize::(&bytes)?; + + tracing::info!( + "Block #{} ({}): Found import for space {} (derived id: {})", + block.block_number, + block.timestamp, + checksum_address(&space_created.space_address), + Space::new_id( + &import.previous_network, + &import.previous_contract_address, + ) + ); + + Some(Space::new_id( + &import.previous_network, + &import.previous_contract_address, + )) + } + _ => None, + } + } else { + None + } + } + None => None, + }; + + let space_id = maybe_existing_space_id + .unwrap_or_else(|| Space::new_id(network_ids::GEO, &space_created.dao_address)); tracing::info!( "Block #{} ({}): Creating space {}", @@ -64,7 +71,7 @@ impl EventHandler { .network(network_ids::GEO.to_string()) .space_plugin_address(&space_created.space_address) .build() - .upsert(&self.kg.neo4j) + .upsert(&self.neo4j) .await?; // Create the spaces @@ -89,7 +96,7 @@ impl EventHandler { personal_space_created: &geo::GeoPersonalSpaceAdminPluginCreated, block: &models::BlockMetadata, ) -> Result<(), HandlerError> { - let space = Space::find_by_dao_address(&self.kg.neo4j, &personal_space_created.dao_address) + let space = Space::find_by_dao_address(&self.neo4j, &personal_space_created.dao_address) .await .map_err(|e| HandlerError::Other(format!("{e:?}").into()))?; // TODO: Convert anyhow::Error to HandlerError properly @@ -98,13 +105,13 @@ impl EventHandler { .r#type(SpaceType::Personal) .personal_space_admin_plugin(&personal_space_created.personal_admin_address) .build() - .upsert(&self.kg.neo4j) + .upsert(&self.neo4j) .await?; // Add initial editors to the personal space let editor = GeoAccount::new(personal_space_created.initial_editor.clone(), block); - editor.upsert(&self.kg.neo4j).await?; + editor.upsert(&self.neo4j).await?; tracing::info!( "Block #{} ({}): Creating personal admin space plugin for space {} with initial editor {}", @@ -118,7 +125,7 @@ impl EventHandler { "Block #{} ({}): Could not create personal admin space plugin for unknown space with dao_address = {}", block.block_number, block.timestamp, - checksum_address(&personal_space_created.dao_address, None) + checksum_address(&personal_space_created.dao_address) ); } @@ -131,8 +138,7 @@ impl EventHandler { block: &models::BlockMetadata, ) -> Result<(), HandlerError> { let space = - Space::find_by_dao_address(&self.kg.neo4j, &governance_plugin_created.dao_address) - .await?; + Space::find_by_dao_address(&self.neo4j, &governance_plugin_created.dao_address).await?; if let Some(space) = space { tracing::info!( @@ -146,14 +152,14 @@ impl EventHandler { .voting_plugin_address(&governance_plugin_created.main_voting_address) .member_access_plugin(&governance_plugin_created.member_access_address) .build() - .upsert(&self.kg.neo4j) + .upsert(&self.neo4j) .await?; } else { tracing::warn!( "Block #{} ({}): Could not create governance plugin for unknown space with dao_address = {}", block.block_number, block.timestamp, - checksum_address(&governance_plugin_created.dao_address, None) + checksum_address(&governance_plugin_created.dao_address) ); } diff --git a/sink/src/events/subspace_added.rs b/sink/src/events/subspace_added.rs index ae76871..e3d9615 100644 --- a/sink/src/events/subspace_added.rs +++ b/sink/src/events/subspace_added.rs @@ -15,14 +15,14 @@ impl EventHandler { ) -> Result<(), HandlerError> { match join!( models::Space::find_by_space_plugin_address( - &self.kg.neo4j, + &self.neo4j, &subspace_added.plugin_address ), - models::Space::find_by_dao_address(&self.kg.neo4j, &subspace_added.subspace) + models::Space::find_by_dao_address(&self.neo4j, &subspace_added.subspace) ) { (Ok(Some(parent_space)), Ok(Some(subspace))) => { ParentSpace::new(subspace.id(), parent_space.id(), block) - .upsert(&self.kg.neo4j) + .upsert(&self.neo4j) .await?; } (Ok(None), Ok(_)) => { @@ -30,7 +30,7 @@ impl EventHandler { "Block #{} ({}): Could not create subspace: parent space with plugin_address = {} not found", block.block_number, block.timestamp, - checksum_address(&subspace_added.plugin_address, None) + checksum_address(&subspace_added.plugin_address) ); } (Ok(Some(_)), Ok(None)) => { @@ -38,7 +38,7 @@ impl EventHandler { "Block #{} ({}): Could not create subspace: space with dao_address = {} not found", block.block_number, block.timestamp, - checksum_address(&subspace_added.plugin_address, None) + checksum_address(&subspace_added.plugin_address) ); } (Err(e), _) | (_, Err(e)) => { diff --git a/sink/src/events/subspace_removed.rs b/sink/src/events/subspace_removed.rs index f84ccc4..f0b359d 100644 --- a/sink/src/events/subspace_removed.rs +++ b/sink/src/events/subspace_removed.rs @@ -9,14 +9,14 @@ impl EventHandler { block: &models::BlockMetadata, ) -> Result<(), HandlerError> { let space = models::Space::find_by_space_plugin_address( - &self.kg.neo4j, + &self.neo4j, &subspace_removed.plugin_address, ) .await .map_err(|e| HandlerError::Other(format!("{e:?}").into()))?; // TODO: Convert anyhow::Error to HandlerError properly if let Some(space) = space { - self.kg.neo4j + self.neo4j .run(neo4rs::query(&format!( "MATCH (subspace:`{INDEXED_SPACE}` {{parent_space: $space_id}}) DELETE subspace", INDEXED_SPACE = system_ids::SPACE_TYPE, diff --git a/sink/src/events/vote_cast.rs b/sink/src/events/vote_cast.rs index 242b47f..93bf91e 100644 --- a/sink/src/events/vote_cast.rs +++ b/sink/src/events/vote_cast.rs @@ -16,20 +16,20 @@ impl EventHandler { block: &models::BlockMetadata, ) -> Result<(), HandlerError> { match join!( - Space::find_by_voting_plugin_address(&self.kg.neo4j, &vote.plugin_address), - Space::find_by_member_access_plugin(&self.kg.neo4j, &vote.plugin_address) + Space::find_by_voting_plugin_address(&self.neo4j, &vote.plugin_address), + Space::find_by_member_access_plugin(&self.neo4j, &vote.plugin_address) ) { // Space found (Ok(Some(_space)), Ok(_)) | (Ok(None), Ok(Some(_space))) => { let maybe_proposal = models::Proposal::find_by_id_and_address( - &self.kg.neo4j, + &self.neo4j, &vote.onchain_proposal_id, &vote.plugin_address, ) .await?; let account = Entity::::find_by_id( - &self.kg.neo4j, + &self.neo4j, &models::GeoAccount::new_id(&vote.voter), indexer_ids::INDEXER_SPACE_ID, ) @@ -45,7 +45,7 @@ impl EventHandler { .map_err(|e| HandlerError::Other(format!("{e:?}").into()))?, block, ) - .upsert(&self.kg.neo4j) + .upsert(&self.neo4j) .await?; } // Proposal or account not found @@ -71,7 +71,7 @@ impl EventHandler { "Block #{} ({}): Matching space in Proposal not found for plugin address = {}", block.block_number, block.timestamp, - checksum_address(&vote.plugin_address, None), + checksum_address(&vote.plugin_address), ); } // Errors diff --git a/sink/src/kg/client.rs b/sink/src/kg/client.rs deleted file mode 100644 index a650a93..0000000 --- a/sink/src/kg/client.rs +++ /dev/null @@ -1,154 +0,0 @@ -use crate::bootstrap::constants; - -use sdk::{ - error::DatabaseError, - mapping::{Entity, Relation}, - models::{self, BlockMetadata}, - pb, -}; - -#[derive(Clone)] -pub struct Client { - pub neo4j: neo4rs::Graph, -} - -impl Client { - pub async fn new(uri: &str, user: &str, pass: &str) -> anyhow::Result { - let neo4j = neo4rs::Graph::new(uri, user, pass).await?; - Ok(Self { neo4j }) - } - - /// Bootstrap the database with the initial data - pub async fn bootstrap(&self, _rollup: bool) -> Result<(), DatabaseError> { - models::Space::builder( - constants::ROOT_SPACE_ID, - constants::ROOT_SPACE_DAO_ADDRESS, - &BlockMetadata::default(), - ) - .space_plugin_address(constants::ROOT_SPACE_PLUGIN_ADDRESS) - .voting_plugin_address(constants::ROOT_SPACE_MAIN_VOTING_ADDRESS) - .member_access_plugin(constants::ROOT_SPACE_MEMBER_ACCESS_ADDRESS) - .build() - .upsert(&self.neo4j) - .await - - // self.process_ops( - // &BlockMetadata::default(), - // constants::ROOT_SPACE_ID, - // bootstrap::bootstrap(), - // ) - // .await - } - - /// Reset the database by deleting all nodes and relations and re-bootstrapping it - pub async fn reset_db(&self, rollup: bool) -> anyhow::Result<()> { - // Delete all nodes and relations - let mut txn = self.neo4j.start_txn().await?; - txn.run(neo4rs::query("MATCH (n) DETACH DELETE n")).await?; - txn.commit().await?; - - // Re-bootstrap the database - self.bootstrap(rollup).await?; - - Ok(()) - } - - pub async fn process_ops( - &self, - block: &models::BlockMetadata, - space_id: &str, - ops: impl IntoIterator, - ) -> Result<(), DatabaseError> { - for op in ops { - match (op.r#type(), op) { - ( - pb::ipfs::OpType::SetTriple, - pb::ipfs::Op { - triple: - Some(pb::ipfs::Triple { - entity, - attribute, - value: Some(value), - }), - .. - }, - ) => { - tracing::info!("SetTriple: {}, {}, {:?}", entity, attribute, value,); - - Entity::<()>::set_triple( - &self.neo4j, - block, - space_id, - &entity, - &attribute, - &value, - ) - .await? - } - ( - pb::ipfs::OpType::DeleteTriple, - pb::ipfs::Op { - triple: Some(triple), - .. - }, - ) => { - tracing::info!( - "DeleteTriple: {}, {}, {:?}", - triple.entity, - triple.attribute, - triple.value, - ); - - Entity::<()>::delete_triple(&self.neo4j, block, space_id, triple).await? - } - // TODO: Handle these cases - // (pb::ipfs::OpType::SetTripleBatch, op) => { - // } - // (pb::ipfs::OpType::DeleteEntity, op) => { - // } - ( - pb::ipfs::OpType::CreateRelation, - pb::ipfs::Op { - relation: Some(relation), - .. - }, - ) => { - tracing::info!( - "CreateRelation: {}, {}, {}, {}", - relation.id, - relation.r#type, - relation.from_entity, - relation.to_entity, - ); - - Relation::<()>::new( - &relation.id, - space_id, - &relation.r#type, - &relation.from_entity, - &relation.to_entity, - block, - (), - ) - .upsert(&self.neo4j) - .await? - } - ( - pb::ipfs::OpType::DeleteRelation, - pb::ipfs::Op { - relation: Some(relation), - .. - }, - ) => { - tracing::info!("DeleteRelation: {}", relation.id); - Entity::<()>::delete(&self.neo4j, block, &relation.id, space_id).await? - } - (typ, maybe_triple) => { - tracing::warn!("Unhandled case: {:?} {:?}", typ, maybe_triple); - } - } - } - - Ok(()) - } -} diff --git a/sink/src/kg/mod.rs b/sink/src/kg/mod.rs deleted file mode 100644 index be467d5..0000000 --- a/sink/src/kg/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub mod client; - -pub use client::Client; diff --git a/sink/src/lib.rs b/sink/src/lib.rs index cbaa10a..0cb98d2 100644 --- a/sink/src/lib.rs +++ b/sink/src/lib.rs @@ -1,4 +1,3 @@ pub mod bootstrap; pub mod events; -pub mod kg; pub mod metrics; diff --git a/sink/src/main.rs b/sink/src/main.rs index afbee4a..3582f71 100644 --- a/sink/src/main.rs +++ b/sink/src/main.rs @@ -3,8 +3,9 @@ use std::env; use anyhow::Error; use axum::{response::Json, routing::get, Router}; use clap::{Args, Parser}; -use sink::{events::EventHandler, kg, metrics}; +use sink::{events::EventHandler, metrics}; use substreams_utils::Sink; +use tracing::Level; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; @@ -17,8 +18,6 @@ const DEFAULT_HTTP_PORT: u16 = 8081; #[tokio::main] async fn main() -> Result<(), Error> { - set_log_level(); - init_tracing(); let endpoint_url = env::var("SUBSTREAMS_ENDPOINT_URL").expect("SUBSTREAMS_ENDPOINT_URL not set"); let start_block = env::var("SUBSTREAMS_START_BLOCK").unwrap_or_else(|_| { @@ -38,7 +37,10 @@ async fn main() -> Result<(), Error> { let args = AppArgs::parse(); - let kg_client = kg::Client::new( + set_log_level(); + init_tracing(args.log_file); + + let neo4j = neo4rs::Graph::new( &args.neo4j_args.neo4j_uri, &args.neo4j_args.neo4j_user, &args.neo4j_args.neo4j_pass, @@ -46,10 +48,10 @@ async fn main() -> Result<(), Error> { .await?; if args.reset_db { - kg_client.reset_db(args.rollup).await?; + reset_db(&neo4j).await?; }; - let sink = EventHandler::new(kg_client); + let sink = EventHandler::new(neo4j); start_http_server().await; @@ -82,6 +84,10 @@ struct AppArgs { /// Whether or not to reset the database #[arg(long)] reset_db: bool, + + /// Log file path + #[arg(long)] + log_file: Option, } #[derive(Debug, Args)] @@ -99,14 +105,38 @@ struct Neo4jArgs { neo4j_pass: String, } -fn init_tracing() { - tracing_subscriber::registry() - .with( - tracing_subscriber::EnvFilter::try_from_default_env() - .unwrap_or_else(|_| "stdout=info".into()), - ) - .with(tracing_subscriber::fmt::layer()) - .init(); +pub async fn reset_db(neo4j: &neo4rs::Graph) -> anyhow::Result<()> { + // Delete all nodes and relations + let mut txn = neo4j.start_txn().await?; + txn.run(neo4rs::query("MATCH (n) DETACH DELETE n")).await?; + txn.commit().await?; + + Ok(()) +} + +fn init_tracing(log_file: Option) { + if let Some(log_file) = log_file { + // Set the path of the log file + let now = chrono::Utc::now(); + let file_appender = tracing_appender::rolling::never( + ".", + format!("{}-{log_file}", now.format("%Y-%m-%d-%H-%M-%S")), + ); + let (non_blocking, _guard) = tracing_appender::non_blocking(file_appender); + + tracing_subscriber::fmt::fmt() + .with_max_level(Level::INFO) + .with_writer(non_blocking) + .init(); + } else { + tracing_subscriber::registry() + .with( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| "stdout=info".into()), + ) + .with(tracing_subscriber::fmt::layer()) + .init(); + } } fn set_log_level() { diff --git a/web3-utils/src/lib.rs b/web3-utils/src/lib.rs index c9bf515..29b443e 100644 --- a/web3-utils/src/lib.rs +++ b/web3-utils/src/lib.rs @@ -1,19 +1,13 @@ use sha3::{Digest, Keccak256}; -pub fn checksum_address(address: &str, chain_id: Option) -> String { - let hex_address = match chain_id { - Some(id) => format!("{}{}", id, address.to_lowercase()), - None => address[2..].to_lowercase(), - }; +pub fn checksum_address(address: &str) -> String { + let input_address = address.to_lowercase().replace("0x", ""); let mut hasher = Keccak256::new(); - hasher.update(hex_address.as_bytes()); + hasher.update(input_address.as_bytes()); let hash = hasher.finalize(); - let mut address_chars: Vec = match chain_id { - Some(id) => hex_address[id.to_string().len() + 2..].chars().collect(), - None => hex_address.chars().collect(), - }; + let mut address_chars: Vec = input_address.chars().collect(); for i in (0..40).step_by(2) { if (hash[i / 2] >> 4) >= 8 && address_chars[i].is_ascii() { @@ -34,20 +28,16 @@ mod tests { #[test] fn test_checksum_address() { assert_eq!( - checksum_address("0x5a0b54d5dc17e0aadc383d2db43b0a0d3e029c4c", None), + checksum_address("0x5a0b54d5dc17e0aadc383d2db43b0a0d3e029c4c"), "0x5A0b54D5dc17e0AadC383d2db43B0a0D3E029c4c" ); assert_eq!( - checksum_address("0x5A0b54D5dc17e0AadC383d2db43B0a0D3E029c4c", None), + checksum_address("0x5A0b54D5dc17e0AadC383d2db43B0a0D3E029c4c"), "0x5A0b54D5dc17e0AadC383d2db43B0a0D3E029c4c" ); assert_eq!( - checksum_address("0x5a0b54d5dc17e0aadc383d2db43b0a0d3e029c4c", Some(1)), - "0x5A0B54d5dC17e0AAdC383d2db43b0a0d3E029C4c" - ); - assert_eq!( - checksum_address("0x5a0b54d5dc17e0aadc383d2db43b0a0d3e029c4c", Some(4)), - "0x5A0B54D5dC17e0AaDC383D2DB43b0A0d3e029C4c" + checksum_address("0xfb6916095ca1df60bb79ce92ce3ea74c37c5d359"), + "0xfB6916095ca1df60bB79Ce92cE3Ea74c37c5d359" ); } }