From 0ff780d1a3822c2f89f42f4aba375d47551bd011 Mon Sep 17 00:00:00 2001 From: John Spray Date: Mon, 6 Jan 2025 12:05:43 +0000 Subject: [PATCH 1/4] storcon: secondary migrate API --- storage_controller/src/http.rs | 36 +++++++++++++++++- storage_controller/src/service.rs | 63 +++++++++++++++++++++++++++++++ 2 files changed, 98 insertions(+), 1 deletion(-) diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs index 24fd4c341a86..c9bbc3a4f7ff 100644 --- a/storage_controller/src/http.rs +++ b/storage_controller/src/http.rs @@ -690,7 +690,8 @@ async fn handle_node_list(req: Request) -> Result, ApiError }; let state = get_state(&req); - let nodes = state.service.node_list().await?; + let mut nodes = state.service.node_list().await?; + nodes.sort_by_key(|n| n.get_id()); let api_nodes = nodes.into_iter().map(|n| n.describe()).collect::>(); json_response(StatusCode::OK, api_nodes) @@ -1005,6 +1006,29 @@ async fn handle_tenant_shard_migrate( ) } +async fn handle_tenant_shard_migrate_secondary( + service: Arc, + req: Request, +) -> Result, ApiError> { + check_permissions(&req, Scope::Admin)?; + + let mut req = match maybe_forward(req).await { + ForwardOutcome::Forwarded(res) => { + return res; + } + ForwardOutcome::NotForwarded(req) => req, + }; + + let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?; + let migrate_req = json_request::(&mut req).await?; + json_response( + StatusCode::OK, + service + .tenant_shard_migrate_secondary(tenant_shard_id, migrate_req) + .await?, + ) +} + async fn handle_tenant_shard_cancel_reconcile( service: Arc, req: Request, @@ -1855,6 +1879,16 @@ pub fn make_router( RequestName("control_v1_tenant_migrate"), ) }) + .put( + "/control/v1/tenant/:tenant_shard_id/migrate_secondary", + |r| { + tenant_service_handler( + r, + handle_tenant_shard_migrate_secondary, + RequestName("control_v1_tenant_migrate_secondary"), + ) + }, + ) .put( "/control/v1/tenant/:tenant_shard_id/cancel_reconcile", |r| { diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 222cb9fdd409..20a666565b13 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -4928,6 +4928,69 @@ impl Service { Ok(TenantShardMigrateResponse {}) } + pub(crate) async fn tenant_shard_migrate_secondary( + &self, + tenant_shard_id: TenantShardId, + migrate_req: TenantShardMigrateRequest, + ) -> Result { + let waiter = { + let mut locked = self.inner.write().unwrap(); + let (nodes, tenants, scheduler) = locked.parts_mut(); + + let Some(node) = nodes.get(&migrate_req.node_id) else { + return Err(ApiError::BadRequest(anyhow::anyhow!( + "Node {} not found", + migrate_req.node_id + ))); + }; + + if !node.is_available() { + // Warn but proceed: the caller may intend to manually adjust the placement of + // a shard even if the node is down, e.g. if intervening during an incident. + tracing::warn!("Migrating to unavailable node {node}"); + } + + let Some(shard) = tenants.get_mut(&tenant_shard_id) else { + return Err(ApiError::NotFound( + anyhow::anyhow!("Tenant shard not found").into(), + )); + }; + + if shard.intent.get_secondary().len() == 1 + && shard.intent.get_secondary()[0] == migrate_req.node_id + { + tracing::info!( + "Migrating secondary to {node}: intent is unchanged {:?}", + shard.intent + ); + } else if shard.intent.get_attached() == &Some(migrate_req.node_id) { + tracing::info!("Migrating secondary to {node}: already attached where we were asked to create a secondary"); + } else { + let old_secondaries = shard.intent.get_secondary().clone(); + for secondary in old_secondaries { + shard.intent.remove_secondary(scheduler, secondary); + } + + shard.intent.push_secondary(scheduler, migrate_req.node_id); + shard.sequence = shard.sequence.next(); + tracing::info!( + "Migrating secondary to {node}: new intent {:?}", + shard.intent + ); + } + + self.maybe_reconcile_shard(shard, nodes) + }; + + if let Some(waiter) = waiter { + waiter.wait_timeout(RECONCILE_TIMEOUT).await?; + } else { + tracing::info!("Migration is a no-op"); + } + + Ok(TenantShardMigrateResponse {}) + } + /// 'cancel' in this context means cancel any ongoing reconcile pub(crate) async fn tenant_shard_cancel_reconcile( &self, From a5ef8c3d0bde483adac42dbb5a6f8d0789aff072 Mon Sep 17 00:00:00 2001 From: John Spray Date: Mon, 6 Jan 2025 12:05:53 +0000 Subject: [PATCH 2/4] storcon_cli: secondary migrate command --- control_plane/storcon_cli/src/main.rs | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/control_plane/storcon_cli/src/main.rs b/control_plane/storcon_cli/src/main.rs index 6ee1044c1839..c077a128d84e 100644 --- a/control_plane/storcon_cli/src/main.rs +++ b/control_plane/storcon_cli/src/main.rs @@ -112,6 +112,13 @@ enum Command { #[arg(long)] node: NodeId, }, + /// Migrate the secondary location for a tenant shard to a specific pageserver. + TenantShardMigrateSecondary { + #[arg(long)] + tenant_shard_id: TenantShardId, + #[arg(long)] + node: NodeId, + }, /// Cancel any ongoing reconciliation for this shard TenantShardCancelReconcile { #[arg(long)] @@ -553,6 +560,23 @@ async fn main() -> anyhow::Result<()> { ) .await?; } + Command::TenantShardMigrateSecondary { + tenant_shard_id, + node, + } => { + let req = TenantShardMigrateRequest { + tenant_shard_id, + node_id: node, + }; + + storcon_client + .dispatch::( + Method::PUT, + format!("control/v1/tenant/{tenant_shard_id}/migrate_secondary"), + Some(req), + ) + .await?; + } Command::TenantShardCancelReconcile { tenant_shard_id } => { storcon_client .dispatch::<(), ()>( From 49a683ff631be0f3defab725055a7e7d5946effe Mon Sep 17 00:00:00 2001 From: John Spray Date: Mon, 6 Jan 2025 12:21:12 +0000 Subject: [PATCH 3/4] storcon: downgrade a log to info, it can happen legitimately --- storage_controller/src/compute_hook.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/storage_controller/src/compute_hook.rs b/storage_controller/src/compute_hook.rs index 69db48f8d18c..3884a6df4601 100644 --- a/storage_controller/src/compute_hook.rs +++ b/storage_controller/src/compute_hook.rs @@ -124,7 +124,10 @@ impl ComputeHookTenant { if let Some(shard_idx) = shard_idx { sharded.shards.remove(shard_idx); } else { - tracing::warn!("Shard not found while handling detach") + // This is a valid but niche case, where the tenant was previously attached + // as a Secondary location and then detached, so has no previously notified + // state. + tracing::info!("Shard not found while handling detach") } } ComputeHookTenant::Unsharded(_) => { @@ -761,7 +764,10 @@ impl ComputeHook { let mut state_locked = self.state.lock().unwrap(); match state_locked.entry(tenant_shard_id.tenant_id) { Entry::Vacant(_) => { - tracing::warn!("Compute hook tenant not found for detach"); + // This is a valid but niche case, where the tenant was previously attached + // as a Secondary location and then detached, so has no previously notified + // state. + tracing::info!("Compute hook tenant not found for detach"); } Entry::Occupied(mut e) => { let sharded = e.get().is_sharded(); From 39a98f7baaca86827eab6e2df5f86279463efdda Mon Sep 17 00:00:00 2001 From: John Spray Date: Mon, 6 Jan 2025 12:21:36 +0000 Subject: [PATCH 4/4] tests: cover secondary migration --- .../regress/test_storage_controller.py | 96 +++++++++++++++---- 1 file changed, 80 insertions(+), 16 deletions(-) diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index 7062c35e05ab..023b52e6b33a 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -935,7 +935,7 @@ def test_storage_controller_debug_apis(neon_env_builder: NeonEnvBuilder): that just hits the endpoints to check that they don't bitrot. """ - neon_env_builder.num_pageservers = 2 + neon_env_builder.num_pageservers = 3 env = neon_env_builder.init_start() tenant_id = TenantId.generate() @@ -960,7 +960,7 @@ def test_storage_controller_debug_apis(neon_env_builder: NeonEnvBuilder): "GET", f"{env.storage_controller_api}/debug/v1/scheduler" ) # Two nodes, in a dict of node_id->node - assert len(response.json()["nodes"]) == 2 + assert len(response.json()["nodes"]) == 3 assert sum(v["shard_count"] for v in response.json()["nodes"].values()) == 3 assert all(v["may_schedule"] for v in response.json()["nodes"].values()) @@ -971,13 +971,25 @@ def test_storage_controller_debug_apis(neon_env_builder: NeonEnvBuilder): headers=env.storage_controller.headers(TokenScope.ADMIN), ) + # Secondary migration API: superficial check that it migrates + secondary_dest = env.pageservers[2].id + env.storage_controller.request( + "PUT", + f"{env.storage_controller_api}/control/v1/tenant/{tenant_id}-0002/migrate_secondary", + headers=env.storage_controller.headers(TokenScope.ADMIN), + json={"tenant_shard_id": f"{tenant_id}-0002", "node_id": secondary_dest}, + ) + assert env.storage_controller.tenant_describe(tenant_id)["shards"][0]["node_secondary"] == [ + secondary_dest + ] + # Node unclean drop API response = env.storage_controller.request( "POST", f"{env.storage_controller_api}/debug/v1/node/{env.pageservers[1].id}/drop", headers=env.storage_controller.headers(TokenScope.ADMIN), ) - assert len(env.storage_controller.node_list()) == 1 + assert len(env.storage_controller.node_list()) == 2 # Tenant unclean drop API response = env.storage_controller.request( @@ -1695,7 +1707,13 @@ def test_storcon_cli(neon_env_builder: NeonEnvBuilder): """ output_dir = neon_env_builder.test_output_dir shard_count = 4 - env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count) + neon_env_builder.num_pageservers = 2 + env = neon_env_builder.init_configs() + env.start() + + tenant_id = TenantId.generate() + env.create_tenant(tenant_id, placement_policy='{"Attached":1}', shard_count=shard_count) + base_args = [env.neon_binpath / "storcon_cli", "--api", env.storage_controller_api] def storcon_cli(args): @@ -1724,7 +1742,7 @@ def storcon_cli(args): # List nodes node_lines = storcon_cli(["nodes"]) # Table header, footer, and one line of data - assert len(node_lines) == 5 + assert len(node_lines) == 7 assert "localhost" in node_lines[3] # Pause scheduling onto a node @@ -1742,10 +1760,21 @@ def storcon_cli(args): storcon_cli(["node-configure", "--node-id", "1", "--availability", "offline"]) assert "Offline" in storcon_cli(["nodes"])[3] + # Restore node, verify status changes in CLI output + env.pageservers[0].start() + + def is_online(): + assert "Offline" not in storcon_cli(["nodes"]) + + wait_until(is_online) + + # Let everything stabilize after node failure to avoid interfering with subsequent steps + env.storage_controller.reconcile_until_idle(timeout_secs=10) + # List tenants tenant_lines = storcon_cli(["tenants"]) assert len(tenant_lines) == 5 - assert str(env.initial_tenant) in tenant_lines[3] + assert str(tenant_id) in tenant_lines[3] # Setting scheduling policies intentionally result in warnings, they're for rare use. env.storage_controller.allowed_errors.extend( @@ -1753,23 +1782,58 @@ def storcon_cli(args): ) # Describe a tenant - tenant_lines = storcon_cli(["tenant-describe", "--tenant-id", str(env.initial_tenant)]) + tenant_lines = storcon_cli(["tenant-describe", "--tenant-id", str(tenant_id)]) assert len(tenant_lines) >= 3 + shard_count * 2 - assert str(env.initial_tenant) in tenant_lines[0] + assert str(tenant_id) in tenant_lines[0] + + # Migrate an attached location + def other_ps_id(current_ps_id): + return ( + env.pageservers[0].id + if current_ps_id == env.pageservers[1].id + else env.pageservers[1].id + ) + + storcon_cli( + [ + "tenant-shard-migrate", + "--tenant-shard-id", + f"{tenant_id}-0004", + "--node", + str( + other_ps_id( + env.storage_controller.tenant_describe(tenant_id)["shards"][0]["node_attached"] + ) + ), + ] + ) + + # Migrate a secondary location + storcon_cli( + [ + "tenant-shard-migrate-secondary", + "--tenant-shard-id", + f"{tenant_id}-0004", + "--node", + str( + other_ps_id( + env.storage_controller.tenant_describe(tenant_id)["shards"][0][ + "node_secondary" + ][0] + ) + ), + ] + ) # Pause changes on a tenant - storcon_cli(["tenant-policy", "--tenant-id", str(env.initial_tenant), "--scheduling", "stop"]) + storcon_cli(["tenant-policy", "--tenant-id", str(tenant_id), "--scheduling", "stop"]) assert "Stop" in storcon_cli(["tenants"])[3] # Cancel ongoing reconcile on a tenant - storcon_cli( - ["tenant-shard-cancel-reconcile", "--tenant-shard-id", f"{env.initial_tenant}-0104"] - ) + storcon_cli(["tenant-shard-cancel-reconcile", "--tenant-shard-id", f"{tenant_id}-0104"]) # Change a tenant's placement - storcon_cli( - ["tenant-policy", "--tenant-id", str(env.initial_tenant), "--placement", "secondary"] - ) + storcon_cli(["tenant-policy", "--tenant-id", str(tenant_id), "--placement", "secondary"]) assert "Secondary" in storcon_cli(["tenants"])[3] # Modify a tenant's config @@ -1777,7 +1841,7 @@ def storcon_cli(args): [ "patch-tenant-config", "--tenant-id", - str(env.initial_tenant), + str(tenant_id), "--config", json.dumps({"pitr_interval": "1m"}), ]