From d4952114c21bdbb9f537d010c3f78aa568468cf3 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 13 Feb 2025 13:17:50 -0800 Subject: [PATCH 1/3] store: Defer remapping until the source shard has finished migrations Without this, starting `graph-node` on an empty database can cause an error that the fdw namespace does not exist --- store/postgres/src/connection_pool.rs | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/store/postgres/src/connection_pool.rs b/store/postgres/src/connection_pool.rs index c2795fca5db..ab44d956026 100644 --- a/store/postgres/src/connection_pool.rs +++ b/store/postgres/src/connection_pool.rs @@ -1015,12 +1015,12 @@ impl PoolInner { // in the database instead of just in memory let result = pool .configure_fdw(coord.servers.as_ref()) - .and_then(|()| migrate_schema(&pool.logger, &mut conn)) - .and_then(|count| coord.propagate(&pool, count)); + .and_then(|()| migrate_schema(&pool.logger, &mut conn)); debug!(&pool.logger, "Release migration lock"); advisory_lock::unlock_migration(&mut conn).unwrap_or_else(|err| { die(&pool.logger, "failed to release migration lock", &err); }); + let result = result.and_then(|count| coord.propagate(&pool, count)); result.unwrap_or_else(|err| die(&pool.logger, "migrations failed", &err)); // Locale check @@ -1233,6 +1233,10 @@ impl PoolCoordinator { /// other pools will then recreate any tables that they imported from /// `shard`. If `pool` is a new shard, we also map all other shards into /// it. + /// + /// This tries to take the migration lock and must therefore be run from + /// code that does _not_ hold the migration lock as it will otherwise + /// deadlock fn propagate(&self, pool: &PoolInner, count: MigrationCount) -> Result<(), StoreError> { // pool is a new shard, map all other shards into it if count.is_new() { @@ -1244,7 +1248,14 @@ impl PoolCoordinator { if count.had_migrations() { let server = self.server(&pool.shard)?; for pool in self.pools.lock().unwrap().values() { - if let Err(e) = pool.remap(server) { + let mut conn = pool.get()?; + let remap_res = { + advisory_lock::lock_migration(&mut conn)?; + let res = pool.remap(server); + advisory_lock::unlock_migration(&mut conn)?; + res + }; + if let Err(e) = remap_res { error!(pool.logger, "Failed to map imports from {}", server.shard; "error" => e.to_string()); return Err(e); } From 8b828f78d14df33f796f8b7d709437fed5e59b7a Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 13 Feb 2025 14:06:09 -0800 Subject: [PATCH 2/3] store: Map more tables across shards This now includes ethereum_networks, copy_state, copy_table_state, and subgraph_features. We no longer map subgraph, subgraph_version, and subgraph_deployment_assignment into the shard_* namespace since these tables are only maintained in the primary, and are mapped in the primary_public namespace. --- store/postgres/src/connection_pool.rs | 35 +++++++++++++++++---------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/store/postgres/src/connection_pool.rs b/store/postgres/src/connection_pool.rs index ab44d956026..00158cb29d7 100644 --- a/store/postgres/src/connection_pool.rs +++ b/store/postgres/src/connection_pool.rs @@ -206,22 +206,31 @@ impl ForeignServer { /// Map the `subgraphs` schema from the foreign server `self` into the /// database accessible through `conn` fn map_metadata(&self, conn: &mut PgConnection) -> Result<(), StoreError> { + const MAP_TABLES: [(&str, &[&str]); 2] = [ + ("public", &["ethereum_networks"]), + ( + "subgraphs", + &[ + "copy_state", + "copy_table_state", + "dynamic_ethereum_contract_data_source", + "subgraph_deployment", + "subgraph_error", + "subgraph_features", + "subgraph_manifest", + "table_stats", + ], + ), + ]; let nsp = Self::metadata_schema(&self.shard); catalog::recreate_schema(conn, &nsp)?; let mut query = String::new(); - for table_name in [ - "subgraph_error", - "dynamic_ethereum_contract_data_source", - "table_stats", - "subgraph_deployment_assignment", - "subgraph", - "subgraph_version", - "subgraph_deployment", - "subgraph_manifest", - ] { - let create_stmt = - catalog::create_foreign_table(conn, "subgraphs", table_name, &nsp, &self.name)?; - write!(query, "{}", create_stmt)?; + for (src_nsp, src_tables) in MAP_TABLES { + for src_table in src_tables { + let create_stmt = + catalog::create_foreign_table(conn, src_nsp, src_table, &nsp, &self.name)?; + write!(query, "{}", create_stmt)?; + } } Ok(conn.batch_execute(&query)?) } From 0e7b3bc3584fda39e3977395cf7bbe3cb8f235e2 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 13 Feb 2025 16:56:00 -0800 Subject: [PATCH 3/3] store: Create views in the primary that union the tables from shards Fixes https://github.com/graphprotocol/graph-node/issues/4824 Fixes https://github.com/graphprotocol/graph-node/issues/4825 --- store/postgres/src/catalog.rs | 52 ++++++++++++ store/postgres/src/connection_pool.rs | 113 ++++++++++++++++++++++---- 2 files changed, 147 insertions(+), 18 deletions(-) diff --git a/store/postgres/src/catalog.rs b/store/postgres/src/catalog.rs index 8e988e31522..1524a768acc 100644 --- a/store/postgres/src/catalog.rs +++ b/store/postgres/src/catalog.rs @@ -626,6 +626,58 @@ pub fn create_foreign_table( Ok(query) } +/// Create a SQL statement unioning imported tables from all shards, +/// something like +/// +/// ```sql +/// create view "dst_nsp"."src_table" as +/// select 'shard1' as shard, "col1", "col2" from "shard_shard1_subgraphs"."table_name" +/// union all +/// ... +/// ```` +/// +/// The list `shard_nsps` consists of pairs `(name, namespace)` where `name` +/// is the name of the shard and `namespace` is the namespace where the +/// `src_table` is mapped +pub fn create_cross_shard_view( + conn: &mut PgConnection, + src_nsp: &str, + src_table: &str, + dst_nsp: &str, + shard_nsps: &[(&str, String)], +) -> Result { + fn build_query( + columns: &[table_schema::Column], + table_name: &str, + dst_nsp: &str, + shard_nsps: &[(&str, String)], + ) -> Result { + let mut query = String::new(); + write!(query, "create view \"{}\".\"{}\" as ", dst_nsp, table_name)?; + for (idx, (name, nsp)) in shard_nsps.into_iter().enumerate() { + if idx > 0 { + write!(query, " union all ")?; + } + write!(query, "select '{name}' as shard")?; + for column in columns { + write!(query, ", \"{}\"", column.column_name)?; + } + writeln!(query, " from \"{}\".\"{}\"", nsp, table_name)?; + } + Ok(query) + } + + let columns = table_schema::columns(conn, src_nsp, src_table)?; + let query = build_query(&columns, src_table, dst_nsp, shard_nsps).map_err(|_| { + anyhow!( + "failed to generate 'create foreign table' query for {}.{}", + dst_nsp, + src_table + ) + })?; + Ok(query) +} + /// Checks in the database if a given index is valid. pub(crate) fn check_index_is_valid( conn: &mut PgConnection, diff --git a/store/postgres/src/connection_pool.rs b/store/postgres/src/connection_pool.rs index 00158cb29d7..ae8ab3e71b6 100644 --- a/store/postgres/src/connection_pool.rs +++ b/store/postgres/src/connection_pool.rs @@ -37,6 +37,23 @@ use crate::primary::{self, NAMESPACE_PUBLIC}; use crate::{advisory_lock, catalog}; use crate::{Shard, PRIMARY_SHARD}; +const SHARDED_TABLES: [(&str, &[&str]); 2] = [ + ("public", &["ethereum_networks"]), + ( + "subgraphs", + &[ + "copy_state", + "copy_table_state", + "dynamic_ethereum_contract_data_source", + "subgraph_deployment", + "subgraph_error", + "subgraph_features", + "subgraph_manifest", + "table_stats", + ], + ), +]; + pub struct ForeignServer { pub name: String, pub shard: Shard, @@ -49,6 +66,7 @@ pub struct ForeignServer { impl ForeignServer { pub(crate) const PRIMARY_PUBLIC: &'static str = "primary_public"; + pub(crate) const CROSS_SHARD_NSP: &'static str = "sharded"; /// The name of the foreign server under which data for `shard` is /// accessible @@ -206,26 +224,10 @@ impl ForeignServer { /// Map the `subgraphs` schema from the foreign server `self` into the /// database accessible through `conn` fn map_metadata(&self, conn: &mut PgConnection) -> Result<(), StoreError> { - const MAP_TABLES: [(&str, &[&str]); 2] = [ - ("public", &["ethereum_networks"]), - ( - "subgraphs", - &[ - "copy_state", - "copy_table_state", - "dynamic_ethereum_contract_data_source", - "subgraph_deployment", - "subgraph_error", - "subgraph_features", - "subgraph_manifest", - "table_stats", - ], - ), - ]; let nsp = Self::metadata_schema(&self.shard); catalog::recreate_schema(conn, &nsp)?; let mut query = String::new(); - for (src_nsp, src_tables) in MAP_TABLES { + for (src_nsp, src_tables) in SHARDED_TABLES { for src_table in src_tables { let create_stmt = catalog::create_foreign_table(conn, src_nsp, src_table, &nsp, &self.name)?; @@ -1024,7 +1026,12 @@ impl PoolInner { // in the database instead of just in memory let result = pool .configure_fdw(coord.servers.as_ref()) - .and_then(|()| migrate_schema(&pool.logger, &mut conn)); + .and_then(|()| pool.drop_cross_shard_views()) + .and_then(|()| migrate_schema(&pool.logger, &mut conn)) + .and_then(|count| { + pool.create_cross_shard_views(coord.servers.as_ref()) + .map(|()| count) + }); debug!(&pool.logger, "Release migration lock"); advisory_lock::unlock_migration(&mut conn).unwrap_or_else(|err| { die(&pool.logger, "failed to release migration lock", &err); @@ -1077,6 +1084,76 @@ impl PoolInner { }) } + /// If this is the primary shard, drop the namespace `CROSS_SHARD_NSP` + fn drop_cross_shard_views(&self) -> Result<(), StoreError> { + if self.shard != *PRIMARY_SHARD { + return Ok(()); + } + + info!(&self.logger, "Dropping cross-shard views"); + let mut conn = self.get()?; + conn.transaction(|conn| { + let query = format!( + "drop schema if exists {} cascade", + ForeignServer::CROSS_SHARD_NSP + ); + conn.batch_execute(&query)?; + Ok(()) + }) + } + + /// If this is the primary shard, create the namespace `CROSS_SHARD_NSP` + /// and populate it with tables that union various imported tables + fn create_cross_shard_views(&self, servers: &[ForeignServer]) -> Result<(), StoreError> { + fn shard_nsp_pairs<'a>( + current: &Shard, + local_nsp: &str, + servers: &'a [ForeignServer], + ) -> Vec<(&'a str, String)> { + servers + .into_iter() + .map(|server| { + let nsp = if &server.shard == current { + local_nsp.to_string() + } else { + ForeignServer::metadata_schema(&server.shard) + }; + (server.shard.as_str(), nsp) + }) + .collect::>() + } + + if self.shard != *PRIMARY_SHARD { + return Ok(()); + } + + info!(&self.logger, "Creating cross-shard views"); + let mut conn = self.get()?; + + conn.transaction(|conn| { + let query = format!( + "create schema if not exists {}", + ForeignServer::CROSS_SHARD_NSP + ); + conn.batch_execute(&query)?; + for (src_nsp, src_tables) in SHARDED_TABLES { + // Pairs of (shard, nsp) for all servers + let nsps = shard_nsp_pairs(&self.shard, src_nsp, servers); + for src_table in src_tables { + let create_view = catalog::create_cross_shard_view( + conn, + src_nsp, + src_table, + ForeignServer::CROSS_SHARD_NSP, + &nsps, + )?; + conn.batch_execute(&create_view)?; + } + } + Ok(()) + }) + } + /// Copy the data from key tables in the primary into our local schema /// so it can be used as a fallback when the primary goes down pub async fn mirror_primary_tables(&self) -> Result<(), StoreError> {