diff --git a/src/insert.rs b/src/insert.rs index 42208a3..b52035b 100644 --- a/src/insert.rs +++ b/src/insert.rs @@ -37,6 +37,7 @@ const_assert!(BUFFER_SIZE.is_power_of_two()); // to use the whole buffer's capac pub struct Insert { state: InsertState, buffer: BytesMut, + written_bytes: usize, #[cfg(feature = "lz4")] compression: Compression, send_timeout: Option, @@ -137,6 +138,7 @@ impl Insert { sql, }, buffer: BytesMut::with_capacity(BUFFER_SIZE), + written_bytes: 0, #[cfg(feature = "lz4")] compression: client.compression, send_timeout: None, @@ -222,7 +224,7 @@ impl Insert { } #[inline(always)] - pub(crate) fn do_write(&mut self, row: &T) -> Result + pub(crate) fn do_write(&mut self, row: &T) -> Result<()> where T: Serialize, { @@ -240,7 +242,8 @@ impl Insert { self.abort(); } - result.and(Ok(written)) + self.written_bytes += written; + result.and(Ok(())) } /// Ends `INSERT`, the server starts processing the data. @@ -256,6 +259,12 @@ impl Insert { self.state.terminated(); self.wait_handle().await } + + /// Returns the number of serialized bytes that have been written because of + /// write operations. + pub fn written_bytes(&self) -> usize { + self.written_bytes + } async fn send_chunk(&mut self) -> Result<()> { debug_assert!(matches!(self.state, InsertState::Active { .. })); diff --git a/src/inserter.rs b/src/inserter.rs index 7597409..8d791c9 100644 --- a/src/inserter.rs +++ b/src/inserter.rs @@ -224,8 +224,8 @@ where } match self.insert.as_mut().unwrap().do_write(row) { - Ok(bytes) => { - self.pending.bytes += bytes as u64; + Ok(()) => { + self.pending.bytes = self.insert.as_ref().unwrap().written_bytes() as u64; self.pending.rows += 1; if !self.in_transaction {