diff --git a/spanner/src/client.rs b/spanner/src/client.rs index b0f0e9d5..cc2e08c0 100644 --- a/spanner/src/client.rs +++ b/spanner/src/client.rs @@ -17,7 +17,7 @@ use crate::session::{ManagedSession, SessionConfig, SessionError, SessionManager use crate::statement::Statement; use crate::transaction::{CallOptions, QueryOptions}; use crate::transaction_ro::{BatchReadOnlyTransaction, ReadOnlyTransaction}; -use crate::transaction_rw::{commit, CommitOptions, ReadWriteTransaction}; +use crate::transaction_rw::{commit, CommitOptions, CommitResult, ReadWriteTransaction}; use crate::value::{Timestamp, TimestampBound}; #[derive(Clone, Default)] @@ -356,7 +356,7 @@ impl Client { /// apply's default replay protection may require an additional RPC. So this /// method may be appropriate for latency sensitive and/or high throughput blind /// writing. - pub async fn apply_at_least_once(&self, ms: Vec) -> Result, Error> { + pub async fn apply_at_least_once(&self, ms: Vec) -> Result, Error> { self.apply_at_least_once_with_option(ms, CommitOptions::default()).await } @@ -373,7 +373,7 @@ impl Client { &self, ms: Vec, options: CommitOptions, - ) -> Result, Error> { + ) -> Result, Error> { let ro = TransactionRetrySetting::default(); let mut session = self.get_session().await?; @@ -385,7 +385,7 @@ impl Client { mode: Some(transaction_options::Mode::ReadWrite(transaction_options::ReadWrite::default())), }); match commit(session, ms.clone(), tx, options.clone()).await { - Ok(s) => Ok(s.commit_timestamp.map(|s| s.into())), + Ok(s) => Ok(s.into()), Err(e) => Err((Error::GRPC(e), session)), } }, @@ -410,7 +410,7 @@ impl Client { /// Ok(()) /// } /// ``` - pub async fn apply(&self, ms: Vec) -> Result, Error> { + pub async fn apply(&self, ms: Vec) -> Result, Error> { self.apply_with_option(ms, ReadWriteTransactionOption::default()).await } @@ -419,8 +419,8 @@ impl Client { &self, ms: Vec, options: ReadWriteTransactionOption, - ) -> Result, Error> { - let result: Result<(Option, ()), Error> = self + ) -> Result, Error> { + let result: Result<(Option, ()), Error> = self .read_write_transaction_sync_with_option( |tx| { tx.buffer_write(ms.to_vec()); @@ -481,7 +481,7 @@ impl Client { /// }) /// }).await /// } - pub async fn read_write_transaction<'a, T, E, F>(&self, f: F) -> Result<(Option, T), E> + pub async fn read_write_transaction<'a, T, E, F>(&self, f: F) -> Result<(Option, T), E> where E: TryAs + From + From, F: for<'tx> Fn(&'tx mut ReadWriteTransaction) -> Pin> + Send + 'tx>>, @@ -512,7 +512,7 @@ impl Client { &'a self, f: F, options: ReadWriteTransactionOption, - ) -> Result<(Option, T), E> + ) -> Result<(Option, T), E> where E: TryAs + From + From, F: for<'tx> Fn(&'tx mut ReadWriteTransaction) -> Pin> + Send + 'tx>>, @@ -591,7 +591,7 @@ impl Client { &self, f: impl Fn(&mut ReadWriteTransaction) -> Result, options: ReadWriteTransactionOption, - ) -> Result<(Option, T), E> + ) -> Result<(Option, T), E> where E: TryAs + From + From, { diff --git a/spanner/src/transaction_rw.rs b/spanner/src/transaction_rw.rs index 8681d423..49ee5aeb 100644 --- a/spanner/src/transaction_rw.rs +++ b/spanner/src/transaction_rw.rs @@ -24,6 +24,21 @@ pub struct CommitOptions { pub call_options: CallOptions, } +#[derive(Clone)] +pub struct CommitResult { + pub timestamp: Option, + pub mutation_count: Option, +} + +impl From for CommitResult { + fn from(value: CommitResponse) -> Self { + Self { + timestamp: value.commit_timestamp.map(|v| v.into()), + mutation_count: value.commit_stats.map(|s| s.mutation_count as u64), + } + } +} + /// ReadWriteTransaction provides a locking read-write transaction. /// /// This type of transaction is the only way to write data into Cloud Spanner; @@ -233,7 +248,7 @@ impl ReadWriteTransaction { &mut self, result: Result, options: Option, - ) -> Result<(Option, S), E> + ) -> Result<(Option, S), E> where E: TryAs + From, { @@ -241,7 +256,7 @@ impl ReadWriteTransaction { match result { Ok(success) => { let cr = self.commit(opt).await?; - Ok((cr.commit_timestamp.map(|e| e.into()), success)) + Ok((Some(cr.into()), success)) } Err(err) => { if let Some(status) = err.try_as() { @@ -260,7 +275,7 @@ impl ReadWriteTransaction { &mut self, result: Result, options: Option, - ) -> Result<(Option, T), (E, Option)> + ) -> Result<(Option, T), (E, Option)> where E: TryAs + From, { @@ -268,7 +283,7 @@ impl ReadWriteTransaction { match result { Ok(s) => match self.commit(opt).await { - Ok(c) => Ok((c.commit_timestamp.map(|ts| ts.into()), s)), + Ok(c) => Ok((Some(c.into()), s)), // Retry the transaction using the same session on ABORT error. // Cloud Spanner will create the new transaction with the previous // one's wound-wait priority.