Skip to content

Commit

Permalink
Merge pull request #300 from meilisearch/parametric-send-rotxn
Browse files Browse the repository at this point in the history
Parametric TLS for read transactions
  • Loading branch information
Kerollmops authored Dec 20, 2024
2 parents b7cb164 + 66af90d commit f76bfc6
Show file tree
Hide file tree
Showing 14 changed files with 438 additions and 274 deletions.
15 changes: 0 additions & 15 deletions heed/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,6 @@ url = "2.5.4"
default = ["serde", "serde-bincode", "serde-json"]
serde = ["bitflags/serde", "dep:serde"]

# The #MDB_NOTLS flag is automatically set on Env opening,
# RoTxn and RoCursors implements the Send trait. This allows the
# user to move RoTxns and RoCursors between threads as read transactions
# will no more use thread local storage and will tie reader locktable
# slots to #MDB_txn objects instead of to threads.
#
# According to the LMDB documentation, when this feature is not enabled:
# A thread can only use one transaction at a time, plus any child
# transactions. Each transaction belongs to one thread. [...]
# The #MDB_NOTLS flag changes this for read-only transactions.
#
# And a #MDB_BAD_RSLOT error will be thrown when multiple read
# transactions exists on the same thread
read-txn-no-tls = []

# Enable the serde en/decoders for bincode, serde_json, or rmp_serde
serde-bincode = ["heed-types/serde-bincode"]
serde-json = ["heed-types/serde-json"]
Expand Down
2 changes: 1 addition & 1 deletion heed/src/cookbook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@
//! }
//!
//! impl<'t> ImmutableMap<'t> {
//! fn from_db(rtxn: &'t RoTxn, db: Database<Str, Str>) -> heed::Result<Self> {
//! fn from_db<T>(rtxn: &'t RoTxn<T>, db: Database<Str, Str>) -> heed::Result<Self> {
//! let mut map = HashMap::new();
//! for result in db.iter(rtxn)? {
//! let (k, v) = result?;
Expand Down
16 changes: 8 additions & 8 deletions heed/src/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ use crate::mdb::error::mdb_result;
use crate::mdb::ffi;
use crate::*;

pub struct RoCursor<'txn> {
pub struct RoCursor<'txn, T> {
cursor: *mut ffi::MDB_cursor,
_marker: marker::PhantomData<&'txn ()>,
_marker: marker::PhantomData<&'txn T>,
}

impl<'txn> RoCursor<'txn> {
// TODO should I ask for a &mut RoTxn, here?
pub(crate) fn new(txn: &'txn RoTxn, dbi: ffi::MDB_dbi) -> Result<RoCursor<'txn>> {
impl<'txn, T> RoCursor<'txn, T> {
// TODO should I ask for a &mut RoTxn<'_, T>, here?
pub(crate) fn new(txn: &'txn RoTxn<'_, T>, dbi: ffi::MDB_dbi) -> Result<RoCursor<'txn, T>> {
let mut cursor: *mut ffi::MDB_cursor = ptr::null_mut();
let mut txn = txn.txn.unwrap();
unsafe { mdb_result(ffi::mdb_cursor_open(txn.as_mut(), dbi, &mut cursor))? }
Expand Down Expand Up @@ -237,14 +237,14 @@ impl<'txn> RoCursor<'txn> {
}
}

impl Drop for RoCursor<'_> {
impl<T> Drop for RoCursor<'_, T> {
fn drop(&mut self) {
unsafe { ffi::mdb_cursor_close(self.cursor) }
}
}

pub struct RwCursor<'txn> {
cursor: RoCursor<'txn>,
cursor: RoCursor<'txn, WithoutTls>,
}

impl<'txn> RwCursor<'txn> {
Expand Down Expand Up @@ -404,7 +404,7 @@ impl<'txn> RwCursor<'txn> {
}

impl<'txn> Deref for RwCursor<'txn> {
type Target = RoCursor<'txn>;
type Target = RoCursor<'txn, WithoutTls>;

fn deref(&self) -> &Self::Target {
&self.cursor
Expand Down
87 changes: 46 additions & 41 deletions heed/src/databases/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,16 @@ use crate::*;
/// # Ok(()) }
/// ```
#[derive(Debug)]
pub struct DatabaseOpenOptions<'e, 'n, KC, DC, C = DefaultComparator> {
env: &'e Env,
pub struct DatabaseOpenOptions<'e, 'n, T, KC, DC, C = DefaultComparator> {
env: &'e Env<T>,
types: marker::PhantomData<(KC, DC, C)>,
name: Option<&'n str>,
flags: AllDatabaseFlags,
}

impl<'e> DatabaseOpenOptions<'e, 'static, Unspecified, Unspecified> {
impl<'e, T> DatabaseOpenOptions<'e, 'static, T, Unspecified, Unspecified> {
/// Create an options struct to open/create a database with specific flags.
pub fn new(env: &'e Env) -> Self {
pub fn new(env: &'e Env<T>) -> Self {
DatabaseOpenOptions {
env,
types: Default::default(),
Expand All @@ -74,23 +74,24 @@ impl<'e> DatabaseOpenOptions<'e, 'static, Unspecified, Unspecified> {
}
}

impl<'e, 'n, KC, DC, C> DatabaseOpenOptions<'e, 'n, KC, DC, C> {
impl<'e, 'n, T, KC, DC, C> DatabaseOpenOptions<'e, 'n, T, KC, DC, C> {
/// Change the type of the database.
///
/// The default types are [`Unspecified`] and require a call to [`Database::remap_types`]
/// to use the [`Database`].
pub fn types<NKC, NDC>(self) -> DatabaseOpenOptions<'e, 'n, NKC, NDC> {
pub fn types<NKC, NDC>(self) -> DatabaseOpenOptions<'e, 'n, T, NKC, NDC> {
DatabaseOpenOptions {
env: self.env,
types: Default::default(),
name: self.name,
flags: self.flags,
}
}

/// Change the customized key compare function of the database.
///
/// By default no customized compare function will be set when opening a database.
pub fn key_comparator<NC>(self) -> DatabaseOpenOptions<'e, 'n, KC, DC, NC> {
pub fn key_comparator<NC>(self) -> DatabaseOpenOptions<'e, 'n, T, KC, DC, NC> {
DatabaseOpenOptions {
env: self.env,
types: Default::default(),
Expand Down Expand Up @@ -131,7 +132,7 @@ impl<'e, 'n, KC, DC, C> DatabaseOpenOptions<'e, 'n, KC, DC, C> {
///
/// If not done, you might raise `Io(Os { code: 22, kind: InvalidInput, message: "Invalid argument" })`
/// known as `EINVAL`.
pub fn open(&self, rtxn: &RoTxn) -> Result<Option<Database<KC, DC, C>>>
pub fn open(&self, rtxn: &RoTxn<T>) -> Result<Option<Database<KC, DC, C>>>
where
KC: 'static,
DC: 'static,
Expand Down Expand Up @@ -171,13 +172,13 @@ impl<'e, 'n, KC, DC, C> DatabaseOpenOptions<'e, 'n, KC, DC, C> {
}
}

impl<KC, DC, C> Clone for DatabaseOpenOptions<'_, '_, KC, DC, C> {
impl<T, KC, DC, C> Clone for DatabaseOpenOptions<'_, '_, T, KC, DC, C> {
fn clone(&self) -> Self {
*self
}
}

impl<KC, DC, C> Copy for DatabaseOpenOptions<'_, '_, KC, DC, C> {}
impl<T, KC, DC, C> Copy for DatabaseOpenOptions<'_, '_, T, KC, DC, C> {}

/// A typed database that accepts only the types it was created with.
///
Expand Down Expand Up @@ -339,7 +340,11 @@ impl<KC, DC, C> Database<KC, DC, C> {
/// wtxn.commit()?;
/// # Ok(()) }
/// ```
pub fn get<'a, 'txn>(&self, txn: &'txn RoTxn, key: &'a KC::EItem) -> Result<Option<DC::DItem>>
pub fn get<'a, 'txn, T>(
&self,
txn: &'txn RoTxn<T>,
key: &'a KC::EItem,
) -> Result<Option<DC::DItem>>
where
KC: BytesEncode<'a>,
DC: BytesDecode<'txn>,
Expand Down Expand Up @@ -423,11 +428,11 @@ impl<KC, DC, C> Database<KC, DC, C> {
/// wtxn.commit()?;
/// # Ok(()) }
/// ```
pub fn get_duplicates<'a, 'txn>(
pub fn get_duplicates<'a, 'txn, T>(
&self,
txn: &'txn RoTxn,
txn: &'txn RoTxn<T>,
key: &'a KC::EItem,
) -> Result<Option<RoIter<'txn, KC, DC, MoveOnCurrentKeyDuplicates>>>
) -> Result<Option<RoIter<'txn, T, KC, DC, MoveOnCurrentKeyDuplicates>>>
where
KC: BytesEncode<'a>,
{
Expand Down Expand Up @@ -486,9 +491,9 @@ impl<KC, DC, C> Database<KC, DC, C> {
/// wtxn.commit()?;
/// # Ok(()) }
/// ```
pub fn get_lower_than<'a, 'txn>(
pub fn get_lower_than<'a, 'txn, T>(
&self,
txn: &'txn RoTxn,
txn: &'txn RoTxn<T>,
key: &'a KC::EItem,
) -> Result<Option<(KC::DItem, DC::DItem)>>
where
Expand Down Expand Up @@ -555,9 +560,9 @@ impl<KC, DC, C> Database<KC, DC, C> {
/// wtxn.commit()?;
/// # Ok(()) }
/// ```
pub fn get_lower_than_or_equal_to<'a, 'txn>(
pub fn get_lower_than_or_equal_to<'a, 'txn, T>(
&self,
txn: &'txn RoTxn,
txn: &'txn RoTxn<T>,
key: &'a KC::EItem,
) -> Result<Option<(KC::DItem, DC::DItem)>>
where
Expand Down Expand Up @@ -628,9 +633,9 @@ impl<KC, DC, C> Database<KC, DC, C> {
/// wtxn.commit()?;
/// # Ok(()) }
/// ```
pub fn get_greater_than<'a, 'txn>(
pub fn get_greater_than<'a, 'txn, T>(
&self,
txn: &'txn RoTxn,
txn: &'txn RoTxn<T>,
key: &'a KC::EItem,
) -> Result<Option<(KC::DItem, DC::DItem)>>
where
Expand Down Expand Up @@ -700,9 +705,9 @@ impl<KC, DC, C> Database<KC, DC, C> {
/// wtxn.commit()?;
/// # Ok(()) }
/// ```
pub fn get_greater_than_or_equal_to<'a, 'txn>(
pub fn get_greater_than_or_equal_to<'a, 'txn, T>(
&self,
txn: &'txn RoTxn,
txn: &'txn RoTxn<T>,
key: &'a KC::EItem,
) -> Result<Option<(KC::DItem, DC::DItem)>>
where
Expand Down Expand Up @@ -759,7 +764,7 @@ impl<KC, DC, C> Database<KC, DC, C> {
/// wtxn.commit()?;
/// # Ok(()) }
/// ```
pub fn first<'txn>(&self, txn: &'txn RoTxn) -> Result<Option<(KC::DItem, DC::DItem)>>
pub fn first<'txn, T>(&self, txn: &'txn RoTxn<T>) -> Result<Option<(KC::DItem, DC::DItem)>>
where
KC: BytesDecode<'txn>,
DC: BytesDecode<'txn>,
Expand Down Expand Up @@ -813,7 +818,7 @@ impl<KC, DC, C> Database<KC, DC, C> {
/// wtxn.commit()?;
/// # Ok(()) }
/// ```
pub fn last<'txn>(&self, txn: &'txn RoTxn) -> Result<Option<(KC::DItem, DC::DItem)>>
pub fn last<'txn, T>(&self, txn: &'txn RoTxn<T>) -> Result<Option<(KC::DItem, DC::DItem)>>
where
KC: BytesDecode<'txn>,
DC: BytesDecode<'txn>,
Expand Down Expand Up @@ -870,7 +875,7 @@ impl<KC, DC, C> Database<KC, DC, C> {
/// wtxn.commit()?;
/// # Ok(()) }
/// ```
pub fn len(&self, txn: &RoTxn) -> Result<u64> {
pub fn len<T>(&self, txn: &RoTxn<T>) -> Result<u64> {
self.stat(txn).map(|stat| stat.entries as u64)
}

Expand Down Expand Up @@ -913,7 +918,7 @@ impl<KC, DC, C> Database<KC, DC, C> {
/// wtxn.commit()?;
/// # Ok(()) }
/// ```
pub fn is_empty(&self, txn: &RoTxn) -> Result<bool> {
pub fn is_empty<T>(&self, txn: &RoTxn<T>) -> Result<bool> {
self.len(txn).map(|l| l == 0)
}

Expand Down Expand Up @@ -955,7 +960,7 @@ impl<KC, DC, C> Database<KC, DC, C> {
/// wtxn.commit()?;
/// # Ok(()) }
/// ```
pub fn stat(&self, txn: &RoTxn) -> Result<DatabaseStat> {
pub fn stat<T>(&self, txn: &RoTxn<T>) -> Result<DatabaseStat> {
assert_eq_env_db_txn!(self, txn);

let mut db_stat = mem::MaybeUninit::uninit();
Expand Down Expand Up @@ -1019,7 +1024,7 @@ impl<KC, DC, C> Database<KC, DC, C> {
/// wtxn.commit()?;
/// # Ok(()) }
/// ```
pub fn iter<'txn>(&self, txn: &'txn RoTxn) -> Result<RoIter<'txn, KC, DC>> {
pub fn iter<'txn, T>(&self, txn: &'txn RoTxn<T>) -> Result<RoIter<'txn, T, KC, DC>> {
assert_eq_env_db_txn!(self, txn);
RoCursor::new(txn, self.dbi).map(|cursor| RoIter::new(cursor))
}
Expand Down Expand Up @@ -1120,7 +1125,7 @@ impl<KC, DC, C> Database<KC, DC, C> {
/// wtxn.commit()?;
/// # Ok(()) }
/// ```
pub fn rev_iter<'txn>(&self, txn: &'txn RoTxn) -> Result<RoRevIter<'txn, KC, DC>> {
pub fn rev_iter<'txn, T>(&self, txn: &'txn RoTxn<T>) -> Result<RoRevIter<'txn, T, KC, DC>> {
assert_eq_env_db_txn!(self, txn);

RoCursor::new(txn, self.dbi).map(|cursor| RoRevIter::new(cursor))
Expand Down Expand Up @@ -1226,11 +1231,11 @@ impl<KC, DC, C> Database<KC, DC, C> {
/// wtxn.commit()?;
/// # Ok(()) }
/// ```
pub fn range<'a, 'txn, R>(
pub fn range<'a, 'txn, R, T>(
&self,
txn: &'txn RoTxn,
txn: &'txn RoTxn<T>,
range: &'a R,
) -> Result<RoRange<'txn, KC, DC, C>>
) -> Result<RoRange<'txn, T, KC, DC, C>>
where
KC: BytesEncode<'a>,
R: RangeBounds<KC::EItem>,
Expand Down Expand Up @@ -1398,11 +1403,11 @@ impl<KC, DC, C> Database<KC, DC, C> {
/// wtxn.commit()?;
/// # Ok(()) }
/// ```
pub fn rev_range<'a, 'txn, R>(
pub fn rev_range<'a, 'txn, R, T>(
&self,
txn: &'txn RoTxn,
txn: &'txn RoTxn<T>,
range: &'a R,
) -> Result<RoRevRange<'txn, KC, DC, C>>
) -> Result<RoRevRange<'txn, T, KC, DC, C>>
where
KC: BytesEncode<'a>,
R: RangeBounds<KC::EItem>,
Expand Down Expand Up @@ -1572,11 +1577,11 @@ impl<KC, DC, C> Database<KC, DC, C> {
/// wtxn.commit()?;
/// # Ok(()) }
/// ```
pub fn prefix_iter<'a, 'txn>(
pub fn prefix_iter<'a, 'txn, T>(
&self,
txn: &'txn RoTxn,
txn: &'txn RoTxn<T>,
prefix: &'a KC::EItem,
) -> Result<RoPrefix<'txn, KC, DC, C>>
) -> Result<RoPrefix<'txn, T, KC, DC, C>>
where
KC: BytesEncode<'a>,
C: LexicographicComparator,
Expand Down Expand Up @@ -1704,11 +1709,11 @@ impl<KC, DC, C> Database<KC, DC, C> {
/// wtxn.commit()?;
/// # Ok(()) }
/// ```
pub fn rev_prefix_iter<'a, 'txn>(
pub fn rev_prefix_iter<'a, 'txn, T>(
&self,
txn: &'txn RoTxn,
txn: &'txn RoTxn<'_, T>,
prefix: &'a KC::EItem,
) -> Result<RoRevPrefix<'txn, KC, DC, C>>
) -> Result<RoRevPrefix<'txn, T, KC, DC, C>>
where
KC: BytesEncode<'a>,
C: LexicographicComparator,
Expand Down
Loading

0 comments on commit f76bfc6

Please sign in to comment.