From 0cdd85fb2a55990dead3e3c900b42f43fe607fdf Mon Sep 17 00:00:00 2001 From: Daniel Norberg Date: Thu, 30 May 2024 10:23:08 +0200 Subject: [PATCH 1/4] expose commit stats --- spanner/src/client.rs | 38 +++++++++++++++++++++++++++++++++-- spanner/src/transaction_rw.rs | 34 ++++++++++++++++++++++++++++--- 2 files changed, 67 insertions(+), 5 deletions(-) diff --git a/spanner/src/client.rs b/spanner/src/client.rs index b0f0e9d5..81cf5b6c 100644 --- a/spanner/src/client.rs +++ b/spanner/src/client.rs @@ -17,7 +17,7 @@ use crate::session::{ManagedSession, SessionConfig, SessionError, SessionManager use crate::statement::Statement; use crate::transaction::{CallOptions, QueryOptions}; use crate::transaction_ro::{BatchReadOnlyTransaction, ReadOnlyTransaction}; -use crate::transaction_rw::{commit, CommitOptions, ReadWriteTransaction}; +use crate::transaction_rw::{commit, CommitOptions, CommitResult, ReadWriteTransaction}; use crate::value::{Timestamp, TimestampBound}; #[derive(Clone, Default)] @@ -513,6 +513,38 @@ impl Client { f: F, options: ReadWriteTransactionOption, ) -> Result<(Option, T), E> + where + E: TryAs + From + From, + F: for<'tx> Fn(&'tx mut ReadWriteTransaction) -> Pin> + Send + 'tx>>, + { + self.read_write_transaction_with_stats(f, options) + .await + .map(|(r, v)| (r.and_then(|r| r.timestamp), v)) + } + + /// ReadWriteTransaction executes a read-write transaction, with retries as + /// necessary. + /// + /// The function f will be called one or more times. It must not maintain + /// any state between calls. + /// + /// If the transaction cannot be committed or if f returns an ABORTED error, + /// ReadWriteTransaction will call f again. It will continue to call f until the + /// transaction can be committed or the Context times out or is cancelled. If f + /// returns an error other than ABORTED, ReadWriteTransaction will abort the + /// transaction and return the error. + /// + /// To limit the number of retries, set a deadline on the Context rather than + /// using a fixed limit on the number of attempts. ReadWriteTransaction will + /// retry as needed until that deadline is met. + /// + /// See for + /// more details. + pub async fn read_write_transaction_with_stats<'a, T, E, F>( + &'a self, + f: F, + options: ReadWriteTransactionOption, + ) -> Result<(Option, T), E> where E: TryAs + From + From, F: for<'tx> Fn(&'tx mut ReadWriteTransaction) -> Pin> + Send + 'tx>>, @@ -606,7 +638,9 @@ impl Client { |session| async { let mut tx = self.create_read_write_transaction::(session, bo.clone()).await?; let result = f(&mut tx); - tx.finish(result, Some(co.clone())).await + tx.finish(result, Some(co.clone())) + .await + .map(|(r, v)| (r.and_then(|r| r.timestamp), v)) }, session, ) diff --git a/spanner/src/transaction_rw.rs b/spanner/src/transaction_rw.rs index 8681d423..6c9e816e 100644 --- a/spanner/src/transaction_rw.rs +++ b/spanner/src/transaction_rw.rs @@ -24,6 +24,21 @@ pub struct CommitOptions { pub call_options: CallOptions, } +#[derive(Clone)] +pub struct CommitResult { + pub timestamp: Option, + pub mutation_count: Option, +} + +impl From for CommitResult { + fn from(value: CommitResponse) -> Self { + Self { + timestamp: value.commit_timestamp.map(|v| v.into()), + mutation_count: value.commit_stats.map(|s| s.mutation_count as u64), + } + } +} + /// ReadWriteTransaction provides a locking read-write transaction. /// /// This type of transaction is the only way to write data into Cloud Spanner; @@ -234,6 +249,19 @@ impl ReadWriteTransaction { result: Result, options: Option, ) -> Result<(Option, S), E> + where + E: TryAs + From, + { + self.end_with_stats(result, options) + .await + .map(|(r, v)| (r.and_then(|r| r.timestamp), v)) + } + + pub async fn end_with_stats( + &mut self, + result: Result, + options: Option, + ) -> Result<(Option, S), E> where E: TryAs + From, { @@ -241,7 +269,7 @@ impl ReadWriteTransaction { match result { Ok(success) => { let cr = self.commit(opt).await?; - Ok((cr.commit_timestamp.map(|e| e.into()), success)) + Ok((Some(cr.into()), success)) } Err(err) => { if let Some(status) = err.try_as() { @@ -260,7 +288,7 @@ impl ReadWriteTransaction { &mut self, result: Result, options: Option, - ) -> Result<(Option, T), (E, Option)> + ) -> Result<(Option, T), (E, Option)> where E: TryAs + From, { @@ -268,7 +296,7 @@ impl ReadWriteTransaction { match result { Ok(s) => match self.commit(opt).await { - Ok(c) => Ok((c.commit_timestamp.map(|ts| ts.into()), s)), + Ok(c) => Ok((Some(c.into()), s)), // Retry the transaction using the same session on ABORT error. // Cloud Spanner will create the new transaction with the previous // one's wound-wait priority. From 57a984b719bc04e1c30a3ebd506a5fe5c34d40ee Mon Sep 17 00:00:00 2001 From: Daniel Norberg Date: Fri, 31 Jan 2025 20:56:43 +0100 Subject: [PATCH 2/4] let read_write_transaction{,_with_option} return CommitResult --- spanner/src/client.rs | 34 +--------------------------------- spanner/src/transaction_rw.rs | 13 ------------- 2 files changed, 1 insertion(+), 46 deletions(-) diff --git a/spanner/src/client.rs b/spanner/src/client.rs index 81cf5b6c..b9d1df73 100644 --- a/spanner/src/client.rs +++ b/spanner/src/client.rs @@ -481,7 +481,7 @@ impl Client { /// }) /// }).await /// } - pub async fn read_write_transaction<'a, T, E, F>(&self, f: F) -> Result<(Option, T), E> + pub async fn read_write_transaction<'a, T, E, F>(&self, f: F) -> Result<(Option, T), E> where E: TryAs + From + From, F: for<'tx> Fn(&'tx mut ReadWriteTransaction) -> Pin> + Send + 'tx>>, @@ -512,38 +512,6 @@ impl Client { &'a self, f: F, options: ReadWriteTransactionOption, - ) -> Result<(Option, T), E> - where - E: TryAs + From + From, - F: for<'tx> Fn(&'tx mut ReadWriteTransaction) -> Pin> + Send + 'tx>>, - { - self.read_write_transaction_with_stats(f, options) - .await - .map(|(r, v)| (r.and_then(|r| r.timestamp), v)) - } - - /// ReadWriteTransaction executes a read-write transaction, with retries as - /// necessary. - /// - /// The function f will be called one or more times. It must not maintain - /// any state between calls. - /// - /// If the transaction cannot be committed or if f returns an ABORTED error, - /// ReadWriteTransaction will call f again. It will continue to call f until the - /// transaction can be committed or the Context times out or is cancelled. If f - /// returns an error other than ABORTED, ReadWriteTransaction will abort the - /// transaction and return the error. - /// - /// To limit the number of retries, set a deadline on the Context rather than - /// using a fixed limit on the number of attempts. ReadWriteTransaction will - /// retry as needed until that deadline is met. - /// - /// See for - /// more details. - pub async fn read_write_transaction_with_stats<'a, T, E, F>( - &'a self, - f: F, - options: ReadWriteTransactionOption, ) -> Result<(Option, T), E> where E: TryAs + From + From, diff --git a/spanner/src/transaction_rw.rs b/spanner/src/transaction_rw.rs index 6c9e816e..49ee5aeb 100644 --- a/spanner/src/transaction_rw.rs +++ b/spanner/src/transaction_rw.rs @@ -248,19 +248,6 @@ impl ReadWriteTransaction { &mut self, result: Result, options: Option, - ) -> Result<(Option, S), E> - where - E: TryAs + From, - { - self.end_with_stats(result, options) - .await - .map(|(r, v)| (r.and_then(|r| r.timestamp), v)) - } - - pub async fn end_with_stats( - &mut self, - result: Result, - options: Option, ) -> Result<(Option, S), E> where E: TryAs + From, From b5e6e11840e4dcec27a65ecf26c084c9a57cd078 Mon Sep 17 00:00:00 2001 From: Daniel Norberg Date: Fri, 31 Jan 2025 20:59:15 +0100 Subject: [PATCH 3/4] also make apply{,_with_option} return CommitResult --- spanner/src/client.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/spanner/src/client.rs b/spanner/src/client.rs index b9d1df73..0997161e 100644 --- a/spanner/src/client.rs +++ b/spanner/src/client.rs @@ -410,7 +410,7 @@ impl Client { /// Ok(()) /// } /// ``` - pub async fn apply(&self, ms: Vec) -> Result, Error> { + pub async fn apply(&self, ms: Vec) -> Result, Error> { self.apply_with_option(ms, ReadWriteTransactionOption::default()).await } @@ -419,8 +419,8 @@ impl Client { &self, ms: Vec, options: ReadWriteTransactionOption, - ) -> Result, Error> { - let result: Result<(Option, ()), Error> = self + ) -> Result, Error> { + let result: Result<(Option, ()), Error> = self .read_write_transaction_sync_with_option( |tx| { tx.buffer_write(ms.to_vec()); @@ -591,7 +591,7 @@ impl Client { &self, f: impl Fn(&mut ReadWriteTransaction) -> Result, options: ReadWriteTransactionOption, - ) -> Result<(Option, T), E> + ) -> Result<(Option, T), E> where E: TryAs + From + From, { @@ -606,9 +606,7 @@ impl Client { |session| async { let mut tx = self.create_read_write_transaction::(session, bo.clone()).await?; let result = f(&mut tx); - tx.finish(result, Some(co.clone())) - .await - .map(|(r, v)| (r.and_then(|r| r.timestamp), v)) + tx.finish(result, Some(co.clone())).await }, session, ) From 51d3ea9a74f7e0868531973d6601b1f2e0b2746d Mon Sep 17 00:00:00 2001 From: Daniel Norberg Date: Fri, 31 Jan 2025 21:00:46 +0100 Subject: [PATCH 4/4] also apply_at_least_once{,_with_option} --- spanner/src/client.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/spanner/src/client.rs b/spanner/src/client.rs index 0997161e..cc2e08c0 100644 --- a/spanner/src/client.rs +++ b/spanner/src/client.rs @@ -356,7 +356,7 @@ impl Client { /// apply's default replay protection may require an additional RPC. So this /// method may be appropriate for latency sensitive and/or high throughput blind /// writing. - pub async fn apply_at_least_once(&self, ms: Vec) -> Result, Error> { + pub async fn apply_at_least_once(&self, ms: Vec) -> Result, Error> { self.apply_at_least_once_with_option(ms, CommitOptions::default()).await } @@ -373,7 +373,7 @@ impl Client { &self, ms: Vec, options: CommitOptions, - ) -> Result, Error> { + ) -> Result, Error> { let ro = TransactionRetrySetting::default(); let mut session = self.get_session().await?; @@ -385,7 +385,7 @@ impl Client { mode: Some(transaction_options::Mode::ReadWrite(transaction_options::ReadWrite::default())), }); match commit(session, ms.clone(), tx, options.clone()).await { - Ok(s) => Ok(s.commit_timestamp.map(|s| s.into())), + Ok(s) => Ok(s.into()), Err(e) => Err((Error::GRPC(e), session)), } },