Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage controller: API + CLI for migrating secondary locations #10284

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions control_plane/storcon_cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ enum Command {
#[arg(long)]
node: NodeId,
},
/// Migrate the secondary location for a tenant shard to a specific pageserver.
TenantShardMigrateSecondary {
#[arg(long)]
tenant_shard_id: TenantShardId,
#[arg(long)]
node: NodeId,
},
/// Cancel any ongoing reconciliation for this shard
TenantShardCancelReconcile {
#[arg(long)]
Expand Down Expand Up @@ -553,6 +560,23 @@ async fn main() -> anyhow::Result<()> {
)
.await?;
}
Command::TenantShardMigrateSecondary {
tenant_shard_id,
node,
} => {
let req = TenantShardMigrateRequest {
tenant_shard_id,
node_id: node,
};

storcon_client
.dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
Method::PUT,
format!("control/v1/tenant/{tenant_shard_id}/migrate_secondary"),
Some(req),
)
.await?;
}
Command::TenantShardCancelReconcile { tenant_shard_id } => {
storcon_client
.dispatch::<(), ()>(
Expand Down
10 changes: 8 additions & 2 deletions storage_controller/src/compute_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,10 @@ impl ComputeHookTenant {
if let Some(shard_idx) = shard_idx {
sharded.shards.remove(shard_idx);
} else {
tracing::warn!("Shard not found while handling detach")
// This is a valid but niche case, where the tenant was previously attached
// as a Secondary location and then detached, so has no previously notified
// state.
tracing::info!("Shard not found while handling detach")
}
}
ComputeHookTenant::Unsharded(_) => {
Expand Down Expand Up @@ -761,7 +764,10 @@ impl ComputeHook {
let mut state_locked = self.state.lock().unwrap();
match state_locked.entry(tenant_shard_id.tenant_id) {
Entry::Vacant(_) => {
tracing::warn!("Compute hook tenant not found for detach");
// This is a valid but niche case, where the tenant was previously attached
// as a Secondary location and then detached, so has no previously notified
// state.
tracing::info!("Compute hook tenant not found for detach");
}
Entry::Occupied(mut e) => {
let sharded = e.get().is_sharded();
Expand Down
36 changes: 35 additions & 1 deletion storage_controller/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,8 @@ async fn handle_node_list(req: Request<Body>) -> Result<Response<Body>, ApiError
};

let state = get_state(&req);
let nodes = state.service.node_list().await?;
let mut nodes = state.service.node_list().await?;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deterministic order to enable the storcon CLI test assertions to run reliably with multiple nodes

nodes.sort_by_key(|n| n.get_id());
let api_nodes = nodes.into_iter().map(|n| n.describe()).collect::<Vec<_>>();

json_response(StatusCode::OK, api_nodes)
Expand Down Expand Up @@ -1005,6 +1006,29 @@ async fn handle_tenant_shard_migrate(
)
}

async fn handle_tenant_shard_migrate_secondary(
service: Arc<Service>,
req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;

let mut req = match maybe_forward(req).await {
ForwardOutcome::Forwarded(res) => {
return res;
}
ForwardOutcome::NotForwarded(req) => req,
};

let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
let migrate_req = json_request::<TenantShardMigrateRequest>(&mut req).await?;
json_response(
StatusCode::OK,
service
.tenant_shard_migrate_secondary(tenant_shard_id, migrate_req)
.await?,
)
}

async fn handle_tenant_shard_cancel_reconcile(
service: Arc<Service>,
req: Request<Body>,
Expand Down Expand Up @@ -1855,6 +1879,16 @@ pub fn make_router(
RequestName("control_v1_tenant_migrate"),
)
})
.put(
"/control/v1/tenant/:tenant_shard_id/migrate_secondary",
|r| {
tenant_service_handler(
r,
handle_tenant_shard_migrate_secondary,
RequestName("control_v1_tenant_migrate_secondary"),
)
},
)
.put(
"/control/v1/tenant/:tenant_shard_id/cancel_reconcile",
|r| {
Expand Down
63 changes: 63 additions & 0 deletions storage_controller/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5055,6 +5055,69 @@ impl Service {
Ok(TenantShardMigrateResponse {})
}

pub(crate) async fn tenant_shard_migrate_secondary(
&self,
tenant_shard_id: TenantShardId,
migrate_req: TenantShardMigrateRequest,
) -> Result<TenantShardMigrateResponse, ApiError> {
let waiter = {
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, scheduler) = locked.parts_mut();

let Some(node) = nodes.get(&migrate_req.node_id) else {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"Node {} not found",
migrate_req.node_id
)));
};

if !node.is_available() {
// Warn but proceed: the caller may intend to manually adjust the placement of
// a shard even if the node is down, e.g. if intervening during an incident.
tracing::warn!("Migrating to unavailable node {node}");
}

let Some(shard) = tenants.get_mut(&tenant_shard_id) else {
return Err(ApiError::NotFound(
anyhow::anyhow!("Tenant shard not found").into(),
));
};

if shard.intent.get_secondary().len() == 1
&& shard.intent.get_secondary()[0] == migrate_req.node_id
{
tracing::info!(
"Migrating secondary to {node}: intent is unchanged {:?}",
shard.intent
);
} else if shard.intent.get_attached() == &Some(migrate_req.node_id) {
tracing::info!("Migrating secondary to {node}: already attached where we were asked to create a secondary");
} else {
let old_secondaries = shard.intent.get_secondary().clone();
for secondary in old_secondaries {
shard.intent.remove_secondary(scheduler, secondary);
}

shard.intent.push_secondary(scheduler, migrate_req.node_id);
shard.sequence = shard.sequence.next();
tracing::info!(
"Migrating secondary to {node}: new intent {:?}",
shard.intent
);
}

self.maybe_reconcile_shard(shard, nodes)
};

if let Some(waiter) = waiter {
waiter.wait_timeout(RECONCILE_TIMEOUT).await?;
} else {
tracing::info!("Migration is a no-op");
}

Ok(TenantShardMigrateResponse {})
}

/// 'cancel' in this context means cancel any ongoing reconcile
pub(crate) async fn tenant_shard_cancel_reconcile(
&self,
Expand Down
96 changes: 80 additions & 16 deletions test_runner/regress/test_storage_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -936,7 +936,7 @@ def test_storage_controller_debug_apis(neon_env_builder: NeonEnvBuilder):
that just hits the endpoints to check that they don't bitrot.
"""

neon_env_builder.num_pageservers = 2
neon_env_builder.num_pageservers = 3
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3 pageservers because we need some node that contains neither an attached nor secondary location, in order to migrate a secondary there

env = neon_env_builder.init_start()

tenant_id = TenantId.generate()
Expand All @@ -961,7 +961,7 @@ def test_storage_controller_debug_apis(neon_env_builder: NeonEnvBuilder):
"GET", f"{env.storage_controller_api}/debug/v1/scheduler"
)
# Two nodes, in a dict of node_id->node
assert len(response.json()["nodes"]) == 2
assert len(response.json()["nodes"]) == 3
assert sum(v["shard_count"] for v in response.json()["nodes"].values()) == 3
assert all(v["may_schedule"] for v in response.json()["nodes"].values())

Expand All @@ -972,13 +972,25 @@ def test_storage_controller_debug_apis(neon_env_builder: NeonEnvBuilder):
headers=env.storage_controller.headers(TokenScope.ADMIN),
)

# Secondary migration API: superficial check that it migrates
secondary_dest = env.pageservers[2].id
env.storage_controller.request(
"PUT",
f"{env.storage_controller_api}/control/v1/tenant/{tenant_id}-0002/migrate_secondary",
headers=env.storage_controller.headers(TokenScope.ADMIN),
json={"tenant_shard_id": f"{tenant_id}-0002", "node_id": secondary_dest},
)
assert env.storage_controller.tenant_describe(tenant_id)["shards"][0]["node_secondary"] == [
secondary_dest
]

# Node unclean drop API
response = env.storage_controller.request(
"POST",
f"{env.storage_controller_api}/debug/v1/node/{env.pageservers[1].id}/drop",
headers=env.storage_controller.headers(TokenScope.ADMIN),
)
assert len(env.storage_controller.node_list()) == 1
assert len(env.storage_controller.node_list()) == 2

# Tenant unclean drop API
response = env.storage_controller.request(
Expand Down Expand Up @@ -1696,7 +1708,13 @@ def test_storcon_cli(neon_env_builder: NeonEnvBuilder):
"""
output_dir = neon_env_builder.test_output_dir
shard_count = 4
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)
neon_env_builder.num_pageservers = 2
env = neon_env_builder.init_configs()
env.start()

tenant_id = TenantId.generate()
env.create_tenant(tenant_id, placement_policy='{"Attached":1}', shard_count=shard_count)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switch to explicit tenant creation because the default tenant in NeonEnv doesn't have secondaries.


base_args = [env.neon_binpath / "storcon_cli", "--api", env.storage_controller_api]

def storcon_cli(args):
Expand Down Expand Up @@ -1725,7 +1743,7 @@ def storcon_cli(args):
# List nodes
node_lines = storcon_cli(["nodes"])
# Table header, footer, and one line of data
assert len(node_lines) == 5
assert len(node_lines) == 7
assert "localhost" in node_lines[3]

# Pause scheduling onto a node
Expand All @@ -1743,42 +1761,88 @@ def storcon_cli(args):
storcon_cli(["node-configure", "--node-id", "1", "--availability", "offline"])
assert "Offline" in storcon_cli(["nodes"])[3]

# Restore node, verify status changes in CLI output
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is necessary because we now have two nodes, so making one unavailable kicks off migrations

env.pageservers[0].start()

def is_online():
assert "Offline" not in storcon_cli(["nodes"])

wait_until(is_online)

# Let everything stabilize after node failure to avoid interfering with subsequent steps
env.storage_controller.reconcile_until_idle(timeout_secs=10)

# List tenants
tenant_lines = storcon_cli(["tenants"])
assert len(tenant_lines) == 5
assert str(env.initial_tenant) in tenant_lines[3]
assert str(tenant_id) in tenant_lines[3]

# Setting scheduling policies intentionally result in warnings, they're for rare use.
env.storage_controller.allowed_errors.extend(
[".*Skipping reconcile for policy.*", ".*Scheduling is disabled by policy.*"]
)

# Describe a tenant
tenant_lines = storcon_cli(["tenant-describe", "--tenant-id", str(env.initial_tenant)])
tenant_lines = storcon_cli(["tenant-describe", "--tenant-id", str(tenant_id)])
assert len(tenant_lines) >= 3 + shard_count * 2
assert str(env.initial_tenant) in tenant_lines[0]
assert str(tenant_id) in tenant_lines[0]

# Migrate an attached location
def other_ps_id(current_ps_id):
return (
env.pageservers[0].id
if current_ps_id == env.pageservers[1].id
else env.pageservers[1].id
)

storcon_cli(
[
"tenant-shard-migrate",
"--tenant-shard-id",
f"{tenant_id}-0004",
"--node",
str(
other_ps_id(
env.storage_controller.tenant_describe(tenant_id)["shards"][0]["node_attached"]
)
),
]
)

# Migrate a secondary location
storcon_cli(
[
"tenant-shard-migrate-secondary",
"--tenant-shard-id",
f"{tenant_id}-0004",
"--node",
str(
other_ps_id(
env.storage_controller.tenant_describe(tenant_id)["shards"][0][
"node_secondary"
][0]
)
),
]
)

# Pause changes on a tenant
storcon_cli(["tenant-policy", "--tenant-id", str(env.initial_tenant), "--scheduling", "stop"])
storcon_cli(["tenant-policy", "--tenant-id", str(tenant_id), "--scheduling", "stop"])
assert "Stop" in storcon_cli(["tenants"])[3]

# Cancel ongoing reconcile on a tenant
storcon_cli(
["tenant-shard-cancel-reconcile", "--tenant-shard-id", f"{env.initial_tenant}-0104"]
)
storcon_cli(["tenant-shard-cancel-reconcile", "--tenant-shard-id", f"{tenant_id}-0104"])

# Change a tenant's placement
storcon_cli(
["tenant-policy", "--tenant-id", str(env.initial_tenant), "--placement", "secondary"]
)
storcon_cli(["tenant-policy", "--tenant-id", str(tenant_id), "--placement", "secondary"])
assert "Secondary" in storcon_cli(["tenants"])[3]

# Modify a tenant's config
storcon_cli(
[
"patch-tenant-config",
"--tenant-id",
str(env.initial_tenant),
str(tenant_id),
"--config",
json.dumps({"pitr_interval": "1m"}),
]
Expand Down
Loading