Skip to content

Commit

Permalink
store: Create views in the primary that union the tables from shards
Browse files Browse the repository at this point in the history
Fixes #4824
Fixes #4825
  • Loading branch information
lutter committed Feb 14, 2025
1 parent ca40ded commit 2a9a6b7
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 18 deletions.
52 changes: 52 additions & 0 deletions store/postgres/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,58 @@ pub fn create_foreign_table(
Ok(query)
}

/// Create a SQL statement unioning imported tables from all shards,
/// something like
///
/// ```sql
/// create view "dst_nsp"."src_table" as
/// select 'shard1' as shard, "col1", "col2" from "shard_shard1_subgraphs"."table_name"
/// union all
/// ...
/// ````
///
/// The list `shard_nsps` consists of pairs `(name, namespace)` where `name`
/// is the name of the shard and `namespace` is the namespace where the
/// `src_table` is mapped
pub fn create_cross_shard_view(
conn: &mut PgConnection,
src_nsp: &str,
src_table: &str,
dst_nsp: &str,
shard_nsps: &[(&str, String)],
) -> Result<String, StoreError> {
fn build_query(
columns: &[table_schema::Column],
table_name: &str,
dst_nsp: &str,
shard_nsps: &[(&str, String)],
) -> Result<String, std::fmt::Error> {
let mut query = String::new();
write!(query, "create view \"{}\".\"{}\" as ", dst_nsp, table_name)?;
for (idx, (name, nsp)) in shard_nsps.into_iter().enumerate() {
if idx > 0 {
write!(query, " union all ")?;
}
write!(query, "select '{name}' as shard")?;
for column in columns {
write!(query, ", \"{}\"", column.column_name)?;
}
writeln!(query, " from \"{}\".\"{}\"", nsp, table_name)?;
}
Ok(query)
}

let columns = table_schema::columns(conn, src_nsp, src_table)?;
let query = build_query(&columns, src_table, dst_nsp, shard_nsps).map_err(|_| {
anyhow!(
"failed to generate 'create foreign table' query for {}.{}",
dst_nsp,
src_table
)
})?;
Ok(query)
}

/// Checks in the database if a given index is valid.
pub(crate) fn check_index_is_valid(
conn: &mut PgConnection,
Expand Down
113 changes: 95 additions & 18 deletions store/postgres/src/connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,23 @@ use crate::primary::{self, NAMESPACE_PUBLIC};
use crate::{advisory_lock, catalog};
use crate::{Shard, PRIMARY_SHARD};

const SHARDED_TABLES: [(&str, &[&str]); 2] = [
("public", &["ethereum_networks"]),
(
"subgraphs",
&[
"copy_state",
"copy_table_state",
"dynamic_ethereum_contract_data_source",
"subgraph_deployment",
"subgraph_error",
"subgraph_features",
"subgraph_manifest",
"table_stats",
],
),
];

pub struct ForeignServer {
pub name: String,
pub shard: Shard,
Expand All @@ -49,6 +66,7 @@ pub struct ForeignServer {

impl ForeignServer {
pub(crate) const PRIMARY_PUBLIC: &'static str = "primary_public";
pub(crate) const CROSS_SHARD_NSP: &'static str = "sharded";

/// The name of the foreign server under which data for `shard` is
/// accessible
Expand Down Expand Up @@ -206,26 +224,10 @@ impl ForeignServer {
/// Map the `subgraphs` schema from the foreign server `self` into the
/// database accessible through `conn`
fn map_metadata(&self, conn: &mut PgConnection) -> Result<(), StoreError> {
const MAP_TABLES: [(&str, &[&str]); 2] = [
("public", &["ethereum_networks"]),
(
"subgraphs",
&[
"copy_state",
"copy_table_state",
"dynamic_ethereum_contract_data_source",
"subgraph_deployment",
"subgraph_error",
"subgraph_features",
"subgraph_manifest",
"table_stats",
],
),
];
let nsp = Self::metadata_schema(&self.shard);
catalog::recreate_schema(conn, &nsp)?;
let mut query = String::new();
for (src_nsp, src_tables) in MAP_TABLES {
for (src_nsp, src_tables) in SHARDED_TABLES {
for src_table in src_tables {
let create_stmt =
catalog::create_foreign_table(conn, src_nsp, src_table, &nsp, &self.name)?;
Expand Down Expand Up @@ -1024,7 +1026,12 @@ impl PoolInner {
// in the database instead of just in memory
let result = pool
.configure_fdw(coord.servers.as_ref())
.and_then(|()| migrate_schema(&pool.logger, &mut conn));
.and_then(|()| pool.drop_cross_shard_views())
.and_then(|()| migrate_schema(&pool.logger, &mut conn))
.and_then(|count| {
pool.create_cross_shard_views(coord.servers.as_ref())
.map(|()| count)
});
debug!(&pool.logger, "Release migration lock");
advisory_lock::unlock_migration(&mut conn).unwrap_or_else(|err| {
die(&pool.logger, "failed to release migration lock", &err);
Expand Down Expand Up @@ -1077,6 +1084,76 @@ impl PoolInner {
})
}

/// If this is the primary shard, drop the namespace `CROSS_SHARD_NSP`
fn drop_cross_shard_views(&self) -> Result<(), StoreError> {
if self.shard != *PRIMARY_SHARD {
return Ok(());
}

info!(&self.logger, "Dropping cross-shard views");
let mut conn = self.get()?;
conn.transaction(|conn| {
let query = format!(
"drop schema if exists {} cascade",
ForeignServer::CROSS_SHARD_NSP
);
conn.batch_execute(&query)?;
Ok(())
})
}

/// If this is the primary shard, create the namespace `CROSS_SHARD_NSP`
/// and populate it with tables that union various imported tables
fn create_cross_shard_views(&self, servers: &[ForeignServer]) -> Result<(), StoreError> {
fn shard_nsp_pairs<'a>(
current: &Shard,
local_nsp: &str,
servers: &'a [ForeignServer],
) -> Vec<(&'a str, String)> {
servers
.into_iter()
.map(|server| {
let nsp = if &server.shard == current {
local_nsp.to_string()
} else {
ForeignServer::metadata_schema(&server.shard)
};
(server.shard.as_str(), nsp)
})
.collect::<Vec<_>>()
}

if self.shard != *PRIMARY_SHARD {
return Ok(());
}

info!(&self.logger, "Creating cross-shard views");
let mut conn = self.get()?;

conn.transaction(|conn| {
let query = format!(
"create schema if not exists {}",
ForeignServer::CROSS_SHARD_NSP
);
conn.batch_execute(&query)?;
for (src_nsp, src_tables) in SHARDED_TABLES {
// Pairs of (shard, nsp) for all servers
let nsps = shard_nsp_pairs(&self.shard, src_nsp, servers);
for src_table in src_tables {
let create_view = catalog::create_cross_shard_view(
conn,
src_nsp,
src_table,
ForeignServer::CROSS_SHARD_NSP,
&nsps,
)?;
conn.batch_execute(&create_view)?;
}
}
Ok(())
})
}

/// Copy the data from key tables in the primary into our local schema
/// so it can be used as a fallback when the primary goes down
pub async fn mirror_primary_tables(&self) -> Result<(), StoreError> {
Expand Down

0 comments on commit 2a9a6b7

Please sign in to comment.