Skip to content

Commit

Permalink
adapter: Remove Canceled execute response (#25095)
Browse files Browse the repository at this point in the history
Previously, the adapter could either send the client an
Ok(ExecuteResponse::Canceled) response or an
Err(AdapterError::Canceled) response. After
aa34c5b the
Ok(ExecuteResponse::Canceled) is never used so this commit removes it.

Looking through the code it seems like Ok(ExecuteResponse::Canceled) and
Err(AdapterError::Canceled) are handled identically. However, when
querying Materialize through rust-postgres, when Materialize sends an
Ok(ExecuteResponse::Canceled) response the rust client receives an Ok(_)
result. When Materialize sends an Err(AdapterError::Canceled) response
the rust client receives an Err(_) result. Err(_) is the correct result
since canceling a statement causes it to have no effect and matches the
PostgreSQL result. For example, imagine a user executes `let res =
rust_client.batch_execute("INSERT INTO t VALUES (1)")`. When Materialize
would send Ok(ExecuteResponse::Canceled) in response to a cancel, then
`res` would be `Ok(())`, even though the `INSERT` was cancelled and had
no effect. Now that Err(AdapterError::Canceled) is sent, `res` is
`Err(_)`, which exactly matches the behavior of PostgreSQL.

Why the difference existed previously is not understood by the author of
this commit.
  • Loading branch information
jkosh44 authored Feb 9, 2024
1 parent 7bb335f commit 10800f8
Show file tree
Hide file tree
Showing 6 changed files with 2 additions and 21 deletions.
4 changes: 0 additions & 4 deletions src/adapter/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,6 @@ pub enum ExecuteResponse {
AlteredRole,
/// The system configuration was altered.
AlteredSystemConfiguration,
/// The query was canceled.
Canceled,
/// The requested cursor was closed.
ClosedCursor,
/// The provided comment was created.
Expand Down Expand Up @@ -447,7 +445,6 @@ impl TryInto<ExecuteResponse> for ExecuteResponseKind {
ExecuteResponseKind::AlteredSystemConfiguration => {
Ok(ExecuteResponse::AlteredSystemConfiguration)
}
ExecuteResponseKind::Canceled => Ok(ExecuteResponse::Canceled),
ExecuteResponseKind::ClosedCursor => Ok(ExecuteResponse::ClosedCursor),
ExecuteResponseKind::Comment => Ok(ExecuteResponse::Comment),
ExecuteResponseKind::Copied => Err(()),
Expand Down Expand Up @@ -511,7 +508,6 @@ impl ExecuteResponse {
AlteredIndexLogicalCompaction => Some("ALTER INDEX".into()),
AlteredRole => Some("ALTER ROLE".into()),
AlteredSystemConfiguration => Some("ALTER SYSTEM".into()),
Canceled => None,
ClosedCursor => Some("CLOSE CURSOR".into()),
Comment => Some("COMMENT".into()),
Copied(n) => Some(format!("COPY {}", n)),
Expand Down
5 changes: 2 additions & 3 deletions src/adapter/src/coord/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::error::AdapterError;
use crate::notice::AdapterNotice;
use crate::session::{EndTransactionAction, Session, TransactionOps, TransactionStatus, WriteOp};
use crate::util::ClientTransmitter;
use crate::{ExecuteContext, ExecuteResponseKind};
use crate::ExecuteContext;

// DO NOT make this visible in any way, i.e. do not add any version of
// `pub` to this mod. The inner `sequence_X` methods are hidden in this
Expand Down Expand Up @@ -74,8 +74,7 @@ impl Coordinator {
) -> LocalBoxFuture<'_, ()> {
async move {
event!(Level::TRACE, plan = format!("{:?}", plan));
let mut responses = ExecuteResponse::generated_from(PlanKind::from(&plan));
responses.push(ExecuteResponseKind::Canceled);
let responses = ExecuteResponse::generated_from(PlanKind::from(&plan));
ctx.tx_mut().set_allowed(responses);

// Scope the borrow of the Catalog because we need to mutate the Coordinator state below.
Expand Down
4 changes: 0 additions & 4 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2503,10 +2503,6 @@ impl Coordinator {
}
}
ExecuteResponse::SendingRowsImmediate { rows } => make_diffs(rows),
resp @ ExecuteResponse::Canceled => {
ctx.retire(Ok(resp));
return;
}
resp => Err(AdapterError::Unstructured(anyhow!(
"unexpected peek response: {resp:?}"
))),
Expand Down
1 change: 0 additions & 1 deletion src/adapter/src/statement_logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ impl From<&ExecuteResponse> for StatementEndedExecutionReason {
execution_strategy: Some(StatementExecutionStrategy::Constant),
}
}
ExecuteResponse::Canceled => StatementEndedExecutionReason::Canceled,

ExecuteResponse::AlteredDefaultPrivileges
| ExecuteResponse::AlteredObject(_)
Expand Down
1 change: 0 additions & 1 deletion src/environmentd/src/http/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1124,7 +1124,6 @@ async fn execute_stmt<S: ResultSender>(
let tag = res.tag();

Ok(match res {
ExecuteResponse::Canceled => SqlResult::err(client, AdapterError::Canceled).into(),
ExecuteResponse::CreatedConnection { .. }
| ExecuteResponse::CreatedDatabase { .. }
| ExecuteResponse::CreatedSchema { .. }
Expand Down
8 changes: 0 additions & 8 deletions src/pgwire/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1412,14 +1412,6 @@ where
}

let r = match response {
ExecuteResponse::Canceled => {
return self
.error(ErrorResponse::error(
SqlState::QUERY_CANCELED,
"canceling statement due to user request",
))
.await;
}
ExecuteResponse::ClosedCursor => {
self.complete_portal(&portal_name);
command_complete!()
Expand Down

0 comments on commit 10800f8

Please sign in to comment.