Skip to content

Commit

Permalink
DmoOperation wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
Flowneee committed Sep 4, 2023
1 parent 3915948 commit b2675d0
Show file tree
Hide file tree
Showing 12 changed files with 232 additions and 30 deletions.
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).


## [0.0.8] - 2023-09-04
## [Unreleased] - 2023-09-04
### Added
- Data-manipulation operations (insert, update, upsert, replace, delete) now return `DmoResponse` with row, returned by operation (#7).
- Data-manipulation operations (insert, update, upsert, replace, delete) now return `DmoResponse` with row, returned by operation ([#7](https://github.com/Flowneee/tarantool-rs/issues/7));
- `TupleElement` type, which allow to write type into `Tuple` without having `serde::Serialize` implemented for it;
- `DmoOperation` for constructing operations in `update` and `upsert` calls.

### Changed
- `TupleResponse` renamed to `CallResponse`.
Expand Down
19 changes: 11 additions & 8 deletions examples/schema.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use rmpv::Value;
use tarantool_rs::{Connection, Executor, ExecutorExt, IteratorType};
use tarantool_rs::{Connection, DmoOperation, Executor, ExecutorExt, IteratorType};
use tracing::info;

#[tokio::main]
Expand All @@ -18,17 +17,21 @@ async fn main() -> Result<(), anyhow::Error> {
.select::<(i64, String), _>(None, None, Some(IteratorType::All), ())
.await?
);
info!("UPSERT: {:?}", space.upsert((0, "Name"), ("=",)).await?);
info!(
"UPSERT: {:?}",
space
.upsert(
(0, "Name"),
DmoOperation::string_splice("name", 2, 2, "!!".into()),
)
.await?
);
info!(
"UPDATE: {:?}",
space
.update(
(0,),
(rmpv::Value::Array(vec![
"=".into(),
1.into(),
"Second".into()
]),),
(DmoOperation::string_splice("name", 2, 2, "!!".into()),)
)
.await?
);
Expand Down
4 changes: 4 additions & 0 deletions src/client/dmo/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub use self::{operation::DmoOperation, response::DmoResponse};

mod operation;
mod response;
173 changes: 173 additions & 0 deletions src/client/dmo/operation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
use std::io::Write;

use rmpv::ValueRef;

use crate::{errors::EncodingError, Tuple, TupleElement};

/// Key of index in operation.
///
/// Can be string, unsigned or signed number ([docs](https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/update/#box-space-update)).
#[derive(Debug)]
pub struct DmoOperationFieldKey<'a>(ValueRef<'a>);

impl<'a> From<&'a str> for DmoOperationFieldKey<'a> {
fn from(value: &'a str) -> Self {
Self(value.into())
}
}

impl<'a> From<u32> for DmoOperationFieldKey<'a> {
fn from(value: u32) -> Self {
Self(value.into())
}
}

impl<'a> From<i32> for DmoOperationFieldKey<'a> {
fn from(value: i32) -> Self {
Self(value.into())
}
}

// TODO: docs and doctests
/// Operation in `upsert` or `update` request.
#[derive(Debug)]
pub struct DmoOperation<'a> {
// TODO: id support
// TODO: negative id support
operation: &'static str,
field_name: ValueRef<'a>,
args: Args<'a>,
}

impl<'a> DmoOperation<'a> {
fn new(
operation: &'static str,
field_name: impl Into<DmoOperationFieldKey<'a>>,
args: Args<'a>,
) -> Self {
Self {
operation,
field_name: field_name.into().0,
args,
}
}

pub fn add(
field_name: impl Into<DmoOperationFieldKey<'a>>,
value: impl Into<ValueRef<'a>>,
) -> Self {
Self::new(ops::ADD, field_name, Args::One(value.into()))
}

pub fn sub(
field_name: impl Into<DmoOperationFieldKey<'a>>,
value: impl Into<ValueRef<'a>>,
) -> Self {
Self::new(ops::SUB, field_name, Args::One(value.into()))
}

pub fn and(
field_name: impl Into<DmoOperationFieldKey<'a>>,
value: impl Into<ValueRef<'a>>,
) -> Self {
Self::new(ops::AND, field_name, Args::One(value.into()))
}

pub fn or(
field_name: impl Into<DmoOperationFieldKey<'a>>,
value: impl Into<ValueRef<'a>>,
) -> Self {
Self::new(ops::OR, field_name, Args::One(value.into()))
}

pub fn xor(
field_name: impl Into<DmoOperationFieldKey<'a>>,
value: impl Into<ValueRef<'a>>,
) -> Self {
Self::new(ops::XOR, field_name, Args::One(value.into()))
}

pub fn string_splice(
field_name: impl Into<DmoOperationFieldKey<'a>>,
from: usize,
len: usize,
value: &'a str,
) -> Self {
Self::new(
ops::STRING_SPLICE,
field_name,
Args::Three(from.into(), len.into(), value.into()),
)
}

pub fn insert(
field_name: impl Into<DmoOperationFieldKey<'a>>,
value: impl Into<ValueRef<'a>>,
) -> Self {
Self::new(ops::INSERT, field_name, Args::One(value.into()))
}

pub fn assign(
field_name: impl Into<DmoOperationFieldKey<'a>>,
value: impl Into<ValueRef<'a>>,
) -> Self {
Self::new(ops::ASSIGN, field_name, Args::One(value.into()))
}

pub fn delete(field_name: impl Into<DmoOperationFieldKey<'a>>) -> Self {
Self::new(ops::DEL, field_name, Args::None)
}
}

impl<'a> TupleElement for DmoOperation<'a> {
fn encode_into_writer<W: Write>(&self, mut buf: W) -> Result<(), EncodingError> {
let arr_len = 2 + match self.args {
Args::None => 0,
Args::One(_) => 1,
Args::Three(_, _, _) => 3,
};
rmp::encode::write_array_len(&mut buf, arr_len)?;
rmp::encode::write_str(&mut buf, self.operation)?;
rmpv::encode::write_value_ref(&mut buf, &self.field_name)?;
match &self.args {
Args::None => {}
Args::One(x) => {
rmpv::encode::write_value_ref(&mut buf, x)?;
}
Args::Three(x, y, z) => {
rmpv::encode::write_value_ref(&mut buf, x)?;
rmpv::encode::write_value_ref(&mut buf, y)?;
rmpv::encode::write_value_ref(&mut buf, z)?;
}
}
Ok(())
}
}

/// Implementation for allow single operation to be used as argument for `update` and `upsert`.
impl<'a> Tuple for DmoOperation<'a> {
fn encode_into_writer<W: Write>(&self, mut buf: W) -> Result<(), EncodingError> {
rmp::encode::write_array_len(&mut buf, 1)?;
TupleElement::encode_into_writer(self, &mut buf)?;
Ok(())
}
}

#[derive(Debug)]
enum Args<'a> {
None,
One(rmpv::ValueRef<'a>),
Three(rmpv::ValueRef<'a>, rmpv::ValueRef<'a>, rmpv::ValueRef<'a>),
}

mod ops {
pub(super) const ADD: &str = "+";
pub(super) const SUB: &str = "-";
pub(super) const AND: &str = "&";
pub(super) const OR: &str = "|";
pub(super) const XOR: &str = "^";
pub(super) const STRING_SPLICE: &str = ":";
pub(super) const INSERT: &str = "|";
pub(super) const DEL: &str = "#";
pub(super) const ASSIGN: &str = "=";
}
File renamed without changes.
6 changes: 2 additions & 4 deletions src/client/executor_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use futures::{future::BoxFuture, FutureExt};
use rmpv::Value;
use serde::de::DeserializeOwned;

use super::{prepared_sql_statement::PreparedSqlStatement, Executor};
use crate::{
codec::request::{
Call, Delete, EncodedRequest, Eval, Execute, Insert, Ping, Prepare, Replace, Request,
Expand All @@ -12,7 +11,7 @@ use crate::{
schema::{SchemaEntityKey, Space},
tuple::Tuple,
utils::extract_and_deserialize_iproto_data,
CallResponse, DmoResponse, IteratorType, Result, SqlResponse,
CallResponse, DmoResponse, Executor, IteratorType, PreparedSqlStatement, Result, SqlResponse,
};

/// Helper trait around [`Executor`] trait, which allows to send specific requests
Expand Down Expand Up @@ -93,6 +92,7 @@ pub trait ExecutorExt: Executor {
))
}

// TODO: docs and doctests for DmoOperation
/// Update tuple.
async fn update<K, O>(
&self,
Expand Down Expand Up @@ -122,7 +122,6 @@ pub trait ExecutorExt: Executor {
))
}

// TODO: structured tuple
/// Insert a tuple into a space. If a tuple with the same primary key already exists,
/// replaces the existing tuple with a new one.
async fn replace<T>(&self, space_id: u32, tuple: T) -> Result<DmoResponse>
Expand All @@ -134,7 +133,6 @@ pub trait ExecutorExt: Executor {
))
}

// TODO: structured tuple
/// Delete a tuple identified by the primary key.
async fn delete<T>(&self, space_id: u32, index_id: u32, keys: T) -> Result<DmoResponse>
where
Expand Down
11 changes: 5 additions & 6 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
pub use self::{
call_response::CallResponse,
connection::Connection,
dmo_response::DmoResponse,
dmo::{DmoOperation, DmoResponse},
executor::Executor,
executor_ext::ExecutorExt,
prepared_sql_statement::PreparedSqlStatement,
sql_response::SqlResponse,
sql::{PreparedSqlStatement, SqlResponse},
stream::Stream,
transaction::{Transaction, TransactionBuilder},
};

// TODO: either reimport everything from schema or add dmo and sql mods
pub mod schema;

mod call_response;
mod connection;
mod dmo_response;
mod dmo;
mod executor;
mod executor_ext;
mod prepared_sql_statement;
mod sql_response;
mod sql;
mod stream;
mod transaction;

Expand Down
4 changes: 4 additions & 0 deletions src/client/sql/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub use self::{prepared_statement::PreparedSqlStatement, response::SqlResponse};

mod prepared_statement;
mod response;
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ pub use self::{
client::*,
codec::consts::{IteratorType, TransactionIsolationLevel},
errors::Error,
tuple::Tuple,
tuple::{Tuple, TupleElement},
};

pub mod errors;
Expand Down
37 changes: 28 additions & 9 deletions src/tuple.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
use std::io::Write;

use serde::Serialize;

use crate::errors::EncodingError;

pub trait TupleElement {
fn encode_into_writer<W: Write>(&self, buf: W) -> Result<(), EncodingError>;
}

impl<T: serde::Serialize> TupleElement for T {
fn encode_into_writer<W: Write>(&self, mut buf: W) -> Result<(), EncodingError> {
rmp_serde::encode::write(&mut buf, self)?;
Ok(())
}
}

/// Trait, describing type, which can be encoded into
/// MessagePack tuple.
///
Expand All @@ -13,11 +22,21 @@ pub trait Tuple {
fn encode_into_writer<W: Write>(&self, buf: W) -> Result<(), EncodingError>;
}

impl<T: Serialize> Tuple for Vec<T> {
impl<T: TupleElement> Tuple for Vec<T> {
fn encode_into_writer<W: Write>(&self, mut buf: W) -> Result<(), EncodingError> {
rmp::encode::write_array_len(&mut buf, self.len() as u32)?;
for x in self.iter() {
x.encode_into_writer(&mut buf)?;
}
Ok(())
}
}

impl<T: TupleElement> Tuple for &[T] {
fn encode_into_writer<W: Write>(&self, mut buf: W) -> Result<(), EncodingError> {
rmp::encode::write_array_len(&mut buf, self.len() as u32)?;
for x in self.iter() {
rmp_serde::encode::write(&mut buf, &x)?;
x.encode_into_writer(&mut buf)?;
}
Ok(())
}
Expand All @@ -39,26 +58,26 @@ impl<T: Tuple> Tuple for &T {
// `= self` idea is from https://stackoverflow.com/a/56700760/5033855
macro_rules! impl_tuple_for_tuple {
( $param:tt ) => {
impl<$param : serde::Serialize> Tuple for ($param,) {
impl<$param : $crate::TupleElement> Tuple for ($param,) {
fn encode_into_writer<W: Write>(&self, mut buf: W) -> Result<(), EncodingError> {
rmp::encode::write_array_len(&mut buf, 1)?;
rmp_serde::encode::write(&mut buf, &self.0)?;
self.0.encode_into_writer(&mut buf)?;
Ok(())
}
}
};
( $param:tt, $($params:tt),* ) => {
impl<$param : serde::Serialize, $($params : serde::Serialize,)*> Tuple for ($param, $($params,)*) {
impl<$param : $crate::TupleElement , $($params : $crate::TupleElement,)*> Tuple for ($param, $($params,)*) {
#[allow(non_snake_case)]
fn encode_into_writer<W: Write>(&self, mut buf: W) -> Result<(), EncodingError> {
rmp::encode::write_array_len(&mut buf, count_tts!($param $($params)+) as u32)?;

let ($param, $($params,)+) = self;

rmp_serde::encode::write(&mut buf, $param)?;
$param.encode_into_writer(&mut buf)?;

$(
rmp_serde::encode::write(&mut buf, $params)?;
$params.encode_into_writer(&mut buf)?;
)+

Ok(())
Expand Down

0 comments on commit b2675d0

Please sign in to comment.