From 5a0d1e07977ce14af4985b08b0c3299d0ea163c1 Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Fri, 27 May 2022 15:14:30 -0400 Subject: [PATCH] [nexus] Re-implement disk attach/detach (#1106) --- nexus/src/app/disk.rs | 5 + nexus/src/app/instance.rs | 293 +---- nexus/src/db/collection_attach.rs | 1494 ++++++++++++++++++++++++ nexus/src/db/collection_detach.rs | 1117 ++++++++++++++++++ nexus/src/db/collection_detach_many.rs | 1193 +++++++++++++++++++ nexus/src/db/cte_utils.rs | 86 ++ nexus/src/db/datastore.rs | 355 +++++- nexus/src/db/mod.rs | 5 +- nexus/src/db/model/instance.rs | 16 +- nexus/tests/integration_tests/disks.rs | 181 +-- 10 files changed, 4344 insertions(+), 401 deletions(-) create mode 100644 nexus/src/db/collection_attach.rs create mode 100644 nexus/src/db/collection_detach.rs create mode 100644 nexus/src/db/collection_detach_many.rs create mode 100644 nexus/src/db/cte_utils.rs diff --git a/nexus/src/app/disk.rs b/nexus/src/app/disk.rs index 52492443c59..5a0f0ee4117 100644 --- a/nexus/src/app/disk.rs +++ b/nexus/src/app/disk.rs @@ -163,6 +163,11 @@ impl super::Nexus { /// Modifies the runtime state of the Disk as requested. This generally /// means attaching or detaching the disk. + // TODO(https://github.com/oxidecomputer/omicron/issues/811): + // This will be unused until we implement hot-plug support. + // However, it has been left for reference until then, as it will + // likely be needed once that feature is implemented. + #[allow(dead_code)] pub(crate) async fn disk_set_runtime( &self, opctx: &OpContext, diff --git a/nexus/src/app/instance.rs b/nexus/src/app/instance.rs index 0caf3680baf..6ea7a022ced 100644 --- a/nexus/src/app/instance.rs +++ b/nexus/src/app/instance.rs @@ -19,7 +19,6 @@ use omicron_common::api::external; use omicron_common::api::external::CreateResult; use omicron_common::api::external::DataPageParams; use omicron_common::api::external::DeleteResult; -use omicron_common::api::external::DiskState; use omicron_common::api::external::Error; use omicron_common::api::external::InstanceState; use omicron_common::api::external::ListResultVec; @@ -152,17 +151,9 @@ impl super::Nexus { Ok(db_instance) } - // TODO-correctness It's not totally clear what the semantics and behavior - // should be here. It might be nice to say that you can only do this - // operation if the Instance is already stopped, in which case we can - // execute this immediately by just removing it from the database, with the - // same race we have with disk delete (i.e., if someone else is requesting - // an instance boot, we may wind up in an inconsistent state). On the other - // hand, we could always allow this operation, issue the request to the SA - // to destroy the instance (not just stop it), and proceed with deletion - // when that finishes. But in that case, although the HTTP DELETE request - // completed, the object will still appear for a little while, which kind of - // sucks. + // This operation may only occur on stopped instances, which implies that + // the attached disks do not have any running "upstairs" process running + // within the sled. pub async fn project_destroy_instance( &self, opctx: &OpContext, @@ -173,7 +164,7 @@ impl super::Nexus { // TODO-robustness We need to figure out what to do with Destroyed // instances? Presumably we need to clean them up at some point, but // not right away so that callers can see that they've been destroyed. - let (.., authz_instance, db_instance) = + let (.., authz_instance, _) = LookupPath::new(opctx, &self.db_datastore) .organization_name(organization_name) .project_name(project_name) @@ -181,50 +172,6 @@ impl super::Nexus { .fetch() .await?; - opctx.authorize(authz::Action::Delete, &authz_instance).await?; - - match db_instance.runtime_state.state.state() { - InstanceState::Stopped | InstanceState::Failed => { - // ok - } - - state => { - return Err(Error::InvalidRequest { - message: format!( - "instance cannot be deleted in state \"{}\"", - state, - ), - }); - } - } - - // Detach all attached disks - let disks = self - .instance_list_disks( - opctx, - organization_name, - project_name, - instance_name, - &DataPageParams { - marker: None, - direction: dropshot::PaginationOrder::Ascending, - limit: std::num::NonZeroU32::new(MAX_DISKS_PER_INSTANCE) - .unwrap(), - }, - ) - .await?; - - for disk in &disks { - self.instance_detach_disk( - opctx, - organization_name, - project_name, - instance_name, - &disk.name(), - ) - .await?; - } - self.db_datastore.project_delete_instance(opctx, &authz_instance).await } @@ -586,144 +533,43 @@ impl super::Nexus { instance_name: &Name, disk_name: &Name, ) -> UpdateResult { - let (.., authz_project, authz_disk, db_disk) = + let (.., authz_project, authz_disk, _) = LookupPath::new(opctx, &self.db_datastore) .organization_name(organization_name) .project_name(project_name) .disk_name(disk_name) .fetch() .await?; - let (.., authz_instance, db_instance) = + let (.., authz_instance, _) = LookupPath::new(opctx, &self.db_datastore) .project_id(authz_project.id()) .instance_name(instance_name) .fetch() .await?; - let instance_id = &authz_instance.id(); - // Enforce attached disks limit - let attached_disks = self - .instance_list_disks( - opctx, - organization_name, - project_name, - instance_name, - &DataPageParams { - marker: None, - direction: dropshot::PaginationOrder::Ascending, - limit: std::num::NonZeroU32::new(MAX_DISKS_PER_INSTANCE) - .unwrap(), - }, + // TODO(https://github.com/oxidecomputer/omicron/issues/811): + // Disk attach is only implemented for instances that are not + // currently running. This operation therefore can operate exclusively + // on database state. + // + // To implement hot-plug support, we should do the following in a saga: + // - Update the state to "Attaching", rather than "Attached". + // - If the instance is running... + // - Issue a request to "disk attach" to the associated sled agent, + // using the "state generation" value from the moment we attached. + // - Update the DB if the request succeeded (hopefully to "Attached"). + // - If the instance is not running... + // - Update the disk state in the DB to "Attached". + let (_instance, disk) = self + .db_datastore + .instance_attach_disk( + &opctx, + &authz_instance, + &authz_disk, + MAX_DISKS_PER_INSTANCE, ) .await?; - - if attached_disks.len() == MAX_DISKS_PER_INSTANCE as usize { - return Err(Error::invalid_request(&format!( - "cannot attach more than {} disks to instance!", - MAX_DISKS_PER_INSTANCE - ))); - } - - fn disk_attachment_error( - disk: &db::model::Disk, - ) -> CreateResult { - let disk_status = match disk.runtime().state().into() { - DiskState::Destroyed => "disk is destroyed", - DiskState::Faulted => "disk is faulted", - DiskState::Creating => "disk is detached", - DiskState::Detached => "disk is detached", - - // It would be nice to provide a more specific message here, but - // the appropriate identifier to provide the user would be the - // other instance's name. Getting that would require another - // database hit, which doesn't seem worth it for this. - DiskState::Attaching(_) => { - "disk is attached to another instance" - } - DiskState::Attached(_) => { - "disk is attached to another instance" - } - DiskState::Detaching(_) => { - "disk is attached to another instance" - } - }; - let message = format!( - "cannot attach disk \"{}\": {}", - disk.name().as_str(), - disk_status - ); - Err(Error::InvalidRequest { message }) - } - - match &db_disk.state().into() { - // If we're already attaching or attached to the requested instance, - // there's nothing else to do. - // TODO-security should it be an error if you're not authorized to - // do this and we did not actually have to do anything? - DiskState::Attached(id) if id == instance_id => return Ok(db_disk), - - // If the disk is currently attaching or attached to another - // instance, fail this request. Users must explicitly detach first - // if that's what they want. If it's detaching, they have to wait - // for it to become detached. - // TODO-debug: the error message here could be better. We'd have to - // look up the other instance by id (and gracefully handle it not - // existing). - DiskState::Attached(id) => { - assert_ne!(id, instance_id); - return disk_attachment_error(&db_disk); - } - DiskState::Detaching(_) => { - return disk_attachment_error(&db_disk); - } - DiskState::Attaching(id) if id != instance_id => { - return disk_attachment_error(&db_disk); - } - DiskState::Destroyed => { - return disk_attachment_error(&db_disk); - } - DiskState::Faulted => { - return disk_attachment_error(&db_disk); - } - - DiskState::Creating => (), - DiskState::Detached => (), - DiskState::Attaching(id) => { - assert_eq!(id, instance_id); - } - } - - match &db_instance.runtime_state.state.state() { - // If there's a propolis zone for this instance, ask the Sled Agent - // to hot-plug the disk. - // - // TODO this will probably involve volume construction requests as - // well! - InstanceState::Running | InstanceState::Starting => { - self.disk_set_runtime( - opctx, - &authz_disk, - &db_disk, - self.instance_sled(&db_instance).await?, - sled_agent_client::types::DiskStateRequested::Attached( - *instance_id, - ), - ) - .await?; - } - - _ => { - // If there is not a propolis zone, then disk attach only occurs - // in the DB. - let new_runtime = db_disk.runtime().attach(*instance_id); - - self.db_datastore - .disk_update_runtime(opctx, &authz_disk, &new_runtime) - .await?; - } - } - - self.db_datastore.disk_refetch(opctx, &authz_disk).await + Ok(disk) } /// Detach a disk from an instance. @@ -735,83 +581,38 @@ impl super::Nexus { instance_name: &Name, disk_name: &Name, ) -> UpdateResult { - let (.., authz_project, authz_disk, db_disk) = + let (.., authz_project, authz_disk, _) = LookupPath::new(opctx, &self.db_datastore) .organization_name(organization_name) .project_name(project_name) .disk_name(disk_name) .fetch() .await?; - let (.., authz_instance, db_instance) = + let (.., authz_instance, _) = LookupPath::new(opctx, &self.db_datastore) .project_id(authz_project.id()) .instance_name(instance_name) .fetch() .await?; - let instance_id = &authz_instance.id(); - - match &db_disk.state().into() { - // This operation is a noop if the disk is not attached or already - // detaching from the same instance. - // TODO-security should it be an error if you're not authorized to - // do this and we did not actually have to do anything? - DiskState::Creating => return Ok(db_disk), - DiskState::Detached => return Ok(db_disk), - DiskState::Destroyed => return Ok(db_disk), - DiskState::Faulted => return Ok(db_disk), - DiskState::Detaching(id) if id == instance_id => { - return Ok(db_disk) - } - - // This operation is not allowed if the disk is attached to some - // other instance. - DiskState::Attaching(id) if id != instance_id => { - return Err(Error::InvalidRequest { - message: String::from("disk is attached elsewhere"), - }); - } - DiskState::Attached(id) if id != instance_id => { - return Err(Error::InvalidRequest { - message: String::from("disk is attached elsewhere"), - }); - } - DiskState::Detaching(_) => { - return Err(Error::InvalidRequest { - message: String::from("disk is attached elsewhere"), - }); - } - - // These are the cases where we have to do something. - DiskState::Attaching(_) => (), - DiskState::Attached(_) => (), - } - - // If there's a propolis zone for this instance, ask the Sled - // Agent to hot-remove the disk. - match &db_instance.runtime_state.state.state() { - InstanceState::Running | InstanceState::Starting => { - self.disk_set_runtime( - opctx, - &authz_disk, - &db_disk, - self.instance_sled(&db_instance).await?, - sled_agent_client::types::DiskStateRequested::Detached, - ) - .await?; - } - - _ => { - // If there is not a propolis zone, then disk detach only occurs - // in the DB. - let new_runtime = db_disk.runtime().detach(); - - self.db_datastore - .disk_update_runtime(opctx, &authz_disk, &new_runtime) - .await?; - } - } - self.db_datastore.disk_refetch(opctx, &authz_disk).await + // TODO(https://github.com/oxidecomputer/omicron/issues/811): + // Disk detach is only implemented for instances that are not + // currently running. This operation therefore can operate exclusively + // on database state. + // + // To implement hot-unplug support, we should do the following in a saga: + // - Update the state to "Detaching", rather than "Detached". + // - If the instance is running... + // - Issue a request to "disk detach" to the associated sled agent, + // using the "state generation" value from the moment we attached. + // - Update the DB if the request succeeded (hopefully to "Detached"). + // - If the instance is not running... + // - Update the disk state in the DB to "Detached". + let disk = self + .db_datastore + .instance_detach_disk(&opctx, &authz_instance, &authz_disk) + .await?; + Ok(disk) } /// Create a network interface attached to the provided instance. diff --git a/nexus/src/db/collection_attach.rs b/nexus/src/db/collection_attach.rs new file mode 100644 index 00000000000..e008a06c8a8 --- /dev/null +++ b/nexus/src/db/collection_attach.rs @@ -0,0 +1,1494 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! CTE for attaching a resource to a collection. +//! +//! This atomically: +//! - Checks if the collection exists and is not soft deleted +//! - Checks if the resource exists and is not soft deleted +//! - Validates conditions on both the collection and resource +//! - Ensures the number of attached resources does not exceed +//! a provided threshold +//! - Updates the resource row + +use super::cte_utils::{ + BoxableTable, BoxableUpdateStatement, BoxedQuery, ExprSqlType, FilterBy, + QueryFromClause, QuerySqlType, TableDefaultWhereClause, +}; +use super::pool::DbConnection; +use async_bb8_diesel::{AsyncRunQueryDsl, ConnectionManager, PoolError}; +use diesel::associations::HasTable; +use diesel::expression::{AsExpression, Expression}; +use diesel::helper_types::*; +use diesel::pg::Pg; +use diesel::prelude::*; +use diesel::query_builder::*; +use diesel::query_dsl::methods as query_methods; +use diesel::query_source::Table; +use diesel::sql_types::{BigInt, Nullable, SingleValue}; +use std::fmt::Debug; + +/// A collection of type aliases particularly relevant to collection-based CTEs. +pub(crate) mod aliases { + use super::{ + Column, DatastoreAttachTarget, Table, TableDefaultWhereClause, + }; + + /// The table representing the collection. The resource references + /// this table. + pub type CollectionTable = <>::CollectionIdColumn as Column>::Table; + /// The table representing the resource. This table contains an + /// ID acting as a foreign key into the collection table. + pub type ResourceTable = <>::ResourceIdColumn as Column>::Table; + + /// The default WHERE clause of the collection table. + pub type CollectionTableDefaultWhereClause = + TableDefaultWhereClause>; + /// The default WHERE clause of the resource table. + pub type ResourceTableDefaultWhereClause = + TableDefaultWhereClause>; + + pub type CollectionIdColumn = + >::CollectionIdColumn; + pub type ResourceIdColumn = + >::ResourceIdColumn; + + /// Representation of Primary Key in Rust. + pub type CollectionPrimaryKey = + as Table>::PrimaryKey; + pub type ResourcePrimaryKey = + as Table>::PrimaryKey; + pub type ResourceForeignKey = + >::ResourceCollectionIdColumn; + + /// Representation of Primary Key in SQL. + pub type SerializedCollectionPrimaryKey = + as diesel::Expression>::SqlType; + pub type SerializedResourcePrimaryKey = + as diesel::Expression>::SqlType; + pub type SerializedResourceForeignKey = + as diesel::Expression>::SqlType; +} + +use aliases::*; + +/// Trait to be implemented by structs representing an attachable collection. +/// +/// For example, since Instances have a one-to-many relationship with +/// Disks, the Instance datatype should implement this trait. +/// ``` +/// # use diesel::prelude::*; +/// # use omicron_nexus::db::collection_attach::DatastoreAttachTarget; +/// # +/// # table! { +/// # test_schema.instance (id) { +/// # id -> Uuid, +/// # time_deleted -> Nullable, +/// # } +/// # } +/// # +/// # table! { +/// # test_schema.disk (id) { +/// # id -> Uuid, +/// # time_deleted -> Nullable, +/// # instance_id -> Nullable, +/// # } +/// # } +/// +/// #[derive(Queryable, Debug, Selectable)] +/// #[diesel(table_name = disk)] +/// struct Disk { +/// pub id: uuid::Uuid, +/// pub time_deleted: Option>, +/// pub instance_id: Option, +/// } +/// +/// #[derive(Queryable, Debug, Selectable)] +/// #[diesel(table_name = instance)] +/// struct Instance { +/// pub id: uuid::Uuid, +/// pub time_deleted: Option>, +/// } +/// +/// impl DatastoreAttachTarget for Instance { +/// // Type of instance::id and disk::id. +/// type Id = uuid::Uuid; +/// +/// type CollectionIdColumn = instance::dsl::id; +/// type CollectionTimeDeletedColumn = instance::dsl::time_deleted; +/// +/// type ResourceIdColumn = disk::dsl::id; +/// type ResourceCollectionIdColumn = disk::dsl::instance_id; +/// type ResourceTimeDeletedColumn = disk::dsl::time_deleted; +/// } +/// ``` +pub trait DatastoreAttachTarget: Selectable + Sized { + /// The Rust type of the collection and resource ids (typically Uuid). + type Id: Copy + Debug + PartialEq + Send + 'static; + + /// The primary key column of the collection. + type CollectionIdColumn: Column; + + /// The time deleted column in the CollectionTable + type CollectionTimeDeletedColumn: Column::Table> + + Default + + ExpressionMethods; + + /// The primary key column of the resource + type ResourceIdColumn: Column; + + /// The column in the resource acting as a foreign key into the Collection + type ResourceCollectionIdColumn: Column
::Table> + + Default + + ExpressionMethods; + + /// The time deleted column in the ResourceTable + type ResourceTimeDeletedColumn: Column
::Table> + + Default + + ExpressionMethods; + + /// Creates a statement for attaching a resource to the given collection. + /// + /// This statement allows callers to atomically check the state of a + /// collection and a resource while attaching a resource to a collection. + /// + /// - `collection_id`: Primary key of the collection being inserted into. + /// - `resource_id`: Primary key of the resource being attached. + /// - `collection_query`: An optional query for collection state. The + /// CTE will automatically filter this query to `collection_id`, and + /// validate that the "time deleted" column is NULL. + /// - `resource_query`: An optional query for the resource state. The + /// CTE will automatically filter this query to `resource_id`, + /// validate that the "time deleted" column is NULL, and validate that the + /// "collection_id" column is NULL. + /// - `max_attached_resources`: The maximum number of non-deleted + /// resources which are permitted to have their "collection_id" column + /// set to the value of `collection_id`. If attaching `resource_id` would + /// cross this threshold, the update is aborted. + /// - `update`: An update statement, identifying how the resource object + /// should be modified to be attached. + /// + /// The V type refers to the "update target" of the UpdateStatement, + /// and should generally be inferred rather than explicitly specified. + fn attach_resource( + collection_id: Self::Id, + resource_id: Self::Id, + + collection_query: BoxedQuery>, + resource_query: BoxedQuery>, + + max_attached_resources: u32, + + // We are intentionally picky about this update statement: + // - The second argument - the WHERE clause - must match the default + // for the table. This encourages the "resource_query" filter to be + // used instead, and makes it possible for the CTE to modify the + // filter here (ensuring "resource_id" is selected). + // - Additionally, UpdateStatement's fourth argument defaults to Ret = + // NoReturningClause. This enforces that the given input statement does + // not have a RETURNING clause, and also lets the CTE control this + // value. + update: UpdateStatement< + ResourceTable, + ResourceTableDefaultWhereClause, + V, + >, + ) -> AttachToCollectionStatement + where + // Treat the collection and resource as boxed tables. + CollectionTable: BoxableTable, + ResourceTable: BoxableTable, + + // Allows treating "collection_exists_query" as a boxed "dyn QueryFragment". + QueryFromClause>: + QueryFragment + Send, + // Allows treating "resource_exists_query" as a boxed "dyn QueryFragment". + QueryFromClause>: + QueryFragment + Send, + // Allows sending "collection_exists_query" between threads. + QuerySqlType>: Send, + // Allows sending "resource_exists_query" between threads. + QuerySqlType>: Send, + // Allows calling ".filter()" on the boxed collection table. + BoxedQuery>: FilterBy, Self::Id>> + + FilterBy>, + // Allows calling ".filter()" on the boxed resource table. + BoxedQuery>: FilterBy, Self::Id>> + + FilterBy> + + FilterBy> + + FilterBy>, + + // Allows calling "update.into_boxed()" + UpdateStatement< + ResourceTable, + ResourceTableDefaultWhereClause, + V, + >: BoxableUpdateStatement, V>, + // Allows calling + // ".filter(resource_table().primary_key().eq(resource_id)" on the + // boxed update statement. + BoxedUpdateStatement<'static, Pg, ResourceTable, V>: + FilterBy, Self::Id>>, + + // Allows using "id" in expressions (e.g. ".eq(...)") with... + Self::Id: AsExpression< + // ... The Collection table's PK + SerializedCollectionPrimaryKey, + > + AsExpression< + // ... The Resource table's PK + SerializedResourcePrimaryKey, + > + AsExpression< + // ... The Resource table's FK to the Collection table + SerializedResourceForeignKey, + >, + ExprSqlType>: SingleValue, + ExprSqlType>: SingleValue, + ExprSqlType: SingleValue, + + // Necessary to actually select the resource in the output type. + ResourceType: Selectable, + { + let collection_table = + || as HasTable>::table(); + let resource_table = + || as HasTable>::table(); + + // Create new queries to determine if the collection and resources + // already exist. + let collection_exists_query = Box::new( + collection_table() + .into_boxed() + .filter(collection_table().primary_key().eq(collection_id)) + .filter(Self::CollectionTimeDeletedColumn::default().is_null()), + ); + let resource_exists_query = Box::new( + resource_table() + .into_boxed() + .filter(resource_table().primary_key().eq(resource_id)) + .filter(Self::ResourceTimeDeletedColumn::default().is_null()), + ); + + // Additionally, construct a new query to count the number of + // already attached resources. + let resource_count_query = Box::new( + resource_table() + .into_boxed() + .filter( + Self::ResourceCollectionIdColumn::default() + .eq(collection_id), + ) + .filter(Self::ResourceTimeDeletedColumn::default().is_null()) + .count(), + ); + + // For the queries which decide whether or not we'll perform the update, + // extend the user-provided arguments. + // + // We force these queries to: + // - Check against the primary key of the target objects + // - Ensure the objects are not deleted + // - (for the resource) Ensure it is not already attached + // - (for the update) Ensure that only the resource with "resource_id" + // is modified. + let collection_query = Box::new( + collection_query + .filter(collection_table().primary_key().eq(collection_id)) + .filter(Self::CollectionTimeDeletedColumn::default().is_null()), + ); + let resource_query = Box::new( + resource_query + .filter(resource_table().primary_key().eq(resource_id)) + .filter(Self::ResourceTimeDeletedColumn::default().is_null()) + .filter(Self::ResourceCollectionIdColumn::default().is_null()), + ); + + let update_resource_statement = update + .into_boxed() + .filter(resource_table().primary_key().eq(resource_id)); + + let resource_returning_clause = ResourceType::as_returning(); + AttachToCollectionStatement { + collection_exists_query, + resource_exists_query, + resource_count_query, + collection_query, + resource_query, + max_attached_resources, + update_resource_statement, + resource_returning_clause, + } + } +} + +/// The CTE described in the module docs +#[must_use = "Queries must be executed"] +pub struct AttachToCollectionStatement +where + ResourceType: Selectable, + C: DatastoreAttachTarget, +{ + // Query which answers: "Does the collection exist?" + collection_exists_query: Box + Send>, + // Query which answers: "Does the resource exist?" + resource_exists_query: Box + Send>, + // Query which answers: "How many resources are associated with the + // collection?" + resource_count_query: Box + Send>, + // A (mostly) user-provided query for validating the collection. + collection_query: Box + Send>, + // A (mostly) user-provided query for validating the resource. + resource_query: Box + Send>, + // The maximum number of resources which may be attached to the collection. + max_attached_resources: u32, + + // Update statement for the resource. + update_resource_statement: + BoxedUpdateStatement<'static, Pg, ResourceTable, V>, + // Describes what should be returned after UPDATE-ing the resource. + resource_returning_clause: AsSelect, +} + +impl QueryId + for AttachToCollectionStatement +where + ResourceType: Selectable, + C: DatastoreAttachTarget, +{ + type QueryId = (); + const HAS_STATIC_QUERY_ID: bool = false; +} + +/// Result of [`AttachToCollectionStatement`] when executed asynchronously +pub type AsyncAttachToCollectionResult = + Result<(C, ResourceType), AttachError>; + +/// Result of [`AttachToCollectionStatement`] when executed synchronously +pub type SyncAttachToCollectionResult = Result< + (C, ResourceType), + AttachError, +>; + +/// Errors returned by [`AttachToCollectionStatement`]. +#[derive(Debug)] +pub enum AttachError { + /// The collection that the query was inserting into does not exist + CollectionNotFound, + /// The resource being attached does not exist + ResourceNotFound, + /// Although the resource and collection exist, the update did not occur + /// + /// The unchanged resource and collection are returned as a part of this + /// error; it is the responsibility of the caller to determine which + /// condition was not met. + NoUpdate { attached_count: i64, resource: ResourceType, collection: C }, + /// Other database error + DatabaseError(E), +} + +/// Describes the type returned from the actual CTE, which is parsed +/// and interpreted before propagating it to users of the Rust API. +pub type RawOutput = + (i64, Option, Option, Option); + +impl AttachToCollectionStatement +where + ResourceType: 'static + Debug + Send + Selectable, + C: 'static + Debug + DatastoreAttachTarget + Send, + ResourceTable: 'static + Table + Send + Copy + Debug, + V: 'static + Send, + AttachToCollectionStatement: Send, +{ + /// Issues the CTE asynchronously and parses the result. + pub async fn attach_and_get_result_async( + self, + pool: &bb8::Pool>, + ) -> AsyncAttachToCollectionResult + where + // We require this bound to ensure that "Self" is runnable as query. + Self: query_methods::LoadQuery< + 'static, + DbConnection, + RawOutput, + >, + { + self.get_result_async::>(pool) + .await + // If the database returns an error, propagate it right away. + .map_err(AttachError::DatabaseError) + // Otherwise, parse the output to determine if the CTE succeeded. + .and_then(Self::parse_result) + } + + /// Issues the CTE synchronously and parses the result. + pub fn attach_and_get_result( + self, + conn: &mut DbConnection, + ) -> SyncAttachToCollectionResult + where + // We require this bound to ensure that "Self" is runnable as query. + Self: query_methods::LoadQuery< + 'static, + DbConnection, + RawOutput, + >, + { + self.get_result::>(conn) + .map_err(AttachError::DatabaseError) + .and_then(Self::parse_result) + } + + fn parse_result( + result: RawOutput, + ) -> Result<(C, ResourceType), AttachError> { + let ( + attached_count, + collection_before_update, + resource_before_update, + resource_after_update, + ) = result; + + let collection_before_update = collection_before_update + .ok_or_else(|| AttachError::CollectionNotFound)?; + + let resource_before_update = resource_before_update + .ok_or_else(|| AttachError::ResourceNotFound)?; + + match resource_after_update { + Some(resource) => Ok((collection_before_update, resource)), + None => Err(AttachError::NoUpdate { + attached_count, + resource: resource_before_update, + collection: collection_before_update, + }), + } + } +} + +type SelectableSqlType = + <>::SelectExpression as Expression>::SqlType; + +impl Query + for AttachToCollectionStatement +where + ResourceType: Selectable, + C: DatastoreAttachTarget, +{ + type SqlType = ( + // The number of resources attached to the collection before update. + BigInt, + // If the collection exists, the value before update. + Nullable>, + // If the resource exists, the value before update. + Nullable>, + // If the resource was updated, the new value. + Nullable>, + ); +} + +impl RunQueryDsl + for AttachToCollectionStatement +where + ResourceType: Selectable, + C: DatastoreAttachTarget, +{ +} + +/// This implementation uses a CTE which attempts to do the following: +/// +/// 1. (collection_by_id, resource_by_id): Identify if the collection and +/// resource objects exist at all. +/// 2. (resource_count): Identify if the number of resources already attached to +/// the collection exceeds a threshold. +/// 3. (collection_info, resource_info): Checks for arbitrary user-provided +/// constraints on the collection and resource objects. +/// 4. (do_update): IFF all previous checks succeeded, make a decision to perfom +/// an update. +/// 5. (updated_resource): Apply user-provided updates on the resource - +/// presumably, setting the collection ID value. +/// +/// This is implemented as follows: +/// +/// ```text +/// // WITH +/// // /* Look up the collection - Check for existence only! */ +/// // collection_by_id AS ( +/// // SELECT * FROM C +/// // WHERE = AND IS NULL +/// // FOR UPDATE +/// // ), +/// // /* Look up the resource - Check for existence only! */ +/// // resource_by_id AS ( +/// // SELECT * FROM R +/// // WHERE = AND IS NULL +/// // FOR UPDATE +/// // ), +/// // /* Count the number of attached resources */ +/// // resource_count AS ( +/// // SELECT COUNT(*) FROM R +/// // WHERE = AND IS NULL +/// // ), +/// // /* Look up the collection - Check for additional constraints */ +/// // collection_info AS ( +/// // SELECT * FROM C +/// // WHERE = AND IS NULL AND +/// // +/// // FOR UPDATE +/// // ), +/// // /* Look up the resource - Check for additional constraints */ +/// // resource_info AS ( +/// // SELECT * FROM R +/// // WHERE = AND IS NULL AND +/// // IS NULL AND +/// // FOR UPDATE +/// // ), +/// // /* Make a decision on whether or not to apply ANY updates */ +/// // do_update AS ( +/// // SELECT IF( +/// // EXISTS(SELECT id FROM collection_info) AND +/// // EXISTS(SELECT id FROM resource_info) AND +/// // (SELECT * FROM resource_count) < , +/// // TRUE, FALSE), +/// // ), +/// // /* Update the resource */ +/// // updated_resource AS ( +/// // UPDATE R SET +/// // WHERE IN (SELECT FROM resource_info) AND (SELECT * FROM do_update) +/// // RETURNING * +/// // ) +/// // SELECT * FROM +/// // (SELECT * FROM resource_count) +/// // LEFT JOIN (SELECT * FROM collection_by_id) ON TRUE +/// // LEFT JOIN (SELECT * FROM resource_by_id) ON TRUE +/// // LEFT JOIN (SELECT * FROM updated_resource) ON TRUE; +/// ``` +impl QueryFragment + for AttachToCollectionStatement +where + ResourceType: Selectable, + C: DatastoreAttachTarget, + CollectionPrimaryKey: diesel::Column, + // Necessary to "walk_ast" over "self.update_resource_statement". + BoxedUpdateStatement<'static, Pg, ResourceTable, V>: + QueryFragment, + // Necessary to "walk_ast" over "self.resource_returning_clause". + AsSelect: QueryFragment, +{ + fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> { + out.unsafe_to_cache_prepared(); + out.push_sql("WITH collection_by_id AS ("); + self.collection_exists_query.walk_ast(out.reborrow())?; + out.push_sql(" FOR UPDATE), "); + + out.push_sql("resource_by_id AS ("); + self.resource_exists_query.walk_ast(out.reborrow())?; + out.push_sql(" FOR UPDATE), "); + + out.push_sql("resource_count AS ("); + self.resource_count_query.walk_ast(out.reborrow())?; + out.push_sql("), "); + + out.push_sql("collection_info AS ("); + self.collection_query.walk_ast(out.reborrow())?; + out.push_sql(" FOR UPDATE), "); + + out.push_sql("resource_info AS ("); + self.resource_query.walk_ast(out.reborrow())?; + out.push_sql(" FOR UPDATE), "); + + out.push_sql("do_update AS (SELECT IF(EXISTS(SELECT "); + out.push_identifier(CollectionIdColumn::::NAME)?; + out.push_sql(" FROM collection_info) AND EXISTS(SELECT "); + out.push_identifier(ResourceIdColumn::::NAME)?; + out.push_sql( + &format!(" FROM resource_info) AND (SELECT * FROM resource_count) < {}, TRUE,FALSE)), ", + self.max_attached_resources) + ); + + out.push_sql("updated_resource AS ("); + self.update_resource_statement.walk_ast(out.reborrow())?; + // NOTE: It is safe to start with "AND" - we forced the update statement + // to have a WHERE clause on the primary key of the resource. + out.push_sql(" AND (SELECT * FROM do_update)"); + out.push_sql(" RETURNING "); + self.resource_returning_clause.walk_ast(out.reborrow())?; + out.push_sql(") "); + + // Why do all these LEFT JOINs here? In short, to ensure that we are + // always returning a constant number of columns. + // + // Diesel parses output "one column at a time", mapping to structs or + // tuples. For example, when deserializing an "Option<(A, B, C)>" object, + // Diesel checks nullability of the "A", "B", and "C" columns. + // If any of those columns unexpectedly return NULL, the entire object is + // treated as "None". + // + // In summary: + // - Without the LEFT JOINs, we'd occassionally be returning "zero + // rows", which would make the output entirely unparseable. + // - If we used an operation like COALESCE (which attempts to map the + // result of an expression to either "NULL" or a single tuple column), + // Diesel struggles to map the result back to a structure. + // + // By returning a static number of columns, each component of the + // "RawOutput" tuple can be parsed, regardless of nullability, without + // preventing later portions of the result from being parsed. + out.push_sql( + "SELECT * FROM \ + (SELECT * FROM resource_count) \ + LEFT JOIN (SELECT * FROM collection_by_id) ON TRUE \ + LEFT JOIN (SELECT * FROM resource_by_id) ON TRUE \ + LEFT JOIN (SELECT * FROM updated_resource) ON TRUE;", + ); + + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::{AttachError, DatastoreAttachTarget}; + use crate::db::{ + self, error::TransactionError, identity::Resource as IdentityResource, + }; + use async_bb8_diesel::{ + AsyncConnection, AsyncRunQueryDsl, AsyncSimpleConnection, + }; + use chrono::Utc; + use db_macros::Resource; + use diesel::expression_methods::ExpressionMethods; + use diesel::pg::Pg; + use diesel::QueryDsl; + use diesel::SelectableHelper; + use nexus_test_utils::db::test_setup_database; + use omicron_common::api::external::{IdentityMetadataCreateParams, Name}; + use omicron_test_utils::dev; + use uuid::Uuid; + + table! { + test_schema.collection (id) { + id -> Uuid, + name -> Text, + description -> Text, + time_created -> Timestamptz, + time_modified -> Timestamptz, + time_deleted -> Nullable, + } + } + + table! { + test_schema.resource (id) { + id -> Uuid, + name -> Text, + description -> Text, + time_created -> Timestamptz, + time_modified -> Timestamptz, + time_deleted -> Nullable, + collection_id -> Nullable, + } + } + + async fn setup_db(pool: &crate::db::Pool) { + let connection = pool.pool().get().await.unwrap(); + (*connection) + .batch_execute_async( + "CREATE SCHEMA IF NOT EXISTS test_schema; \ + CREATE TABLE IF NOT EXISTS test_schema.collection ( \ + id UUID PRIMARY KEY, \ + name STRING(63) NOT NULL, \ + description STRING(512) NOT NULL, \ + time_created TIMESTAMPTZ NOT NULL, \ + time_modified TIMESTAMPTZ NOT NULL, \ + time_deleted TIMESTAMPTZ); \ + CREATE TABLE IF NOT EXISTS test_schema.resource( \ + id UUID PRIMARY KEY, \ + name STRING(63) NOT NULL, \ + description STRING(512) NOT NULL, \ + time_created TIMESTAMPTZ NOT NULL, \ + time_modified TIMESTAMPTZ NOT NULL, \ + time_deleted TIMESTAMPTZ, \ + collection_id UUID); \ + CREATE INDEX IF NOT EXISTS collection_index ON test_schema.resource ( \ + collection_id \ + ) WHERE collection_id IS NOT NULL AND time_deleted IS NULL; \ + TRUNCATE test_schema.collection; \ + TRUNCATE test_schema.resource", + ) + .await + .unwrap(); + } + + /// Describes a resource within the database. + #[derive( + Clone, Queryable, Insertable, Debug, Resource, Selectable, PartialEq, + )] + #[diesel(table_name = resource)] + struct Resource { + #[diesel(embed)] + pub identity: ResourceIdentity, + pub collection_id: Option, + } + + #[derive( + Clone, Queryable, Insertable, Debug, Resource, Selectable, PartialEq, + )] + #[diesel(table_name = collection)] + struct Collection { + #[diesel(embed)] + pub identity: CollectionIdentity, + } + + impl DatastoreAttachTarget for Collection { + type Id = uuid::Uuid; + + type CollectionIdColumn = collection::dsl::id; + type CollectionTimeDeletedColumn = collection::dsl::time_deleted; + + type ResourceIdColumn = resource::dsl::id; + type ResourceCollectionIdColumn = resource::dsl::collection_id; + type ResourceTimeDeletedColumn = resource::dsl::time_deleted; + } + + async fn insert_collection( + id: Uuid, + name: &str, + pool: &db::Pool, + ) -> Collection { + let create_params = IdentityMetadataCreateParams { + name: Name::try_from(name.to_string()).unwrap(), + description: "description".to_string(), + }; + let c = + Collection { identity: CollectionIdentity::new(id, create_params) }; + + diesel::insert_into(collection::table) + .values(c) + .execute_async(pool.pool()) + .await + .unwrap(); + + get_collection(id, &pool).await + } + + async fn get_collection(id: Uuid, pool: &db::Pool) -> Collection { + collection::table + .find(id) + .select(Collection::as_select()) + .first_async(pool.pool()) + .await + .unwrap() + } + + async fn insert_resource( + id: Uuid, + name: &str, + pool: &db::Pool, + ) -> Resource { + let create_params = IdentityMetadataCreateParams { + name: Name::try_from(name.to_string()).unwrap(), + description: "description".to_string(), + }; + let r = Resource { + identity: ResourceIdentity::new(id, create_params), + collection_id: None, + }; + + diesel::insert_into(resource::table) + .values(r) + .execute_async(pool.pool()) + .await + .unwrap(); + + get_resource(id, &pool).await + } + + async fn get_resource(id: Uuid, pool: &db::Pool) -> Resource { + resource::table + .find(id) + .select(Resource::as_select()) + .first_async(pool.pool()) + .await + .unwrap() + } + + #[test] + fn test_verify_query() { + let collection_id = + uuid::Uuid::parse_str("cccccccc-cccc-cccc-cccc-cccccccccccc") + .unwrap(); + let resource_id = + uuid::Uuid::parse_str("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa") + .unwrap(); + let attach = Collection::attach_resource( + collection_id, + resource_id, + collection::table.into_boxed(), + resource::table.into_boxed(), + 12345, + diesel::update(resource::table) + .set(resource::dsl::collection_id.eq(collection_id)), + ); + let query = diesel::debug_query::(&attach).to_string(); + + let expected_query = "WITH \ + collection_by_id AS (\ + SELECT \ + \"test_schema\".\"collection\".\"id\", \ + \"test_schema\".\"collection\".\"name\", \ + \"test_schema\".\"collection\".\"description\", \ + \"test_schema\".\"collection\".\"time_created\", \ + \"test_schema\".\"collection\".\"time_modified\", \ + \"test_schema\".\"collection\".\"time_deleted\" \ + FROM \"test_schema\".\"collection\" \ + WHERE (\ + (\"test_schema\".\"collection\".\"id\" = $1) AND \ + (\"test_schema\".\"collection\".\"time_deleted\" IS NULL)\ + ) FOR UPDATE\ + ), \ + resource_by_id AS (\ + SELECT \ + \"test_schema\".\"resource\".\"id\", \ + \"test_schema\".\"resource\".\"name\", \ + \"test_schema\".\"resource\".\"description\", \ + \"test_schema\".\"resource\".\"time_created\", \ + \"test_schema\".\"resource\".\"time_modified\", \ + \"test_schema\".\"resource\".\"time_deleted\", \ + \"test_schema\".\"resource\".\"collection_id\" \ + FROM \"test_schema\".\"resource\" \ + WHERE (\ + (\"test_schema\".\"resource\".\"id\" = $2) AND \ + (\"test_schema\".\"resource\".\"time_deleted\" IS NULL)\ + ) FOR UPDATE\ + ), \ + resource_count AS (\ + SELECT COUNT(*) \ + FROM \"test_schema\".\"resource\" \ + WHERE (\ + (\"test_schema\".\"resource\".\"collection_id\" = $3) AND \ + (\"test_schema\".\"resource\".\"time_deleted\" IS NULL)\ + )\ + ), \ + collection_info AS (\ + SELECT \ + \"test_schema\".\"collection\".\"id\", \ + \"test_schema\".\"collection\".\"name\", \ + \"test_schema\".\"collection\".\"description\", \ + \"test_schema\".\"collection\".\"time_created\", \ + \"test_schema\".\"collection\".\"time_modified\", \ + \"test_schema\".\"collection\".\"time_deleted\" \ + FROM \"test_schema\".\"collection\" \ + WHERE (\ + (\"test_schema\".\"collection\".\"id\" = $4) AND \ + (\"test_schema\".\"collection\".\"time_deleted\" IS NULL)\ + ) FOR UPDATE\ + ), \ + resource_info AS (\ + SELECT \ + \"test_schema\".\"resource\".\"id\", \ + \"test_schema\".\"resource\".\"name\", \ + \"test_schema\".\"resource\".\"description\", \ + \"test_schema\".\"resource\".\"time_created\", \ + \"test_schema\".\"resource\".\"time_modified\", \ + \"test_schema\".\"resource\".\"time_deleted\", \ + \"test_schema\".\"resource\".\"collection_id\" \ + FROM \"test_schema\".\"resource\" \ + WHERE ((\ + (\"test_schema\".\"resource\".\"id\" = $5) AND \ + (\"test_schema\".\"resource\".\"time_deleted\" IS NULL)) AND \ + (\"test_schema\".\"resource\".\"collection_id\" IS NULL)\ + ) FOR UPDATE\ + ), \ + do_update AS (\ + SELECT IF(\ + EXISTS(SELECT \"id\" FROM collection_info) AND \ + EXISTS(SELECT \"id\" FROM resource_info) AND \ + (SELECT * FROM resource_count) < 12345, \ + TRUE,\ + FALSE)\ + ), \ + updated_resource AS (\ + UPDATE \ + \"test_schema\".\"resource\" \ + SET \ + \"collection_id\" = $6 \ + WHERE \ + (\"test_schema\".\"resource\".\"id\" = $7) AND \ + (SELECT * FROM do_update) \ + RETURNING \ + \"test_schema\".\"resource\".\"id\", \ + \"test_schema\".\"resource\".\"name\", \ + \"test_schema\".\"resource\".\"description\", \ + \"test_schema\".\"resource\".\"time_created\", \ + \"test_schema\".\"resource\".\"time_modified\", \ + \"test_schema\".\"resource\".\"time_deleted\", \ + \"test_schema\".\"resource\".\"collection_id\"\ + ) \ + SELECT * FROM \ + (SELECT * FROM resource_count) \ + LEFT JOIN (SELECT * FROM collection_by_id) ON TRUE \ + LEFT JOIN (SELECT * FROM resource_by_id) ON TRUE \ + LEFT JOIN (SELECT * FROM updated_resource) ON TRUE; -- binds: [cccccccc-cccc-cccc-cccc-cccccccccccc, aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa, cccccccc-cccc-cccc-cccc-cccccccccccc, cccccccc-cccc-cccc-cccc-cccccccccccc, aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa, cccccccc-cccc-cccc-cccc-cccccccccccc, aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa]"; + assert_eq!(query, expected_query); + } + + #[tokio::test] + async fn test_attach_missing_collection_fails() { + let logctx = + dev::test_setup_log("test_attach_missing_collection_fails"); + let mut db = test_setup_database(&logctx.log).await; + let cfg = db::Config { url: db.pg_config().clone() }; + let pool = db::Pool::new(&cfg); + + setup_db(&pool).await; + + let collection_id = uuid::Uuid::new_v4(); + let resource_id = uuid::Uuid::new_v4(); + let attach = Collection::attach_resource( + collection_id, + resource_id, + collection::table.into_boxed(), + resource::table.into_boxed(), + 10, + diesel::update(resource::table) + .set(resource::dsl::collection_id.eq(collection_id)), + ) + .attach_and_get_result_async(pool.pool()) + .await; + + assert!(matches!(attach, Err(AttachError::CollectionNotFound))); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_attach_missing_resource_fails() { + let logctx = dev::test_setup_log("test_attach_missing_resource_fails"); + let mut db = test_setup_database(&logctx.log).await; + let cfg = db::Config { url: db.pg_config().clone() }; + let pool = db::Pool::new(&cfg); + + setup_db(&pool).await; + + let collection_id = uuid::Uuid::new_v4(); + let resource_id = uuid::Uuid::new_v4(); + + // Create the collection + let collection = + insert_collection(collection_id, "collection", &pool).await; + + // Attempt to attach - even though the resource does not exist. + let attach = Collection::attach_resource( + collection_id, + resource_id, + collection::table.into_boxed(), + resource::table.into_boxed(), + 10, + diesel::update(resource::table) + .set(resource::dsl::collection_id.eq(collection_id)), + ) + .attach_and_get_result_async(pool.pool()) + .await; + + assert!(matches!(attach, Err(AttachError::ResourceNotFound))); + // The collection should remain unchanged. + assert_eq!(collection, get_collection(collection_id, &pool).await); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_attach_once() { + let logctx = dev::test_setup_log("test_attach_once"); + let mut db = test_setup_database(&logctx.log).await; + let cfg = db::Config { url: db.pg_config().clone() }; + let pool = db::Pool::new(&cfg); + + setup_db(&pool).await; + + let collection_id = uuid::Uuid::new_v4(); + let resource_id = uuid::Uuid::new_v4(); + + // Create the collection and resource. + let _collection = + insert_collection(collection_id, "collection", &pool).await; + let _resource = insert_resource(resource_id, "resource", &pool).await; + + // Attach the resource to the collection. + let attach = Collection::attach_resource( + collection_id, + resource_id, + collection::table.into_boxed(), + resource::table.into_boxed(), + 10, + diesel::update(resource::table) + .set(resource::dsl::collection_id.eq(collection_id)), + ) + .attach_and_get_result_async(pool.pool()) + .await; + + // "attach_and_get_result_async" should return the "attached" resource. + let (returned_collection, returned_resource) = + attach.expect("Attach should have worked"); + assert_eq!( + returned_resource.collection_id.expect("Expected a collection ID"), + collection_id + ); + // The returned value should be the latest value in the DB. + assert_eq!( + returned_collection, + get_collection(collection_id, &pool).await + ); + assert_eq!(returned_resource, get_resource(resource_id, &pool).await); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_attach_once_synchronous() { + let logctx = dev::test_setup_log("test_attach_once_synchronous"); + let mut db = test_setup_database(&logctx.log).await; + let cfg = db::Config { url: db.pg_config().clone() }; + let pool = db::Pool::new(&cfg); + + setup_db(&pool).await; + + let collection_id = uuid::Uuid::new_v4(); + let resource_id = uuid::Uuid::new_v4(); + + // Create the collection and resource. + let _collection = + insert_collection(collection_id, "collection", &pool).await; + let _resource = insert_resource(resource_id, "resource", &pool).await; + + // Attach the resource to the collection. + let attach_query = Collection::attach_resource( + collection_id, + resource_id, + collection::table.into_boxed(), + resource::table.into_boxed(), + 10, + diesel::update(resource::table) + .set(resource::dsl::collection_id.eq(collection_id)), + ); + + type TxnError = TransactionError< + AttachError, + >; + let result = pool + .pool() + .transaction(move |conn| { + attach_query.attach_and_get_result(conn).map_err(|e| match e { + AttachError::DatabaseError(e) => TxnError::from(e), + e => TxnError::CustomError(e), + }) + }) + .await; + + // "attach_and_get_result" should return the "attached" resource. + let (returned_collection, returned_resource) = + result.expect("Attach should have worked"); + assert_eq!( + returned_resource.collection_id.expect("Expected a collection ID"), + collection_id + ); + // The returned values should be the latest value in the DB. + assert_eq!( + returned_collection, + get_collection(collection_id, &pool).await + ); + assert_eq!(returned_resource, get_resource(resource_id, &pool).await); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_attach_multiple_times() { + let logctx = dev::test_setup_log("test_attach_multiple_times"); + let mut db = test_setup_database(&logctx.log).await; + let cfg = db::Config { url: db.pg_config().clone() }; + let pool = db::Pool::new(&cfg); + + setup_db(&pool).await; + + const RESOURCE_COUNT: u32 = 5; + + let collection_id = uuid::Uuid::new_v4(); + + // Create the collection. + let _collection = + insert_collection(collection_id, "collection", &pool).await; + + // Create each resource, attaching them to the collection. + for i in 0..RESOURCE_COUNT { + let resource_id = uuid::Uuid::new_v4(); + insert_resource(resource_id, &format!("resource{}", i), &pool) + .await; + + // Attach the resource to the collection. + let attach = Collection::attach_resource( + collection_id, + resource_id, + collection::table.into_boxed(), + resource::table.into_boxed(), + RESOURCE_COUNT, + diesel::update(resource::table) + .set(resource::dsl::collection_id.eq(collection_id)), + ) + .attach_and_get_result_async(pool.pool()) + .await; + + // "attach_and_get_result_async" should return the "attached" resource. + let (_, returned_resource) = + attach.expect("Attach should have worked"); + assert_eq!( + returned_resource + .collection_id + .expect("Expected a collection ID"), + collection_id + ); + // The returned resource value should be the latest value in the DB. + assert_eq!( + returned_resource, + get_resource(resource_id, &pool).await + ); + } + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_attach_beyond_capacity_fails() { + let logctx = dev::test_setup_log("test_attach_beyond_capacity_fails"); + let mut db = test_setup_database(&logctx.log).await; + let cfg = db::Config { url: db.pg_config().clone() }; + let pool = db::Pool::new(&cfg); + + setup_db(&pool).await; + + let collection_id = uuid::Uuid::new_v4(); + + // Attach a resource to a collection, as usual. + let _collection = + insert_collection(collection_id, "collection", &pool).await; + let resource_id1 = uuid::Uuid::new_v4(); + let _resource = insert_resource(resource_id1, "resource1", &pool).await; + let attach = Collection::attach_resource( + collection_id, + resource_id1, + collection::table.into_boxed(), + resource::table.into_boxed(), + 1, + diesel::update(resource::table) + .set(resource::dsl::collection_id.eq(collection_id)), + ) + .attach_and_get_result_async(pool.pool()) + .await; + assert_eq!( + attach.expect("Attach should have worked").1.id(), + resource_id1 + ); + + // Let's try attaching a second resource, now that we're at capacity. + let resource_id2 = uuid::Uuid::new_v4(); + let _resource = insert_resource(resource_id2, "resource2", &pool).await; + let attach = Collection::attach_resource( + collection_id, + resource_id2, + collection::table.into_boxed(), + resource::table.into_boxed(), + 1, + diesel::update(resource::table) + .set(resource::dsl::collection_id.eq(collection_id)), + ) + .attach_and_get_result_async(pool.pool()) + .await; + + let err = attach.expect_err("Should have failed to attach"); + match err { + AttachError::NoUpdate { attached_count, resource, collection } => { + assert_eq!(attached_count, 1); + assert_eq!(resource, get_resource(resource_id2, &pool).await); + assert_eq!( + collection, + get_collection(collection_id, &pool).await + ); + } + _ => panic!("Unexpected error: {:?}", err), + }; + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_attach_while_already_attached() { + let logctx = dev::test_setup_log("test_attach_while_already_attached"); + let mut db = test_setup_database(&logctx.log).await; + let cfg = db::Config { url: db.pg_config().clone() }; + let pool = db::Pool::new(&cfg); + + setup_db(&pool).await; + + let collection_id = uuid::Uuid::new_v4(); + + // Attach a resource to a collection, as usual. + let _collection = + insert_collection(collection_id, "collection", &pool).await; + let resource_id = uuid::Uuid::new_v4(); + let _resource = insert_resource(resource_id, "resource", &pool).await; + let attach = Collection::attach_resource( + collection_id, + resource_id, + collection::table.into_boxed(), + resource::table.into_boxed(), + 10, + diesel::update(resource::table) + .set(resource::dsl::collection_id.eq(collection_id)), + ) + .attach_and_get_result_async(pool.pool()) + .await; + assert_eq!( + attach.expect("Attach should have worked").1.id(), + resource_id + ); + + // Try attaching when well below the capacity. + let attach = Collection::attach_resource( + collection_id, + resource_id, + collection::table.into_boxed(), + resource::table.into_boxed(), + 10, + diesel::update(resource::table) + .set(resource::dsl::collection_id.eq(collection_id)), + ) + .attach_and_get_result_async(pool.pool()) + .await; + let err = attach.expect_err("Should have failed to attach"); + + // A caller should be able to inspect this result, see that the count of + // attached devices is below capacity, and that resource.collection_id + // is already set. This should provide enough context to identify "the + // resource is already attached". + match err { + AttachError::NoUpdate { attached_count, resource, collection } => { + assert_eq!(attached_count, 1); + assert_eq!( + *resource + .collection_id + .as_ref() + .expect("Should already be attached"), + collection_id + ); + assert_eq!(resource, get_resource(resource_id, &pool).await); + assert_eq!( + collection, + get_collection(collection_id, &pool).await + ); + } + _ => panic!("Unexpected error: {:?}", err), + }; + + // Let's try attaching the same resource again - while at capacity. + let attach = Collection::attach_resource( + collection_id, + resource_id, + collection::table.into_boxed(), + resource::table.into_boxed(), + 1, + diesel::update(resource::table) + .set(resource::dsl::collection_id.eq(collection_id)), + ) + .attach_and_get_result_async(pool.pool()) + .await; + let err = attach.expect_err("Should have failed to attach"); + // Even when at capacity, the same information should be propagated back + // to the caller. + match err { + AttachError::NoUpdate { attached_count, resource, collection } => { + assert_eq!(attached_count, 1); + assert_eq!( + *resource + .collection_id + .as_ref() + .expect("Should already be attached"), + collection_id + ); + assert_eq!(resource, get_resource(resource_id, &pool).await); + assert_eq!( + collection, + get_collection(collection_id, &pool).await + ); + } + _ => panic!("Unexpected error: {:?}", err), + }; + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_attach_with_filters() { + let logctx = dev::test_setup_log("test_attach_once"); + let mut db = test_setup_database(&logctx.log).await; + let cfg = db::Config { url: db.pg_config().clone() }; + let pool = db::Pool::new(&cfg); + + setup_db(&pool).await; + + let collection_id = uuid::Uuid::new_v4(); + let resource_id = uuid::Uuid::new_v4(); + + // Create the collection and resource. + let _collection = + insert_collection(collection_id, "collection", &pool).await; + let _resource = insert_resource(resource_id, "resource", &pool).await; + + // Attach the resource to the collection. + // + // Note that we are also filtering for specific conditions on the + // collection and resource - admittedly, just the name, but this could + // also be used to check the state of a disk, instance, etc. + let attach = Collection::attach_resource( + collection_id, + resource_id, + collection::table + .filter(collection::name.eq("collection")) + .into_boxed(), + resource::table.filter(resource::name.eq("resource")).into_boxed(), + 10, + // When actually performing the update, update the collection ID + // as well as an auxiliary field - the description. + // + // This provides an example of how one could attach an ID and update + // the state of a resource simultaneously. + diesel::update(resource::table).set(( + resource::dsl::collection_id.eq(collection_id), + resource::dsl::description.eq("new description".to_string()), + )), + ) + .attach_and_get_result_async(pool.pool()) + .await; + + let (_, returned_resource) = attach.expect("Attach should have worked"); + assert_eq!( + returned_resource.collection_id.expect("Expected a collection ID"), + collection_id + ); + assert_eq!(returned_resource, get_resource(resource_id, &pool).await); + assert_eq!(returned_resource.description(), "new description"); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_attach_deleted_resource_fails() { + let logctx = dev::test_setup_log("test_attach_deleted_resource_fails"); + let mut db = test_setup_database(&logctx.log).await; + let cfg = db::Config { url: db.pg_config().clone() }; + let pool = db::Pool::new(&cfg); + + setup_db(&pool).await; + + let collection_id = uuid::Uuid::new_v4(); + let resource_id = uuid::Uuid::new_v4(); + + // Create the collection and resource. + let _collection = + insert_collection(collection_id, "collection", &pool).await; + let _resource = insert_resource(resource_id, "resource", &pool).await; + + // Immediately soft-delete the resource. + diesel::update( + resource::table.filter(resource::dsl::id.eq(resource_id)), + ) + .set(resource::dsl::time_deleted.eq(Utc::now())) + .execute_async(pool.pool()) + .await + .unwrap(); + + // Attach the resource to the collection. Observe a failure which is + // indistinguishable from the resource not existing. + let attach = Collection::attach_resource( + collection_id, + resource_id, + collection::table.into_boxed(), + resource::table.into_boxed(), + 10, + diesel::update(resource::table) + .set(resource::dsl::collection_id.eq(collection_id)), + ) + .attach_and_get_result_async(pool.pool()) + .await; + assert!(matches!(attach, Err(AttachError::ResourceNotFound))); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_attach_without_update_filter() { + let logctx = dev::test_setup_log("test_attach_without_update_filter"); + let mut db = test_setup_database(&logctx.log).await; + let cfg = db::Config { url: db.pg_config().clone() }; + let pool = db::Pool::new(&cfg); + + setup_db(&pool).await; + + let collection_id = uuid::Uuid::new_v4(); + + // Create the collection and some resources. + let _collection = + insert_collection(collection_id, "collection", &pool).await; + let resource_id1 = uuid::Uuid::new_v4(); + let resource_id2 = uuid::Uuid::new_v4(); + let _resource1 = + insert_resource(resource_id1, "resource1", &pool).await; + let _resource2 = + insert_resource(resource_id2, "resource2", &pool).await; + + // Attach the resource to the collection. + // + // NOTE: In the update statement, we aren't filtering by resource ID, + // even though we explicitly have two "live" resources". + let attach = Collection::attach_resource( + collection_id, + resource_id1, + collection::table.into_boxed(), + resource::table.into_boxed(), + 10, + diesel::update(resource::table) + .set(resource::dsl::collection_id.eq(collection_id)), + ) + .attach_and_get_result_async(pool.pool()) + .await; + + let (_, returned_resource) = attach.expect("Attach should have worked"); + assert_eq!(returned_resource.id(), resource_id1); + + // Note that only "resource1" should be attached. + // "resource2" should have automatically been filtered away from the + // update statement, regardless of user input. + assert_eq!( + get_resource(resource_id1, &pool).await.collection_id.unwrap(), + collection_id + ); + assert!(get_resource(resource_id2, &pool) + .await + .collection_id + .is_none()); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } +} diff --git a/nexus/src/db/collection_detach.rs b/nexus/src/db/collection_detach.rs new file mode 100644 index 00000000000..62974c41593 --- /dev/null +++ b/nexus/src/db/collection_detach.rs @@ -0,0 +1,1117 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! CTE for detaching a resource from a collection. +//! +//! This atomically: +//! - Checks if the collection exists and is not soft deleted +//! - Checks if the resource exists and is not soft deleted +//! - Validates conditions on both the collection and resource +//! - Updates the resource row + +use super::collection_attach::aliases::*; +use super::cte_utils::{ + BoxableTable, BoxableUpdateStatement, BoxedQuery, ExprSqlType, FilterBy, + QueryFromClause, QuerySqlType, +}; +use super::pool::DbConnection; +use crate::db::collection_attach::DatastoreAttachTarget; +use async_bb8_diesel::{AsyncRunQueryDsl, ConnectionManager, PoolError}; +use diesel::associations::HasTable; +use diesel::expression::{AsExpression, Expression}; +use diesel::helper_types::*; +use diesel::pg::Pg; +use diesel::prelude::*; +use diesel::query_builder::*; +use diesel::query_dsl::methods as query_methods; +use diesel::query_source::Table; +use diesel::sql_types::{Nullable, SingleValue}; +use std::fmt::Debug; + +/// Trait to be implemented by structs representing a detachable collection. +/// +/// A blanket implementation is provided for traits that implement +/// [`DatastoreAttachTarget`]. +pub trait DatastoreDetachTarget: + DatastoreAttachTarget +{ + /// Creates a statement for detaching a resource from the given collection. + /// + /// This statement allows callers to atomically check the state of a + /// collection and a resource while detaching a resource. + /// + /// - `collection_id`: Primary key of the collection being removed from. + /// - `resource_id`: Primary key of the resource being detached. + /// - `collection_query`: An optional query for collection state. The + /// CTE will automatically filter this query to `collection_id`, and + /// validate that the "time deleted" column is NULL. + /// - `resource_query`: An optional query for the resource state. The + /// CTE will automatically filter this query to `resource_id`, + /// validate that the "time deleted" column is NULL, and validate that the + /// "collection_id" column points to `collection_id`. + /// - `update`: An update statement, identifying how the resource object + /// should be modified to be detached + /// + /// The V type refers to the "update target" of the UpdateStatement, + /// and should generally be inferred rather than explicitly specified. + fn detach_resource( + collection_id: Self::Id, + resource_id: Self::Id, + + collection_query: BoxedQuery>, + resource_query: BoxedQuery>, + + // We are intentionally picky about this update statement: + // - The second argument - the WHERE clause - must match the default + // for the table. This encourages the "resource_query" filter to be + // used instead, and makes it possible for the CTE to modify the + // filter here (ensuring "resource_id" is selected). + // - Additionally, UpdateStatement's fourth argument defaults to Ret = + // NoReturningClause. This enforces that the given input statement does + // not have a RETURNING clause, and also lets the CTE control this + // value. + update: UpdateStatement< + ResourceTable, + ResourceTableDefaultWhereClause, + V, + >, + ) -> DetachFromCollectionStatement + where + // Treat the collection and resource as boxed tables. + CollectionTable: BoxableTable, + ResourceTable: BoxableTable, + + // Allows treating "collection_exists_query" as a boxed "dyn QueryFragment". + QueryFromClause>: + QueryFragment + Send, + QuerySqlType>: Send, + // Allows treating "resource_exists_query" as a boxed "dyn QueryFragment". + QueryFromClause>: + QueryFragment + Send, + QuerySqlType>: Send, + + // Allows calling ".filter()" on the boxed collection table. + BoxedQuery>: FilterBy, Self::Id>> + + FilterBy>, + // Allows calling ".filter()" on the boxed resource table. + BoxedQuery>: FilterBy, Self::Id>> + + FilterBy> + + FilterBy>, + + // Allows calling "update.into_boxed()" + UpdateStatement< + ResourceTable, + ResourceTableDefaultWhereClause, + V, + >: BoxableUpdateStatement, V>, + + // Allows calling + // ".filter(resource_table().primary_key().eq(resource_id)" on the + // boxed update statement. + BoxedUpdateStatement<'static, Pg, ResourceTable, V>: + FilterBy, Self::Id>>, + + // Allows using "id" in expressions (e.g. ".eq(...)") with... + Self::Id: AsExpression< + // ... The Collection table's PK + SerializedCollectionPrimaryKey, + > + AsExpression< + // ... The Resource table's PK + SerializedResourcePrimaryKey, + > + AsExpression< + // ... The Resource table's FK to the Collection table + SerializedResourceForeignKey, + >, + ExprSqlType>: SingleValue, + ExprSqlType>: SingleValue, + ExprSqlType: SingleValue, + + // Necessary to actually select the resource in the output type. + ResourceType: Selectable, + { + let collection_table = + || as HasTable>::table(); + let resource_table = + || as HasTable>::table(); + + // Create new queries to determine if the collection and resources + // already exist. + let collection_exists_query = Box::new( + collection_table() + .into_boxed() + .filter(collection_table().primary_key().eq(collection_id)) + .filter(Self::CollectionTimeDeletedColumn::default().is_null()), + ); + let resource_exists_query = Box::new( + resource_table() + .into_boxed() + .filter(resource_table().primary_key().eq(resource_id)) + .filter(Self::ResourceTimeDeletedColumn::default().is_null()), + ); + + // For the queries which decide whether or not we'll perform the update, + // extend the user-provided arguments. + // + // We force these queries to: + // - Check against the primary key of the target objects + // - Ensure the objects are not deleted + // - (for the resource) Ensure it is attached + // - (for the update) Ensure that only the resource with "resource_id" + // is modified. + let collection_query = Box::new( + collection_query + .filter(collection_table().primary_key().eq(collection_id)) + .filter(Self::CollectionTimeDeletedColumn::default().is_null()), + ); + let resource_query = Box::new( + resource_query + .filter(resource_table().primary_key().eq(resource_id)) + .filter(Self::ResourceTimeDeletedColumn::default().is_null()) + .filter( + Self::ResourceCollectionIdColumn::default() + .eq(collection_id), + ), + ); + + let update_resource_statement = update + .into_boxed() + .filter(resource_table().primary_key().eq(resource_id)); + + let resource_returning_clause = ResourceType::as_returning(); + DetachFromCollectionStatement { + collection_exists_query, + resource_exists_query, + collection_query, + resource_query, + update_resource_statement, + resource_returning_clause, + } + } +} + +impl DatastoreDetachTarget for T where + T: DatastoreAttachTarget +{ +} + +/// The CTE described in the module docs +#[must_use = "Queries must be executed"] +pub struct DetachFromCollectionStatement +where + ResourceType: Selectable, + C: DatastoreDetachTarget, +{ + // Query which answers: "Does the collection exist?" + collection_exists_query: Box + Send>, + // Query which answers: "Does the resource exist?" + resource_exists_query: Box + Send>, + // A (mostly) user-provided query for validating the collection. + collection_query: Box + Send>, + // A (mostly) user-provided query for validating the resource. + resource_query: Box + Send>, + + // Update statement for the resource. + update_resource_statement: + BoxedUpdateStatement<'static, Pg, ResourceTable, V>, + // Describes what should be returned after UPDATE-ing the resource. + resource_returning_clause: AsSelect, +} + +impl QueryId + for DetachFromCollectionStatement +where + ResourceType: Selectable, + C: DatastoreDetachTarget, +{ + type QueryId = (); + const HAS_STATIC_QUERY_ID: bool = false; +} + +/// Result of [`DetachFromCollectionStatement`] when executed asynchronously +pub type AsyncDetachFromCollectionResult = + Result>; + +/// Result of [`DetachFromCollectionStatement`] when executed synchronously +pub type SyncDetachFromCollectionResult = + Result>; + +/// Errors returned by [`DetachFromCollectionStatement`]. +#[derive(Debug)] +pub enum DetachError { + /// The collection that the query was removing from does not exist + CollectionNotFound, + /// The resource being detached does not exist + ResourceNotFound, + /// Although the resource and collection exist, the update did not occur + /// + /// The unchanged resource and collection are returned as a part of this + /// error; it is the responsibility of the caller to determine which + /// condition was not met. + NoUpdate { resource: ResourceType, collection: C }, + /// Other database error + DatabaseError(E), +} + +/// Describes the type returned from the actual CTE, which is parsed +/// and interpreted before propagating it to users of the Rust API. +pub type RawOutput = + (i64, Option, Option, Option); + +impl DetachFromCollectionStatement +where + ResourceType: 'static + Debug + Send + Selectable, + C: 'static + Debug + DatastoreDetachTarget + Send, + ResourceTable: 'static + Table + Send + Copy + Debug, + V: 'static + Send, + DetachFromCollectionStatement: Send, +{ + /// Issues the CTE asynchronously and parses the result. + pub async fn detach_and_get_result_async( + self, + pool: &bb8::Pool>, + ) -> AsyncDetachFromCollectionResult + where + // We require this bound to ensure that "Self" is runnable as query. + Self: query_methods::LoadQuery< + 'static, + DbConnection, + RawOutput, + >, + { + self.get_result_async::>(pool) + .await + // If the database returns an error, propagate it right away. + .map_err(DetachError::DatabaseError) + // Otherwise, parse the output to determine if the CTE succeeded. + .and_then(Self::parse_result) + } + + /// Issues the CTE synchronously and parses the result. + pub fn detach_and_get_result( + self, + conn: &mut DbConnection, + ) -> SyncDetachFromCollectionResult + where + // We require this bound to ensure that "Self" is runnable as query. + Self: query_methods::LoadQuery< + 'static, + DbConnection, + RawOutput, + >, + { + self.get_result::>(conn) + .map_err(DetachError::DatabaseError) + .and_then(Self::parse_result) + } + + fn parse_result( + result: RawOutput, + ) -> Result> { + let ( + _, + collection_before_update, + resource_before_update, + resource_after_update, + ) = result; + + let collection_before_update = collection_before_update + .ok_or_else(|| DetachError::CollectionNotFound)?; + + let resource_before_update = resource_before_update + .ok_or_else(|| DetachError::ResourceNotFound)?; + + match resource_after_update { + Some(resource) => Ok(resource), + None => Err(DetachError::NoUpdate { + resource: resource_before_update, + collection: collection_before_update, + }), + } + } +} + +type SelectableSqlType = + <>::SelectExpression as Expression>::SqlType; + +impl Query + for DetachFromCollectionStatement +where + ResourceType: Selectable, + C: DatastoreDetachTarget, +{ + type SqlType = ( + // Ignored "SELECT 1" value + diesel::sql_types::BigInt, + // If the collection exists, the value before update. + Nullable>, + // If the resource exists, the value before update. + Nullable>, + // If the resource was updated, the new value. + Nullable>, + ); +} + +impl RunQueryDsl + for DetachFromCollectionStatement +where + ResourceType: Selectable, + C: DatastoreDetachTarget, +{ +} + +/// This implementation uses a CTE which attempts to do the following: +/// +/// 1. (collection_by_id, resource_by_id): Identify if the collection and +/// resource objects exist at all. +/// 2. (collection_info, resource_info): Checks for arbitrary user-provided +/// constraints on the collection and resource objects. +/// 3. (do_update): IFF all previous checks succeeded, make a decision to perfom +/// an update. +/// 4. (updated_resource): Apply user-provided updates on the resource - +/// presumably, setting the collection ID value. +/// +/// This is implemented as follows: +/// +/// ```text +/// // WITH +/// // /* Look up the collection - Check for existence only! */ +/// // collection_by_id AS ( +/// // SELECT * FROM C +/// // WHERE = AND IS NULL +/// // FOR UPDATE +/// // ), +/// // /* Look up the resource - Check for existence only! */ +/// // resource_by_id AS ( +/// // SELECT * FROM R +/// // WHERE = AND IS NULL +/// // FOR UPDATE +/// // ), +/// // /* Look up the collection - Check for additional constraints */ +/// // collection_info AS ( +/// // SELECT * FROM C +/// // WHERE = AND IS NULL AND +/// // +/// // FOR UPDATE +/// // ), +/// // /* Look up the resource - Check for additional constraints */ +/// // resource_info AS ( +/// // SELECT * FROM R +/// // WHERE = AND IS NULL AND +/// // = AND +/// // FOR UPDATE +/// // ), +/// // /* Make a decision on whether or not to apply ANY updates */ +/// // do_update AS ( +/// // SELECT IF( +/// // EXISTS(SELECT id FROM collection_info) AND +/// // EXISTS(SELECT id FROM resource_info), +/// // TRUE, FALSE), +/// // ), +/// // /* Update the resource */ +/// // updated_resource AS ( +/// // UPDATE R SET +/// // WHERE IN (SELECT FROM resource_info) AND (SELECT * FROM do_update) +/// // RETURNING * +/// // ) +/// // SELECT * FROM +/// // (SELECT 1) +/// // LEFT JOIN (SELECT * FROM collection_by_id) ON TRUE +/// // LEFT JOIN (SELECT * FROM resource_by_id) ON TRUE +/// // LEFT JOIN (SELECT * FROM updated_resource) ON TRUE; +/// ``` +impl QueryFragment + for DetachFromCollectionStatement +where + ResourceType: Selectable, + C: DatastoreDetachTarget, + CollectionPrimaryKey: diesel::Column, + // Necessary to "walk_ast" over "self.update_resource_statement". + BoxedUpdateStatement<'static, Pg, ResourceTable, V>: + QueryFragment, + // Necessary to "walk_ast" over "self.resource_returning_clause". + AsSelect: QueryFragment, + // Necessary to "walk_ast" over "self.collection_returning_clause". + AsSelect: QueryFragment, +{ + fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> { + out.unsafe_to_cache_prepared(); + out.push_sql("WITH collection_by_id AS ("); + self.collection_exists_query.walk_ast(out.reborrow())?; + out.push_sql(" FOR UPDATE), "); + + out.push_sql("resource_by_id AS ("); + self.resource_exists_query.walk_ast(out.reborrow())?; + out.push_sql(" FOR UPDATE), "); + + out.push_sql("collection_info AS ("); + self.collection_query.walk_ast(out.reborrow())?; + out.push_sql(" FOR UPDATE), "); + + out.push_sql("resource_info AS ("); + self.resource_query.walk_ast(out.reborrow())?; + out.push_sql(" FOR UPDATE), "); + + out.push_sql("do_update AS (SELECT IF(EXISTS(SELECT "); + out.push_identifier(CollectionIdColumn::::NAME)?; + out.push_sql(" FROM collection_info) AND EXISTS(SELECT "); + out.push_identifier(ResourceIdColumn::::NAME)?; + out.push_sql(" FROM resource_info), TRUE,FALSE)), "); + + out.push_sql("updated_resource AS ("); + self.update_resource_statement.walk_ast(out.reborrow())?; + // NOTE: It is safe to start with "AND" - we forced the update statement + // to have a WHERE clause on the primary key of the resource. + out.push_sql(" AND (SELECT * FROM do_update)"); + out.push_sql(" RETURNING "); + self.resource_returning_clause.walk_ast(out.reborrow())?; + out.push_sql(") "); + + // Why do all these LEFT JOINs here? In short, to ensure that we are + // always returning a constant number of columns. + // + // Diesel parses output "one column at a time", mapping to structs or + // tuples. For example, when deserializing an "Option<(A, B, C)>" object, + // Diesel checks nullability of the "A", "B", and "C" columns. + // If any of those columns unexpectedly return NULL, the entire object is + // treated as "None". + // + // In summary: + // - Without the LEFT JOINs, we'd occassionally be returning "zero + // rows", which would make the output entirely unparseable. + // - If we used an operation like COALESCE (which attempts to map the + // result of an expression to either "NULL" or a single tuple column), + // Diesel struggles to map the result back to a structure. + // + // By returning a static number of columns, each component of the + // "RawOutput" tuple can be parsed, regardless of nullability, without + // preventing later portions of the result from being parsed. + out.push_sql( + "SELECT * FROM \ + (SELECT 1) \ + LEFT JOIN (SELECT * FROM collection_by_id) ON TRUE \ + LEFT JOIN (SELECT * FROM resource_by_id) ON TRUE \ + LEFT JOIN (SELECT * FROM updated_resource) ON TRUE;", + ); + + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::{DatastoreDetachTarget, DetachError}; + use crate::db::collection_attach::DatastoreAttachTarget; + use crate::db::{ + self, error::TransactionError, identity::Resource as IdentityResource, + }; + use async_bb8_diesel::{ + AsyncConnection, AsyncRunQueryDsl, AsyncSimpleConnection, + }; + use chrono::Utc; + use db_macros::Resource; + use diesel::expression_methods::ExpressionMethods; + use diesel::pg::Pg; + use diesel::QueryDsl; + use diesel::SelectableHelper; + use nexus_test_utils::db::test_setup_database; + use omicron_common::api::external::{IdentityMetadataCreateParams, Name}; + use omicron_test_utils::dev; + use uuid::Uuid; + + table! { + test_schema.collection (id) { + id -> Uuid, + name -> Text, + description -> Text, + time_created -> Timestamptz, + time_modified -> Timestamptz, + time_deleted -> Nullable, + } + } + + table! { + test_schema.resource (id) { + id -> Uuid, + name -> Text, + description -> Text, + time_created -> Timestamptz, + time_modified -> Timestamptz, + time_deleted -> Nullable, + collection_id -> Nullable, + } + } + + async fn setup_db(pool: &crate::db::Pool) { + let connection = pool.pool().get().await.unwrap(); + (*connection) + .batch_execute_async( + "CREATE SCHEMA IF NOT EXISTS test_schema; \ + CREATE TABLE IF NOT EXISTS test_schema.collection ( \ + id UUID PRIMARY KEY, \ + name STRING(63) NOT NULL, \ + description STRING(512) NOT NULL, \ + time_created TIMESTAMPTZ NOT NULL, \ + time_modified TIMESTAMPTZ NOT NULL, \ + time_deleted TIMESTAMPTZ); \ + CREATE TABLE IF NOT EXISTS test_schema.resource( \ + id UUID PRIMARY KEY, \ + name STRING(63) NOT NULL, \ + description STRING(512) NOT NULL, \ + time_created TIMESTAMPTZ NOT NULL, \ + time_modified TIMESTAMPTZ NOT NULL, \ + time_deleted TIMESTAMPTZ, \ + collection_id UUID); \ + CREATE INDEX IF NOT EXISTS collection_index ON test_schema.resource ( \ + collection_id \ + ) WHERE collection_id IS NOT NULL AND time_deleted IS NULL; \ + TRUNCATE test_schema.collection; \ + TRUNCATE test_schema.resource", + ) + .await + .unwrap(); + } + + /// Describes a resource within the database. + #[derive( + Clone, Queryable, Insertable, Debug, Resource, Selectable, PartialEq, + )] + #[diesel(table_name = resource)] + struct Resource { + #[diesel(embed)] + pub identity: ResourceIdentity, + pub collection_id: Option, + } + + #[derive( + Clone, Queryable, Insertable, Debug, Resource, Selectable, PartialEq, + )] + #[diesel(table_name = collection)] + struct Collection { + #[diesel(embed)] + pub identity: CollectionIdentity, + } + + impl DatastoreAttachTarget for Collection { + type Id = uuid::Uuid; + + type CollectionIdColumn = collection::dsl::id; + type CollectionTimeDeletedColumn = collection::dsl::time_deleted; + + type ResourceIdColumn = resource::dsl::id; + type ResourceCollectionIdColumn = resource::dsl::collection_id; + type ResourceTimeDeletedColumn = resource::dsl::time_deleted; + } + + async fn insert_collection( + id: Uuid, + name: &str, + pool: &db::Pool, + ) -> Collection { + let create_params = IdentityMetadataCreateParams { + name: Name::try_from(name.to_string()).unwrap(), + description: "description".to_string(), + }; + let c = + Collection { identity: CollectionIdentity::new(id, create_params) }; + + diesel::insert_into(collection::table) + .values(c) + .execute_async(pool.pool()) + .await + .unwrap(); + + get_collection(id, &pool).await + } + + async fn get_collection(id: Uuid, pool: &db::Pool) -> Collection { + collection::table + .find(id) + .select(Collection::as_select()) + .first_async(pool.pool()) + .await + .unwrap() + } + + async fn insert_resource( + id: Uuid, + name: &str, + pool: &db::Pool, + ) -> Resource { + let create_params = IdentityMetadataCreateParams { + name: Name::try_from(name.to_string()).unwrap(), + description: "description".to_string(), + }; + let r = Resource { + identity: ResourceIdentity::new(id, create_params), + collection_id: None, + }; + + diesel::insert_into(resource::table) + .values(r) + .execute_async(pool.pool()) + .await + .unwrap(); + + get_resource(id, &pool).await + } + + async fn attach_resource( + collection_id: Uuid, + resource_id: Uuid, + pool: &db::Pool, + ) { + Collection::attach_resource( + collection_id, + resource_id, + collection::table.into_boxed(), + resource::table.into_boxed(), + 100, + diesel::update(resource::table) + .set(resource::dsl::collection_id.eq(collection_id)), + ) + .attach_and_get_result_async(pool.pool()) + .await + .unwrap(); + } + + async fn get_resource(id: Uuid, pool: &db::Pool) -> Resource { + resource::table + .find(id) + .select(Resource::as_select()) + .first_async(pool.pool()) + .await + .unwrap() + } + + #[test] + fn test_verify_query() { + let collection_id = + uuid::Uuid::parse_str("cccccccc-cccc-cccc-cccc-cccccccccccc") + .unwrap(); + let resource_id = + uuid::Uuid::parse_str("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa") + .unwrap(); + let detach = Collection::detach_resource( + collection_id, + resource_id, + collection::table.into_boxed(), + resource::table.into_boxed(), + diesel::update(resource::table) + .set(resource::dsl::collection_id.eq(Option::::None)), + ); + let query = diesel::debug_query::(&detach).to_string(); + + let expected_query = "WITH \ + collection_by_id AS (\ + SELECT \ + \"test_schema\".\"collection\".\"id\", \ + \"test_schema\".\"collection\".\"name\", \ + \"test_schema\".\"collection\".\"description\", \ + \"test_schema\".\"collection\".\"time_created\", \ + \"test_schema\".\"collection\".\"time_modified\", \ + \"test_schema\".\"collection\".\"time_deleted\" \ + FROM \"test_schema\".\"collection\" \ + WHERE (\ + (\"test_schema\".\"collection\".\"id\" = $1) AND \ + (\"test_schema\".\"collection\".\"time_deleted\" IS NULL)\ + ) FOR UPDATE\ + ), \ + resource_by_id AS (\ + SELECT \ + \"test_schema\".\"resource\".\"id\", \ + \"test_schema\".\"resource\".\"name\", \ + \"test_schema\".\"resource\".\"description\", \ + \"test_schema\".\"resource\".\"time_created\", \ + \"test_schema\".\"resource\".\"time_modified\", \ + \"test_schema\".\"resource\".\"time_deleted\", \ + \"test_schema\".\"resource\".\"collection_id\" \ + FROM \"test_schema\".\"resource\" \ + WHERE (\ + (\"test_schema\".\"resource\".\"id\" = $2) AND \ + (\"test_schema\".\"resource\".\"time_deleted\" IS NULL)\ + ) FOR UPDATE\ + ), \ + collection_info AS (\ + SELECT \ + \"test_schema\".\"collection\".\"id\", \ + \"test_schema\".\"collection\".\"name\", \ + \"test_schema\".\"collection\".\"description\", \ + \"test_schema\".\"collection\".\"time_created\", \ + \"test_schema\".\"collection\".\"time_modified\", \ + \"test_schema\".\"collection\".\"time_deleted\" \ + FROM \"test_schema\".\"collection\" \ + WHERE (\ + (\"test_schema\".\"collection\".\"id\" = $3) AND \ + (\"test_schema\".\"collection\".\"time_deleted\" IS NULL)\ + ) FOR UPDATE\ + ), \ + resource_info AS (\ + SELECT \ + \"test_schema\".\"resource\".\"id\", \ + \"test_schema\".\"resource\".\"name\", \ + \"test_schema\".\"resource\".\"description\", \ + \"test_schema\".\"resource\".\"time_created\", \ + \"test_schema\".\"resource\".\"time_modified\", \ + \"test_schema\".\"resource\".\"time_deleted\", \ + \"test_schema\".\"resource\".\"collection_id\" \ + FROM \"test_schema\".\"resource\" \ + WHERE ((\ + (\"test_schema\".\"resource\".\"id\" = $4) AND \ + (\"test_schema\".\"resource\".\"time_deleted\" IS NULL)) AND \ + (\"test_schema\".\"resource\".\"collection_id\" = $5)\ + ) FOR UPDATE\ + ), \ + do_update AS (\ + SELECT IF(\ + EXISTS(SELECT \"id\" FROM collection_info) AND \ + EXISTS(SELECT \"id\" FROM resource_info), \ + TRUE,\ + FALSE)\ + ), \ + updated_resource AS (\ + UPDATE \ + \"test_schema\".\"resource\" \ + SET \ + \"collection_id\" = $6 \ + WHERE \ + (\"test_schema\".\"resource\".\"id\" = $7) AND \ + (SELECT * FROM do_update) \ + RETURNING \ + \"test_schema\".\"resource\".\"id\", \ + \"test_schema\".\"resource\".\"name\", \ + \"test_schema\".\"resource\".\"description\", \ + \"test_schema\".\"resource\".\"time_created\", \ + \"test_schema\".\"resource\".\"time_modified\", \ + \"test_schema\".\"resource\".\"time_deleted\", \ + \"test_schema\".\"resource\".\"collection_id\"\ + ) \ + SELECT * FROM \ + (SELECT 1) \ + LEFT JOIN (SELECT * FROM collection_by_id) ON TRUE \ + LEFT JOIN (SELECT * FROM resource_by_id) ON TRUE \ + LEFT JOIN (SELECT * FROM updated_resource) ON TRUE; -- binds: [cccccccc-cccc-cccc-cccc-cccccccccccc, aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa, cccccccc-cccc-cccc-cccc-cccccccccccc, aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa, cccccccc-cccc-cccc-cccc-cccccccccccc, None, aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa]"; + assert_eq!(query, expected_query); + } + + #[tokio::test] + async fn test_detach_missing_collection_fails() { + let logctx = + dev::test_setup_log("test_detach_missing_collection_fails"); + let mut db = test_setup_database(&logctx.log).await; + let cfg = db::Config { url: db.pg_config().clone() }; + let pool = db::Pool::new(&cfg); + + setup_db(&pool).await; + + let collection_id = uuid::Uuid::new_v4(); + let resource_id = uuid::Uuid::new_v4(); + let detach = Collection::detach_resource( + collection_id, + resource_id, + collection::table.into_boxed(), + resource::table.into_boxed(), + diesel::update(resource::table) + .set(resource::dsl::collection_id.eq(Option::::None)), + ) + .detach_and_get_result_async(pool.pool()) + .await; + + assert!(matches!(detach, Err(DetachError::CollectionNotFound))); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_detach_missing_resource_fails() { + let logctx = dev::test_setup_log("test_detach_missing_resource_fails"); + let mut db = test_setup_database(&logctx.log).await; + let cfg = db::Config { url: db.pg_config().clone() }; + let pool = db::Pool::new(&cfg); + + setup_db(&pool).await; + + let collection_id = uuid::Uuid::new_v4(); + let resource_id = uuid::Uuid::new_v4(); + + // Create the collection + let collection = + insert_collection(collection_id, "collection", &pool).await; + + // Attempt to detach - even though the resource does not exist. + let detach = Collection::detach_resource( + collection_id, + resource_id, + collection::table.into_boxed(), + resource::table.into_boxed(), + diesel::update(resource::table) + .set(resource::dsl::collection_id.eq(Option::::None)), + ) + .detach_and_get_result_async(pool.pool()) + .await; + + assert!(matches!(detach, Err(DetachError::ResourceNotFound))); + // The collection should remain unchanged. + assert_eq!(collection, get_collection(collection_id, &pool).await); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_detach_once() { + let logctx = dev::test_setup_log("test_detach_once"); + let mut db = test_setup_database(&logctx.log).await; + let cfg = db::Config { url: db.pg_config().clone() }; + let pool = db::Pool::new(&cfg); + + setup_db(&pool).await; + + let collection_id = uuid::Uuid::new_v4(); + let resource_id = uuid::Uuid::new_v4(); + + // Create the collection and resource. Attach them. + let _collection = + insert_collection(collection_id, "collection", &pool).await; + let _resource = insert_resource(resource_id, "resource", &pool).await; + attach_resource(collection_id, resource_id, &pool).await; + + // Detach the resource from the collection. + let detach = Collection::detach_resource( + collection_id, + resource_id, + collection::table.into_boxed(), + resource::table.into_boxed(), + diesel::update(resource::table) + .set(resource::dsl::collection_id.eq(Option::::None)), + ) + .detach_and_get_result_async(pool.pool()) + .await; + + // "detach_and_get_result_async" should return the "detached" resource. + let returned_resource = detach.expect("Detach should have worked"); + assert!(returned_resource.collection_id.is_none(),); + // The returned value should be the latest value in the DB. + assert_eq!(returned_resource, get_resource(resource_id, &pool).await); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_detach_once_synchronous() { + let logctx = dev::test_setup_log("test_detach_once_synchronous"); + let mut db = test_setup_database(&logctx.log).await; + let cfg = db::Config { url: db.pg_config().clone() }; + let pool = db::Pool::new(&cfg); + + setup_db(&pool).await; + + let collection_id = uuid::Uuid::new_v4(); + let resource_id = uuid::Uuid::new_v4(); + + // Create the collection and resource. + let _collection = + insert_collection(collection_id, "collection", &pool).await; + let _resource = insert_resource(resource_id, "resource", &pool).await; + attach_resource(collection_id, resource_id, &pool).await; + + // Detach the resource from the collection. + let detach_query = Collection::detach_resource( + collection_id, + resource_id, + collection::table.into_boxed(), + resource::table.into_boxed(), + diesel::update(resource::table) + .set(resource::dsl::collection_id.eq(Option::::None)), + ); + + type TxnError = TransactionError< + DetachError, + >; + let result = pool + .pool() + .transaction(move |conn| { + detach_query.detach_and_get_result(conn).map_err(|e| match e { + DetachError::DatabaseError(e) => TxnError::from(e), + e => TxnError::CustomError(e), + }) + }) + .await; + + // "detach_and_get_result" should return the "detached" resource. + let returned_resource = result.expect("Detach should have worked"); + assert!(returned_resource.collection_id.is_none()); + // The returned values should be the latest value in the DB. + assert_eq!(returned_resource, get_resource(resource_id, &pool).await); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_detach_while_already_detached() { + let logctx = dev::test_setup_log("test_detach_while_already_detached"); + let mut db = test_setup_database(&logctx.log).await; + let cfg = db::Config { url: db.pg_config().clone() }; + let pool = db::Pool::new(&cfg); + + setup_db(&pool).await; + + let collection_id = uuid::Uuid::new_v4(); + + let _collection = + insert_collection(collection_id, "collection", &pool).await; + let resource_id = uuid::Uuid::new_v4(); + let _resource = insert_resource(resource_id, "resource", &pool).await; + attach_resource(collection_id, resource_id, &pool).await; + + // Detach a resource from a collection, as usual. + let detach = Collection::detach_resource( + collection_id, + resource_id, + collection::table.into_boxed(), + resource::table.into_boxed(), + diesel::update(resource::table) + .set(resource::dsl::collection_id.eq(Option::::None)), + ) + .detach_and_get_result_async(pool.pool()) + .await; + assert_eq!( + detach.expect("Detach should have worked").id(), + resource_id + ); + + // Try detaching once more + let detach = Collection::detach_resource( + collection_id, + resource_id, + collection::table.into_boxed(), + resource::table.into_boxed(), + diesel::update(resource::table) + .set(resource::dsl::collection_id.eq(Option::::None)), + ) + .detach_and_get_result_async(pool.pool()) + .await; + let err = detach.expect_err("Should have failed to detach"); + + // A caller should be able to inspect this result, the resource is + // already detached. + match err { + DetachError::NoUpdate { resource, collection } => { + assert!(resource.collection_id.as_ref().is_none()); + assert_eq!(resource, get_resource(resource_id, &pool).await); + assert_eq!( + collection, + get_collection(collection_id, &pool).await + ); + } + _ => panic!("Unexpected error: {:?}", err), + }; + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_detach_deleted_resource_fails() { + let logctx = dev::test_setup_log("test_detach_deleted_resource_fails"); + let mut db = test_setup_database(&logctx.log).await; + let cfg = db::Config { url: db.pg_config().clone() }; + let pool = db::Pool::new(&cfg); + + setup_db(&pool).await; + + let collection_id = uuid::Uuid::new_v4(); + let resource_id = uuid::Uuid::new_v4(); + + // Create the collection and resource. + let _collection = + insert_collection(collection_id, "collection", &pool).await; + let _resource = insert_resource(resource_id, "resource", &pool).await; + + // Immediately soft-delete the resource. + diesel::update( + resource::table.filter(resource::dsl::id.eq(resource_id)), + ) + .set(resource::dsl::time_deleted.eq(Utc::now())) + .execute_async(pool.pool()) + .await + .unwrap(); + + // Detach the resource from the collection. Observe a failure which is + // indistinguishable from the resource not existing. + let detach = Collection::detach_resource( + collection_id, + resource_id, + collection::table.into_boxed(), + resource::table.into_boxed(), + diesel::update(resource::table) + .set(resource::dsl::collection_id.eq(collection_id)), + ) + .detach_and_get_result_async(pool.pool()) + .await; + assert!(matches!(detach, Err(DetachError::ResourceNotFound))); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_detach_without_update_filter() { + let logctx = dev::test_setup_log("test_detach_without_update_filter"); + let mut db = test_setup_database(&logctx.log).await; + let cfg = db::Config { url: db.pg_config().clone() }; + let pool = db::Pool::new(&cfg); + + setup_db(&pool).await; + + let collection_id = uuid::Uuid::new_v4(); + + // Create the collection and some resources. + let _collection = + insert_collection(collection_id, "collection", &pool).await; + let resource_id1 = uuid::Uuid::new_v4(); + let resource_id2 = uuid::Uuid::new_v4(); + let _resource1 = + insert_resource(resource_id1, "resource1", &pool).await; + attach_resource(collection_id, resource_id1, &pool).await; + let _resource2 = + insert_resource(resource_id2, "resource2", &pool).await; + attach_resource(collection_id, resource_id2, &pool).await; + + // Detach the resource from the collection. + // + // NOTE: In the update statement, we aren't filtering by resource ID, + // even though we explicitly have two "live" resources". + let detach = Collection::detach_resource( + collection_id, + resource_id1, + collection::table.into_boxed(), + resource::table.into_boxed(), + diesel::update(resource::table) + .set(resource::dsl::collection_id.eq(Option::::None)), + ) + .detach_and_get_result_async(pool.pool()) + .await; + + let returned_resource = detach.expect("Detach should have worked"); + assert_eq!(returned_resource.id(), resource_id1); + + // Note that only "resource1" should be detached. + // "resource2" should have automatically been filtered away from the + // update statement, regardless of user input. + assert!(get_resource(resource_id1, &pool) + .await + .collection_id + .is_none()); + assert!(get_resource(resource_id2, &pool) + .await + .collection_id + .is_some()); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } +} diff --git a/nexus/src/db/collection_detach_many.rs b/nexus/src/db/collection_detach_many.rs new file mode 100644 index 00000000000..31addf386f2 --- /dev/null +++ b/nexus/src/db/collection_detach_many.rs @@ -0,0 +1,1193 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! CTE for detaching multiple resources from a collection. +//! +//! This atomically: +//! - Checks if the collection exists and is not soft deleted +//! - Validates conditions on both the collection and resources +//! - Updates the collection row +//! - Updates the resource rows + +use super::collection_attach::aliases::*; +use super::cte_utils::{ + BoxableTable, BoxableUpdateStatement, BoxedQuery, ExprSqlType, FilterBy, + QueryFromClause, QuerySqlType, +}; +use super::pool::DbConnection; +use crate::db::collection_attach::DatastoreAttachTarget; +use async_bb8_diesel::{AsyncRunQueryDsl, ConnectionManager, PoolError}; +use diesel::associations::HasTable; +use diesel::expression::{AsExpression, Expression}; +use diesel::helper_types::*; +use diesel::pg::Pg; +use diesel::prelude::*; +use diesel::query_builder::*; +use diesel::query_dsl::methods as query_methods; +use diesel::query_source::Table; +use diesel::sql_types::{Nullable, SingleValue}; +use std::fmt::Debug; + +/// Trait to be implemented by structs representing a detachable collection. +/// +/// A blanket implementation is provided for traits that implement +/// [`DatastoreAttachTarget`]. +pub trait DatastoreDetachManyTarget: + DatastoreAttachTarget +{ + /// Creates a statement for detaching a resource from the given collection. + /// + /// This statement allows callers to atomically check the state of a + /// collection and a resource while detaching a resource. + /// + /// - `collection_id`: Primary key of the collection being removed from. + /// - `collection_query`: An optional query for collection state. The + /// CTE will automatically filter this query to `collection_id`, and + /// validate that the "time deleted" column is NULL. + /// - `resource_query`: An optional query for the resource state. The + /// CTE will automatically filter this query to non-deleted resources. + /// validate that the "time deleted" column is NULL, and validate that the + /// "collection_id" column points to `collection_id`. + /// - `update_collection`: An update statement, identifying how the + /// collection object should be modified as associated resources are + /// detached. + /// - `update_resource`: An update statement, identifying how the resource + /// objects should be modified to be detached + /// + /// The VC, VR types refer to the "update target" of the UpdateStatements, + /// and should generally be inferred rather than explicitly specified. + fn detach_resources( + collection_id: Self::Id, + + collection_query: BoxedQuery>, + resource_query: BoxedQuery>, + + update_collection: UpdateStatement< + CollectionTable, + CollectionTableDefaultWhereClause, + VC, + >, + update_resource: UpdateStatement< + ResourceTable, + ResourceTableDefaultWhereClause, + VR, + >, + ) -> DetachManyFromCollectionStatement + where + // Treat the collection and resource as boxed tables. + CollectionTable: BoxableTable, + ResourceTable: BoxableTable, + + // Allows treating "collection_exists_query" as a boxed "dyn QueryFragment". + QueryFromClause>: + QueryFragment + Send, + QuerySqlType>: Send, + // Allows treating "resource_exists_query" as a boxed "dyn QueryFragment". + QueryFromClause>: + QueryFragment + Send, + QuerySqlType>: Send, + + // Allows calling ".filter()" on the boxed collection table. + BoxedQuery>: FilterBy, Self::Id>> + + FilterBy>, + // Allows calling ".filter()" on the boxed resource table. + BoxedQuery>: FilterBy> + + FilterBy>, + + // Allows calling "update.into_boxed()" + UpdateStatement< + CollectionTable, + CollectionTableDefaultWhereClause, + VC, + >: BoxableUpdateStatement, VC>, + UpdateStatement< + ResourceTable, + ResourceTableDefaultWhereClause, + VR, + >: BoxableUpdateStatement, VR>, + + // Allows calling + // ".filter(collection_table().primary_key().eq(collection_id)" on the + // boxed update statement. + BoxedUpdateStatement< + 'static, + Pg, + CollectionTable, + VC, + >: FilterBy, Self::Id>>, + // Allows calling + // ".filter(Self::ResourceTimeDeletedColumn::default().is_null())" + BoxedUpdateStatement< + 'static, + Pg, + ResourceTable, + VR, + >: FilterBy> + + FilterBy>, + + // Allows using "id" in expressions (e.g. ".eq(...)") with... + Self::Id: AsExpression< + // ... The Collection table's PK + SerializedCollectionPrimaryKey, + > + AsExpression< + // ... The Resource table's PK + SerializedResourcePrimaryKey, + > + AsExpression< + // ... The Resource table's FK to the Collection table + SerializedResourceForeignKey, + >, + ExprSqlType>: SingleValue, + ExprSqlType>: SingleValue, + ExprSqlType: SingleValue, + + ResourceType: Selectable, + { + let collection_table = + || as HasTable>::table(); + + // Create new queries to determine if the collection exists. + let collection_exists_query = Box::new( + collection_table() + .into_boxed() + .filter(collection_table().primary_key().eq(collection_id)) + .filter(Self::CollectionTimeDeletedColumn::default().is_null()), + ); + + // For the queries which decide whether or not we'll perform the update, + // extend the user-provided arguments. + // + // We force these queries to: + // - Check against the primary key of the target objects + // - Ensure the objects are not deleted + // - (for the resources) Ensure they are attached + // - (for the update) Ensure that only the collection with "collection_id" + // is modified. + let collection_query = Box::new( + collection_query + .filter(collection_table().primary_key().eq(collection_id)) + .filter(Self::CollectionTimeDeletedColumn::default().is_null()), + ); + let resource_query = Box::new( + resource_query + .filter(Self::ResourceTimeDeletedColumn::default().is_null()) + .filter( + Self::ResourceCollectionIdColumn::default() + .eq(collection_id), + ), + ); + + let update_collection_statement = update_collection + .into_boxed() + .filter(collection_table().primary_key().eq(collection_id)); + + let update_resource_statement = update_resource + .into_boxed() + .filter(Self::ResourceTimeDeletedColumn::default().is_null()) + .filter( + Self::ResourceCollectionIdColumn::default().eq(collection_id), + ); + + let collection_returning_clause = Self::as_returning(); + DetachManyFromCollectionStatement { + collection_exists_query, + collection_query, + resource_query, + update_collection_statement, + update_resource_statement, + collection_returning_clause, + } + } +} + +impl DatastoreDetachManyTarget for T where + T: DatastoreAttachTarget +{ +} + +/// The CTE described in the module docs +#[must_use = "Queries must be executed"] +pub struct DetachManyFromCollectionStatement +where + ResourceType: Selectable, + C: DatastoreDetachManyTarget, +{ + // Query which answers: "Does the collection exist?" + collection_exists_query: Box + Send>, + // A (mostly) user-provided query for validating the collection. + collection_query: Box + Send>, + // A (mostly) user-provided query for validating the resource. + resource_query: Box + Send>, + + // Update statement for the collection. + update_collection_statement: + BoxedUpdateStatement<'static, Pg, CollectionTable, VC>, + // Update statement for the resource. + update_resource_statement: + BoxedUpdateStatement<'static, Pg, ResourceTable, VR>, + // Describes what should be returned after UPDATE-ing the resource. + collection_returning_clause: AsSelect, +} + +impl QueryId + for DetachManyFromCollectionStatement +where + ResourceType: Selectable, + C: DatastoreDetachManyTarget, +{ + type QueryId = (); + const HAS_STATIC_QUERY_ID: bool = false; +} + +/// Result of [`DetachManyFromCollectionStatement`] when executed asynchronously +pub type AsyncDetachManyFromCollectionResult = + Result>; + +/// Result of [`DetachManyFromCollectionStatement`] when executed synchronously +pub type SyncDetachManyFromCollectionResult = + Result>; + +/// Errors returned by [`DetachManyFromCollectionStatement`]. +#[derive(Debug)] +pub enum DetachManyError { + /// The collection that the query was removing from does not exist + CollectionNotFound, + /// Although the collection exists, the update did not occur + /// + /// The unchanged resource and collection are returned as a part of this + /// error; it is the responsibility of the caller to determine which + /// condition was not met. + NoUpdate { collection: C }, + /// Other database error + DatabaseError(E), +} + +/// Describes the type returned from the actual CTE, which is parsed +/// and interpreted before propagating it to users of the Rust API. +pub type RawOutput = (i64, Option, Option); + +impl + DetachManyFromCollectionStatement +where + ResourceType: 'static + Debug + Send + Selectable, + C: 'static + Debug + DatastoreDetachManyTarget + Send, + ResourceTable: 'static + Table + Send + Copy + Debug, + VC: 'static + Send, + VR: 'static + Send, + DetachManyFromCollectionStatement: Send, +{ + /// Issues the CTE asynchronously and parses the result. + pub async fn detach_and_get_result_async( + self, + pool: &bb8::Pool>, + ) -> AsyncDetachManyFromCollectionResult + where + // We require this bound to ensure that "Self" is runnable as query. + Self: query_methods::LoadQuery<'static, DbConnection, RawOutput>, + { + self.get_result_async::>(pool) + .await + // If the database returns an error, propagate it right away. + .map_err(DetachManyError::DatabaseError) + // Otherwise, parse the output to determine if the CTE succeeded. + .and_then(Self::parse_result) + } + + /// Issues the CTE synchronously and parses the result. + pub fn detach_and_get_result( + self, + conn: &mut DbConnection, + ) -> SyncDetachManyFromCollectionResult + where + // We require this bound to ensure that "Self" is runnable as query. + Self: query_methods::LoadQuery<'static, DbConnection, RawOutput>, + { + self.get_result::>(conn) + .map_err(DetachManyError::DatabaseError) + .and_then(Self::parse_result) + } + + fn parse_result( + result: RawOutput, + ) -> Result> { + let (_, collection_before_update, collection_after_update) = result; + + let collection_before_update = collection_before_update + .ok_or_else(|| DetachManyError::CollectionNotFound)?; + + match collection_after_update { + Some(collection) => Ok(collection), + None => Err(DetachManyError::NoUpdate { + collection: collection_before_update, + }), + } + } +} + +type SelectableSqlType = + <>::SelectExpression as Expression>::SqlType; + +impl Query + for DetachManyFromCollectionStatement +where + ResourceType: Selectable, + C: DatastoreDetachManyTarget, +{ + type SqlType = ( + // Ignored "SELECT 1" value + diesel::sql_types::BigInt, + // If the collection exists, the value before update. + Nullable>, + // If the collection was updated, the new value. + Nullable>, + ); +} + +impl RunQueryDsl + for DetachManyFromCollectionStatement +where + ResourceType: Selectable, + C: DatastoreDetachManyTarget, +{ +} + +/// This implementation uses a CTE which attempts to do the following: +/// +/// 1. (collection_by_id): Identify if the collection exists at all. +/// 2. (collection_info, resource_info): Checks for arbitrary user-provided +/// constraints on the collection and resource objects. +/// 3. (do_update): IFF all previous checks succeeded, make a decision to perfom +/// an update. +/// 4. (updated_collection, updated_resource): Apply user-provided updates on +/// the collection and resource - presumably, setting the collection ID +/// value. +/// +/// This is implemented as follows: +/// +/// ```text +/// // WITH +/// // /* Look up the collection - Check for existence only! */ +/// // collection_by_id AS ( +/// // SELECT * FROM C +/// // WHERE = AND IS NULL +/// // FOR UPDATE +/// // ), +/// // /* Look up the collection - Check for additional constraints */ +/// // collection_info AS ( +/// // SELECT * FROM C +/// // WHERE = AND IS NULL AND +/// // +/// // FOR UPDATE +/// // ), +/// // /* Look up the resource - Check for additional constraints */ +/// // resource_info AS ( +/// // SELECT * FROM R +/// // WHERE IS NULL AND +/// // = AND +/// // FOR UPDATE +/// // ), +/// // /* Make a decision on whether or not to apply ANY updates */ +/// // do_update AS ( +/// // SELECT IF( +/// // EXISTS(SELECT id FROM collection_info) +/// // TRUE, FALSE), +/// // ), +/// // /* Update the collection */ +/// // updated_collection AS ( +/// // UPDATE C SET +/// // WHERE IN (SELECT FROM collection_info) AND (SELECT * FROM do_update) +/// // RETURNING * +/// // ), +/// // /* Update the resource */ +/// // updated_resource AS ( +/// // UPDATE R SET +/// // WHERE (id IN (SELECT id FROM resource_info)) AND (SELECT * FROM do_update) +/// // RETURNING 1 +/// // ) +/// // SELECT * FROM +/// // (SELECT 1) +/// // LEFT JOIN (SELECT * FROM collection_by_id) ON TRUE +/// // LEFT JOIN (SELECT * FROM updated_collection) ON TRUE; +/// ``` +impl QueryFragment + for DetachManyFromCollectionStatement +where + ResourceType: Selectable, + C: DatastoreDetachManyTarget, + ResourcePrimaryKey: diesel::Column, + // Necessary to "walk_ast" over "self.update_collection_statement". + BoxedUpdateStatement<'static, Pg, CollectionTable, VC>: + QueryFragment, + // Necessary to "walk_ast" over "self.update_resource_statement". + BoxedUpdateStatement<'static, Pg, ResourceTable, VR>: + QueryFragment, + // Necessary to "walk_ast" over "self.collection_returning_clause". + AsSelect: QueryFragment, +{ + fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> { + out.unsafe_to_cache_prepared(); + out.push_sql("WITH collection_by_id AS ("); + self.collection_exists_query.walk_ast(out.reborrow())?; + out.push_sql(" FOR UPDATE), "); + + out.push_sql("collection_info AS ("); + self.collection_query.walk_ast(out.reborrow())?; + out.push_sql(" FOR UPDATE), "); + + out.push_sql("resource_info AS ("); + self.resource_query.walk_ast(out.reborrow())?; + out.push_sql(" FOR UPDATE), "); + + out.push_sql("do_update AS (SELECT IF(EXISTS(SELECT "); + out.push_identifier(CollectionIdColumn::::NAME)?; + out.push_sql(" FROM collection_info), TRUE,FALSE)), "); + + out.push_sql("updated_collection AS ("); + self.update_collection_statement.walk_ast(out.reborrow())?; + // NOTE: It is safe to start with "AND" - we forced the update statement + // to have a WHERE clause on the primary key of the resource. + out.push_sql(" AND (SELECT * FROM do_update)"); + out.push_sql(" RETURNING "); + self.collection_returning_clause.walk_ast(out.reborrow())?; + out.push_sql("), "); + + out.push_sql("updated_resource AS ("); + self.update_resource_statement.walk_ast(out.reborrow())?; + // NOTE: It is safe to start with "AND" - we forced the update statement + // to have a WHERE clause on the time deleted column. + out.push_sql(" AND ("); + out.push_identifier(ResourcePrimaryKey::::NAME)?; + out.push_sql(" IN (SELECT "); + out.push_identifier(ResourcePrimaryKey::::NAME)?; + out.push_sql(" FROM resource_info))"); + out.push_sql(" AND (SELECT * FROM do_update) RETURNING 1) "); + + // Why do all these LEFT JOINs here? In short, to ensure that we are + // always returning a constant number of columns. + // + // Diesel parses output "one column at a time", mapping to structs or + // tuples. For example, when deserializing an "Option<(A, B, C)>" object, + // Diesel checks nullability of the "A", "B", and "C" columns. + // If any of those columns unexpectedly return NULL, the entire object is + // treated as "None". + // + // In summary: + // - Without the LEFT JOINs, we'd occassionally be returning "zero + // rows", which would make the output entirely unparseable. + // - If we used an operation like COALESCE (which attempts to map the + // result of an expression to either "NULL" or a single tuple column), + // Diesel struggles to map the result back to a structure. + // + // By returning a static number of columns, each component of the + // "RawOutput" tuple can be parsed, regardless of nullability, without + // preventing later portions of the result from being parsed. + out.push_sql( + "SELECT * FROM \ + (SELECT 1) \ + LEFT JOIN (SELECT * FROM collection_by_id) ON TRUE \ + LEFT JOIN (SELECT * FROM updated_collection) ON TRUE;", + ); + + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::{DatastoreDetachManyTarget, DetachManyError}; + use crate::db::collection_attach::DatastoreAttachTarget; + use crate::db::{ + self, error::TransactionError, identity::Resource as IdentityResource, + }; + use async_bb8_diesel::{ + AsyncConnection, AsyncRunQueryDsl, AsyncSimpleConnection, + }; + use chrono::Utc; + use db_macros::Resource; + use diesel::expression_methods::ExpressionMethods; + use diesel::pg::Pg; + use diesel::QueryDsl; + use diesel::SelectableHelper; + use nexus_test_utils::db::test_setup_database; + use omicron_common::api::external::{IdentityMetadataCreateParams, Name}; + use omicron_test_utils::dev; + use uuid::Uuid; + + table! { + test_schema.collection (id) { + id -> Uuid, + name -> Text, + description -> Text, + time_created -> Timestamptz, + time_modified -> Timestamptz, + time_deleted -> Nullable, + } + } + + table! { + test_schema.resource (id) { + id -> Uuid, + name -> Text, + description -> Text, + time_created -> Timestamptz, + time_modified -> Timestamptz, + time_deleted -> Nullable, + collection_id -> Nullable, + } + } + + async fn setup_db(pool: &crate::db::Pool) { + let connection = pool.pool().get().await.unwrap(); + (*connection) + .batch_execute_async( + "CREATE SCHEMA IF NOT EXISTS test_schema; \ + CREATE TABLE IF NOT EXISTS test_schema.collection ( \ + id UUID PRIMARY KEY, \ + name STRING(63) NOT NULL, \ + description STRING(512) NOT NULL, \ + time_created TIMESTAMPTZ NOT NULL, \ + time_modified TIMESTAMPTZ NOT NULL, \ + time_deleted TIMESTAMPTZ); \ + CREATE TABLE IF NOT EXISTS test_schema.resource( \ + id UUID PRIMARY KEY, \ + name STRING(63) NOT NULL, \ + description STRING(512) NOT NULL, \ + time_created TIMESTAMPTZ NOT NULL, \ + time_modified TIMESTAMPTZ NOT NULL, \ + time_deleted TIMESTAMPTZ, \ + collection_id UUID); \ + CREATE INDEX IF NOT EXISTS collection_index ON test_schema.resource ( \ + collection_id \ + ) WHERE collection_id IS NOT NULL AND time_deleted IS NULL; \ + TRUNCATE test_schema.collection; \ + TRUNCATE test_schema.resource", + ) + .await + .unwrap(); + } + + /// Describes a resource within the database. + #[derive( + Clone, Queryable, Insertable, Debug, Resource, Selectable, PartialEq, + )] + #[diesel(table_name = resource)] + struct Resource { + #[diesel(embed)] + pub identity: ResourceIdentity, + pub collection_id: Option, + } + + #[derive( + Clone, Queryable, Insertable, Debug, Resource, Selectable, PartialEq, + )] + #[diesel(table_name = collection)] + struct Collection { + #[diesel(embed)] + pub identity: CollectionIdentity, + } + + impl DatastoreAttachTarget for Collection { + type Id = uuid::Uuid; + + type CollectionIdColumn = collection::dsl::id; + type CollectionTimeDeletedColumn = collection::dsl::time_deleted; + + type ResourceIdColumn = resource::dsl::id; + type ResourceCollectionIdColumn = resource::dsl::collection_id; + type ResourceTimeDeletedColumn = resource::dsl::time_deleted; + } + + async fn insert_collection( + id: Uuid, + name: &str, + pool: &db::Pool, + ) -> Collection { + let create_params = IdentityMetadataCreateParams { + name: Name::try_from(name.to_string()).unwrap(), + description: "description".to_string(), + }; + let c = + Collection { identity: CollectionIdentity::new(id, create_params) }; + + diesel::insert_into(collection::table) + .values(c) + .execute_async(pool.pool()) + .await + .unwrap(); + + get_collection(id, &pool).await + } + + async fn get_collection(id: Uuid, pool: &db::Pool) -> Collection { + collection::table + .find(id) + .select(Collection::as_select()) + .first_async(pool.pool()) + .await + .unwrap() + } + + async fn insert_resource( + id: Uuid, + name: &str, + pool: &db::Pool, + ) -> Resource { + let create_params = IdentityMetadataCreateParams { + name: Name::try_from(name.to_string()).unwrap(), + description: "description".to_string(), + }; + let r = Resource { + identity: ResourceIdentity::new(id, create_params), + collection_id: None, + }; + + diesel::insert_into(resource::table) + .values(r) + .execute_async(pool.pool()) + .await + .unwrap(); + + get_resource(id, &pool).await + } + + async fn attach_resource( + collection_id: Uuid, + resource_id: Uuid, + pool: &db::Pool, + ) { + Collection::attach_resource( + collection_id, + resource_id, + collection::table.into_boxed(), + resource::table.into_boxed(), + 100, + diesel::update(resource::table) + .set(resource::dsl::collection_id.eq(collection_id)), + ) + .attach_and_get_result_async(pool.pool()) + .await + .unwrap(); + } + + async fn get_resource(id: Uuid, pool: &db::Pool) -> Resource { + resource::table + .find(id) + .select(Resource::as_select()) + .first_async(pool.pool()) + .await + .unwrap() + } + + #[test] + fn test_verify_query() { + let collection_id = + uuid::Uuid::parse_str("cccccccc-cccc-cccc-cccc-cccccccccccc") + .unwrap(); + let _resource_id = + uuid::Uuid::parse_str("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa") + .unwrap(); + let detach = Collection::detach_resources( + collection_id, + collection::table.into_boxed(), + resource::table.into_boxed(), + diesel::update(collection::table) + .set(collection::dsl::description.eq("Updated desc")), + diesel::update(resource::table) + .set(resource::dsl::collection_id.eq(Option::::None)), + ); + let query = diesel::debug_query::(&detach).to_string(); + + let expected_query = "WITH \ + collection_by_id AS (\ + SELECT \ + \"test_schema\".\"collection\".\"id\", \ + \"test_schema\".\"collection\".\"name\", \ + \"test_schema\".\"collection\".\"description\", \ + \"test_schema\".\"collection\".\"time_created\", \ + \"test_schema\".\"collection\".\"time_modified\", \ + \"test_schema\".\"collection\".\"time_deleted\" \ + FROM \"test_schema\".\"collection\" \ + WHERE (\ + (\"test_schema\".\"collection\".\"id\" = $1) AND \ + (\"test_schema\".\"collection\".\"time_deleted\" IS NULL)\ + ) FOR UPDATE\ + ), \ + collection_info AS (\ + SELECT \ + \"test_schema\".\"collection\".\"id\", \ + \"test_schema\".\"collection\".\"name\", \ + \"test_schema\".\"collection\".\"description\", \ + \"test_schema\".\"collection\".\"time_created\", \ + \"test_schema\".\"collection\".\"time_modified\", \ + \"test_schema\".\"collection\".\"time_deleted\" \ + FROM \"test_schema\".\"collection\" \ + WHERE (\ + (\"test_schema\".\"collection\".\"id\" = $2) AND \ + (\"test_schema\".\"collection\".\"time_deleted\" IS NULL)\ + ) FOR UPDATE\ + ), \ + resource_info AS (\ + SELECT \ + \"test_schema\".\"resource\".\"id\", \ + \"test_schema\".\"resource\".\"name\", \ + \"test_schema\".\"resource\".\"description\", \ + \"test_schema\".\"resource\".\"time_created\", \ + \"test_schema\".\"resource\".\"time_modified\", \ + \"test_schema\".\"resource\".\"time_deleted\", \ + \"test_schema\".\"resource\".\"collection_id\" \ + FROM \"test_schema\".\"resource\" \ + WHERE (\ + (\"test_schema\".\"resource\".\"time_deleted\" IS NULL) AND \ + (\"test_schema\".\"resource\".\"collection_id\" = $3)\ + ) FOR UPDATE\ + ), \ + do_update AS (\ + SELECT IF(\ + EXISTS(SELECT \"id\" FROM collection_info), \ + TRUE,\ + FALSE)\ + ), \ + updated_collection AS (\ + UPDATE \ + \"test_schema\".\"collection\" \ + SET \ + \"description\" = $4 \ + WHERE \ + (\"test_schema\".\"collection\".\"id\" = $5) AND \ + (SELECT * FROM do_update) \ + RETURNING \ + \"test_schema\".\"collection\".\"id\", \ + \"test_schema\".\"collection\".\"name\", \ + \"test_schema\".\"collection\".\"description\", \ + \"test_schema\".\"collection\".\"time_created\", \ + \"test_schema\".\"collection\".\"time_modified\", \ + \"test_schema\".\"collection\".\"time_deleted\"\ + ), \ + updated_resource AS (\ + UPDATE \ + \"test_schema\".\"resource\" \ + SET \ + \"collection_id\" = $6 \ + WHERE \ + ((\"test_schema\".\"resource\".\"time_deleted\" IS NULL) AND \ + (\"test_schema\".\"resource\".\"collection_id\" = $7)) AND \ + (\"id\" IN (SELECT \"id\" FROM resource_info)) AND \ + (SELECT * FROM do_update) \ + RETURNING 1\ + ) \ + SELECT * FROM \ + (SELECT 1) \ + LEFT JOIN (SELECT * FROM collection_by_id) ON TRUE \ + LEFT JOIN (SELECT * FROM updated_collection) ON TRUE; -- binds: [cccccccc-cccc-cccc-cccc-cccccccccccc, cccccccc-cccc-cccc-cccc-cccccccccccc, cccccccc-cccc-cccc-cccc-cccccccccccc, \"Updated desc\", cccccccc-cccc-cccc-cccc-cccccccccccc, None, cccccccc-cccc-cccc-cccc-cccccccccccc]"; + assert_eq!(query, expected_query); + } + + #[tokio::test] + async fn test_detach_missing_collection_fails() { + let logctx = + dev::test_setup_log("test_detach_missing_collection_fails"); + let mut db = test_setup_database(&logctx.log).await; + let cfg = db::Config { url: db.pg_config().clone() }; + let pool = db::Pool::new(&cfg); + + setup_db(&pool).await; + + let collection_id = uuid::Uuid::new_v4(); + let _resource_id = uuid::Uuid::new_v4(); + let detach = Collection::detach_resources( + collection_id, + collection::table.into_boxed(), + resource::table.into_boxed(), + diesel::update(collection::table) + .set(collection::dsl::description.eq("Updated desc")), + diesel::update(resource::table) + .set(resource::dsl::collection_id.eq(Option::::None)), + ) + .detach_and_get_result_async(pool.pool()) + .await; + + assert!(matches!(detach, Err(DetachManyError::CollectionNotFound))); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_detach_missing_resource_succeeds() { + let logctx = + dev::test_setup_log("test_detach_missing_resource_succeeds"); + let mut db = test_setup_database(&logctx.log).await; + let cfg = db::Config { url: db.pg_config().clone() }; + let pool = db::Pool::new(&cfg); + + setup_db(&pool).await; + + let collection_id = uuid::Uuid::new_v4(); + let _resource_id = uuid::Uuid::new_v4(); + + // Create the collection + let _collection = + insert_collection(collection_id, "collection", &pool).await; + + // Attempt to detach - even though the resource does not exist. + let detach = Collection::detach_resources( + collection_id, + collection::table.into_boxed(), + resource::table.into_boxed(), + diesel::update(collection::table) + .set(collection::dsl::description.eq("Updated desc")), + diesel::update(resource::table) + .set(resource::dsl::collection_id.eq(Option::::None)), + ) + .detach_and_get_result_async(pool.pool()) + .await; + + let returned_collection = detach.expect("Detach should have worked"); + assert_eq!(returned_collection.description(), "Updated desc"); + // The collection should still be updated. + assert_eq!( + returned_collection, + get_collection(collection_id, &pool).await + ); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_detach_once() { + let logctx = dev::test_setup_log("test_detach_once"); + let mut db = test_setup_database(&logctx.log).await; + let cfg = db::Config { url: db.pg_config().clone() }; + let pool = db::Pool::new(&cfg); + + setup_db(&pool).await; + + let collection_id = uuid::Uuid::new_v4(); + let resource_id = uuid::Uuid::new_v4(); + + // Create the collection and resource. Attach them. + let _collection = + insert_collection(collection_id, "collection", &pool).await; + let _resource = insert_resource(resource_id, "resource", &pool).await; + attach_resource(collection_id, resource_id, &pool).await; + + // Detach the resource from the collection. + let detach = Collection::detach_resources( + collection_id, + collection::table.into_boxed(), + resource::table.into_boxed(), + diesel::update(collection::table) + .set(collection::dsl::description.eq("Updated desc")), + diesel::update(resource::table) + .set(resource::dsl::collection_id.eq(Option::::None)), + ) + .detach_and_get_result_async(pool.pool()) + .await; + + // "detach_and_get_result_async" should return the updated collection. + let returned_collection = detach.expect("Detach should have worked"); + // The returned value should be the latest value in the DB. + assert_eq!( + returned_collection, + get_collection(collection_id, &pool).await + ); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_detach_once_synchronous() { + let logctx = dev::test_setup_log("test_detach_once_synchronous"); + let mut db = test_setup_database(&logctx.log).await; + let cfg = db::Config { url: db.pg_config().clone() }; + let pool = db::Pool::new(&cfg); + + setup_db(&pool).await; + + let collection_id = uuid::Uuid::new_v4(); + let resource_id = uuid::Uuid::new_v4(); + + // Create the collection and resource. + let _collection = + insert_collection(collection_id, "collection", &pool).await; + let _resource = insert_resource(resource_id, "resource", &pool).await; + attach_resource(collection_id, resource_id, &pool).await; + + // Detach the resource from the collection. + let detach_query = Collection::detach_resources( + collection_id, + collection::table.into_boxed(), + resource::table.into_boxed(), + diesel::update(collection::table) + .set(collection::dsl::description.eq("Updated desc")), + diesel::update(resource::table) + .set(resource::dsl::collection_id.eq(Option::::None)), + ); + + type TxnError = TransactionError< + DetachManyError, + >; + let result = pool + .pool() + .transaction(move |conn| { + detach_query.detach_and_get_result(conn).map_err(|e| match e { + DetachManyError::DatabaseError(e) => TxnError::from(e), + e => TxnError::CustomError(e), + }) + }) + .await; + + // "detach_and_get_result" should return the "detached" resource. + let returned_collection = result.expect("Detach should have worked"); + // The returned values should be the latest value in the DB. + assert_eq!( + returned_collection, + get_collection(collection_id, &pool).await + ); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_detach_while_already_detached() { + let logctx = dev::test_setup_log("test_detach_while_already_detached"); + let mut db = test_setup_database(&logctx.log).await; + let cfg = db::Config { url: db.pg_config().clone() }; + let pool = db::Pool::new(&cfg); + + setup_db(&pool).await; + + let collection_id = uuid::Uuid::new_v4(); + + let _collection = + insert_collection(collection_id, "collection", &pool).await; + let resource_id = uuid::Uuid::new_v4(); + let _resource = insert_resource(resource_id, "resource", &pool).await; + attach_resource(collection_id, resource_id, &pool).await; + + // Detach a resource from a collection, as usual. + let detach = Collection::detach_resources( + collection_id, + collection::table.into_boxed(), + resource::table.into_boxed(), + diesel::update(collection::table) + .set(collection::dsl::description.eq("Updated desc")), + diesel::update(resource::table) + .set(resource::dsl::collection_id.eq(Option::::None)), + ) + .detach_and_get_result_async(pool.pool()) + .await; + assert_eq!( + detach.expect("Detach should have worked").description(), + "Updated desc" + ); + + // Try detaching once more. This one won't detach anything, but + // we still expect it to succeed. + let detach = Collection::detach_resources( + collection_id, + collection::table.into_boxed(), + resource::table.into_boxed(), + diesel::update(collection::table) + .set(collection::dsl::description.eq("... and again!")), + diesel::update(resource::table) + .set(resource::dsl::collection_id.eq(Option::::None)), + ) + .detach_and_get_result_async(pool.pool()) + .await; + assert_eq!( + detach.expect("Detach should have worked").description(), + "... and again!" + ); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_detach_filter_collection() { + let logctx = dev::test_setup_log("test_detach_filter_collection"); + let mut db = test_setup_database(&logctx.log).await; + let cfg = db::Config { url: db.pg_config().clone() }; + let pool = db::Pool::new(&cfg); + + setup_db(&pool).await; + + let collection_id = uuid::Uuid::new_v4(); + + let _collection = + insert_collection(collection_id, "collection", &pool).await; + let resource_id = uuid::Uuid::new_v4(); + let _resource = insert_resource(resource_id, "resource", &pool).await; + attach_resource(collection_id, resource_id, &pool).await; + + // Detach a resource from a collection, but do so with a picky filter + // on the collectipon. + let detach = Collection::detach_resources( + collection_id, + collection::table + .into_boxed() + .filter(collection::dsl::name.eq("This name will not match")), + resource::table.into_boxed(), + diesel::update(collection::table) + .set(collection::dsl::description.eq("Updated desc")), + diesel::update(resource::table) + .set(resource::dsl::collection_id.eq(Option::::None)), + ) + .detach_and_get_result_async(pool.pool()) + .await; + + let err = detach.expect_err("Expected this detach to fail"); + + // A caller should be able to inspect this result; the collection + // exists but has a different name than requested. + match err { + DetachManyError::NoUpdate { collection } => { + assert_eq!( + collection, + get_collection(collection_id, &pool).await + ); + } + _ => panic!("Unexpected error: {:?}", err), + }; + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_detach_deleted_resource() { + let logctx = dev::test_setup_log("test_detach_deleted_resource"); + let mut db = test_setup_database(&logctx.log).await; + let cfg = db::Config { url: db.pg_config().clone() }; + let pool = db::Pool::new(&cfg); + + setup_db(&pool).await; + + let collection_id = uuid::Uuid::new_v4(); + let resource_id = uuid::Uuid::new_v4(); + + // Create the collection and resource. + let _collection = + insert_collection(collection_id, "collection", &pool).await; + let _resource = insert_resource(resource_id, "resource", &pool).await; + attach_resource(collection_id, resource_id, &pool).await; + + // Immediately soft-delete the resource. + diesel::update( + resource::table.filter(resource::dsl::id.eq(resource_id)), + ) + .set(resource::dsl::time_deleted.eq(Utc::now())) + .execute_async(pool.pool()) + .await + .unwrap(); + + // Detach the resource from the collection. Observe a failure which is + // indistinguishable from the resource not existing. + let detach = Collection::detach_resources( + collection_id, + collection::table.into_boxed(), + resource::table.into_boxed(), + diesel::update(collection::table) + .set(collection::dsl::description.eq("Updated desc")), + diesel::update(resource::table) + .set(resource::dsl::collection_id.eq(collection_id)), + ) + .detach_and_get_result_async(pool.pool()) + .await; + + assert_eq!( + detach.expect("Detach should have worked").description(), + "Updated desc" + ); + assert_eq!( + get_resource(resource_id, &pool) + .await + .collection_id + .as_ref() + .expect("Should be deleted, but still attached"), + &collection_id, + ); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_detach_many() { + let logctx = dev::test_setup_log("test_detach_many"); + let mut db = test_setup_database(&logctx.log).await; + let cfg = db::Config { url: db.pg_config().clone() }; + let pool = db::Pool::new(&cfg); + + setup_db(&pool).await; + + // Create the collection and some resources. + let collection_id1 = uuid::Uuid::new_v4(); + let _collection1 = + insert_collection(collection_id1, "collection", &pool).await; + let resource_id1 = uuid::Uuid::new_v4(); + let resource_id2 = uuid::Uuid::new_v4(); + let _resource1 = + insert_resource(resource_id1, "resource1", &pool).await; + attach_resource(collection_id1, resource_id1, &pool).await; + let _resource2 = + insert_resource(resource_id2, "resource2", &pool).await; + attach_resource(collection_id1, resource_id2, &pool).await; + + // Create a separate collection with a resource. + // + // We will check that this resource is untouched after operating + // on "collection_id1". + let collection_id2 = uuid::Uuid::new_v4(); + let _collection2 = + insert_collection(collection_id2, "collection2", &pool).await; + let resource_id3 = uuid::Uuid::new_v4(); + let _resource3 = + insert_resource(resource_id3, "resource3", &pool).await; + attach_resource(collection_id2, resource_id3, &pool).await; + + // Detach the resource from the collection. + let detach = Collection::detach_resources( + collection_id1, + collection::table.into_boxed(), + resource::table.into_boxed(), + diesel::update(collection::table) + .set(collection::dsl::description.eq("Updated desc")), + diesel::update(resource::table) + .set(resource::dsl::collection_id.eq(Option::::None)), + ) + .detach_and_get_result_async(pool.pool()) + .await; + + let returned_resource = detach.expect("Detach should have worked"); + assert_eq!(returned_resource.id(), collection_id1); + assert_eq!(returned_resource.description(), "Updated desc"); + + // Note that only "resource1" and "resource2" should be detached. + assert!(get_resource(resource_id1, &pool) + .await + .collection_id + .is_none()); + assert!(get_resource(resource_id2, &pool) + .await + .collection_id + .is_none()); + + // "resource3" should have been left alone. + assert_eq!( + get_resource(resource_id3, &pool) + .await + .collection_id + .as_ref() + .expect("Should still be attached"), + &collection_id2 + ); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } +} diff --git a/nexus/src/db/cte_utils.rs b/nexus/src/db/cte_utils.rs new file mode 100644 index 00000000000..42743647f7e --- /dev/null +++ b/nexus/src/db/cte_utils.rs @@ -0,0 +1,86 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Utilities for writing CTEs. + +use diesel::associations::HasTable; +use diesel::expression::Expression; +use diesel::pg::Pg; +use diesel::prelude::*; +use diesel::query_builder::*; +use diesel::query_dsl::methods as query_methods; +use diesel::query_source::Table; + +/// The default WHERE clause of a table, when treated as an UPDATE target. +pub(crate) type TableDefaultWhereClause
= +
::WhereClause; + +// Short-hand type accessors. +pub(crate) type QueryFromClause = ::FromClause; +pub(crate) type QuerySqlType = ::SqlType; +pub(crate) type ExprSqlType = ::SqlType; +type TableSqlType = ::SqlType; + +pub(crate) type BoxedQuery = diesel::helper_types::IntoBoxed<'static, T, Pg>; +pub(crate) type BoxedDslOutput = + diesel::internal::table_macro::BoxedSelectStatement< + 'static, + TableSqlType, + diesel::internal::table_macro::FromClause, + Pg, + >; + +/// Ensures that the type is a Diesel table, and that we can call ".table" and +/// ".into_boxed()" on it. +pub trait BoxableTable: + HasTable
+ + 'static + + Send + + Table + + IntoUpdateTarget + + query_methods::BoxedDsl<'static, Pg, Output = BoxedDslOutput> +{ +} +impl BoxableTable for T where + T: HasTable
+ + 'static + + Send + + Table + + IntoUpdateTarget + + query_methods::BoxedDsl<'static, Pg, Output = BoxedDslOutput> +{ +} + +/// Ensures that calling ".filter(predicate)" on this type is callable, and does +/// not change the underlying type. +pub trait FilterBy: + query_methods::FilterDsl +{ +} +impl FilterBy for T where + T: query_methods::FilterDsl +{ +} + +/// Allows calling ".into_boxed" on an update statement. +pub trait BoxableUpdateStatement: + query_methods::BoxedDsl< + 'static, + Pg, + Output = BoxedUpdateStatement<'static, Pg, Table, V>, +> +where + Table: QuerySource, +{ +} +impl BoxableUpdateStatement for T +where + T: query_methods::BoxedDsl< + 'static, + Pg, + Output = BoxedUpdateStatement<'static, Pg, Table, V>, + >, + Table: QuerySource, +{ +} diff --git a/nexus/src/db/datastore.rs b/nexus/src/db/datastore.rs index c80d3c81e89..047df522949 100644 --- a/nexus/src/db/datastore.rs +++ b/nexus/src/db/datastore.rs @@ -28,6 +28,11 @@ use super::Pool; use crate::authn; use crate::authz::{self, ApiResource}; use crate::context::OpContext; +use crate::db::collection_attach::{AttachError, DatastoreAttachTarget}; +use crate::db::collection_detach::{DatastoreDetachTarget, DetachError}; +use crate::db::collection_detach_many::{ + DatastoreDetachManyTarget, DetachManyError, +}; use crate::db::fixed_data::role_assignment::BUILTIN_ROLE_ASSIGNMENTS; use crate::db::fixed_data::role_builtin::BUILTIN_ROLES; use crate::db::fixed_data::silo::DEFAULT_SILO; @@ -996,47 +1001,65 @@ impl DataStore { // This is subject to change, but for now we're going to say that an // instance must be "stopped" or "failed" in order to delete it. The // delete operation sets "time_deleted" (just like with other objects) - // and also sets the state to "destroyed". By virtue of being - // "stopped", we assume there are no dependencies on this instance - // (e.g., disk attachments). If that changes, we'll want to check for - // such dependencies here. + // and also sets the state to "destroyed". use api::external::InstanceState as ApiInstanceState; use db::model::InstanceState as DbInstanceState; - use db::schema::instance::dsl; - - let now = Utc::now(); + use db::schema::{disk, instance}; - let destroyed = DbInstanceState::new(ApiInstanceState::Destroyed); let stopped = DbInstanceState::new(ApiInstanceState::Stopped); let failed = DbInstanceState::new(ApiInstanceState::Failed); - - let instance_id = authz_instance.id(); - let result = diesel::update(dsl::instance) - .filter(dsl::time_deleted.is_null()) - .filter(dsl::id.eq(instance_id)) - .filter(dsl::state.eq_any(vec![stopped, failed])) - .set((dsl::state.eq(destroyed), dsl::time_deleted.eq(now))) - .check_if_exists::(instance_id) - .execute_and_check(self.pool()) - .await - .map_err(|e| { - public_error_from_diesel_pool( - e, - ErrorHandler::NotFoundByResource(authz_instance), - ) - })?; - - match result.status { - UpdateStatus::Updated => Ok(()), - UpdateStatus::NotUpdatedButExists => { - return Err(Error::InvalidRequest { - message: format!( + let destroyed = DbInstanceState::new(ApiInstanceState::Destroyed); + let ok_to_delete_instance_states = vec![stopped, failed]; + + let detached_label = api::external::DiskState::Detached.label(); + let ok_to_detach_disk_states = + vec![api::external::DiskState::Attached(authz_instance.id())]; + let ok_to_detach_disk_state_labels: Vec<_> = + ok_to_detach_disk_states.iter().map(|s| s.label()).collect(); + + let _instance = Instance::detach_resources( + authz_instance.id(), + instance::table.into_boxed().filter( + instance::dsl::state.eq_any(ok_to_delete_instance_states), + ), + disk::table.into_boxed().filter( + disk::dsl::disk_state.eq_any(ok_to_detach_disk_state_labels), + ), + diesel::update(instance::dsl::instance).set(( + instance::dsl::state.eq(destroyed), + instance::dsl::time_deleted.eq(Utc::now()), + )), + diesel::update(disk::dsl::disk).set(( + disk::dsl::disk_state.eq(detached_label), + disk::dsl::attach_instance_id.eq(Option::::None), + )), + ) + .detach_and_get_result_async(self.pool_authorized(opctx).await?) + .await + .map_err(|e| match e { + DetachManyError::CollectionNotFound => Error::not_found_by_id( + ResourceType::Instance, + &authz_instance.id(), + ), + DetachManyError::NoUpdate { collection } => { + let instance_state = collection.runtime_state.state.state(); + match instance_state { + api::external::InstanceState::Stopped + | api::external::InstanceState::Failed => { + Error::internal_error("cannot delete instance") + } + _ => Error::invalid_request(&format!( "instance cannot be deleted in state \"{}\"", - result.found.runtime_state.state.state() - ), - }); + instance_state, + )), + } } - } + DetachManyError::DatabaseError(e) => { + public_error_from_diesel_pool(e, ErrorHandler::Server) + } + })?; + + Ok(()) } // Disks @@ -1112,6 +1135,270 @@ impl DataStore { .map_err(|e| public_error_from_diesel_pool(e, ErrorHandler::Server)) } + /// Attaches a disk to an instance, if both objects: + /// - Exist + /// - Are in valid states + /// - Are under the maximum "attach count" threshold + pub async fn instance_attach_disk( + &self, + opctx: &OpContext, + authz_instance: &authz::Instance, + authz_disk: &authz::Disk, + max_disks: u32, + ) -> Result<(Instance, Disk), Error> { + use db::schema::{disk, instance}; + + opctx.authorize(authz::Action::Modify, authz_instance).await?; + opctx.authorize(authz::Action::Modify, authz_disk).await?; + + let ok_to_attach_disk_states = vec![ + api::external::DiskState::Creating, + api::external::DiskState::Detached, + ]; + let ok_to_attach_disk_state_labels: Vec<_> = + ok_to_attach_disk_states.iter().map(|s| s.label()).collect(); + + // TODO(https://github.com/oxidecomputer/omicron/issues/811): + // This list of instance attach states is more restrictive than it + // plausibly could be. + // + // We currently only permit attaching disks to stopped instances. + let ok_to_attach_instance_states = vec![ + db::model::InstanceState(api::external::InstanceState::Creating), + db::model::InstanceState(api::external::InstanceState::Stopped), + ]; + + let attached_label = + api::external::DiskState::Attached(authz_instance.id()).label(); + + let (instance, disk) = Instance::attach_resource( + authz_instance.id(), + authz_disk.id(), + instance::table + .into_boxed() + .filter(instance::dsl::state.eq_any(ok_to_attach_instance_states)), + disk::table + .into_boxed() + .filter(disk::dsl::disk_state.eq_any(ok_to_attach_disk_state_labels)), + max_disks, + diesel::update(disk::dsl::disk) + .set(( + disk::dsl::disk_state.eq(attached_label), + disk::dsl::attach_instance_id.eq(authz_instance.id()) + )) + ) + .attach_and_get_result_async(self.pool_authorized(opctx).await?) + .await + .or_else(|e| { + match e { + AttachError::CollectionNotFound => { + Err(Error::not_found_by_id( + ResourceType::Instance, + &authz_instance.id(), + )) + }, + AttachError::ResourceNotFound => { + Err(Error::not_found_by_id( + ResourceType::Disk, + &authz_disk.id(), + )) + }, + AttachError::NoUpdate { attached_count, resource, collection } => { + let disk_state = resource.state().into(); + match disk_state { + // Idempotent errors: We did not perform an update, + // because we're already in the process of attaching. + api::external::DiskState::Attached(id) if id == authz_instance.id() => { + return Ok((collection, resource)); + } + api::external::DiskState::Attaching(id) if id == authz_instance.id() => { + return Ok((collection, resource)); + } + // Ok-to-attach disk states: Inspect the state to infer + // why we did not attach. + api::external::DiskState::Creating | + api::external::DiskState::Detached => { + match collection.runtime_state.state.state() { + // Ok-to-be-attached instance states: + api::external::InstanceState::Creating | + api::external::InstanceState::Stopped => { + // The disk is ready to be attached, and the + // instance is ready to be attached. Perhaps + // we are at attachment capacity? + if attached_count == i64::from(max_disks) { + return Err(Error::invalid_request(&format!( + "cannot attach more than {} disks to instance", + max_disks + ))); + } + + // We can't attach, but the error hasn't + // helped us infer why. + return Err(Error::internal_error( + "cannot attach disk" + )); + } + // Not okay-to-be-attached instance states: + _ => { + Err(Error::invalid_request(&format!( + "cannot attach disk to instance in {} state", + collection.runtime_state.state.state(), + ))) + } + } + }, + // Not-okay-to-attach disk states: The disk is attached elsewhere. + api::external::DiskState::Attached(_) | + api::external::DiskState::Attaching(_) | + api::external::DiskState::Detaching(_) => { + Err(Error::invalid_request(&format!( + "cannot attach disk \"{}\": disk is attached to another instance", + resource.name().as_str(), + ))) + } + _ => { + Err(Error::invalid_request(&format!( + "cannot attach disk \"{}\": invalid state {}", + resource.name().as_str(), + disk_state, + ))) + } + } + }, + AttachError::DatabaseError(e) => { + Err(public_error_from_diesel_pool(e, ErrorHandler::Server)) + }, + } + })?; + + Ok((instance, disk)) + } + + pub async fn instance_detach_disk( + &self, + opctx: &OpContext, + authz_instance: &authz::Instance, + authz_disk: &authz::Disk, + ) -> Result { + use db::schema::{disk, instance}; + + opctx.authorize(authz::Action::Modify, authz_instance).await?; + opctx.authorize(authz::Action::Modify, authz_disk).await?; + + let ok_to_detach_disk_states = + vec![api::external::DiskState::Attached(authz_instance.id())]; + let ok_to_detach_disk_state_labels: Vec<_> = + ok_to_detach_disk_states.iter().map(|s| s.label()).collect(); + + // TODO(https://github.com/oxidecomputer/omicron/issues/811): + // This list of instance detach states is more restrictive than it + // plausibly could be. + // + // We currently only permit detaching disks from stopped instances. + let ok_to_detach_instance_states = vec![ + db::model::InstanceState(api::external::InstanceState::Creating), + db::model::InstanceState(api::external::InstanceState::Stopped), + ]; + + let detached_label = api::external::DiskState::Detached.label(); + + let disk = Instance::detach_resource( + authz_instance.id(), + authz_disk.id(), + instance::table + .into_boxed() + .filter(instance::dsl::state.eq_any(ok_to_detach_instance_states)), + disk::table + .into_boxed() + .filter(disk::dsl::disk_state.eq_any(ok_to_detach_disk_state_labels)), + diesel::update(disk::dsl::disk) + .set(( + disk::dsl::disk_state.eq(detached_label), + disk::dsl::attach_instance_id.eq(Option::::None) + )) + ) + .detach_and_get_result_async(self.pool_authorized(opctx).await?) + .await + .or_else(|e| { + match e { + DetachError::CollectionNotFound => { + Err(Error::not_found_by_id( + ResourceType::Instance, + &authz_instance.id(), + )) + }, + DetachError::ResourceNotFound => { + Err(Error::not_found_by_id( + ResourceType::Disk, + &authz_disk.id(), + )) + }, + DetachError::NoUpdate { resource, collection } => { + let disk_state = resource.state().into(); + match disk_state { + // Idempotent errors: We did not perform an update, + // because we're already in the process of detaching. + api::external::DiskState::Detached => { + return Ok(resource); + } + api::external::DiskState::Detaching(id) if id == authz_instance.id() => { + return Ok(resource); + } + // Ok-to-detach disk states: Inspect the state to infer + // why we did not detach. + api::external::DiskState::Attached(id) if id == authz_instance.id() => { + match collection.runtime_state.state.state() { + // Ok-to-be-detached instance states: + api::external::InstanceState::Creating | + api::external::InstanceState::Stopped => { + // We can't detach, but the error hasn't + // helped us infer why. + return Err(Error::internal_error( + "cannot detach disk" + )); + } + // Not okay-to-be-detached instance states: + _ => { + Err(Error::invalid_request(&format!( + "cannot detach disk from instance in {} state", + collection.runtime_state.state.state(), + ))) + } + } + }, + api::external::DiskState::Attaching(id) if id == authz_instance.id() => { + Err(Error::invalid_request(&format!( + "cannot detach disk \"{}\": disk is currently being attached", + resource.name().as_str(), + ))) + }, + // Not-okay-to-detach disk states: The disk is attached elsewhere. + api::external::DiskState::Attached(_) | + api::external::DiskState::Attaching(_) | + api::external::DiskState::Detaching(_) => { + Err(Error::invalid_request(&format!( + "cannot detach disk \"{}\": disk is attached to another instance", + resource.name().as_str(), + ))) + } + _ => { + Err(Error::invalid_request(&format!( + "cannot detach disk \"{}\": invalid state {}", + resource.name().as_str(), + disk_state, + ))) + } + } + }, + DetachError::DatabaseError(e) => { + Err(public_error_from_diesel_pool(e, ErrorHandler::Server)) + }, + } + })?; + + Ok(disk) + } + pub async fn disk_update_runtime( &self, opctx: &OpContext, diff --git a/nexus/src/db/mod.rs b/nexus/src/db/mod.rs index 50f69fab08f..c70fcdf0f07 100644 --- a/nexus/src/db/mod.rs +++ b/nexus/src/db/mod.rs @@ -6,9 +6,12 @@ // This is not intended to be public, but this is necessary to use it from // doctests +pub mod collection_attach; +pub mod collection_detach; +pub mod collection_detach_many; pub mod collection_insert; mod config; - +mod cte_utils; // This is marked public for use by the integration tests pub mod datastore; mod error; diff --git a/nexus/src/db/model/instance.rs b/nexus/src/db/model/instance.rs index eed074052f2..02f20451023 100644 --- a/nexus/src/db/model/instance.rs +++ b/nexus/src/db/model/instance.rs @@ -2,9 +2,10 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -use super::{ByteCount, Generation, InstanceCpuCount, InstanceState}; +use super::{ByteCount, Disk, Generation, InstanceCpuCount, InstanceState}; +use crate::db::collection_attach::DatastoreAttachTarget; use crate::db::identity::Resource; -use crate::db::schema::instance; +use crate::db::schema::{disk, instance}; use crate::external_api::params; use chrono::{DateTime, Utc}; use db_macros::Resource; @@ -68,6 +69,17 @@ impl Into for Instance { } } +impl DatastoreAttachTarget for Instance { + type Id = Uuid; + + type CollectionIdColumn = instance::dsl::id; + type CollectionTimeDeletedColumn = instance::dsl::time_deleted; + + type ResourceIdColumn = disk::dsl::id; + type ResourceCollectionIdColumn = disk::dsl::attach_instance_id; + type ResourceTimeDeletedColumn = disk::dsl::time_deleted; +} + /// Runtime state of the Instance, including the actual running state and minimal /// metadata /// diff --git a/nexus/tests/integration_tests/disks.rs b/nexus/tests/integration_tests/disks.rs index a68504681b6..9c8a8910db5 100644 --- a/nexus/tests/integration_tests/disks.rs +++ b/nexus/tests/integration_tests/disks.rs @@ -24,6 +24,7 @@ use omicron_common::api::external::ByteCount; use omicron_common::api::external::Disk; use omicron_common::api::external::DiskState; use omicron_common::api::external::IdentityMetadataCreateParams; +use omicron_common::api::external::Instance; use omicron_common::api::external::Name; use omicron_nexus::TestInterfaces as _; use omicron_nexus::{external_api::params, Nexus}; @@ -113,6 +114,30 @@ async fn test_disk_not_found_before_creation( ); } +async fn set_instance_state( + client: &ClientTestContext, + instance_url: &str, + state: &str, +) -> Instance { + let url = format!("{}/{}", instance_url, state); + NexusRequest::new( + RequestBuilder::new(client, Method::POST, &url) + .body(None as Option<&serde_json::Value>) + .expect_status(Some(StatusCode::ACCEPTED)), + ) + .authn_as(AuthnMode::PrivilegedUser) + .execute() + .await + .unwrap() + .parsed_body() + .unwrap() +} + +async fn instance_simulate(nexus: &Arc, id: &Uuid) { + let sa = nexus.instance_sled_by_id(id).await.unwrap(); + sa.instance_finish_transition(id.clone()).await; +} + #[nexus_test] async fn test_disk_create_attach_detach_delete( cptestctx: &ControlPlaneTestContext, @@ -157,6 +182,18 @@ async fn test_disk_create_attach_detach_delete( let instance = create_instance(&client, ORG_NAME, PROJECT_NAME, INSTANCE_NAME).await; + // TODO(https://github.com/oxidecomputer/omicron/issues/811): + // + // Instances must be stopped before disks can be attached - this + // is an artificial limitation without hotplug support. + let instance1_url = format!( + "/organizations/{}/projects/{}/instances/{}", + ORG_NAME, PROJECT_NAME, INSTANCE_NAME + ); + let instance_next = + set_instance_state(&client, &instance1_url, "stop").await; + instance_simulate(nexus, &instance_next.identity.id).await; + // Verify that there are no disks attached to the instance, and specifically // that our disk is not attached to this instance. let url_instance_disks = @@ -179,14 +216,6 @@ async fn test_disk_create_attach_detach_delete( let instance_id = &instance.identity.id; assert_eq!(attached_disk.identity.name, disk.identity.name); assert_eq!(attached_disk.identity.id, disk.identity.id); - assert_eq!(attached_disk.state, DiskState::Attaching(instance_id.clone())); - - // Finish simulation of the attachment and verify the new state, both on the - // attachment and the disk itself. - disk_simulate(nexus, &disk.identity.id).await; - let attached_disk: Disk = disk_get(&client, &disk_url).await; - assert_eq!(attached_disk.identity.name, disk.identity.name); - assert_eq!(attached_disk.identity.id, disk.identity.id); assert_eq!(attached_disk.state, DiskState::Attached(instance_id.clone())); // Attach the disk to the same instance. This should complete immediately @@ -194,8 +223,6 @@ async fn test_disk_create_attach_detach_delete( let disk = disk_post(client, &url_instance_attach_disk, disk.identity.name).await; assert_eq!(disk.state, DiskState::Attached(instance_id.clone())); - let disk = disk_get(&client, &disk_url).await; - assert_eq!(disk.state, DiskState::Attached(instance_id.clone())); // Begin detaching the disk. let disk = disk_post( @@ -204,13 +231,6 @@ async fn test_disk_create_attach_detach_delete( disk.identity.name.clone(), ) .await; - assert_eq!(disk.state, DiskState::Detaching(instance_id.clone())); - let disk: Disk = disk_get(&client, &disk_url).await; - assert_eq!(disk.state, DiskState::Detaching(instance_id.clone())); - - // Finish the detachment. - disk_simulate(nexus, &disk.identity.id).await; - let disk = disk_get(&client, &disk_url).await; assert_eq!(disk.state, DiskState::Detached); // Since detach is idempotent, we can detach it again. @@ -314,6 +334,17 @@ async fn test_disk_move_between_instances(cptestctx: &ControlPlaneTestContext) { // Create an instance to attach the disk. let instance = create_instance(&client, ORG_NAME, PROJECT_NAME, INSTANCE_NAME).await; + // TODO(https://github.com/oxidecomputer/omicron/issues/811): + // + // Instances must be stopped before disks can be attached - this + // is an artificial limitation without hotplug support. + let instance_url = format!( + "/organizations/{}/projects/{}/instances/{}", + ORG_NAME, PROJECT_NAME, INSTANCE_NAME + ); + let instance_next = + set_instance_state(&client, &instance_url, "stop").await; + instance_simulate(nexus, &instance_next.identity.id).await; // Verify that there are no disks attached to the instance, and specifically // that our disk is not attached to this instance. @@ -337,14 +368,6 @@ async fn test_disk_move_between_instances(cptestctx: &ControlPlaneTestContext) { let instance_id = &instance.identity.id; assert_eq!(attached_disk.identity.name, disk.identity.name); assert_eq!(attached_disk.identity.id, disk.identity.id); - assert_eq!(attached_disk.state, DiskState::Attaching(instance_id.clone())); - - // Finish simulation of the attachment and verify the new state, both on the - // attachment and the disk itself. - disk_simulate(nexus, &disk.identity.id).await; - let attached_disk: Disk = disk_get(&client, &disk_url).await; - assert_eq!(attached_disk.identity.name, disk.identity.name); - assert_eq!(attached_disk.identity.id, disk.identity.id); assert_eq!(attached_disk.state, DiskState::Attached(instance_id.clone())); // Attach the disk to the same instance. This should complete immediately @@ -357,6 +380,14 @@ async fn test_disk_move_between_instances(cptestctx: &ControlPlaneTestContext) { // fail and the disk should remain attached to the first instance. let instance2 = create_instance(&client, ORG_NAME, PROJECT_NAME, "instance2").await; + let instance2_url = format!( + "/organizations/{}/projects/{}/instances/{}", + ORG_NAME, PROJECT_NAME, "instance2" + ); + let instance_next = + set_instance_state(&client, &instance2_url, "stop").await; + instance_simulate(nexus, &instance_next.identity.id).await; + let url_instance2_attach_disk = get_disk_attach_url(instance2.identity.name.as_str()); let url_instance2_detach_disk = @@ -389,64 +420,11 @@ async fn test_disk_move_between_instances(cptestctx: &ControlPlaneTestContext) { // Begin detaching the disk. let disk = disk_post(client, &url_instance_detach_disk, disk.identity.name).await; - assert_eq!(disk.state, DiskState::Detaching(instance_id.clone())); - let disk = disk_get(&client, &disk_url).await; - assert_eq!(disk.state, DiskState::Detaching(instance_id.clone())); - - // It's still illegal to attach this disk elsewhere. - let error: HttpErrorResponseBody = NexusRequest::new( - RequestBuilder::new(client, Method::POST, &url_instance2_attach_disk) - .body(Some(¶ms::DiskIdentifier { - name: disk.identity.name.clone(), - })) - .expect_status(Some(StatusCode::BAD_REQUEST)), - ) - .authn_as(AuthnMode::PrivilegedUser) - .execute() - .await - .unwrap() - .parsed_body() - .unwrap(); - assert_eq!( - error.message, - format!( - "cannot attach disk \"{}\": disk is attached to another instance", - DISK_NAME - ) - ); - - // It's even illegal to attach this disk back to the same instance. - let error: HttpErrorResponseBody = NexusRequest::new( - RequestBuilder::new(client, Method::POST, &url_instance_attach_disk) - .body(Some(¶ms::DiskIdentifier { - name: disk.identity.name.clone(), - })) - .expect_status(Some(StatusCode::BAD_REQUEST)), - ) - .authn_as(AuthnMode::PrivilegedUser) - .execute() - .await - .unwrap() - .parsed_body() - .unwrap(); - assert_eq!( - error.message, - format!( - "cannot attach disk \"{}\": disk is attached to another instance", - DISK_NAME - ) - ); + assert_eq!(disk.state, DiskState::Detached); - // However, there's no problem attempting to detach it again. + // There's no problem attempting to detach it again. let disk = disk_post(client, &url_instance_detach_disk, disk.identity.name).await; - assert_eq!(disk.state, DiskState::Detaching(instance_id.clone())); - let disk = disk_get(&client, &disk_url).await; - assert_eq!(disk.state, DiskState::Detaching(instance_id.clone())); - - // Finish the detachment. - disk_simulate(nexus, &disk.identity.id).await; - let disk = disk_get(&client, &disk_url).await; assert_eq!(disk.state, DiskState::Detached); // Since delete is idempotent, we can detach it again -- from either one. @@ -467,10 +445,7 @@ async fn test_disk_move_between_instances(cptestctx: &ControlPlaneTestContext) { let instance2_id = &instance2.identity.id; assert_eq!(attached_disk.identity.name, disk.identity.name); assert_eq!(attached_disk.identity.id, disk.identity.id); - assert_eq!(attached_disk.state, DiskState::Attaching(instance2_id.clone())); - - let disk = disk_get(&client, &disk_url).await; - assert_eq!(disk.state, DiskState::Attaching(instance2_id.clone())); + assert_eq!(attached_disk.state, DiskState::Attached(instance2_id.clone())); // At this point, it's not legal to attempt to attach it to a different // instance (the first one). @@ -502,11 +477,9 @@ async fn test_disk_move_between_instances(cptestctx: &ControlPlaneTestContext) { disk.identity.name.clone(), ) .await; - assert_eq!(disk.state, DiskState::Attaching(instance2_id.clone())); - let disk = disk_get(&client, &disk_url).await; - assert_eq!(disk.state, DiskState::Attaching(instance2_id.clone())); + assert_eq!(disk.state, DiskState::Attached(instance2_id.clone())); - // It's not allowed to delete a disk that's attaching. + // It's not allowed to delete a disk that's attached. let error = NexusRequest::expect_failure( client, StatusCode::BAD_REQUEST, @@ -519,37 +492,15 @@ async fn test_disk_move_between_instances(cptestctx: &ControlPlaneTestContext) { .expect("expected request to fail") .parsed_body::() .expect("cannot parse"); - assert_eq!(error.message, "disk cannot be deleted in state \"attaching\""); + assert_eq!(error.message, "disk cannot be deleted in state \"attached\""); - // Now, begin a detach while the disk is still being attached. + // Now, begin a detach. let disk = disk_post( client, &url_instance2_detach_disk, disk.identity.name.clone(), ) .await; - assert_eq!(disk.state, DiskState::Detaching(instance2_id.clone())); - let disk: Disk = disk_get(&client, &disk_url).await; - assert_eq!(disk.state, DiskState::Detaching(instance2_id.clone())); - - // It's not allowed to delete a disk that's detaching, either. - let error = NexusRequest::expect_failure( - client, - StatusCode::BAD_REQUEST, - Method::DELETE, - &disk_url, - ) - .authn_as(AuthnMode::PrivilegedUser) - .execute() - .await - .expect("expected request to fail") - .parsed_body::() - .expect("cannot parse"); - assert_eq!(error.message, "disk cannot be deleted in state \"detaching\""); - - // Finish detachment. - disk_simulate(nexus, &disk.identity.id).await; - let disk = disk_get(&client, &disk_url).await; assert_eq!(disk.state, DiskState::Detached); // Now we can delete the disk. @@ -826,9 +777,3 @@ fn disks_eq(disk1: &Disk, disk2: &Disk) { assert_eq!(disk1.state, disk2.state); assert_eq!(disk1.device_path, disk2.device_path); } - -/// Simulate completion of an ongoing disk state transition. -async fn disk_simulate(nexus: &Arc, id: &Uuid) { - let sa = nexus.disk_sled_by_id(id).await.unwrap(); - sa.disk_finish_transition(id.clone()).await; -}