Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add views across all shards #5820

Merged
merged 3 commits into from
Mar 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions store/postgres/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,58 @@ pub fn create_foreign_table(
Ok(query)
}

/// Create a SQL statement unioning imported tables from all shards,
/// something like
///
/// ```sql
/// create view "dst_nsp"."src_table" as
/// select 'shard1' as shard, "col1", "col2" from "shard_shard1_subgraphs"."table_name"
/// union all
/// ...
/// ````
///
/// The list `shard_nsps` consists of pairs `(name, namespace)` where `name`
/// is the name of the shard and `namespace` is the namespace where the
/// `src_table` is mapped
pub fn create_cross_shard_view(
conn: &mut PgConnection,
src_nsp: &str,
src_table: &str,
dst_nsp: &str,
shard_nsps: &[(&str, String)],
) -> Result<String, StoreError> {
fn build_query(
columns: &[table_schema::Column],
table_name: &str,
dst_nsp: &str,
shard_nsps: &[(&str, String)],
) -> Result<String, std::fmt::Error> {
let mut query = String::new();
write!(query, "create view \"{}\".\"{}\" as ", dst_nsp, table_name)?;
for (idx, (name, nsp)) in shard_nsps.into_iter().enumerate() {
if idx > 0 {
write!(query, " union all ")?;
}
write!(query, "select '{name}' as shard")?;
for column in columns {
write!(query, ", \"{}\"", column.column_name)?;
}
writeln!(query, " from \"{}\".\"{}\"", nsp, table_name)?;
}
Ok(query)
}

let columns = table_schema::columns(conn, src_nsp, src_table)?;
let query = build_query(&columns, src_table, dst_nsp, shard_nsps).map_err(|_| {
anyhow!(
"failed to generate 'create foreign table' query for {}.{}",
dst_nsp,
src_table
)
})?;
Ok(query)
}

/// Checks in the database if a given index is valid.
pub(crate) fn check_index_is_valid(
conn: &mut PgConnection,
Expand Down
127 changes: 112 additions & 15 deletions store/postgres/src/connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,23 @@ use crate::primary::{self, NAMESPACE_PUBLIC};
use crate::{advisory_lock, catalog};
use crate::{Shard, PRIMARY_SHARD};

const SHARDED_TABLES: [(&str, &[&str]); 2] = [
("public", &["ethereum_networks"]),
(
"subgraphs",
&[
"copy_state",
"copy_table_state",
"dynamic_ethereum_contract_data_source",
"subgraph_deployment",
"subgraph_error",
"subgraph_features",
"subgraph_manifest",
"table_stats",
],
),
];

pub struct ForeignServer {
pub name: String,
pub shard: Shard,
Expand All @@ -49,6 +66,7 @@ pub struct ForeignServer {

impl ForeignServer {
pub(crate) const PRIMARY_PUBLIC: &'static str = "primary_public";
pub(crate) const CROSS_SHARD_NSP: &'static str = "sharded";

/// The name of the foreign server under which data for `shard` is
/// accessible
Expand Down Expand Up @@ -209,19 +227,12 @@ impl ForeignServer {
let nsp = Self::metadata_schema(&self.shard);
catalog::recreate_schema(conn, &nsp)?;
let mut query = String::new();
for table_name in [
"subgraph_error",
"dynamic_ethereum_contract_data_source",
"table_stats",
"subgraph_deployment_assignment",
"subgraph",
"subgraph_version",
"subgraph_deployment",
"subgraph_manifest",
] {
let create_stmt =
catalog::create_foreign_table(conn, "subgraphs", table_name, &nsp, &self.name)?;
write!(query, "{}", create_stmt)?;
for (src_nsp, src_tables) in SHARDED_TABLES {
for src_table in src_tables {
let create_stmt =
catalog::create_foreign_table(conn, src_nsp, src_table, &nsp, &self.name)?;
write!(query, "{}", create_stmt)?;
}
}
Ok(conn.batch_execute(&query)?)
}
Expand Down Expand Up @@ -1015,12 +1026,17 @@ impl PoolInner {
// in the database instead of just in memory
let result = pool
.configure_fdw(coord.servers.as_ref())
.and_then(|()| pool.drop_cross_shard_views())
.and_then(|()| migrate_schema(&pool.logger, &mut conn))
.and_then(|count| coord.propagate(&pool, count));
.and_then(|count| {
pool.create_cross_shard_views(coord.servers.as_ref())
.map(|()| count)
});
debug!(&pool.logger, "Release migration lock");
advisory_lock::unlock_migration(&mut conn).unwrap_or_else(|err| {
die(&pool.logger, "failed to release migration lock", &err);
});
let result = result.and_then(|count| coord.propagate(&pool, count));
result.unwrap_or_else(|err| die(&pool.logger, "migrations failed", &err));

// Locale check
Expand Down Expand Up @@ -1068,6 +1084,76 @@ impl PoolInner {
})
}

/// If this is the primary shard, drop the namespace `CROSS_SHARD_NSP`
fn drop_cross_shard_views(&self) -> Result<(), StoreError> {
if self.shard != *PRIMARY_SHARD {
return Ok(());
}

info!(&self.logger, "Dropping cross-shard views");
let mut conn = self.get()?;
conn.transaction(|conn| {
let query = format!(
"drop schema if exists {} cascade",
ForeignServer::CROSS_SHARD_NSP
);
conn.batch_execute(&query)?;
Ok(())
})
}

/// If this is the primary shard, create the namespace `CROSS_SHARD_NSP`
/// and populate it with tables that union various imported tables
fn create_cross_shard_views(&self, servers: &[ForeignServer]) -> Result<(), StoreError> {
fn shard_nsp_pairs<'a>(
current: &Shard,
local_nsp: &str,
servers: &'a [ForeignServer],
) -> Vec<(&'a str, String)> {
servers
.into_iter()
.map(|server| {
let nsp = if &server.shard == current {
local_nsp.to_string()
} else {
ForeignServer::metadata_schema(&server.shard)
};
(server.shard.as_str(), nsp)
})
.collect::<Vec<_>>()
}

if self.shard != *PRIMARY_SHARD {
return Ok(());
}

info!(&self.logger, "Creating cross-shard views");
let mut conn = self.get()?;

conn.transaction(|conn| {
let query = format!(
"create schema if not exists {}",
ForeignServer::CROSS_SHARD_NSP
);
conn.batch_execute(&query)?;
for (src_nsp, src_tables) in SHARDED_TABLES {
// Pairs of (shard, nsp) for all servers
let nsps = shard_nsp_pairs(&self.shard, src_nsp, servers);
for src_table in src_tables {
let create_view = catalog::create_cross_shard_view(
conn,
src_nsp,
src_table,
ForeignServer::CROSS_SHARD_NSP,
&nsps,
)?;
conn.batch_execute(&create_view)?;
}
}
Ok(())
})
}

/// Copy the data from key tables in the primary into our local schema
/// so it can be used as a fallback when the primary goes down
pub async fn mirror_primary_tables(&self) -> Result<(), StoreError> {
Expand Down Expand Up @@ -1233,6 +1319,10 @@ impl PoolCoordinator {
/// other pools will then recreate any tables that they imported from
/// `shard`. If `pool` is a new shard, we also map all other shards into
/// it.
///
/// This tries to take the migration lock and must therefore be run from
/// code that does _not_ hold the migration lock as it will otherwise
/// deadlock
fn propagate(&self, pool: &PoolInner, count: MigrationCount) -> Result<(), StoreError> {
// pool is a new shard, map all other shards into it
if count.is_new() {
Expand All @@ -1244,7 +1334,14 @@ impl PoolCoordinator {
if count.had_migrations() {
let server = self.server(&pool.shard)?;
for pool in self.pools.lock().unwrap().values() {
if let Err(e) = pool.remap(server) {
let mut conn = pool.get()?;
let remap_res = {
advisory_lock::lock_migration(&mut conn)?;
let res = pool.remap(server);
advisory_lock::unlock_migration(&mut conn)?;
res
};
if let Err(e) = remap_res {
error!(pool.logger, "Failed to map imports from {}", server.shard; "error" => e.to_string());
return Err(e);
}
Expand Down
Loading