Skip to content

Commit

Permalink
start, proper cancelation for Oneshot Ingestions
Browse files Browse the repository at this point in the history
* add CancelOneshotIngestion message to the storage controller
* handle new message in 'reduce' and 'reconcile' in the storage-controller
* emit a CancelOntshotIngestion whenever an ingestion completes
  • Loading branch information
ParkMyCar committed Jan 21, 2025
1 parent 2b6a8f1 commit ffede29
Show file tree
Hide file tree
Showing 9 changed files with 251 additions and 76 deletions.
11 changes: 10 additions & 1 deletion src/adapter/src/active_compute_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use tokio::sync::{mpsc, oneshot};
use uuid::Uuid;

use crate::coord::peek::PeekResponseUnary;
use crate::{AdapterError, ExecuteResponse};
use crate::{AdapterError, ExecuteContext, ExecuteResponse};

#[derive(Debug)]
/// A description of an active compute sink from the coordinator's perspective.
Expand Down Expand Up @@ -435,3 +435,12 @@ impl ActiveCopyTo {
let _ = self.tx.send(message);
}
}

/// State we keep in the [`Coordinator`] to track active `COPY FROM` statements.
#[derive(Debug)]
pub(crate) struct ActiveCopyFrom {
/// ID of the ingestion running in clusterd.
pub ingestion_id: uuid::Uuid,
/// Context of the SQL session that ran the statement.
pub ctx: ExecuteContext,
}
4 changes: 2 additions & 2 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ use tracing::{debug, info, info_span, span, warn, Instrument, Level, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use uuid::Uuid;

use crate::active_compute_sink::ActiveComputeSink;
use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyFrom};
use crate::catalog::{BuiltinTableUpdate, Catalog, OpenCatalogResult};
use crate::client::{Client, Handle};
use crate::command::{Command, ExecuteResponse};
Expand Down Expand Up @@ -1667,7 +1667,7 @@ pub struct Coordinator {
active_webhooks: BTreeMap<CatalogItemId, WebhookAppenderInvalidator>,
/// A map of active `COPY FROM` statements. The Coordinator waits for `clusterd`
/// to stage Batches in Persist that we will then link into the shard.
active_copies: BTreeMap<ConnectionId, ExecuteContext>,
active_copies: BTreeMap<ConnectionId, ActiveCopyFrom>,

/// A map from connection ids to a watch channel that is set to `true` if the connection
/// received a cancel request.
Expand Down
24 changes: 19 additions & 5 deletions src/adapter/src/coord/sequencer/inner/copy_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use url::Url;
use uuid::Uuid;

use crate::coord::sequencer::inner::return_if_err;
use crate::coord::{Coordinator, TargetCluster};
use crate::coord::{ActiveCopyFrom, Coordinator, TargetCluster};
use crate::optimize::dataflows::{prep_scalar_expr, EvalTime, ExprPrepStyle};
use crate::session::{TransactionOps, WriteOp};
use crate::{AdapterError, ExecuteContext, ExecuteResponse};
Expand Down Expand Up @@ -108,8 +108,9 @@ impl Coordinator {
});
});
// Stash the execute context so we can cancel the COPY.
let conn_id = ctx.session().conn_id().clone();
self.active_copies
.insert(ctx.session().conn_id().clone(), ctx);
.insert(conn_id, ActiveCopyFrom { ingestion_id, ctx });

let _result = self
.controller
Expand All @@ -124,11 +125,17 @@ impl Coordinator {
table_id: CatalogItemId,
batches: Vec<Result<ProtoBatch, String>>,
) {
let Some(mut ctx) = self.active_copies.remove(&conn_id) else {
let Some(active_copy) = self.active_copies.remove(&conn_id) else {
tracing::warn!(?conn_id, "got response for canceled COPY FROM");
return;
};

let ActiveCopyFrom {
ingestion_id,
mut ctx,
} = active_copy;
tracing::info!(%ingestion_id, num_batches = ?batches.len(), "received batches to append");

let mut all_batches = SmallVec::with_capacity(batches.len());
let mut all_errors = SmallVec::<[String; 1]>::with_capacity(batches.len());
let mut row_count = 0u64;
Expand Down Expand Up @@ -179,8 +186,15 @@ impl Coordinator {
/// Cancel any active `COPY FROM` statements/oneshot ingestions.
#[mz_ore::instrument(level = "debug")]
pub(crate) fn cancel_pending_copy(&mut self, conn_id: &ConnectionId) {
// TODO(cf1): Also cancel the dataflow running on clusterd.
if let Some(ctx) = self.active_copies.remove(conn_id) {
if let Some(ActiveCopyFrom { ingestion_id, ctx }) = self.active_copies.remove(conn_id) {
let cancel_result = self
.controller
.storage
.cancel_oneshot_ingestion(ingestion_id);
if let Err(err) = cancel_result {
tracing::error!(?err, "failed to cancel OneshotIngestion");
}

ctx.retire(Err(AdapterError::Canceled));
}
}
Expand Down
13 changes: 11 additions & 2 deletions src/storage-client/src/client.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,21 @@ message ProtoRunIngestionCommand {
mz_storage_types.sources.ProtoIngestionDescription description = 2;
}

message ProtoRunOneshotIngestionCommand {
message ProtoRunOneshotIngestion {
mz_proto.ProtoU128 ingestion_id = 1;
mz_repr.global_id.ProtoGlobalId collection_id = 2;
mz_storage_types.controller.ProtoCollectionMetadata storage_metadata = 3;
mz_storage_types.oneshot_sources.ProtoOneshotIngestionRequest request = 4;
}

message ProtoRunOneshotIngestionsCommand {
repeated ProtoRunOneshotIngestion ingestions = 1;
}

message ProtoCancelOneshotIngestionsCommand {
repeated mz_proto.ProtoU128 ingestions = 1;
}

message ProtoCreateSources {
repeated ProtoRunIngestionCommand sources = 1;
}
Expand Down Expand Up @@ -94,7 +102,8 @@ message ProtoStorageCommand {
google.protobuf.Empty allow_writes = 7;
ProtoRunSinks run_sinks = 4;
mz_storage_types.parameters.ProtoStorageParameters update_configuration = 5;
ProtoRunOneshotIngestionCommand oneshot_ingestion = 10;
ProtoRunOneshotIngestionsCommand run_oneshot_ingestions = 10;
ProtoCancelOneshotIngestionsCommand cancel_oneshot_ingestions = 11;
}
}

Expand Down
80 changes: 57 additions & 23 deletions src/storage-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use smallvec::SmallVec;
use timely::progress::frontier::{Antichain, MutableAntichain};
use timely::PartialOrder;
use tonic::{Request, Status as TonicStatus, Streaming};
use uuid::Uuid;

use crate::client::proto_storage_server::ProtoStorage;
use crate::metrics::ReplicaMetrics;
Expand Down Expand Up @@ -123,18 +124,29 @@ pub enum StorageCommand<T = mz_repr::Timestamp> {
UpdateConfiguration(StorageParameters),
/// Run the enumerated sources, each associated with its identifier.
RunIngestions(Vec<RunIngestionCommand>),
/// Run a dataflow which will ingest data from an external source and only __stage__ it in
/// Persist.
///
/// Unlike regular ingestions/sources, some other component (e.g. `environmentd`) is
/// responsible for linking the staged data into a shard.
RunOneshotIngestion(RunOneshotIngestionCommand),
/// Enable compaction in storage-managed collections.
///
/// Each entry in the vector names a collection and provides a frontier after which
/// accumulations must be correct.
AllowCompaction(Vec<(GlobalId, Antichain<T>)>),
RunSinks(Vec<RunSinkCommand<T>>),
/// Run a dataflow which will ingest data from an external source and only __stage__ it in
/// Persist.
///
/// Unlike regular ingestions/sources, some other component (e.g. `environmentd`) is
/// responsible for linking the staged data into a shard.
RunOneshotIngestion(Vec<RunOneshotIngestion>),
/// `CancelOneshotIngestion` instructs the replica to cancel the identified oneshot ingestions.
///
/// It is invalid to send a [`CancelOneshotIngestion`] command that references a oneshot
/// ingestion that was not created by a corresponding [`RunOneshotIngestion`] command before.
/// Doing so may cause the replica to exhibit undefined behavior.
///
/// [`CancelOneshotIngestion`]: crate::client::StorageCommand::CancelOneshotIngestion
/// [`RunOneshotIngestion`]: crate::client::StorageCommand::RunOneshotIngestion
CancelOneshotIngestion {
ingestions: Vec<Uuid>,
},
}

impl<T> StorageCommand<T> {
Expand All @@ -146,7 +158,8 @@ impl<T> StorageCommand<T> {
| InitializationComplete
| AllowWrites
| UpdateConfiguration(_)
| AllowCompaction(_) => false,
| AllowCompaction(_)
| CancelOneshotIngestion { .. } => false,
// TODO(cf2): multi-replica oneshot ingestions. At the moment returning
// true here means we can't run `COPY FROM` on multi-replica clusters, this
// should be easy enough to support though.
Expand Down Expand Up @@ -199,7 +212,7 @@ impl RustType<ProtoRunIngestionCommand> for RunIngestionCommand {

/// A command that starts ingesting the given ingestion description
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct RunOneshotIngestionCommand {
pub struct RunOneshotIngestion {
/// The ID of the ingestion dataflow.
pub ingestion_id: uuid::Uuid,
/// The ID of collection we'll stage batches for.
Expand All @@ -210,30 +223,30 @@ pub struct RunOneshotIngestionCommand {
pub request: OneshotIngestionRequest,
}

impl RustType<ProtoRunOneshotIngestionCommand> for RunOneshotIngestionCommand {
fn into_proto(&self) -> ProtoRunOneshotIngestionCommand {
ProtoRunOneshotIngestionCommand {
impl RustType<ProtoRunOneshotIngestion> for RunOneshotIngestion {
fn into_proto(&self) -> ProtoRunOneshotIngestion {
ProtoRunOneshotIngestion {
ingestion_id: Some(self.ingestion_id.into_proto()),
collection_id: Some(self.collection_id.into_proto()),
storage_metadata: Some(self.collection_meta.into_proto()),
request: Some(self.request.into_proto()),
}
}

fn from_proto(proto: ProtoRunOneshotIngestionCommand) -> Result<Self, TryFromProtoError> {
Ok(RunOneshotIngestionCommand {
fn from_proto(proto: ProtoRunOneshotIngestion) -> Result<Self, TryFromProtoError> {
Ok(RunOneshotIngestion {
ingestion_id: proto
.ingestion_id
.into_rust_if_some("ProtoRunOneshotIngestionCommand::ingestion_id")?,
.into_rust_if_some("ProtoRunOneshotIngestion::ingestion_id")?,
collection_id: proto
.collection_id
.into_rust_if_some("ProtoRunOneshotIngestionCommand::collection_id")?,
.into_rust_if_some("ProtoRunOneshotIngestion::collection_id")?,
collection_meta: proto
.storage_metadata
.into_rust_if_some("ProtoRunOneshotIngestionCommand::storage_metadata")?,
.into_rust_if_some("ProtoRunOneshotIngestion::storage_metadata")?,
request: proto
.request
.into_rust_if_some("ProtoRunOneshotIngestionCommand::request")?,
.into_rust_if_some("ProtoRunOneshotIngestion::request")?,
})
}
}
Expand Down Expand Up @@ -300,12 +313,19 @@ impl RustType<ProtoStorageCommand> for StorageCommand<mz_repr::Timestamp> {
StorageCommand::RunIngestions(sources) => CreateSources(ProtoCreateSources {
sources: sources.into_proto(),
}),
StorageCommand::RunOneshotIngestion(oneshot) => {
OneshotIngestion(oneshot.into_proto())
}
StorageCommand::RunSinks(sinks) => RunSinks(ProtoRunSinks {
sinks: sinks.into_proto(),
}),
StorageCommand::RunOneshotIngestion(ingestions) => {
RunOneshotIngestions(ProtoRunOneshotIngestionsCommand {
ingestions: ingestions.iter().map(|cmd| cmd.into_proto()).collect(),
})
}
StorageCommand::CancelOneshotIngestion { ingestions } => {
CancelOneshotIngestions(ProtoCancelOneshotIngestionsCommand {
ingestions: ingestions.iter().map(|uuid| uuid.into_proto()).collect(),
})
}
}),
}
}
Expand Down Expand Up @@ -334,8 +354,21 @@ impl RustType<ProtoStorageCommand> for StorageCommand<mz_repr::Timestamp> {
Some(RunSinks(ProtoRunSinks { sinks })) => {
Ok(StorageCommand::RunSinks(sinks.into_rust()?))
}
Some(OneshotIngestion(oneshot)) => {
Ok(StorageCommand::RunOneshotIngestion(oneshot.into_rust()?))
Some(RunOneshotIngestions(oneshot)) => {
let ingestions = oneshot
.ingestions
.into_iter()
.map(|cmd| cmd.into_rust())
.collect::<Result<_, _>>()?;
Ok(StorageCommand::RunOneshotIngestion(ingestions))
}
Some(CancelOneshotIngestions(oneshot)) => {
let ingestions = oneshot
.ingestions
.into_iter()
.map(|uuid| uuid.into_rust())
.collect::<Result<_, _>>()?;
Ok(StorageCommand::CancelOneshotIngestion { ingestions })
}
None => Err(TryFromProtoError::missing_field(
"ProtoStorageCommand::kind",
Expand Down Expand Up @@ -802,7 +835,8 @@ where
| StorageCommand::AllowWrites
| StorageCommand::UpdateConfiguration(_)
| StorageCommand::AllowCompaction(_)
| StorageCommand::RunOneshotIngestion(_) => {}
| StorageCommand::RunOneshotIngestion(_)
| StorageCommand::CancelOneshotIngestion { .. } => {}
};
}

Expand Down
6 changes: 6 additions & 0 deletions src/storage-client/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,12 @@ pub trait StorageController: Debug {
result_tx: OneshotResultCallback<ProtoBatch>,
) -> Result<(), StorageError<Self::Timestamp>>;

/// Cancel a oneshot ingestion.
fn cancel_oneshot_ingestion(
&mut self,
ingestion_id: uuid::Uuid,
) -> Result<(), StorageError<Self::Timestamp>>;

/// Alter the sink identified by the given id to match the provided `ExportDescription`.
async fn alter_export(
&mut self,
Expand Down
34 changes: 18 additions & 16 deletions src/storage-controller/src/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl<T: std::fmt::Debug> CommandHistory<T> {
RunIngestions(x) => metrics.run_ingestions_count.add(x.len().cast_into()),
RunSinks(x) => metrics.run_sinks_count.add(x.len().cast_into()),
AllowCompaction(x) => metrics.allow_compaction_count.add(x.len().cast_into()),
RunOneshotIngestion(_) => {
RunOneshotIngestion(_) | CancelOneshotIngestion { .. } => {
// TODO(cf2): Add metrics for oneshot ingestions.
}
}
Expand Down Expand Up @@ -115,15 +115,21 @@ impl<T: std::fmt::Debug> CommandHistory<T> {
final_sinks.extend(cmds.into_iter().map(|c| (c.id, c)));
}
AllowCompaction(updates) => final_compactions.extend(updates),
RunOneshotIngestion(oneshot) => {
final_oneshot_ingestions.insert(oneshot.ingestion_id, oneshot);
RunOneshotIngestion(oneshots) => {
for oneshot in oneshots {
final_oneshot_ingestions.insert(oneshot.ingestion_id, oneshot);
}
}
CancelOneshotIngestion { ingestions } => {
for ingestion in ingestions {
final_oneshot_ingestions.remove(&ingestion);
}
}
}
}

let mut run_ingestions = Vec::new();
let mut run_sinks = Vec::new();
let mut run_oneshot_ingestions = Vec::new();
let mut allow_compaction = Vec::new();

// Discard ingestions that have been dropped, keep the rest.
Expand Down Expand Up @@ -155,10 +161,6 @@ impl<T: std::fmt::Debug> CommandHistory<T> {
run_sinks.push(sink);
}

// TODO(cf1): Add a CancelOneshotIngestion command similar to CancelPeek
// that will compact/reduce away the RunOneshotIngestion.
run_oneshot_ingestions.extend(final_oneshot_ingestions.into_values());

// Reconstitute the commands as a compact history.
//
// When we update `metrics`, we need to be careful to not transiently report incorrect
Expand Down Expand Up @@ -192,14 +194,14 @@ impl<T: std::fmt::Debug> CommandHistory<T> {
self.commands.push(StorageCommand::RunSinks(run_sinks));
}

// TODO(cf1): Add a CancelOneshotIngestion command, make sure we prevent
// re-sending commands for ingestions that we've already responded to.
if !run_oneshot_ingestions.is_empty() {
self.commands.extend(
run_oneshot_ingestions
.into_iter()
.map(|oneshot| StorageCommand::RunOneshotIngestion(oneshot)),
);
// Note: RunOneshotIngestion commands are reduced, as we receive
// CancelOneshotIngestion commands.
//
// TODO(cf2): Record metrics on the number of OneshotIngestion commands.
if !final_oneshot_ingestions.is_empty() {
let oneshots = final_oneshot_ingestions.into_values().collect();
self.commands
.push(StorageCommand::RunOneshotIngestion(oneshots));
}

let count = u64::cast_from(allow_compaction.len());
Expand Down
Loading

0 comments on commit ffede29

Please sign in to comment.