From 434d3bf8ae41c74975f2eec132443a431422abee Mon Sep 17 00:00:00 2001 From: Arindam Das Date: Sun, 24 Dec 2023 19:57:23 +0530 Subject: [PATCH 01/30] chore: remove redundant doc links --- src/common/mod.rs | 2 +- src/storage/commit_log/segmented_log/index.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/common/mod.rs b/src/common/mod.rs index 226363658..2cbfd5f2b 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -18,7 +18,7 @@ pub mod stream { } pub mod ref_ops { - //! Module providing utilities for [`Deref`](Deref) and [`AsRef`] interop. + //! Module providing utilities for [`Deref`] and [`AsRef`] interop. use std::ops::Deref; diff --git a/src/storage/commit_log/segmented_log/index.rs b/src/storage/commit_log/segmented_log/index.rs index fcab4aae4..6884361a1 100644 --- a/src/storage/commit_log/segmented_log/index.rs +++ b/src/storage/commit_log/segmented_log/index.rs @@ -9,7 +9,7 @@ use std::{ ops::Deref, }; -/// Extension used by backing files for [`Index`](Index) instances. +/// Extension used by backing files for [`Index`] instances. pub const INDEX_FILE_EXTENSION: &str = "index"; /// Number of bytes required for storing the base marker. From e3061d59af04d2e3e1973537b60a04e95af0e647 Mon Sep 17 00:00:00 2001 From: Arindam Das Date: Sun, 24 Dec 2023 20:51:35 +0530 Subject: [PATCH 02/30] doc: adds documentation for top-level modules under crate::storage --- src/common/mod.rs | 6 ++++++ src/lib.rs | 8 ++++++++ src/storage/commit_log/mod.rs | 14 ++++++++++++++ src/storage/common.rs | 2 ++ src/storage/impls/mod.rs | 2 ++ src/storage/mod.rs | 1 + 6 files changed, 33 insertions(+) diff --git a/src/common/mod.rs b/src/common/mod.rs index 2cbfd5f2b..d0d271821 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -43,22 +43,28 @@ pub mod ref_ops { } pub mod cache { + //! Module providing [`Cache`] implementation adapters for using with `laminarmq` + pub use generational_cache::{ cache::lru_cache::LRUCacheBlockArenaEntry, prelude::{ AllocBTreeMap, AllocVec, Cache, Eviction, LRUCache, LRUCacheError, Link, Lookup, Map, }, }; + use std::{fmt::Display, marker::PhantomData}; + /// A [`LRUCache`] using an [`AllocVec`] and [`AllocBTreeMap`]. pub type AllocLRUCache = LRUCache>, K, T, AllocBTreeMap>; + /// A [`Cache`] that does a no-op on every cache operation and returns an error instead. #[derive(Debug, Default)] pub struct NoOpCache { _phantom_data: PhantomData<(K, V)>, } + /// Error type used by [`NoOpCache`]. #[derive(Debug, Default)] pub struct UnsupportedOp; diff --git a/src/lib.rs b/src/lib.rs index 3203baa3f..4800aa889 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,4 +6,12 @@ pub mod storage; pub mod prelude { //! Prelude module for [`laminarmq`](super) with common exports for convenience. + + pub use crate::storage::{ + commit_log::{ + segmented_log::{index::Index, segment::Segment, SegmentedLog}, + CommitLog, + }, + AsyncConsume, AsyncIndexedRead, AsyncTruncate, Storage, + }; } diff --git a/src/storage/commit_log/mod.rs b/src/storage/commit_log/mod.rs index 93de7138f..2391ad804 100644 --- a/src/storage/commit_log/mod.rs +++ b/src/storage/commit_log/mod.rs @@ -1,10 +1,14 @@ +//! Module providing abstractions for modelling an ordered, persistent sequence of records. + use super::{AsyncConsume, AsyncIndexedRead, AsyncTruncate, Sizable}; +/// The unit of storage in our [`CommitLog`]. pub struct Record { pub metadata: M, pub value: T, } +/// An absrtact, append-only, ordered sequence of [`Record`] instances. #[async_trait::async_trait(?Send)] pub trait CommitLog: AsyncIndexedRead, ReadError = Self::Error> @@ -12,14 +16,24 @@ pub trait CommitLog: + AsyncConsume + Sizable { + /// Error type associated with [`CommitLog`]. type Error: std::error::Error; + /// Appends the given [`Record`] at the end of this [`CommitLog`]. + /// + /// The [`Record`] may contain a stream of byte slices. Implementations are to exhaustively + /// read the stream and append the corresponding byte slices as a single record. + /// + /// Returns the index at which the [`Record`] was appended. async fn append(&mut self, record: Record) -> Result where X: futures_lite::Stream>, X: Unpin + 'async_trait, XBuf: std::ops::Deref; + /// Removes all expired records from this [`CommitLog`]. + /// + /// Returns the number of expired records. async fn remove_expired( &mut self, _expiry_duration: std::time::Duration, diff --git a/src/storage/common.rs b/src/storage/common.rs index 0ad77802c..460360611 100644 --- a/src/storage/common.rs +++ b/src/storage/common.rs @@ -1,3 +1,5 @@ +//! Module providing common utilities to aid commit-log implementations. + use super::AsyncIndexedRead; use futures_core::Stream; use num::{CheckedSub, Unsigned}; diff --git a/src/storage/impls/mod.rs b/src/storage/impls/mod.rs index 574a6e17d..e043e1536 100644 --- a/src/storage/impls/mod.rs +++ b/src/storage/impls/mod.rs @@ -1,3 +1,5 @@ +//! Module providing different storage backend implementations. + #[cfg(target_os = "linux")] pub mod glommio; pub mod in_mem; diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 3e15546f7..b68240f94 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -59,6 +59,7 @@ pub trait AsyncIndexedRead { } } +/// [`AsyncIndexedRead`] with additional APIs for providing exclusive read access to elements. #[async_trait(?Send)] pub trait AsyncIndexedExclusiveRead: AsyncIndexedRead { /// Exclusively reads the value at the given index from this abstraction. From 99d7e220c05ebed30aa8486f418ba71ba5fc6b8b Mon Sep 17 00:00:00 2001 From: Arindam Das Date: Fri, 29 Dec 2023 22:27:09 +0530 Subject: [PATCH 03/30] doc: adds documentation for the commit-log module --- src/storage/commit_log/mod.rs | 8 +- src/storage/commit_log/segmented_log/index.rs | 5 + src/storage/commit_log/segmented_log/mod.rs | 96 +++++++++++++++++++ .../commit_log/segmented_log/segment.rs | 6 ++ src/storage/commit_log/segmented_log/store.rs | 5 + src/storage/common.rs | 6 ++ src/storage/mod.rs | 4 + 7 files changed, 129 insertions(+), 1 deletion(-) diff --git a/src/storage/commit_log/mod.rs b/src/storage/commit_log/mod.rs index 2391ad804..6333e76d2 100644 --- a/src/storage/commit_log/mod.rs +++ b/src/storage/commit_log/mod.rs @@ -1,4 +1,7 @@ //! Module providing abstractions for modelling an ordered, persistent sequence of records. +//! +//! This module forms the basis for storage in our message queue. It provides the different storage +//! components for representing, storing, indexing and retreiving our records (messages). use super::{AsyncConsume, AsyncIndexedRead, AsyncTruncate, Sizable}; @@ -8,7 +11,10 @@ pub struct Record { pub value: T, } -/// An absrtact, append-only, ordered sequence of [`Record`] instances. +/// An abstract, append-only, ordered sequence of [`Record`] instances. +/// +/// This trait acts as a generalized storage mechanism for storing our records. All our +/// message-queue server APIs can be expressed using this trait. #[async_trait::async_trait(?Send)] pub trait CommitLog: AsyncIndexedRead, ReadError = Self::Error> diff --git a/src/storage/commit_log/segmented_log/index.rs b/src/storage/commit_log/segmented_log/index.rs index 6884361a1..ed67d5874 100644 --- a/src/storage/commit_log/segmented_log/index.rs +++ b/src/storage/commit_log/segmented_log/index.rs @@ -1,3 +1,8 @@ +//! Provides components necessary for mapping record indices to store-file positions in segments. +//! +//! This module is used by the `segment`implementation to store mapping from record +//! indices to positions on the `segment-store` file. + use super::{ super::super::{AsyncConsume, AsyncIndexedRead, AsyncTruncate, Sizable, Storage}, store::common::RecordHeader, diff --git a/src/storage/commit_log/segmented_log/mod.rs b/src/storage/commit_log/segmented_log/mod.rs index 957adbedf..d4b05604a 100644 --- a/src/storage/commit_log/segmented_log/mod.rs +++ b/src/storage/commit_log/segmented_log/mod.rs @@ -1,3 +1,99 @@ +//! A [`CommitLog`] implemented as a collection of segment files. +//! +//! The segmented-log data structure for storing was originally described in the [Apache +//! Kafka](https://www.microsoft.com/en-us/research/wp-content/uploads/2017/09/Kafka.pdf) paper. +//! +//!

+//! segmented_log +//!

+//! +//! A segmented log is a collection of read segments and a single write segment. Each "segment" is +//! backed by a storage file on disk called "store". +//! +//! The log is: +//! - "immutable", since only "append", "read" and "truncate" operations are allowed. It is not possible +//! to update or delete records from the middle of the log. +//! - "segmented", since it is composed of segments, where each segment services records from a +//! particular range of offsets. +//! +//! All writes go to the write segment. A new record is written at the `highest_index` +//! in the write segment. When we max out the capacity of the write segment, we close the write segment +//! and reopen it as a read segment. The re-opened segment is added to the list of read segments. A new +//! write segment is then created with `base_index` equal to the `highest_index` of the previous write +//! segment. +//! +//! When reading from a particular index, we linearly check which segment contains the given read +//! segment. If a segment capable of servicing a read from the given index is found, we read from that +//! segment. If no such segment is found among the read segments, we default to the write segment. The +//! following scenarios may occur when reading from the write segment in this case: +//! - The write segment has synced the messages including the message at the given offset. In this case +//! the record is read successfully and returned. +//! - The write segment hasn't synced the data at the given offset. In this case the read fails with a +//! segment I/O error. +//! - If the offset is out of bounds of even the write segment, we return an "out of bounds" error. +//! +//! #### `laminarmq` specific enhancements to the `segmented_log` data structure +//! Originally, the `segmented_log` addressed individual records with "offsets" which were continous +//! accross all the segments. While the conventional `segmented_log` data structure is quite performant +//! for a `commit_log` implementation, it still requires the following properties to hold true for the +//! record being appended: +//! - We have the entire record in memory +//! - We know the record bytes' length and record bytes' checksum before the record is appended +//! +//! It's not possible to know this information when the record bytes are read from an asynchronous +//! stream of bytes. Without the enhancements, we would have to concatenate intermediate byte buffers to +//! a vector. This would not only incur more allocations, but also slow down our system. +//! +//! Hence, to accommodate this use case, we introduced an intermediate indexing layer to our design. +//! +//! ```text +//! //! Index and position invariants across segmented_log +//! +//! // segmented_log index invariants +//! segmented_log.lowest_index = segmented_log.read_segments[0].lowest_index +//! segmented_log.highest_index = segmented_log.write_segment.highest_index +//! +//! // record position invariants in store +//! records[i+1].position = records[i].position + records[i].record_header.length +//! +//! // segment index invariants in segmented_log +//! segments[i+1].base_index = segments[i].highest_index +//! = segments[i].index[index.len-1].index + 1 +//! ``` +//!

+//! Fig: Data organisation for persisting the segmented_log data structure on a +//! *nix file system. +//!

+//! +//! In the design, instead of referring to records with a raw offset, we refer to them with indices. +//! The index in each segment translates the record indices to raw file position in the segment store +//! file. +//! +//! Now, the store append operation accepts an asynchronous stream of bytes instead of a contiguously +//! laid out slice of bytes. We use this operation to write the record bytes, and at the time of writing +//! the record bytes, we calculate the record bytes' length and checksum. Once we are done writing the +//! record bytes to the store, we write it's corresponding `record_header` (containing the checksum and +//! length), position and index as an `index_record` in the segment index. +//! +//! This provides two quality of life enhancements: +//! - Allow asynchronous streaming writes, without having to concatenate intermediate byte buffers +//! - Records are accessed much more easily with easy to use indices +//! +//! Now, to prevent a malicious user from overloading our storage capacity and memory with a maliciously +//! crafted request which infinitely loops over some data and sends it to our server, we have provided +//! an optional `append_threshold` parameter to all append operations. When provided, it prevents +//! streaming append writes to write more bytes than the provided `append_threshold`. +//! +//! At the segment level, this requires us to keep a segment overflow capacity. All segment append +//! operations now use `segment_capacity - segment.size + segment_overflow_capacity` as the +//! `append_threshold` value. A good `segment_overflow_capacity` value could be `segment_capacity / 2`. +//! +//! ## Why is this nested as a submodule? +//! +//! There can be other implementations of a [`CommitLog`] which have a completely different +//! structure. So we make "segmented-log" a submodule to repreent it as one of the possivle +//! implementations. + pub mod index; pub mod segment; pub mod store; diff --git a/src/storage/commit_log/segmented_log/segment.rs b/src/storage/commit_log/segmented_log/segment.rs index d9348d442..393ba2a52 100644 --- a/src/storage/commit_log/segmented_log/segment.rs +++ b/src/storage/commit_log/segmented_log/segment.rs @@ -1,3 +1,9 @@ +//! Presents the `segment` units that a `segmented-log` is made out of. +//! +//! Each `segment` contains an `index` for addressing reocrds and a `store` for backing storage. +//! The `index` stores a mapping from record indices to positions on the `store` file. The `store` +//! file stores the actual records on the underlying storage. + use super::{ super::super::{ super::common::{serde_compat::SerializationProvider, split::SplitAt}, diff --git a/src/storage/commit_log/segmented_log/store.rs b/src/storage/commit_log/segmented_log/store.rs index a626ca0e1..e558f65b8 100644 --- a/src/storage/commit_log/segmented_log/store.rs +++ b/src/storage/commit_log/segmented_log/store.rs @@ -1,3 +1,8 @@ +//! Present the backing storge components for a `segment` in a `segmented-log`. +//! +//! This module is responsible for ultimately persisting the records in our `segmented-log` to some +//! form of [`Storage`]. + use self::common::RecordHeader; use super::super::super::{AsyncConsume, AsyncTruncate, Sizable, Storage}; use async_trait::async_trait; diff --git a/src/storage/common.rs b/src/storage/common.rs index 460360611..8a0bd0dd9 100644 --- a/src/storage/common.rs +++ b/src/storage/common.rs @@ -1,10 +1,16 @@ //! Module providing common utilities to aid commit-log implementations. +//! +//! The methods present in this module are used for testing the consistency of our different +//! storage-related trait and data-structure implementations. use super::AsyncIndexedRead; use futures_core::Stream; use num::{CheckedSub, Unsigned}; use std::{cmp, ops::RangeBounds}; +/// Constrains the given [`RangeBounds`] to the given low and high values. +/// +/// Returns the end points of the constrained range. pub fn index_bounds_for_range(index_bounds: RB, lo_min: Idx, hi_max: Idx) -> (Idx, Idx) where RB: RangeBounds, diff --git a/src/storage/mod.rs b/src/storage/mod.rs index b68240f94..7b9fa4132 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -1,4 +1,8 @@ //! Module providing abstractions to store records. +//! +//! This module provides traits to abstract over different storage behaviours. These traits allow +//! us to implement our storage data-structures in a portable fashion - completely ddcoupled from +//! the underlying device, async runtime or other file APIs. use super::common::stream::StreamUnexpectedLength; use async_trait::async_trait; From 3cdae60642d5373d30ea1fc0defb43f119209b54 Mon Sep 17 00:00:00 2001 From: Arindam Das Date: Sat, 13 Jan 2024 22:04:49 +0530 Subject: [PATCH 04/30] doc: adds documentation for the index module --- src/storage/commit_log/segmented_log/index.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/storage/commit_log/segmented_log/index.rs b/src/storage/commit_log/segmented_log/index.rs index ed67d5874..239967165 100644 --- a/src/storage/commit_log/segmented_log/index.rs +++ b/src/storage/commit_log/segmented_log/index.rs @@ -1,7 +1,7 @@ //! Provides components necessary for mapping record indices to store-file positions in segments. //! -//! This module is used by the `segment`implementation to store mapping from record -//! indices to positions on the `segment-store` file. +//! This module is used by the `segment` implementation to store mappings from record-indices to +//! positions on the `segment-store` file. use super::{ super::super::{AsyncConsume, AsyncIndexedRead, AsyncTruncate, Sizable, Storage}, @@ -14,7 +14,7 @@ use std::{ ops::Deref, }; -/// Extension used by backing files for [`Index`] instances. +/// Extension used by backing files for [Index] instances. pub const INDEX_FILE_EXTENSION: &str = "index"; /// Number of bytes required for storing the base marker. @@ -26,6 +26,8 @@ pub const INDEX_RECORD_LENGTH: usize = 16; /// Lowest underlying storage position pub const INDEX_BASE_POSITION: u64 = 0; +/// Unit of storage on an Index. Stores position, length and checksum metadata for a single +/// [`Record`](super::Record) persisted in the `segment` [`Store`](super::store::Store). #[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] pub struct IndexRecord { pub checksum: u64, @@ -42,6 +44,7 @@ impl From for RecordHeader { } } +/// Marker to persist the starting `base_index` of an [`Index`] in the index [`Storage`]. pub struct IndexBaseMarker { pub base_index: u64, _padding: u64, From 0d2f3fbfc0eee5de82998a002cde7bc3d4b365fe Mon Sep 17 00:00:00 2001 From: Arindam Das Date: Wed, 7 Feb 2024 14:01:05 +0530 Subject: [PATCH 05/30] style: rewrites SegmentedLog::new to reduce Segment::new calls --- README.md | 86 +++++++++++++-------- src/storage/commit_log/segmented_log/mod.rs | 33 ++++---- 2 files changed, 69 insertions(+), 50 deletions(-) diff --git a/README.md b/README.md index da9c89ef7..c46f0d12f 100644 --- a/README.md +++ b/README.md @@ -47,7 +47,7 @@ order they need. - [x] Locally persistent queue of records - [ ] Single node, multi threaded, eBPF based request to thread routed message queue - [ ] Service discovery with - [SWIM](https://www.cs.cornell.edu/projects/Quicksilver/public_pdfs/SWIM.pdf). + [SWIM](https://www.cs.cornell.edu/projects/Quicksilver/public_pdfs/SWIM.pdf). - [ ] Replication and consensus of replicated records with [Raft](https://raft.github.io/raft.pdf). ## Examples @@ -76,8 +76,9 @@ partition_id_1 = (topic_id_0, partition_idx_1) partition_id_2 = (topic_id_1, partition_idx_0) ``` ->The exact numerical ids don't have any pattern with respect to partition_id and topic_id; there can ->be multiple topics, each of which can have multiple partitions (identified by partition_idx). + +> The exact numerical ids don't have any pattern with respect to partition_id and topic_id; there can +> be multiple topics, each of which can have multiple partitions (identified by partition_idx). … alternatively: @@ -98,6 +99,7 @@ partition_id_2 = (topic_id_1, partition_idx_0) └── ...other nodes ``` + ```text [L] := leader; [F] := follower ``` @@ -119,6 +121,7 @@ have chosen to maintain a flat representation of topic partitions. We present an commit-log API at the partition level. Users may hence do the following: + - Directly interact with our message queue at the partition level - Use client side load balancing between topic partitions @@ -169,20 +172,21 @@ using Rendezvous hashing. From the Wikipedia [article](https://en.wikipedia.org/wiki/Rendezvous_hashing): ->Rendezvous or highest random weight (HRW) hashing is an algorithm that allows clients to achieve ->distributed agreement on a set of _k_ options out of a possible set of _n_ options. A typical ->application is when clients need to agree on which sites (or proxies) objects are assigned to. +> Rendezvous or highest random weight (HRW) hashing is an algorithm that allows clients to achieve +> distributed agreement on a set of _k_ options out of a possible set of _n_ options. A typical +> application is when clients need to agree on which sites (or proxies) objects are assigned to. In our case, we use rendezvous hashing to determine the subset of nodes to use for placing the replicas of a partition. For some hashing function `H`, some weight function `f(w, hash)` and partition id `P_x`, we proceed as follows: + - For every node `N_i` in the cluster with a weight `w_i`, we compute `R_i = f(w_i, H(concat(P_x, N_i)))` - We rank all nodes `N_i` belonging to the set of nodes `N` with respect to their rank value `R_i`. - For some replication factor `k`, we select the top `k` nodes to place the `k` replicas of the -partition with id `P_x` + partition with id `P_x` (We assume `k <= |N|`; where `|N|` is the number of nodes and `k` is the number of replicas) @@ -198,14 +202,14 @@ current leader of the replica set. ### Supported execution models `laminarmq` supports two execution models: + - General async execution model used by various async runtimes in the Rust ecosystem (e.g `tokio`) - Thread per core execution model In the thread-per-core execution model individual processor cores are limited to single threads. This model encourages design that minimizes inter-thread contention and locks, thereby improving tail latencies in software services. Read: [The Impact of Thread per Core Architecture on -Application Tail Latency.]( -https://helda.helsinki.fi//bitstream/handle/10138/313642/tpc_ancs19.pdf?sequence=1) +Application Tail Latency.](https://helda.helsinki.fi//bitstream/handle/10138/313642/tpc_ancs19.pdf?sequence=1) In the thread per core execution model, we have to leverage application level partitioning such that each individual thread is responsible for a subset of requests and/or responsibilities. We also have @@ -231,9 +235,10 @@ model.

In our cluster, we have two kinds of requests: -- __membership requests__: used by the gossip style service discovery system for maintaining cluster -membership. -- __partition requests__: used to interact with `laminarmq` topic partitions. + +- **membership requests**: used by the gossip style service discovery system for maintaining cluster + membership. +- **partition requests**: used to interact with `laminarmq` topic partitions. We use an [eBPF](https://ebpf.io/what-is-ebpf/) XDP filter to classify request packets at the socket layer into membership request packets and partition request packets. Next we use eBPF to route @@ -243,24 +248,26 @@ subsystem in that node. The partition request packets are left to flow as is. Next we have an "HTTP server", which parses the incoming partition request packets from the original socket into valid `partition::*` requests. For every `partition::*` request, the HTTP server spawns a future to handle it. This request handler future does the following: + - Create a new channel `(tx, rx)` for the request. - Send the parsed partition request along with send end of the channel `(partition::*, tx)` to the -"Request Router" over the request router's receiving channel. + "Request Router" over the request router's receiving channel. - Await on the recv. end of the channel created by this future for the response. `res = rx.await` - When we receive the response from this future's channel, we serialize it and respond back to the -socket we had received the packets from. + socket we had received the packets from. Next we have a "Request Router / Partition manager" responsible for routing various requests to the partition serving futures. The request router unit receives both `membership::*` requests from the membership subsystem and `partition::*` requests received from the "HTTP server" request handler futures (also called request poller futures from here on since they poll for the response from the channel recv. `rx` end). The request router unit routes requests as follows: + - `membership::*` requests are broadcast to all the partition serving futures - `(partition::*_request(partition_id_x, …), tx)` tuples are routed to their destination partitions -using the `partition_id`. + using the `partition_id`. - `(partition::create(partition_id_x, …), tx)` tuples are handled by the request router/ partition -manager itself. For this, the request router / partition manager creates a new partition serving -future, allocates the required storage units or it and sends and appropriate response on `tx`. + manager itself. For this, the request router / partition manager creates a new partition serving + future, allocates the required storage units or it and sends and appropriate response on `tx`. Finally, the individual partition server futures receive both `membership::*` and `(partition::*, tx)` requests as they come to our node and routed. They handle the requests as necessary and send a @@ -338,7 +345,7 @@ The partition request handler handles the different requests as follows: replicas, we initial the leadership election process with each replica as a candidate. - `membership::leave(j)`: remove {node #j} from priority queue and Raft group if present. If `{node - #j}` was not present in the Raft group no further action is necessary. If it was present in the +#j}` was not present in the Raft group no further action is necessary. If it was present in the Raft group, `pop()` another member from the priority queue, add it to the Raft group and proceed similarly as in the case of `membership::join(j)` @@ -352,6 +359,7 @@ The partition request handler handles the different requests as follows: When a node goes down the appropriate `membership::leave(i)` message (where `i` is the node that went down) is sent to all the nodes in the cluster. The partition replica controllers in each node handle the membership request accordingly. In effect: + - For every leader partition in that node: - if there are no other follower replicas in other nodes in it's Raft group, that partition goes down. @@ -369,6 +377,7 @@ In our system, we use different Raft groups for different data buckets (replica different Raft groups for different data buckets on the same node as MultiRaft. Read more here: + - - @@ -377,6 +386,7 @@ Every partition controller is backed by a `segmented_log` for persisting records ### Persistence mechanism #### `segmented_log`: Persistent data structure for storing records in a partition + The segmented-log data structure for storing was originally described in the [Apache Kafka](https://www.microsoft.com/en-us/research/wp-content/uploads/2017/09/Kafka.pdf) paper. @@ -392,6 +402,7 @@ A segmented log is a collection of read segments and a single write segment. Eac backed by a storage file on disk called "store". The log is: + - "immutable", since only "append", "read" and "truncate" operations are allowed. It is not possible to update or delete records from the middle of the log. - "segmented", since it is composed of segments, where each segment services records from a @@ -407,6 +418,7 @@ When reading from a particular offset, we linearly check which segment contains segment. If a segment capable of servicing a read from the given offset is found, we read from that segment. If no such segment is found among the read segments, we default to the write segment. The following scenarios may occur when reading from the write segment in this case: + - The write segment has synced the messages including the message at the given offset. In this case the record is read successfully and returned. - The write segment hasn't synced the data at the given offset. In this case the read fails with a @@ -414,9 +426,11 @@ following scenarios may occur when reading from the write segment in this case: - If the offset is out of bounds of even the write segment, we return an "out of bounds" error. #### `laminarmq` specific enhancements to the `segmented_log` data structure + While the conventional `segmented_log` data structure is quite performant for a `commit_log` implementation, it still requires the following properties to hold true for the record being appended: + - We have the entire record in memory - We know the record bytes' length and record bytes' checksum before the record is appended @@ -443,6 +457,7 @@ records[i+1].position = records[i].position + records[i].record_header.length // segment index invariants in segmented_log segments[i+1].base_index = segments[i].highest_index = segments[i].index[index.len-1].index + 1 ``` +

Fig: Data organisation for persisting the segmented_log data structure on a *nix file system. @@ -459,6 +474,7 @@ record bytes to the store, we write it's corresponding `record_header` (containi length), position and index as an `index_record` in the segment index. This provides two quality of life enhancements: + - Allow asynchronous streaming writes, without having to concatenate intermediate byte buffers - Records are accessed much more easily with easy to use indices @@ -486,15 +502,16 @@ This execution model is based on the executor, reactor, waker abstractions used runtimes. We don't have to specifically care about how and where a particular future is executed. The data flow in this execution model is as follows: + - A HTTP server future parses HTTP requests from the request socket - For every HTTP request it creates a new future to handle it - The HTTP handler future sends the request and a response channel tx to the request router via a channel. -It also awaits on the response rx end. + It also awaits on the response rx end. - The request router future maintains a map of partition_id to designated request channel tx for each -partition controller future. + partition controller future. - For every partition request received it forwards the request on the appropriate partition request -channel tx. If a `partition::create(...)` request is received it creates a new partition controller -future. + channel tx. If a `partition::create(...)` request is received it creates a new partition controller + future. - The partition controller future send back the response to the provided response channel tx. - The response poller future received it and responds back with a serialized response to the socket. @@ -523,6 +540,7 @@ than the one that runs tasks for persisting data to the disk. We re-use the same constructs that we use in the general async runtime execution model. The only difference being, we explicitly care about in which task queue a class of future's tasks are executed. In our case, we have the following 4 task queues: + - Request router task queue - HTTP server request parser task queue - Partition replica controller task queue @@ -573,6 +591,7 @@ ulimit -l ``` If the `memlock` resource limit (rlimit) is lesser than 512 KiB, you can increase it as follows: + ```sh sudo vi /etc/security/limits.conf * hard memlock 512 @@ -582,15 +601,18 @@ sudo vi /etc/security/limits.conf To make the new limits effective, you need to log in to the machine again. Verify whether the limits have been reflected with `ulimit` as described above. ->(On old WSL versions, you might need to spawn a login shell every time for the limits to be ->reflected: ->```sh ->su ${USER} -l ->``` ->The limits persist once inside the login shell. This is not necessary on the latest WSL2 version as ->of 22.12.2022) +> (On old WSL versions, you might need to spawn a login shell every time for the limits to be +> reflected: +> +> ```sh +> su ${USER} -l +> ``` +> +> The limits persist once inside the login shell. This is not necessary on the latest WSL2 version as +> of 22.12.2022) Finally, clone the repository and run the tests: + ```sh git clone https://github.com/arindas/laminarmq.git cd laminarmq/ @@ -611,13 +633,15 @@ cargo bench The complete latest benchmark reports are available at . All benchmarks in the reports have been run on a machine (HP Pavilion x360 Convertible 14-ba0xx) with: + - 4 core CPU (Intel(R) Core(TM) i5-7200U CPU @ 2.50GHz) - 8GB RAM (SK Hynix HMA81GS6AFR8N-UH DDR4 2133 MT/s) - 128GB SSD storage (SanDisk SD8SN8U-128G-1006) ### Selected Benchmark Reports - __Note__: We use the following names for different record sizes: +**Note**: We use the following names for different record sizes: + @@ -652,7 +676,7 @@ All benchmarks in the reports have been run on a machine (HP Pavilion x360 Conve - +
size_name
blog 11760 bytes (11.76 KB)4x linked_in_post4X linked_in_post
diff --git a/src/storage/commit_log/segmented_log/mod.rs b/src/storage/commit_log/segmented_log/mod.rs index d4b05604a..36b71d3d8 100644 --- a/src/storage/commit_log/segmented_log/mod.rs +++ b/src/storage/commit_log/segmented_log/mod.rs @@ -239,7 +239,7 @@ where config: Config, mut segment_storage_provider: SSP, ) -> Result> { - let mut segment_base_indices = segment_storage_provider + let segment_base_indices = segment_storage_provider .obtain_base_indices_of_stored_segments() .await .map_err(SegmentedLogError::StorageError)?; @@ -251,36 +251,31 @@ where _ => Ok(()), }?; - let write_segment_base_index = segment_base_indices.pop().unwrap_or(config.initial_index); + let (segment_base_indices, write_segment_base_index) = + match segment_base_indices.last().cloned() { + Some(last_index) => (segment_base_indices, last_index), + None => (vec![config.initial_index], config.initial_index), + }; - let read_segment_base_indices = segment_base_indices; + let mut segments = Vec::with_capacity(segment_base_indices.len()); - let mut read_segments = Vec::>::with_capacity( - read_segment_base_indices.len(), - ); + for segment_base_index in segment_base_indices { + let cache_index_records_flag = (segment_base_index == write_segment_base_index) + || config.num_index_cached_read_segments.is_none(); - for segment_base_index in read_segment_base_indices { - read_segments.push( + segments.push( Segment::with_segment_storage_provider_config_base_index_and_cache_index_records_flag( &mut segment_storage_provider, config.segment_config, segment_base_index, - config.num_index_cached_read_segments.is_none(), + cache_index_records_flag , ) .await .map_err(SegmentedLogError::SegmentError)?, ); } - let write_segment = - Segment::with_segment_storage_provider_config_base_index_and_cache_index_records_flag( - &mut segment_storage_provider, - config.segment_config, - write_segment_base_index, - true, // write segment is always cached - ) - .await - .map_err(SegmentedLogError::SegmentError)?; + let write_segment = segments.pop().ok_or(SegmentedLogError::WriteSegmentLost)?; let cache = match config.num_index_cached_read_segments { Some(cache_capacity) => { @@ -298,7 +293,7 @@ where Ok(Self { write_segment: Some(write_segment), - read_segments, + read_segments: segments, config, segments_with_cached_index: cache, segment_storage_provider, From 0af1217fe3d351363988de7cd96d34e9813ccdd6 Mon Sep 17 00:00:00 2001 From: Arindam Das Date: Wed, 7 Feb 2024 23:12:11 +0530 Subject: [PATCH 06/30] chore: updates generational-cache dep version --- Cargo.lock | 4 ++-- Cargo.toml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0190e7bfc..a9d653f9e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -652,9 +652,9 @@ dependencies = [ [[package]] name = "generational-cache" -version = "0.2.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60c96758dc31f4a38cf0b64f2fab050f56fa450fdefe923b6d6de147430733e4" +checksum = "2d816f5c40761e00c6fc202c12a231f740a9b8b48d885688494a9fb910bc86d2" [[package]] name = "getrandom" diff --git a/Cargo.toml b/Cargo.toml index da850a7fd..55475cc64 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,7 +35,7 @@ tower-service = "0.3.2" num = "0.4.0" futures-time = "3.0.0" async-io = "1.13.0" -generational-cache = "0.2.0" +generational-cache = "0.2.1" [lib] name = "laminarmq" From ed3faf2f70fa34fe5b733c4263a9152efe9f2c96 Mon Sep 17 00:00:00 2001 From: Arindam Das Date: Thu, 8 Feb 2024 11:53:19 +0530 Subject: [PATCH 07/30] chore: updates generational-cache dep version --- Cargo.lock | 4 ++-- Cargo.toml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a9d653f9e..08c8f6c1a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -652,9 +652,9 @@ dependencies = [ [[package]] name = "generational-cache" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d816f5c40761e00c6fc202c12a231f740a9b8b48d885688494a9fb910bc86d2" +checksum = "8f81129066835be752a1470e4b4c182f92881c11635b292c6b18594539984843" [[package]] name = "getrandom" diff --git a/Cargo.toml b/Cargo.toml index 55475cc64..bd02dd5dc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,7 +35,7 @@ tower-service = "0.3.2" num = "0.4.0" futures-time = "3.0.0" async-io = "1.13.0" -generational-cache = "0.2.1" +generational-cache = "0.2.2" [lib] name = "laminarmq" From 0084f4aff74fd828f148f91765c09875c2031650 Mon Sep 17 00:00:00 2001 From: Arindam Das Date: Thu, 8 Feb 2024 19:16:12 +0530 Subject: [PATCH 08/30] style: conforms common::split module structure to follow repository standards --- src/common/split.rs | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/src/common/split.rs b/src/common/split.rs index c9e918fd5..119b12dd8 100644 --- a/src/common/split.rs +++ b/src/common/split.rs @@ -19,19 +19,23 @@ impl SplitAt for Vec { } } -#[cfg(target_os = "linux")] -pub mod glommio_impl { - //! Module containing [`super::SplitAt`] implementations for [`glommio`] specific types. - use glommio::io::ReadResult; +pub mod impls { + //! Module providing [`SplitAt`](super::SplitAt) implementations. - use super::SplitAt; + #[cfg(target_os = "linux")] + pub mod glommio { + //! Module containing [`SplitAt`] implementations for [`glommio`](https://docs.rs/glommio) specific types. + use glommio::io::ReadResult; - impl SplitAt for ReadResult { - fn split_at(self, at: usize) -> Option<(Self, Self)> { - Some(( - ReadResult::slice(&self, 0, at)?, - ReadResult::slice(&self, at, self.len() - at)?, - )) + use super::super::SplitAt; + + impl SplitAt for ReadResult { + fn split_at(self, at: usize) -> Option<(Self, Self)> { + Some(( + ReadResult::slice(&self, 0, at)?, + ReadResult::slice(&self, at, self.len() - at)?, + )) + } } } } From 1d994b2d263c2e319348c261675c5e50ddb17ee5 Mon Sep 17 00:00:00 2001 From: Arindam Das Date: Thu, 8 Feb 2024 22:18:42 +0530 Subject: [PATCH 09/30] doc: adds documentation for Index struct --- src/storage/commit_log/segmented_log/index.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/storage/commit_log/segmented_log/index.rs b/src/storage/commit_log/segmented_log/index.rs index 239967165..f1fec83f3 100644 --- a/src/storage/commit_log/segmented_log/index.rs +++ b/src/storage/commit_log/segmented_log/index.rs @@ -211,6 +211,20 @@ impl IndexRecord { } } +/// [`Index`] for every [`Segment`](super::Segment) in a [`SegmentedLog`](super::SegmentedLog). +/// +/// [`Index`] stores a mapping from [`Record`](super::Record) logical indices to positions on its +/// parent [`Segment`](super::Segment) instance's underlying [`Store`](super::store::Store). It +/// acts as an index to position translation-table when reading record contents from the underlying +/// [`Store`](super::store::Store) using the record's logical index. +/// +///

+/// segmented_log_segment +///

+///

+/// Fig: Segment diagram showing Index, mapping logical indices +/// toStore positions. +///

pub struct Index { index_records: Option>, base_index: Idx, From b1de935a08eae89be4a489519090f6a30d281bef Mon Sep 17 00:00:00 2001 From: Arindam Das Date: Fri, 9 Feb 2024 14:12:46 +0530 Subject: [PATCH 10/30] doc: updates README and Index documentation --- README.md | 88 ++++++++++--------- src/storage/commit_log/segmented_log/index.rs | 26 +++++- 2 files changed, 71 insertions(+), 43 deletions(-) diff --git a/README.md b/README.md index c46f0d12f..1738b351c 100644 --- a/README.md +++ b/README.md @@ -55,6 +55,12 @@ order they need. Find examples demonstrating different capabilities of `laminarmq` in the [examples branch](https://github.com/arindas/laminarmq/tree/examples). +## Media + +Media associated with the `laminarmq` project. + +- `[BLOG]` [Building Segmented Logs in Rust: From Theory to Production!](https://arindas.github.io/blog/segmented-log-rust/) + ## Design This section describes the internal design of `laminarmq`. @@ -640,47 +646,47 @@ All benchmarks in the reports have been run on a machine (HP Pavilion x360 Conve ### Selected Benchmark Reports -**Note**: We use the following names for different record sizes: - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
size_namesizecomments
tiny12 bytesnone
tweet140 bytesnone
half_k560 bytes≈ 512 bytes
k1120 bytes≈ 1024 bytes (1 KiB)
linked_in_post2940 bytes≤ 3000 bytes (3 KB)
blog11760 bytes (11.76 KB)4X linked_in_post
- -This section presents some selected benchmark reports: +This section presents some selected benchmark reports. + +> **Note**: We use the following names for different record sizes: +> +> +> +> +> +> +> +> +> +> +> +> +> +> +> +> +> +> +> +> +> +> +> +> +> +> +> +> +> +> +> +> +> +> +> +> +> +>
size_namesizecomments
tiny12 bytesnone
tweet140 bytesnone
half_k560 bytes≈ 512 bytes
k1120 bytes≈ 1024 bytes (1 KiB)
linked_in_post2940 bytes≤ 3000 bytes (3 KB)
blog11760 bytes (11.76 KB)4X linked_in_post
#### `commit_log` write benchmark with 1KB messages diff --git a/src/storage/commit_log/segmented_log/index.rs b/src/storage/commit_log/segmented_log/index.rs index f1fec83f3..ce44c1ded 100644 --- a/src/storage/commit_log/segmented_log/index.rs +++ b/src/storage/commit_log/segmented_log/index.rs @@ -1,7 +1,7 @@ //! Provides components necessary for mapping record indices to store-file positions in segments. //! -//! This module is used by the `segment` implementation to store mappings from record-indices to -//! positions on the `segment-store` file. +//! This module provides [`Index`] which maintains a mapping from logical record indices to +//! positions on the [`Store`](super::store::Store) of a [`Segment`](super::Segment). use super::{ super::super::{AsyncConsume, AsyncIndexedRead, AsyncTruncate, Sizable, Storage}, @@ -51,6 +51,7 @@ pub struct IndexBaseMarker { } impl IndexBaseMarker { + /// Creates a new [`IndexBaseMarker`] with the given `base_index`. pub fn new(base_index: u64) -> Self { Self { base_index, @@ -149,6 +150,7 @@ impl SizedRecord for IndexBaseMarker { } } +/// Error type associated with operations on [`Index`]. #[derive(Debug)] pub enum IndexError { StorageError(StorageError), @@ -199,6 +201,7 @@ macro_rules! idx_as_u64 { } impl IndexRecord { + /// Creates a new [`IndexRecord`] with the given position and [`RecordHeader`]. pub fn with_position_and_record_header( position: P, record_header: RecordHeader, @@ -225,6 +228,17 @@ impl IndexRecord { /// Fig: Segment diagram showing Index, mapping logical indices /// toStore positions. ///

+/// +/// [`Index`] also stores checksum and length information for every record with [`RecordHeader`]. +/// This information is used to detect any data corruption on the underlying persistent media when +/// reading from [`Store`](super::store::Store). +/// +/// ### Type parameters +/// - `S` +/// Underlying [`Storage`] implementation for storing [`IndexRecord`] instances +/// - `Idx` +/// Type to use for representing logical indices. (Usually an unsigned integer like u32, u64 +/// usize, etc.) pub struct Index { index_records: Option>, base_index: Idx, @@ -233,10 +247,12 @@ pub struct Index { } impl Index { + /// Maps this [`Index`] to the underlying [`Storage`] implementation instance. pub fn into_storage(self) -> S { self.storage } + /// Obtains the logical index of the first record in this [`Index`]. pub fn base_index(&self) -> &Idx { &self.base_index } @@ -247,6 +263,10 @@ where S: Storage, Idx: Unsigned + FromPrimitive + Copy + Eq, { + /// Returns the estimated number of [`IndexRecord`] instances stored in the given [`Storage`]. + /// + /// This function calculates this estimate by using the [`Storage`] size and the size of a + /// single [`IndexRecord`]. pub fn estimated_index_records_len_in_storage( storage: &S, ) -> Result> { @@ -261,6 +281,8 @@ where Ok(estimated_index_records_len) } + /// Reads and returns the `base_index` of the [`Index`] persisted on the provided [`Storage`] + /// instance by reading the [`IndexBaseMarker`] at [`INDEX_BASE_POSITION`]. pub async fn base_index_from_storage(storage: &S) -> Result> { let index_base_marker = PersistentSizedRecord::::read_at( From b70ba84c6de7950b87422a29ba4c938f7670f2f3 Mon Sep 17 00:00:00 2001 From: Arindam Das Date: Mon, 26 Feb 2024 21:25:20 +0530 Subject: [PATCH 11/30] doc: documents segmented_log::MetaWithIdx --- src/storage/commit_log/segmented_log/mod.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/storage/commit_log/segmented_log/mod.rs b/src/storage/commit_log/segmented_log/mod.rs index 36b71d3d8..3d0ca1e45 100644 --- a/src/storage/commit_log/segmented_log/mod.rs +++ b/src/storage/commit_log/segmented_log/mod.rs @@ -127,9 +127,13 @@ use std::{ time::Duration, }; +/// Represent metadata for records in the [`SegmentedLog`]. #[derive(Debug, Clone, Copy, Serialize, Deserialize)] pub struct MetaWithIdx { + /// Generic metadata for the record as necessary pub metadata: M, + + /// Index of the record within the [`SegmentedLog`]. pub index: Option, } @@ -137,6 +141,10 @@ impl MetaWithIdx where Idx: Eq, { + /// Returns a [`Some`]`(`[`MetaWithIdx`]`)` containing this instance's `metadata` and the + /// provided `anchor_idx` if this indices match or this instance's `index` is `None`. + /// + /// Returns `None` if this instance contains an `index` and the indices mismatch. pub fn anchored_with_index(self, anchor_idx: Idx) -> Option { let index = match self.index { Some(idx) if idx != anchor_idx => None, @@ -150,6 +158,8 @@ where } } +/// Record type alias for [`SegmentedLog`] using [`MetaWithIdx`] for the generic metadata +/// parameter. pub type Record = super::Record, T>; #[derive(Debug)] From f95fb55123979dc4765014001d25a00a13ea3a80 Mon Sep 17 00:00:00 2001 From: Arindam Das Date: Tue, 27 Feb 2024 13:25:29 +0530 Subject: [PATCH 12/30] chore: adds documentation for SegmentedLogError --- src/storage/commit_log/segmented_log/mod.rs | 20 ++++++++++++++++++- src/storage/commit_log/segmented_log/store.rs | 2 +- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/src/storage/commit_log/segmented_log/mod.rs b/src/storage/commit_log/segmented_log/mod.rs index 3d0ca1e45..6fca404c4 100644 --- a/src/storage/commit_log/segmented_log/mod.rs +++ b/src/storage/commit_log/segmented_log/mod.rs @@ -127,7 +127,7 @@ use std::{ time::Duration, }; -/// Represent metadata for records in the [`SegmentedLog`]. +/// Represents metadata for records in the [`SegmentedLog`]. #[derive(Debug, Clone, Copy, Serialize, Deserialize)] pub struct MetaWithIdx { /// Generic metadata for the record as necessary @@ -162,15 +162,33 @@ where /// parameter. pub type Record = super::Record, T>; +/// Error type associated with [`SegmentedLog`] operations. #[derive(Debug)] pub enum SegmentedLogError { + /// Used to denote errors from the underlying [`Storage`] implementation. StorageError(SE), + + /// Used to denote errors from operations on [`Segment`] instances. SegmentError(segment::SegmentError), + + /// Used to denote errors from the [`SegmentedLog`] inner cache. CacheError(CE), + + /// Used when the inner cache is not configured while using APIs that expect it. CacheNotFound, + + /// Used when the resulting `base_index` of a [`Segment`] in the [`SegmentedLog`] + /// is lesser than the `initial_index` configured at the [`SegmentedLog`] level. BaseIndexLesserThanInitialIndex, + + /// Used when the _write_ [`Segment`] containing [`Option`] is set to `None` WriteSegmentLost, + + /// Used when the given index is outside the range `[lowest_index, highest_index)` IndexOutOfBounds, + + /// Used when no [`Record`] is found at a valid index inside the range + /// `[lowest_index, highest_index]` IndexGapEncountered, } diff --git a/src/storage/commit_log/segmented_log/store.rs b/src/storage/commit_log/segmented_log/store.rs index e558f65b8..d527e0ab5 100644 --- a/src/storage/commit_log/segmented_log/store.rs +++ b/src/storage/commit_log/segmented_log/store.rs @@ -1,4 +1,4 @@ -//! Present the backing storge components for a `segment` in a `segmented-log`. +//! Present the backing storage components for a `segment` in a `segmented-log`. //! //! This module is responsible for ultimately persisting the records in our `segmented-log` to some //! form of [`Storage`]. From 95732bd5a02199cbb44ad5ec056cc922ca9c0d88 Mon Sep 17 00:00:00 2001 From: Arindam Das Date: Tue, 27 Feb 2024 19:27:50 +0530 Subject: [PATCH 13/30] style: remove redundant unit return from unit closure --- src/storage/impls/in_mem/commit_log.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/storage/impls/in_mem/commit_log.rs b/src/storage/impls/in_mem/commit_log.rs index 9a3ca36f8..0bac0a25f 100644 --- a/src/storage/impls/in_mem/commit_log.rs +++ b/src/storage/impls/in_mem/commit_log.rs @@ -24,7 +24,7 @@ pub mod segmented_log { async_io::block_on(async { segmented_log::test::_test_segmented_log_remove_expired_segments( InMemSegmentStorageProvider::::default(), - |duration| async { () }.delay(Duration::from(duration)), + |duration| async {}.delay(Duration::from(duration)), PhantomData::<((), crc32fast::Hasher, bincode::BinCode)>, ) .await; @@ -36,7 +36,7 @@ pub mod segmented_log { async_io::block_on(async { segmented_log::test::_test_segmented_log_segment_index_caching( InMemSegmentStorageProvider::::default(), - |duration| async { () }.delay(Duration::from(duration)), + |duration| async {}.delay(Duration::from(duration)), true, PhantomData::<((), crc32fast::Hasher, bincode::BinCode)>, ) From ff8b4d8a2a8155fd0075828acc1ccad0006973dd Mon Sep 17 00:00:00 2001 From: Arindam Das Date: Tue, 27 Feb 2024 20:31:48 +0530 Subject: [PATCH 14/30] doc: adds documentation for the SegmentedLog struct --- src/storage/commit_log/segmented_log/mod.rs | 85 ++++++++++++++++++++- 1 file changed, 81 insertions(+), 4 deletions(-) diff --git a/src/storage/commit_log/segmented_log/mod.rs b/src/storage/commit_log/segmented_log/mod.rs index 6fca404c4..c79699bf7 100644 --- a/src/storage/commit_log/segmented_log/mod.rs +++ b/src/storage/commit_log/segmented_log/mod.rs @@ -127,7 +127,7 @@ use std::{ time::Duration, }; -/// Represents metadata for records in the [`SegmentedLog`]. +/// Represents metadata for [`Record`] instances in the [`SegmentedLog`]. #[derive(Debug, Clone, Copy, Serialize, Deserialize)] pub struct MetaWithIdx { /// Generic metadata for the record as necessary @@ -142,7 +142,7 @@ where Idx: Eq, { /// Returns a [`Some`]`(`[`MetaWithIdx`]`)` containing this instance's `metadata` and the - /// provided `anchor_idx` if this indices match or this instance's `index` is `None`. + /// provided `anchor_idx` if the indices match or this instance's `index` is `None`. /// /// Returns `None` if this instance contains an `index` and the indices mismatch. pub fn anchored_with_index(self, anchor_idx: Idx) -> Option { @@ -158,8 +158,7 @@ where } } -/// Record type alias for [`SegmentedLog`] using [`MetaWithIdx`] for the generic metadata -/// parameter. +/// Record type alias for [`SegmentedLog`] using [`MetaWithIdx`] as the metadata. pub type Record = super::Record, T>; /// Error type associated with [`SegmentedLog`] operations. @@ -211,13 +210,91 @@ where { } +/// Configuration for [`SegmentedLog`]. +/// +/// Used to configure specific invariants of a segmented log. #[derive(Default, Debug, Clone, Copy, Serialize, Deserialize)] pub struct Config { + /// Number of [`Segment`] instances in the [`SegmentedLog`] to be _index-cached_. + /// + /// _Index-cached_ [`Segment`] instances cache their inner [`Index`](index::Index) in memory. + /// This helps to avoid I/O for reading [`Record`] persistent metadata (such as position in + /// store file or checksum) everytime the [`Record`] is read from the [`Segment`] + /// + /// This configuration has the following effects depending on it's values: + /// - [`None`]: Default, *all* [`Segment`] instances are _index-cached_ + /// - [`Some`]`(0)`: *No* [`Segment`] instances are _index-cached_ + /// - [`Some`]`()`: A *maximum of the given number* of [`Segment`] instances are + /// _index-cached_ at any time. + /// + /// >You may think of it this way -- you can opt-in to optional index-caching by specific a + /// >[`Some`]. Or, you can keep using the default setting to index-cache all segments by + /// >specifying [`None`]. + /// + /// _Optional index-caching_ is benefical in [`SegmentedLog`] with a large number of + /// [`Segment`] instances, only a few of which are actively read from at any given point of + /// time. This is beneifical when working with limited heap memory but a large amount of + /// storage. + /// + ///
pub num_index_cached_read_segments: Option, + + /// [`Segment`] specific configuration to be used for all [`Segment`] instances in the + /// [`SegmentedLog`] in question. + /// + ///
pub segment_config: segment::Config, + + /// Lowest possible record index in the [`SegmentedLog`] in question. + /// + /// `( initial_index <= read_segments[0].base_index )` pub initial_index: Idx, } +/// The [`SegmentedLog`] abstraction, implementing a [`CommitLog`] with a collection of _read_ +/// [`Segment`]`s` and a single _write_ [`Segment`]. +/// +/// Uses a [`Vec`] to store _read_ [`Segment`] instances and an [`Option`] to store the _write_ +/// [`Segment`]. The [`Option`] is used so that we can easily move out the _write_ [`Segment`] or +/// move in a new one when implementing some of the APIs. The +/// [`SegmentedLogError::WriteSegmentLost`] error is a result of this implementation decision. +/// +/// [`SegmentedLog`] also has the ability to only optionally _index-cache_ some of the [`Segment`] +/// instances. +/// +/// >_Index-cached_ [`Segment`] instances cache their inner [`Index`](index::Index) in memory. +/// >This helps to avoid I/O for reading [`Record`] persistent metadata (such as position in store +/// >file, checksum) everytime the [`Record`] is read from the [`Segment`] +/// +/// >_Optional index-caching_ is benefical in [`SegmentedLog`] with a large number of +/// >[`Segment`] instances, only a few of which are actively read from at any given point of +/// >time. This is beneifical when working with limited heap memory but a large amount of +/// >storage. +/// +/// [`SegmentedLog`] maintains a [`Cache`] to keep track of which [`Segment`] instances to +/// _index-cache_. The _index-caching_ behaviour will depend on the [`Cache`] implementation used. +/// (For instance, an `LRUCache` would cache the least recently used [`Segment`] instances.) In +/// order to enable such behaviour, we perform lookups and inserts on this inner cache when +/// referring to any [`Segment`] for any operation. +/// +/// The _write_ [`Segment`] is always _index-cached_. +/// +/// Only the metadata associated with [`Record`] instances are serialized or deserialized. The +/// reocord content bytes are always written and read from the [`Storage`] as-is. +/// +/// ### Type parameters +/// - `S`: [`Storage`] implementation to be used for [`Segment`] instances +/// - `M`: Metadata to be used for [`Record`] instances +/// - `H`: [`Hasher`] to use for computing checksums of our [`Record`] contents +/// - `Idx`: Unsigned integer type to used for represeting record indices +/// - `Size`: Unsized integer to represent record and persistent storage sizes +/// - `SERP`: [`SerializationProvider`] used for serializing and deserializing metadata associated +/// with our records. +/// - `SSP`: [`SegmentStorageProvider`] used for obtaining backing storage for our [`Segment`] +/// instances +/// - `C`: [`Cache`] implementation to use for _index-caching_ behaviour. You may use +/// [`NoOpCache`](crate::common::cache::NoOpCache) when opting out of _optional index-caching_, +/// i.e. using [`None`] for [`Config::num_index_cached_read_segments`]. pub struct SegmentedLog { write_segment: Option>, read_segments: Vec>, From 78f038e3b53adc9afc3783f89aae76fcad435fae Mon Sep 17 00:00:00 2001 From: Arindam Das Date: Tue, 27 Feb 2024 21:11:40 +0530 Subject: [PATCH 15/30] doc: adds an example for SegmentedLog --- src/storage/commit_log/segmented_log/mod.rs | 77 ++++++++++++++++++++- 1 file changed, 76 insertions(+), 1 deletion(-) diff --git a/src/storage/commit_log/segmented_log/mod.rs b/src/storage/commit_log/segmented_log/mod.rs index c79699bf7..87e696de5 100644 --- a/src/storage/commit_log/segmented_log/mod.rs +++ b/src/storage/commit_log/segmented_log/mod.rs @@ -255,7 +255,7 @@ pub struct Config { /// [`Segment`]`s` and a single _write_ [`Segment`]. /// /// Uses a [`Vec`] to store _read_ [`Segment`] instances and an [`Option`] to store the _write_ -/// [`Segment`]. The [`Option`] is used so that we can easily move out the _write_ [`Segment`] or +/// [`Segment`]. An [`Option`] is used so that we can easily move out the _write_ [`Segment`] or /// move in a new one when implementing some of the APIs. The /// [`SegmentedLogError::WriteSegmentLost`] error is a result of this implementation decision. /// @@ -295,6 +295,80 @@ pub struct Config { /// - `C`: [`Cache`] implementation to use for _index-caching_ behaviour. You may use /// [`NoOpCache`](crate::common::cache::NoOpCache) when opting out of _optional index-caching_, /// i.e. using [`None`] for [`Config::num_index_cached_read_segments`]. +/// +/// ### Example +/// +/// Here's an example using +/// [`InMemStorage`](crate::storage::impls::in_mem::storage::InMemStorage): +/// +/// ``` +/// use futures_lite::{stream, future::block_on, StreamExt}; +/// use laminarmq::{ +/// common::{cache::NoOpCache, serde_compat::bincode}, +/// storage::{ +/// commit_log::{ +/// segmented_log::{segment::Config as SegmentConfig, Config, MetaWithIdx, SegmentedLog}, +/// CommitLog, Record, +/// }, +/// impls::{ +/// common::DiskBackedSegmentStorageProvider, +/// in_mem::{segment::InMemSegmentStorageProvider, storage::InMemStorage}, +/// }, +/// AsyncConsume, +/// }, +/// }; +/// use std::convert::Infallible; +/// +/// fn record(stream: X) -> Record, X> { +/// Record { +/// metadata: MetaWithIdx { +/// metadata: (), +/// index: None, +/// }, +/// value: stream, +/// } +/// } +/// +/// fn infallible(t: T) -> Result { +/// Ok(t) +/// } +/// +/// const IN_MEMORY_SEGMENTED_LOG_CONFIG: Config = Config { +/// segment_config: SegmentConfig { +/// max_store_size: 1048576, // = 1MiB in bytes +/// max_store_overflow: 524288, +/// max_index_size: 1048576, +/// }, +/// initial_index: 0, +/// num_index_cached_read_segments: None, +/// }; +/// +/// block_on(async { +/// let mut segmented_log = SegmentedLog::< +/// InMemStorage, +/// (), +/// crc32fast::Hasher, +/// u32, +/// usize, +/// bincode::BinCode, +/// _, +/// NoOpCache, +/// >::new( +/// IN_MEMORY_SEGMENTED_LOG_CONFIG, +/// InMemSegmentStorageProvider::::default(), +/// ) +/// .await +/// .unwrap(); +/// +/// let tiny_message = stream::once(b"Hello World!" as &[u8]) +/// .map(infallible); +/// +/// segmented_log +/// .append(record(tiny_message)) +/// .await +/// .unwrap(); +/// }); +/// ``` pub struct SegmentedLog { write_segment: Option>, read_segments: Vec>, @@ -306,6 +380,7 @@ pub struct SegmentedLog { segment_storage_provider: SSP, } +/// Type alias for [`SegmentedLogError`] with additional type parameter trait bounds. pub type LogError = SegmentedLogError< ::Error, ::Error, From 52d2ee4e26d95748c02fc6a9618836549bcf3903 Mon Sep 17 00:00:00 2001 From: Arindam Das Date: Wed, 28 Feb 2024 13:05:55 +0530 Subject: [PATCH 16/30] doc: documents SegmentedLog::new --- src/storage/commit_log/segmented_log/mod.rs | 38 +++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/src/storage/commit_log/segmented_log/mod.rs b/src/storage/commit_log/segmented_log/mod.rs index 87e696de5..f2b813235 100644 --- a/src/storage/commit_log/segmented_log/mod.rs +++ b/src/storage/commit_log/segmented_log/mod.rs @@ -282,6 +282,15 @@ pub struct Config { /// Only the metadata associated with [`Record`] instances are serialized or deserialized. The /// reocord content bytes are always written and read from the [`Storage`] as-is. /// +/// Every [`Segment`] in a [`SegmentedLog`] has fixed maximum [`Storage`] size. Writes always go to +/// the current _write_ [`Segment`]. Whenever a _write_ [`Segment`] exceeds the configured storage +/// size, it is rotated back to the collection of _read_ [`Segment`] instances and a new _write_ +/// [`Segment`] is created in it's place, with it's `base_index` as the `highest_index` of the previous +/// _write_ [`Segment`]. +/// +/// Reads are serviced by both _read_ and _write_ [`Segment`] instances depending on whether the +/// [`Record`] to be read lies within their [`Record`] index range. +/// /// ### Type parameters /// - `S`: [`Storage`] implementation to be used for [`Segment`] instances /// - `M`: Metadata to be used for [`Record`] instances @@ -388,6 +397,8 @@ pub type LogError = SegmentedLogError< >; impl SegmentedLog { + /// Returns an iterator containing immutable references all the [`Segment`] instances in this + /// [`SegmentedLog`]. fn segments(&self) -> impl Iterator> { self.read_segments.iter().chain(self.write_segment.iter()) } @@ -415,6 +426,31 @@ where C: Cache + Default, C::Error: Debug, { + /// Creates a new [`SegmentedLog`] instance with the given [`Config`] and + /// [`SegmentStorageProvider`] implementation. + /// + /// This function first scans for already persisted [`Segment`] instances in the given + /// [`SegmentStorageProvider`]. The segments are already sorted by their `base_index`. Next it + /// uses the last segment in this sorted order as the `write_segment` and the remaining _n - 1_ + /// segments as the `read_segments`. + /// + /// If no segments are already persisted in the provided storage, we create a new + /// `write_segment` with the given [`Config::initial_index`] and no read segments. Read + /// segments are created when this `write_segment` is rotated back as a read segment. + /// + /// Returns a [`SegmentedLog`]. + /// + /// # Errors + /// + /// - [`SegmentedLogError::StorageError`]: if there's an error in scanning for the segments on the + /// [`SegmentStorageProvider`] + /// - [`SegmentedLogError::BaseIndexLesserThanInitialIndex`]: if the `base_index` of the first + /// segment read from the storage is lesser than the configured `initial_index`. + /// - [`SegmentedLogError::SegmentError`]: if there's an error in creating a [`Segment`]. + /// - [`SegmentedLogError::WriteSegmentLost`]: if there's an error in obtaining the _write_ + /// segment after creating all the [`Segment`] instances. + /// - [`SegmentedLogError::CacheError`]: if there's an error in initializing the inner cache + /// for _optional-index-caching_. pub async fn new( config: Config, mut segment_storage_provider: SSP, @@ -440,6 +476,8 @@ where let mut segments = Vec::with_capacity(segment_base_indices.len()); for segment_base_index in segment_base_indices { + // index-cache the current segment if this the write_segment or + // "optional-index-caching" is disabled. let cache_index_records_flag = (segment_base_index == write_segment_base_index) || config.num_index_cached_read_segments.is_none(); From a9e2017801b3320082b3191f0300fe628494f4fc Mon Sep 17 00:00:00 2001 From: Arindam Das Date: Wed, 28 Feb 2024 13:53:11 +0530 Subject: [PATCH 17/30] doc: adds documentation for convenience macros --- src/storage/commit_log/segmented_log/mod.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/storage/commit_log/segmented_log/mod.rs b/src/storage/commit_log/segmented_log/mod.rs index f2b813235..be85abd44 100644 --- a/src/storage/commit_log/segmented_log/mod.rs +++ b/src/storage/commit_log/segmented_log/mod.rs @@ -519,6 +519,7 @@ where } } +/// Creates a new _write_ [`Segment`] for the given `segmented_log` wth the given `base_index`. macro_rules! new_write_segment { ($segmented_log:ident, $base_index:ident) => { Segment::with_segment_storage_provider_config_base_index_and_cache_index_records_flag( @@ -532,6 +533,7 @@ macro_rules! new_write_segment { }; } +/// Consumes the given [`Segment`] with the provided [`AsyncConsume`] method. macro_rules! consume_segment { ($segment:ident, $consume_method:ident) => { $segment @@ -541,6 +543,7 @@ macro_rules! consume_segment { }; } +/// Takes the _write_ [`Segment`] from the given [`SegmentedLog`]. macro_rules! take_write_segment { ($segmented_log:ident) => { $segmented_log @@ -550,6 +553,8 @@ macro_rules! take_write_segment { }; } +/// Obtains a reference to the _write_ [`Segment`] of the given [`SegmentedLog`] using the +/// provided reference function. (can be [`Option::as_mut`] or [`Option::as_ref`]). macro_rules! write_segment_ref { ($segmented_log:ident, $ref_method:ident) => { $segmented_log From 61b83ef7ceca67d74c64b0a5ca09d5d31f5fab40 Mon Sep 17 00:00:00 2001 From: Arindam Das Date: Wed, 28 Feb 2024 20:07:04 +0530 Subject: [PATCH 18/30] doc: adds documentation for various read() APIs --- src/storage/commit_log/segmented_log/mod.rs | 71 +++++++++++++++++++-- 1 file changed, 65 insertions(+), 6 deletions(-) diff --git a/src/storage/commit_log/segmented_log/mod.rs b/src/storage/commit_log/segmented_log/mod.rs index be85abd44..1d2b59b83 100644 --- a/src/storage/commit_log/segmented_log/mod.rs +++ b/src/storage/commit_log/segmented_log/mod.rs @@ -598,6 +598,22 @@ where .unwrap_or(self.config.initial_index) } + /// Reads the [`Record`] at the given `idx`. + /// + /// Note that this method is purely idempotent and doesn't trigger the _optional-index-caching_ + /// behaviour. If the [`Segment`] containing the [`Record`] is not _index-cached_, it incurs an + /// additional I/O cost to read the position and checksum metadata for the [`Record`]. + /// + /// If however all [`Segment`] instances are _index-cached_ i.e when using the default + /// configuration, no additional I/O cost is incurred. + /// + /// Returns the [`Record`] read. + /// + /// ## Errors + /// - [`SegmentedLogError::IndexOutOfBounds`]: if the provided `idx` is out of the index bounds + /// of this [`SegmentedLog`] + /// - [`SegmentedLogError::SegmentError`]: if there is any error in reading the [`Record`] from + /// the underlying [`Segment`]. async fn read(&self, idx: &Self::Idx) -> Result { if !self.has_index(idx) { return Err(SegmentedLogError::IndexOutOfBounds); @@ -742,6 +758,13 @@ where C: Cache, C::Error: Debug, { + /// Exclusively reads the [`Record`] from the [`Segment`] specified by the provided `segment_id` + /// at the provided `idx`. + /// + /// This method uses the _optional index-caching_ behaviour by using the inner cache. + /// + /// Returns a [`SegRead`] containing the [`Record`] and next index to read from, or seek + /// information containing which [`Segment`] and `idx` to read from next. pub async fn read_seq_exclusive( &mut self, segment_id: usize, @@ -775,6 +798,9 @@ where C: Cache, C::Error: Debug, { + /// Exclusively reads the [`Record`] at the given `idx` from this [`SegmentedLog`]. + /// + /// This method triggers the _optional index-caching_ behaviour by using the inner cache. async fn exclusive_read(&mut self, idx: &Self::Idx) -> Result { if !self.has_index(idx) { return Err(SegmentedLogError::IndexOutOfBounds); @@ -791,21 +817,54 @@ where } } +/// Returned by methods which allow manual resolution of which [`Segment`] to read from in a +/// [`SegmentedLog`]. +/// +/// Methods like [`SegmentedLog::read_seq`] and [`SegmentedLog::read_seq_exclusive`] allow manual +/// control over which [`Segment`] to read from by explicitly having a `segment_id` as a parameter. +/// [`SeqRead`] is used to represent the value of these operations. +/// +/// APIs like these enable avoiding searching for which [`Segment`] can service a `read()` since we +/// can explicitly pass in a `segment_id` to specify which [`Segment`] to read from. This helps us +/// avoid the cost of searching when simply contiguously iterating over all the [`Record`] +/// instances in a [`SegmentedLog`]. +/// +/// Generally, APIs using this type are meant to be used as follows: +/// +/// ```text +/// let (mut segment_id, mut idx) = (0, 0); +/// +/// while let Ok(seq_read) = segmented_log.seq_read().await { +/// match seq_read { +/// Read { record, next_idx } => { +/// // do something with record +/// idx = next_idx; +/// } +/// Seek { next_segment, next_idx } => { +/// segment_id, idx = next_segment, next_idx +/// } +/// }; +/// } +/// ``` pub enum SeqRead { + /// A valid _read_ containing the read [`Record`] and the index to the next [`Record`] Read { record: Record, next_idx: Idx, }, - Seek { - next_segment: usize, - next_idx: Idx, - }, + + /// Used when the _read_ hits the end of a [`Segment`] and the next [`Record`] can be found in + /// the next [`Segment`]. Contains the `segment_id` of the next [`Segment`] to read from and + /// the `index` to read at. + Seek { next_segment: usize, next_idx: Idx }, } -pub type ResolvedSegmentMutResult<'a, S, M, H, Idx, SERP, C> = +/// Used as the result of resolving a `segment_id` ta a mutable ref to a [`Segment`]. +type ResolvedSegmentMutResult<'a, S, M, H, Idx, SERP, C> = Result<&'a mut Segment::Size, SERP>, LogError>; -pub type ResolvedSegmentResult<'a, S, M, H, Idx, SERP, C> = +/// Used as the result of resolving a `segment_id` ta an immutable ref to a [`Segment`]. +type ResolvedSegmentResult<'a, S, M, H, Idx, SERP, C> = Result<&'a Segment::Size, SERP>, LogError>; impl SegmentedLog From 0d452c28dd0f7808d926b80a0ce294fa606d691f Mon Sep 17 00:00:00 2001 From: Arindam Das Date: Wed, 28 Feb 2024 20:08:10 +0530 Subject: [PATCH 19/30] chore: corrects typo --- src/storage/commit_log/segmented_log/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/storage/commit_log/segmented_log/mod.rs b/src/storage/commit_log/segmented_log/mod.rs index 1d2b59b83..847451af6 100644 --- a/src/storage/commit_log/segmented_log/mod.rs +++ b/src/storage/commit_log/segmented_log/mod.rs @@ -763,7 +763,7 @@ where /// /// This method uses the _optional index-caching_ behaviour by using the inner cache. /// - /// Returns a [`SegRead`] containing the [`Record`] and next index to read from, or seek + /// Returns a [`SeqRead`] containing the [`Record`] and next index to read from, or seek /// information containing which [`Segment`] and `idx` to read from next. pub async fn read_seq_exclusive( &mut self, From 47f92d674ca4350902957d8dcac1088a9ce8a2d1 Mon Sep 17 00:00:00 2001 From: Arindam Das Date: Sat, 2 Mar 2024 19:53:56 +0530 Subject: [PATCH 20/30] style: refactor CacheOp and CacheOpKind to single sum type SegmentIndexCacheOp --- src/storage/commit_log/segmented_log/mod.rs | 47 ++++++++------------- 1 file changed, 17 insertions(+), 30 deletions(-) diff --git a/src/storage/commit_log/segmented_log/mod.rs b/src/storage/commit_log/segmented_log/mod.rs index 847451af6..f4e399e6b 100644 --- a/src/storage/commit_log/segmented_log/mod.rs +++ b/src/storage/commit_log/segmented_log/mod.rs @@ -627,22 +627,10 @@ where } #[derive(Debug)] -enum CacheOpKind { - Uncache, - Cache, - None, -} - -#[derive(Debug)] -struct CacheOp { - segment_id: usize, - kind: CacheOpKind, -} - -impl CacheOp { - fn new(segment_id: usize, kind: CacheOpKind) -> Self { - Self { segment_id, kind } - } +enum SegmentIndexCacheOp { + Drop { evicted_segment_id: usize }, + Cache { segment_id: usize }, + Nop, } impl SegmentedLog @@ -665,10 +653,7 @@ where return Ok(()); } - let mut cache_op_buf = [ - CacheOp::new(0, CacheOpKind::None), - CacheOp::new(0, CacheOpKind::None), - ]; + let mut cache_op_buf = [SegmentIndexCacheOp::Nop, SegmentIndexCacheOp::Nop]; let cache = self .segments_with_cached_index @@ -681,15 +666,15 @@ where Ok(Lookup::Hit(_)) => Ok(&cache_op_buf[..0]), Ok(Lookup::Miss) => match cache.insert(segment_id, ()) { Ok(Eviction::None) => { - cache_op_buf[0] = CacheOp::new(segment_id, CacheOpKind::Cache); + cache_op_buf[0] = SegmentIndexCacheOp::Cache { segment_id }; Ok(&cache_op_buf[..1]) } Ok(Eviction::Block { - key: evicted_id, + key: evicted_segment_id, value: _, }) => { - cache_op_buf[0] = CacheOp::new(evicted_id, CacheOpKind::Uncache); - cache_op_buf[1] = CacheOp::new(segment_id, CacheOpKind::Cache); + cache_op_buf[0] = SegmentIndexCacheOp::Drop { evicted_segment_id }; + cache_op_buf[1] = SegmentIndexCacheOp::Cache { segment_id }; Ok(&cache_op_buf[..]) } Ok(Eviction::Value(_)) => Ok(&cache_op_buf[..0]), @@ -701,15 +686,17 @@ where .map_err(SegmentedLogError::CacheError)?; for segment_cache_op in cache_ops { - let segment = self.resolve_segment_mut(Some(segment_cache_op.segment_id))?; - - match segment_cache_op.kind { - CacheOpKind::Uncache => drop(segment.take_cached_index_records()), - CacheOpKind::Cache => segment + match *segment_cache_op { + SegmentIndexCacheOp::Drop { evicted_segment_id } => drop( + self.resolve_segment_mut(Some(evicted_segment_id))? + .take_cached_index_records(), + ), + SegmentIndexCacheOp::Cache { segment_id } => self + .resolve_segment_mut(Some(segment_id))? .cache_index() .await .map_err(SegmentedLogError::SegmentError)?, - CacheOpKind::None => {} + SegmentIndexCacheOp::Nop => {} } } From 9bfbea36496093bdb7f1c5021b9d9c550a735a5c Mon Sep 17 00:00:00 2001 From: Arindam Das Date: Sat, 2 Mar 2024 20:41:13 +0530 Subject: [PATCH 21/30] doc: adds documentation for stream_* and seq_read APIs --- src/storage/commit_log/segmented_log/mod.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/storage/commit_log/segmented_log/mod.rs b/src/storage/commit_log/segmented_log/mod.rs index f4e399e6b..064b3600f 100644 --- a/src/storage/commit_log/segmented_log/mod.rs +++ b/src/storage/commit_log/segmented_log/mod.rs @@ -927,6 +927,11 @@ where } } + /// Reads the [`Record`] from the [`Segment`] specified by the provided `segment_id` at the + /// provided `idx`. + /// + /// Returns a [`SeqRead`] containing the [`Record`] and next index to read from, or seek + /// information containing which [`Segment`] and `idx` to read from next. pub async fn read_seq( &self, segment_id: usize, @@ -943,6 +948,8 @@ where self.read_seq_unchecked(segment_id, idx).await } + /// Returns a [`Stream`] containing [`Record`] instances within the given `index_bounds` in + /// this [`SegmentedLog`], ordered by record index. pub fn stream( &self, index_bounds: RB, @@ -971,6 +978,8 @@ where .flatten() } + /// Returns a [`Stream`] containing all [`Record`] instances in this [`SegmentedLog`], ordered + /// by record index. pub fn stream_unbounded(&self) -> impl Stream> + '_ { stream::iter(self.segments()) .map(move |segment| indexed_read_stream(segment, ..)) From c3d2e1359193f177436c07892a95f5574ec46d9d Mon Sep 17 00:00:00 2001 From: Arindam Das Date: Wed, 6 Mar 2024 13:47:58 +0530 Subject: [PATCH 22/30] chore: fix typos --- src/storage/commit_log/segmented_log/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/storage/commit_log/segmented_log/mod.rs b/src/storage/commit_log/segmented_log/mod.rs index 064b3600f..3f7c48073 100644 --- a/src/storage/commit_log/segmented_log/mod.rs +++ b/src/storage/commit_log/segmented_log/mod.rs @@ -91,7 +91,7 @@ //! ## Why is this nested as a submodule? //! //! There can be other implementations of a [`CommitLog`] which have a completely different -//! structure. So we make "segmented-log" a submodule to repreent it as one of the possivle +//! structure. So we make "segmented-log" a submodule to represent it as one of the possible //! implementations. pub mod index; From 3c4d0baad3b5fc5b969e4574d5b3acab08bc675b Mon Sep 17 00:00:00 2001 From: Arindam Das Date: Wed, 6 Mar 2024 14:19:00 +0530 Subject: [PATCH 23/30] doc: documents remaining methods for SegmentedLog Documents append_record_with_contiguous_bytes, remove_expired_segments, rotate_new_write_segment and flush --- src/storage/commit_log/segmented_log/mod.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/src/storage/commit_log/segmented_log/mod.rs b/src/storage/commit_log/segmented_log/mod.rs index 3f7c48073..9463b8a06 100644 --- a/src/storage/commit_log/segmented_log/mod.rs +++ b/src/storage/commit_log/segmented_log/mod.rs @@ -1001,6 +1001,16 @@ where C: Cache, C::Error: Debug, { + /// Rotates the current _write_ [`Segment`] to a _read_ [`Segment`], creating a new _write_ + /// [`Segment`] in its place. + /// + /// Closes the current _write_ [`Segment`], reopens it as a new _read_ [`Segment`] and appends + /// it the the collection of _read_ [`Segment`] instances. Next, a new _write_ [`Segment`] is + /// created for this [`SegmentedLog`] with it's `base_index` set to the `highest_index` of the + /// previous _write_ [`Segment`]. + /// + /// This operations is used when the _write_ [`Segment`] exceeds the confiured [`Segment`] + /// storage size limit and needs to be rotated. pub async fn rotate_new_write_segment(&mut self) -> Result<(), LogError> { self.flush().await?; @@ -1021,6 +1031,7 @@ where Ok(()) } + /// Flushes all writes in the current _write_ [`Segment`] to persistent [`Storage`]. pub async fn flush(&mut self) -> Result<(), LogError> { let write_segment = take_write_segment!(self)?; @@ -1034,6 +1045,10 @@ where Ok(()) } + /// Removes all [`Segment`] instances that are older than the given `expiry_duration`. + /// + /// Returns the total number of [`Record`] instances removed from removing the [`Segment`] + /// instances. pub async fn remove_expired_segments( &mut self, expiry_duration: Duration, @@ -1095,6 +1110,9 @@ where C: Cache, C::Error: Debug, { + /// Appends a new [`Record`] containing value bytes laid out in a contiguous fashion. + /// + /// Returns the index of the newly appended [`Record`]. pub async fn append_record_with_contiguous_bytes( &mut self, record: &Record, From a139f281fdd5e7654354e9216aff06eebc4e0836 Mon Sep 17 00:00:00 2001 From: Arindam Das Date: Sat, 16 Mar 2024 22:51:53 +0530 Subject: [PATCH 24/30] doc: adds documentation for IndexError and Index::* functions --- src/storage/commit_log/segmented_log/index.rs | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/src/storage/commit_log/segmented_log/index.rs b/src/storage/commit_log/segmented_log/index.rs index ce44c1ded..3fed9996e 100644 --- a/src/storage/commit_log/segmented_log/index.rs +++ b/src/storage/commit_log/segmented_log/index.rs @@ -153,15 +153,39 @@ impl SizedRecord for IndexBaseMarker { /// Error type associated with operations on [`Index`]. #[derive(Debug)] pub enum IndexError { + /// Used to denote errors from the backing [`Storage`] implementation. StorageError(StorageError), + + /// Used to denote I/O errors caused during serializing or deserializing [`IndexRecord`] + /// instances. IoError(std::io::Error), + + /// Used when the type used for representing positions is incompatible with [`u64`]. IncompatiblePositionType, + + /// Used when the type used for representing sizes is incompatible with [`u64`]. IncompatibleSizeType, + + /// Used when the type used for representing indices is incompatible with [`u64`]. IncompatibleIdxType, + + /// Used when the index used for an operation is outside the permissible bounds for the + /// referenced [`Index`] IndexOutOfBounds, + + /// Used when there is a range of indices the [`Index`] bounds which are not mapped to any + /// [`IndexRecord`] instances. IndexGapEncountered, + + /// Used when the `base_index` for the referenced [`Index`] cannot be inferred from the + /// provided [`Storage`], in the absence of an explicitly provided `base_index`. NoBaseIndexFound, + + /// Used when the inferred and provided `base_index` values mismatch. BaseIndexMismatch, + + /// Used when the number of [`IndexRecord`] instances read from an [`Index`] is inconsistent + /// the estimated number of [`IndexRecord`] instances based on the underlying storage size. InconsistentIndexSize, } @@ -297,6 +321,16 @@ where .and_then(|x| u64_as_idx!(x, Idx)) } + /// Reads all [`IndexRecord`] instances persisted in the provided [`Storage`] instance. + /// + /// Returns a [`Vec`] containing the read [`IndexRecord`] instances. + /// + /// # Errors + /// + /// - [`IndexError::InconsistentIndexSize`]: + /// If the number of [`IndexRecord`] instances read is inconsistent with the estimated + /// number of [`IndexRecord`] instances persisted. (Calculated with + /// [`Self::estimated_index_records_len_in_storage`]) pub async fn index_records_from_storage( storage: &S, ) -> Result, IndexError> { From 89e863c49e685dd7482108ba2feac0b32a57f9b8 Mon Sep 17 00:00:00 2001 From: Arindam Das Date: Wed, 1 May 2024 21:12:09 +0530 Subject: [PATCH 25/30] doc: adds missing documenatation for Index::* functions --- src/storage/commit_log/segmented_log/index.rs | 31 ++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/src/storage/commit_log/segmented_log/index.rs b/src/storage/commit_log/segmented_log/index.rs index 3fed9996e..e53e1cd31 100644 --- a/src/storage/commit_log/segmented_log/index.rs +++ b/src/storage/commit_log/segmented_log/index.rs @@ -360,6 +360,19 @@ where } } + /// Returns a validated `base_index` for this [`Index`]. + /// + /// Compares the given `base_index` with the read `base_index` to obtain the `base_index` for + /// this [`Index`]. If the `base_index` could be obtained from only one of the sources, then + /// that obtained value is retured. If the `base_index` could be obtained from both the soures: + /// - If the read values mismatch across the different sources, we return + /// [`IndexError::BaseIndexMismatch`] + /// - If the read values are same, we return the provided value + /// + /// # Errors + /// + /// - [`IndexError::BaseIndexMismatch`]: if the read `base_index` and the provided + /// `base_index` mismatch. pub async fn validated_base_index( storage: &S, base_index: Option, @@ -375,6 +388,9 @@ where } } + /// Creates an [`Index`] instance from the given `storage` and an optional `base_index`. + /// + /// The created [`Index`] uses the provided `storage` instance as it's storage backend. pub async fn with_storage_and_base_index_option( storage: S, base_index: Option, @@ -395,6 +411,7 @@ where }) } + /// Creates an [`Index`] instance from the given `storage` and a `base_index`. pub async fn with_storage_and_base_index( storage: S, base_index: Idx, @@ -402,10 +419,16 @@ where Self::with_storage_and_base_index_option(storage, Some(base_index)).await } + /// Creates an [`Index`] from the given storage. pub async fn with_storage(storage: S) -> Result> { Self::with_storage_and_base_index_option(storage, None).await } + /// Creates an [`Index`] with the given `storage` buffered `index_records` and a + /// validated_base_index`. + /// + /// This method doesn't use any additional checks for the `validated_base_index` and the + /// buffered `index_records` against the `storage` provided. pub fn with_storage_index_records_option_and_validated_base_index( storage: S, index_records: Option>, @@ -422,14 +445,18 @@ where }) } + /// Takes out the cached [`IndexRecord`] instances from this [`Index`], leaving the + /// cache empty. pub fn take_cached_index_records(&mut self) -> Option> { self.index_records.take() } + /// Returns a reference to the cached [`IndexRecord`] instance. pub fn cached_index_records(&self) -> Option<&Vec> { self.index_records.as_ref() } + /// Cache all the [`IndexRecord`] instances in the underlying `storage` in this [`Index`]. pub async fn cache(&mut self) -> Result<(), IndexError> { if self.index_records.as_ref().is_some() { return Ok(()); @@ -446,6 +473,7 @@ where S: Default, Idx: Copy, { + /// Creates an empty [`Index`] with the given `base_index`. pub fn with_base_index(base_index: Idx) -> Self { Self { index_records: Some(Vec::new()), @@ -513,7 +541,7 @@ where index_records .get(normalized_index) .ok_or(IndexError::IndexGapEncountered) - .map(|&x| x) + .copied() } else { PersistentSizedRecord::::read_at( &self.storage, @@ -530,6 +558,7 @@ where S: Storage, Idx: Unsigned + ToPrimitive + Copy, { + /// Appends the given [`IndexRecord`] instance to the end of this [`Index`]. pub async fn append(&mut self, index_record: IndexRecord) -> Result> { let write_index = self.next_index; From ec7874fb9846ba15c9ca2d0cd3291f4761a83526 Mon Sep 17 00:00:00 2001 From: Arindam Das Date: Fri, 24 May 2024 19:43:25 +0530 Subject: [PATCH 26/30] doc: documents RecordHeader and Store in segmented_log::store --- src/storage/commit_log/segmented_log/store.rs | 41 +++++++++++++++++-- 1 file changed, 38 insertions(+), 3 deletions(-) diff --git a/src/storage/commit_log/segmented_log/store.rs b/src/storage/commit_log/segmented_log/store.rs index d527e0ab5..9030cc707 100644 --- a/src/storage/commit_log/segmented_log/store.rs +++ b/src/storage/commit_log/segmented_log/store.rs @@ -11,9 +11,11 @@ use futures_lite::StreamExt; use std::{error::Error as StdError, hash::Hasher, marker::PhantomData, ops::Deref}; pub mod common { + //! Module providing common entities for all [`Store`](super::Store) implementations. + use std::{ hash::Hasher, - io::{ErrorKind::UnexpectedEof, Read, Write}, + io::{self, ErrorKind::UnexpectedEof, Read, Write}, }; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; @@ -24,6 +26,9 @@ pub mod common { /// Number of bytes required for storing the record header. pub const RECORD_HEADER_LENGTH: usize = 16; + /// Header containing the checksum and length of the bytes contained within a Record. + /// + /// Used for maintaining data integrity of all persisted data. #[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] pub struct RecordHeader { pub checksum: u64, @@ -31,7 +36,8 @@ pub mod common { } impl RecordHeader { - pub fn read(source: &mut R) -> std::io::Result { + /// Reads a [`RecordHeader`] header instance from the given [`Read`] impl. + pub fn read(source: &mut R) -> io::Result { let checksum = source.read_u64::()?; let length = source.read_u64::()?; @@ -42,13 +48,16 @@ pub mod common { } } - pub fn write(&self, dest: &mut W) -> std::io::Result<()> { + /// Writes this [`RecordHeader`] instance to the given [`Write`] impl. + pub fn write(&self, dest: &mut W) -> io::Result<()> { dest.write_u64::(self.checksum)?; dest.write_u64::(self.length)?; Ok(()) } + /// Computes and returns the [`RecordHeader`] for a record containing the + /// given `record_bytes`. pub fn compute(record_bytes: &[u8]) -> Self where H: Hasher + Default, @@ -65,6 +74,17 @@ pub mod common { } } +/// Unit of persistence within a [`Segment`](super::segment::Segment). +/// +///

+/// segmented_log_segment +///

+///

+/// Fig: Segment diagram showing Store, persisting +/// record bytes at positions mapped out by the Index records. +///

+/// +/// A [`Store`] contains a backing [`Storage`] impl instance to persist record bytes. pub struct Store { storage: S, @@ -78,6 +98,7 @@ impl Default for Store { } impl Store { + /// Creates a new [`Store`] instance from the given backing [`Storage`] instance. pub fn new(storage: S) -> Self { Self { storage, @@ -88,9 +109,17 @@ impl Store { #[derive(Debug)] pub enum StoreError { + /// Used to denote errors from the backing [`Storage`] implementation. StorageError(SE), + + /// Used when the type used for representing sizes is incompatible with [`u64`]. IncompatibleSizeType, + + /// Used in the case of a data integrity error when the computed [`RecordHeader`] + /// doesn't match the designated [`RecordHeader`] for a given record. RecordHeaderMismatch, + + /// Used when reading from an empty [`Store`]. ReadOnEmptyStore, } @@ -126,6 +155,8 @@ where S: Storage, H: Hasher + Default, { + /// Reads record bytes for a record persisted at the given `position` with the designated + /// [`RecordHeader`]. pub async fn read( &self, position: &S::Position, @@ -151,6 +182,10 @@ where Ok(record_bytes) } + /// Appends the bytes for a new record at the end of this store. + /// + /// Returns the computed [`RecordHeader`] for the provided record bytes along with the + /// position where the record was written. pub async fn append( &mut self, stream: X, From 09f67d2d5d6910c6c7a97c843fa779ba64713a4e Mon Sep 17 00:00:00 2001 From: Arindam Das Date: Fri, 24 May 2024 19:46:14 +0530 Subject: [PATCH 27/30] doc: documents store::StoreError --- src/storage/commit_log/segmented_log/store.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/storage/commit_log/segmented_log/store.rs b/src/storage/commit_log/segmented_log/store.rs index 9030cc707..5c83983ac 100644 --- a/src/storage/commit_log/segmented_log/store.rs +++ b/src/storage/commit_log/segmented_log/store.rs @@ -107,6 +107,7 @@ impl Store { } } +/// Error type used for [`Store`] operations. #[derive(Debug)] pub enum StoreError { /// Used to denote errors from the backing [`Storage`] implementation. From e4900cd795fd2e5888d01a14d7bfe179b1c9ef0a Mon Sep 17 00:00:00 2001 From: Arindam Das Date: Fri, 24 May 2024 20:23:04 +0530 Subject: [PATCH 28/30] doc: documents segmented_log::segment::Segment --- .../commit_log/segmented_log/segment.rs | 61 +++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/src/storage/commit_log/segmented_log/segment.rs b/src/storage/commit_log/segmented_log/segment.rs index 393ba2a52..5bf9b5239 100644 --- a/src/storage/commit_log/segmented_log/segment.rs +++ b/src/storage/commit_log/segmented_log/segment.rs @@ -27,6 +27,7 @@ use std::{ time::{Duration, Instant}, }; +/// [`Store`] and [`Index`] size configuration for a [`Segment`]. #[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)] pub struct Config { pub max_store_size: Size, @@ -34,6 +35,17 @@ pub struct Config { pub max_index_size: Size, } +/// A segment unit in a [`SegmentedLog`](super::SegmentedLog). +/// +///

+/// segmented_log_segment +///

+///

+/// Fig: Segment diagram showing Index, mapping logical indices +/// toStore positions and a Store persisting record bytes at the +/// demarcated positions. +///

+ pub struct Segment { index: Index, store: Store, @@ -80,17 +92,39 @@ where } } +/// Error type associated with operations on [`Segment`]. #[derive(Debug)] pub enum SegmentError { + /// Used to denote errors from the backing [`Storage`] implementation. StorageError(StorageError), + + /// Used to denote errors from the underlying [`Store`]. StoreError(StoreError), + + /// Used to denote errors from the underlying [`Index`]. IndexError(IndexError), + + /// Used when the type used for representing positions is incompatible with [`u64`]. IncompatiblePositionType, + + /// Used to denote errors when serializing or deserializing data. (for instance, [`Record`] + /// metadata) SerializationError(SerDeError), + + /// Used when the metadata associated with a [`Record`] is not found. RecordMetadataNotFound, + + /// Used when the provided append index is not the hghest index of the [`Segment`]. InvalidAppendIdx, + + /// Used when the [`Segment`] is unable to regenerate an [`IndexRecord`] from the position and + /// [`RecordHeader`](super::store::common::RecordHeader). InvalidIndexRecordGenerated, + + /// Used when usize cannot be coerced to u32 and vice versa. UsizeU32Inconvertible, + + /// Used when a given [`Segment`] maxes out its capacity when we append to it. SegmentMaxed, } @@ -111,6 +145,7 @@ where { } +#[doc(hidden)] pub type SegmentOpError = SegmentError<::Error, ::Error>; @@ -220,6 +255,15 @@ where Ok(write_index) } + /// Appends a new [`Record`] to this [`Segment`]. + /// + /// Serializes the record metadata and bytes and writes them to the backing [`Store`]. Also + /// makes an [`IndexRecord`] entry in the underlying [`Index`] to keep track of the [`Record`]. + /// + /// Returns the index at which the [`Record`] was written. + /// + /// Errors out with a [`SegmentError`] when necessary. Refer to [`SegmentError`] for more info + /// about error situations and types. pub async fn append( &mut self, record: Record, @@ -291,6 +335,8 @@ where Idx: Serialize, SERP: SerializationProvider, { + /// Like [`Segment::append`] but the [`Record`] contains a contiguous slice of bytes, as + /// opposed to a stream. pub async fn append_record_with_contiguous_bytes( &mut self, record: &Record, @@ -399,31 +445,46 @@ where SERP: SerializationProvider, Idx: Unsigned + FromPrimitive + Copy + Eq, { + /// Caches the [`Index`] contents i.e [`IndexRecord`] instances in memory for fast lookup. pub async fn cache_index(&mut self) -> Result<(), SegmentError> { self.index.cache().await.map_err(SegmentError::IndexError) } + /// Takes the cached [`IndexRecord`] instances from this [`Segment`], leaving [`None`] in their + /// place. pub fn take_cached_index_records(&mut self) -> Option> { self.index.take_cached_index_records() } + /// Returns a reference to the cached [`IndexRecord`] instances. pub fn cached_index_records(&self) -> Option<&Vec> { self.index.cached_index_records() } } +/// Backing storage for an [`Index`] and [`Store`] within a [`Segment`]. pub struct SegmentStorage { pub store: S, pub index: S, } +/// Provides backing storage for [`Segment`] instances. +/// +/// Used to abstract the mechanism of acquiring storage handles from the underlying persistent +/// media. #[async_trait(?Send)] pub trait SegmentStorageProvider where S: Storage, { + /// Returns the base indices of all the [`Segment`] instances persisted in this storage media. async fn obtain_base_indices_of_stored_segments(&mut self) -> Result, S::Error>; + /// Obtains a [`SegmentStorage`] instance for a [`Segment`] with the given `idx` as their base + /// index. + /// + /// Implementations are required to allocate/arrange new storage handles if a [`Segment`] with + /// the given base index is not already persisted on the underlying storage media. async fn obtain(&mut self, idx: &Idx) -> Result, S::Error>; } From c16dbe13fec5151ed3189623dbec39545d9c3ca9 Mon Sep 17 00:00:00 2001 From: Arindam Das Date: Fri, 24 May 2024 20:45:46 +0530 Subject: [PATCH 29/30] chore: update crate version --- Cargo.lock | 365 ++++++++++-- Cargo.toml | 11 +- README.md | 2 +- .../README.md | 72 +++ .../laminarmq-tokio-commit-log-server/main.rs | 556 ++++++++++++++++++ 5 files changed, 958 insertions(+), 48 deletions(-) create mode 100644 examples/laminarmq-tokio-commit-log-server/README.md create mode 100644 examples/laminarmq-tokio-commit-log-server/main.rs diff --git a/Cargo.lock b/Cargo.lock index 08c8f6c1a..4c5f1276a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -94,7 +94,7 @@ dependencies = [ "polling", "rustix 0.37.23", "slab", - "socket2", + "socket2 0.4.9", "waker-fn", ] @@ -146,6 +146,55 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "axum" +version = "0.6.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" +dependencies = [ + "async-trait", + "axum-core", + "bitflags 1.3.2", + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.68" @@ -475,7 +524,7 @@ checksum = "4bcfec3a70f97c962c307b2d2c56e358cf1d00b558d74262b5f929ee8cc7e73a" dependencies = [ "errno-dragonfly", "libc", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -534,6 +583,15 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "form_urlencoded" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" +dependencies = [ + "percent-encoding", +] + [[package]] name = "futures" version = "0.3.28" @@ -705,7 +763,7 @@ dependencies = [ "signal-hook", "sketches-ddsketch", "smallvec", - "socket2", + "socket2 0.4.9", "tracing", "typenum", ] @@ -769,6 +827,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-range-header" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "add0ab9360ddbd88cfeb3bd9574a1d85cfdfa14db10b3e21d3700dbc4328758f" + [[package]] name = "httparse" version = "1.8.0" @@ -798,6 +862,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", + "socket2 0.4.9", "tokio", "tower-service", "tracing", @@ -858,7 +923,7 @@ checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" dependencies = [ "hermit-abi", "libc", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -869,7 +934,7 @@ checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b" dependencies = [ "hermit-abi", "rustix 0.38.4", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -898,11 +963,12 @@ dependencies = [ [[package]] name = "laminarmq" -version = "0.0.5-rc2" +version = "0.0.5" dependencies = [ "async-io", "async-stream", "async-trait", + "axum", "bincode", "byteorder", "bytes", @@ -923,6 +989,8 @@ dependencies = [ "route-recognizer", "serde", "tokio", + "tower", + "tower-http", "tower-service", "tracing", "tracing-subscriber", @@ -936,9 +1004,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.147" +version = "0.2.155" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" +checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" [[package]] name = "linux-raw-sys" @@ -977,6 +1045,21 @@ version = "0.4.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4" +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "memchr" version = "2.5.0" @@ -1019,6 +1102,12 @@ dependencies = [ "autocfg", ] +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + [[package]] name = "miniz_oxide" version = "0.7.1" @@ -1030,13 +1119,13 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.8" +version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" +checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" dependencies = [ "libc", "wasi", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -1238,9 +1327,15 @@ dependencies = [ "libc", "redox_syscall", "smallvec", - "windows-targets", + "windows-targets 0.48.1", ] +[[package]] +name = "percent-encoding" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" + [[package]] name = "pin-project" version = "1.1.2" @@ -1263,9 +1358,9 @@ dependencies = [ [[package]] name = "pin-project-lite" -version = "0.2.10" +version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c40d25201921e5ff0c862a505c6557ea88568a4e3ace775ab55e93f2f4f9d57" +checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02" [[package]] name = "pin-utils" @@ -1314,7 +1409,7 @@ dependencies = [ "libc", "log", "pin-project-lite", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -1341,9 +1436,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.64" +version = "1.0.83" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78803b62cbf1f46fde80d7c0e803111524b9877184cfe7c3033659490ac7a7da" +checksum = "0b33eb56c327dec362a9e55b3ad14f9d2f0904fb5a5b03b513ab5465399e9f43" dependencies = [ "unicode-ident", ] @@ -1359,9 +1454,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.29" +version = "1.0.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "573015e8ab27661678357f27dc26460738fd2b6c86e46f386fde94cb5d913105" +checksum = "0fa76aaf39101c457836aec0ce2316dbdc3ab723cdda1c6bd4e6ad4208acaca7" dependencies = [ "proc-macro2", ] @@ -1405,8 +1500,17 @@ checksum = "b2eae68fc220f7cf2532e4494aded17545fce192d59cd996e0fe7887f4ceb575" dependencies = [ "aho-corasick", "memchr", - "regex-automata", - "regex-syntax", + "regex-automata 0.3.3", + "regex-syntax 0.7.4", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", ] [[package]] @@ -1417,9 +1521,15 @@ checksum = "39354c10dd07468c2e73926b23bb9c2caca74c5501e38a35da70406f1d923310" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-syntax 0.7.4", ] +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + [[package]] name = "regex-syntax" version = "0.7.4" @@ -1476,7 +1586,7 @@ dependencies = [ "io-lifetimes", "libc", "linux-raw-sys 0.3.8", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -1489,9 +1599,15 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.4.3", - "windows-sys", + "windows-sys 0.48.0", ] +[[package]] +name = "rustversion" +version = "1.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "955d28af4278de8121b7ebeb796b6a45735dc01436d898801014aced2773a3d6" + [[package]] name = "ryu" version = "1.0.14" @@ -1521,18 +1637,18 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" [[package]] name = "serde" -version = "1.0.171" +version = "1.0.202" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30e27d1e4fd7659406c492fd6cfaf2066ba8773de45ca75e855590f856dc34a9" +checksum = "226b61a0d411b2ba5ff6d7f73a476ac4f8bb900373459cd00fab8512828ba395" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.171" +version = "1.0.202" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "389894603bd18c46fa56231694f8d827779c0951a667087194cf9de94ed24682" +checksum = "6048858004bcff69094cd972ed40a32500f153bd3be9f716b2eed2e8217c4838" dependencies = [ "proc-macro2", "quote", @@ -1550,6 +1666,28 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af99884400da37c88f5e9146b7f1fd0fbcae8f6eec4e9da38b67d05486f814a6" +dependencies = [ + "itoa", + "serde", +] + +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "sharded-slab" version = "0.1.4" @@ -1609,6 +1747,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "socket2" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "spin" version = "0.9.8" @@ -1661,15 +1809,21 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.25" +version = "2.0.66" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15e3fc8c0c74267e2df136e5e5fb656a464158aa57624053375eb9c8c6e25ae2" +checksum = "c42f3f41a2de00b01c0aaad383c5a45241efc8b2d1eda5661812fda5f3cdcff5" dependencies = [ "proc-macro2", "quote", "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + [[package]] name = "tempfile" version = "3.6.0" @@ -1681,7 +1835,7 @@ dependencies = [ "fastrand", "redox_syscall", "rustix 0.37.23", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -1726,27 +1880,27 @@ dependencies = [ [[package]] name = "tokio" -version = "1.29.1" +version = "1.37.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "532826ff75199d5833b9d2c5fe410f29235e25704ee5f0ef599fb51c21f4a4da" +checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787" dependencies = [ - "autocfg", "backtrace", "bytes", "libc", "mio", "num_cpus", "pin-project-lite", - "socket2", + "signal-hook-registry", + "socket2 0.5.7", "tokio-macros", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] name = "tokio-macros" -version = "2.1.0" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" +checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", @@ -1767,6 +1921,47 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "pin-project-lite", + "tokio", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-http" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140" +dependencies = [ + "bitflags 2.3.3", + "bytes", + "futures-core", + "futures-util", + "http", + "http-body", + "http-range-header", + "pin-project-lite", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" + [[package]] name = "tower-service" version = "0.3.2" @@ -1780,6 +1975,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" dependencies = [ "cfg-if", + "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -1823,10 +2019,14 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77" dependencies = [ + "matchers", "nu-ansi-term", + "once_cell", + "regex", "sharded-slab", "smallvec", "thread_local", + "tracing", "tracing-core", "tracing-log", ] @@ -1999,7 +2199,16 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" dependencies = [ - "windows-targets", + "windows-targets 0.48.1", +] + +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets 0.52.5", ] [[package]] @@ -2008,13 +2217,29 @@ version = "0.48.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05d4b17490f70499f20b9e791dcf6a299785ce8af4d709018206dc5b4953e95f" dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows_aarch64_gnullvm 0.48.0", + "windows_aarch64_msvc 0.48.0", + "windows_i686_gnu 0.48.0", + "windows_i686_msvc 0.48.0", + "windows_x86_64_gnu 0.48.0", + "windows_x86_64_gnullvm 0.48.0", + "windows_x86_64_msvc 0.48.0", +] + +[[package]] +name = "windows-targets" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f0713a46559409d202e70e28227288446bf7841d3211583a4b53e3f6d96e7eb" +dependencies = [ + "windows_aarch64_gnullvm 0.52.5", + "windows_aarch64_msvc 0.52.5", + "windows_i686_gnu 0.52.5", + "windows_i686_gnullvm", + "windows_i686_msvc 0.52.5", + "windows_x86_64_gnu 0.52.5", + "windows_x86_64_gnullvm 0.52.5", + "windows_x86_64_msvc 0.52.5", ] [[package]] @@ -2023,38 +2248,86 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7088eed71e8b8dda258ecc8bac5fb1153c5cffaf2578fc8ff5d61e23578d3263" + [[package]] name = "windows_aarch64_msvc" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3" +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9985fd1504e250c615ca5f281c3f7a6da76213ebd5ccc9561496568a2752afb6" + [[package]] name = "windows_i686_gnu" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241" +[[package]] +name = "windows_i686_gnu" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88ba073cf16d5372720ec942a8ccbf61626074c6d4dd2e745299726ce8b89670" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87f4261229030a858f36b459e748ae97545d6f1ec60e5e0d6a3d32e0dc232ee9" + [[package]] name = "windows_i686_msvc" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00" +[[package]] +name = "windows_i686_msvc" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db3c2bf3d13d5b658be73463284eaf12830ac9a26a90c717b7f771dfe97487bf" + [[package]] name = "windows_x86_64_gnu" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1" +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e4246f76bdeff09eb48875a0fd3e2af6aada79d409d33011886d3e1581517d9" + [[package]] name = "windows_x86_64_gnullvm" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "852298e482cd67c356ddd9570386e2862b5673c85bd5f88df9ab6802b334c596" + [[package]] name = "windows_x86_64_msvc" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0" diff --git a/Cargo.toml b/Cargo.toml index bd02dd5dc..12ab2b89c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ license = "MIT" categories = ["web-programming"] keywords = ["message-queue", "distributed-systems", "segmented-log", "io-uring"] exclude = [".github/", "assets/"] -version = "0.0.5-rc2" +version = "0.0.5" edition = "2021" rust-version = "1.62" @@ -55,6 +55,15 @@ bench = false rlimit = "0.10.1" criterion = { version = "0.5", features = ["html_reports", "async_futures", "async_tokio"] } pprof = { version = "0.12", features = ["flamegraph", "criterion"] } +axum = "0.6.20" +crc32fast = "1.3.2" +hyper = "0.14.27" +serde = { version = "1.0.188", features = ["derive"] } +tokio = { version = "1.32.0", features = ["rt", "rt-multi-thread", "sync", "net", "fs", "signal"] } +tower = { version = "0.4.13", features = ["util", "timeout"] } +tower-http = { version = "0.4.4", features = ["add-extension", "trace"] } +tracing = "0.1.37" +tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } [[bench]] name = "commit_log_append" diff --git a/README.md b/README.md index 1738b351c..3b92abe22 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,7 @@ to use `laminarmq` as a library, add the following to your `Cargo.toml`: ```toml [dependencies] -laminarmq = "0.0.5-rc2" +laminarmq = "0.0.5" ``` Refer to latest git [API Documentation](https://arindas.github.io/laminarmq/docs/laminarmq/) or diff --git a/examples/laminarmq-tokio-commit-log-server/README.md b/examples/laminarmq-tokio-commit-log-server/README.md new file mode 100644 index 000000000..b03eb3511 --- /dev/null +++ b/examples/laminarmq-tokio-commit-log-server/README.md @@ -0,0 +1,72 @@ +# laminarmq-tokio-commit-log-server + +A simple persistent commit log server using the tokio runtime. + +## Endpoints + +This server exposes the following HTTP endpoints: + +```rust +.route("/index_bounds", get(index_bounds)) // obtain the index bounds +.route("/records/:index", get(read)) // obtain the record at given index +.route("/records", post(append)) // append a new record at the end of the commit log + +.route("/rpc/truncate", post(truncate)) // truncate the commit log + // expects JSON: { "truncate_index": } + // records starting from truncate_index are removed +``` + +## Usage + +Run the server as follows: + +```sh +cargo run --example laminarmq-tokio-commit-log-server --release +``` + +The server optionally expects an environment variable: `STORAGE_DIRECTORY`. + +The default value is: + +```rust +const DEFAULT_STORAGE_DIRECTORY: &str = "./.storage/laminarmq_tokio_commit_log_server/commit_log"; +``` + +You may specify it as follows: + +```sh +STORAGE_DIRECTORY="" cargo run --release +``` + +Once the server is running you may make requests as follows: + +```sh +curl -w "\n" "http://127.0.0.1:3000/index_bounds" + +curl -w "\n" --request POST --data "Hello World" "http://127.0.0.1:3000/records" +curl -w "\n" --request POST --data "Moshi moshi" "http://127.0.0.1:3000/records" +curl -w "\n" --request POST --data "Bonjour <3" "http://127.0.0.1:3000/records" + +curl -w "\n" "http://127.0.0.1:3000/index_bounds" + +curl -w "\n" "http://127.0.0.1:3000/records/1" + +curl -w "\n" --header "Content-Type: application/json" --request POST \ + --data "{\"truncate_index\": 1}" \ + "http://127.0.0.1:3000/rpc/truncate" + +curl -w "\n" "http://127.0.0.1:3000/index_bounds" +``` + +Here's what's happening above: + +- First request find the index_bounds, (highest_index) is exclusive +- We append three records with the given data +- We lookup the current index_bounds after appending to the commit_log +- We read the record at index 1 +- We truncate the commit_log at index 1. All records starting from index 1 are + removed. After this operation the bounds are [0, 1) +- We lookup the current index_bounds after truncating the commit_log + +> Note: The `-w "\n"` flag is for appending a "\n" to the output of curl. This way +> the output is more readable. diff --git a/examples/laminarmq-tokio-commit-log-server/main.rs b/examples/laminarmq-tokio-commit-log-server/main.rs new file mode 100644 index 000000000..4775db5e9 --- /dev/null +++ b/examples/laminarmq-tokio-commit-log-server/main.rs @@ -0,0 +1,556 @@ +use axum::{ + error_handling::HandleErrorLayer, + extract::{Path, State}, + http::StatusCode, + response::{IntoResponse, Response}, + routing::{get, post}, + Json, Router, +}; +use hyper::{Body, Request}; + +extern crate laminarmq; + +use laminarmq::{ + common::{cache::NoOpCache, serde_compat::bincode}, + storage::{ + commit_log::{ + segmented_log::{segment::Config as SegmentConfig, Config, MetaWithIdx, SegmentedLog}, + CommitLog, Record, + }, + impls::{ + common::DiskBackedSegmentStorageProvider, + in_mem::{segment::InMemSegmentStorageProvider, storage::InMemStorage}, + tokio::storage::std_seek_read::{ + StdSeekReadFileStorage, StdSeekReadFileStorageProvider, + }, + }, + }, +}; +use serde::{Deserialize, Serialize}; +use std::{ + env, + fmt::Debug, + future::Future, + io, + net::SocketAddr, + rc::Rc, + thread::{self, JoinHandle}, + time::Duration, +}; +use tokio::{ + signal, + sync::{mpsc, oneshot, AcquireError, RwLock, Semaphore}, + task, +}; +use tower::{BoxError, ServiceBuilder}; +use tower_http::trace::TraceLayer; +use tracing::{error, error_span, info, info_span, Instrument}; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + +pub type InMemSegLog = SegmentedLog< + InMemStorage, + (), + crc32fast::Hasher, + u32, + usize, + bincode::BinCode, + InMemSegmentStorageProvider, + NoOpCache, +>; + +#[allow(unused)] +#[derive(Clone)] +struct AppState { + message_tx: mpsc::Sender, +} + +#[derive(Debug)] +pub enum ChannelError { + SendError, + RecvError, +} + +impl AppState { + pub async fn enqueue_request( + &self, + request: AppRequest, + ) -> Result, ChannelError> { + let (resp_tx, resp_rx) = oneshot::channel(); + + let message = Message::Connection { resp_tx, request }; + + self.message_tx + .send(message) + .await + .map_err(|_| ChannelError::SendError)?; + + Ok(resp_rx) + } +} + +pub struct CommitLogServerConfig { + message_buffer_size: usize, + max_connections: usize, +} + +#[allow(unused)] +const IN_MEMORY_SEGMENTED_LOG_CONFIG: Config = Config { + segment_config: SegmentConfig { + max_store_size: 1048576, // = 1MiB + max_store_overflow: 524288, + max_index_size: 1048576, + }, + initial_index: 0, + num_index_cached_read_segments: None, +}; + +const PERSISTENT_SEGMENTED_LOG_CONFIG: Config = Config { + segment_config: SegmentConfig { + max_store_size: 10000000, // ~ 10MB + max_store_overflow: 10000000 / 2, + max_index_size: 10000000, + }, + initial_index: 0, + num_index_cached_read_segments: None, +}; + +const DEFAULT_STORAGE_DIRECTORY: &str = "./.storage/laminarmq_tokio_commit_log_server/commit_log"; + +#[tokio::main] +async fn main() { + tracing_subscriber::registry() + .with( + tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| { + "laminarmq_tokio_commit_log_server=debug,tower_http=debug".into() + }), + ) + .with(tracing_subscriber::fmt::layer()) + .init(); + + let storage_directory = + env::var("STORAGE_DIRECTORY").unwrap_or(DEFAULT_STORAGE_DIRECTORY.into()); + + let (join_handle, message_tx) = CommitLogServer::orchestrate( + CommitLogServerConfig { + message_buffer_size: 1024, + max_connections: 512, + }, + || async { + let disk_backed_storage_provider = + DiskBackedSegmentStorageProvider::<_, _, u32>::with_storage_directory_path_and_provider( + storage_directory, + StdSeekReadFileStorageProvider, + ) + .unwrap(); + + SegmentedLog::< + StdSeekReadFileStorage, + (), + crc32fast::Hasher, + u32, + u64, + bincode::BinCode, + _, + NoOpCache, + >::new( + PERSISTENT_SEGMENTED_LOG_CONFIG, + disk_backed_storage_provider, + ) + .await + .unwrap() + }, + ); + + // Compose the routes + let app = Router::new() + .route("/index_bounds", get(index_bounds)) + .route("/records/:index", get(read)) + .route("/records", post(append)) + .route("/rpc/truncate", post(truncate)) + // Add middleware to all routes + .layer( + ServiceBuilder::new() + .layer(HandleErrorLayer::new(|error: BoxError| async move { + if error.is::() { + Ok(StatusCode::REQUEST_TIMEOUT) + } else { + Err(( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Unhandled internal error: {}", error), + )) + } + })) + .timeout(Duration::from_secs(10)) + .layer(TraceLayer::new_for_http()) + .into_inner(), + ) + .with_state(AppState { + message_tx: message_tx.clone(), + }); + + let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); + + tracing::debug!("listening on {}", addr); + + hyper::Server::bind(&addr) + .serve(app.into_make_service()) + .with_graceful_shutdown(shutdown_signal()) + .await + .unwrap(); + + message_tx.send(Message::Terminate).await.unwrap(); + + tokio::task::spawn_blocking(|| join_handle.join()) + .await + .unwrap() + .unwrap() + .unwrap(); + + info!("Exiting application."); +} + +async fn shutdown_signal() { + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("failed to install Ctrl+C handler"); + }; + + #[cfg(unix)] + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("failed to install signal handler") + .recv() + .await; + }; + + #[cfg(not(unix))] + let terminate = std::future::pending::<()>(); + + tokio::select! { + _ = ctrl_c => {}, + _ = terminate => {}, + } + + tracing::info!("signal received, starting graceful shutdown"); +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct IndexBoundsResponse { + highest_index: u32, + lowest_index: u32, +} + +pub struct StringError(String); + +impl From for StringError { + fn from(value: String) -> Self { + Self(value) + } +} + +impl IntoResponse for StringError { + fn into_response(self) -> Response { + (StatusCode::INTERNAL_SERVER_ERROR, self.0).into_response() + } +} + +async fn index_bounds( + State(state): State, +) -> Result, StringError> { + let resp_rx = state + .enqueue_request(AppRequest::IndexBounds) + .await + .map_err(|err| format!("error sending request to commit_log_server: {:?}", err))?; + + let response = resp_rx + .await + .map_err(|err| format!("error receiving response: {:?}", err))??; + + if let AppResponse::IndexBounds(index_bounds_response) = response { + Ok(Json(index_bounds_response)) + } else { + Err(StringError("invalid response type".into())) + } +} + +async fn read( + Path(index): Path, + State(state): State, +) -> Result, StringError> { + let resp_rx = state + .enqueue_request(AppRequest::Read { index }) + .await + .map_err(|err| format!("error sending request to commit_log_server: {:?}", err))?; + + let response = resp_rx + .await + .map_err(|err| format!("error receiving response: {:?}", err))??; + + if let AppResponse::Read { record_value } = response { + Ok(record_value) + } else { + Err(StringError("invalid response type".into())) + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct AppendResponse { + write_index: u32, +} + +async fn append( + State(state): State, + request: Request, +) -> Result, StringError> { + let resp_rx = state + .enqueue_request(AppRequest::Append { + record_value: request.into_body(), + }) + .await + .map_err(|err| format!("error sending request to commit_log_server: {:?}", err))?; + + let response = resp_rx + .await + .map_err(|err| format!("error receiving reponse: {:?}", err))??; + + if let AppResponse::Append(append_reponse) = response { + Ok(Json(append_reponse)) + } else { + Err(StringError("invalid response type".into())) + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct TruncateRequest { + truncate_index: u32, +} + +async fn truncate( + State(state): State, + Json(truncate_request): Json, +) -> Result<(), StringError> { + let resp_rx = state + .enqueue_request(AppRequest::Truncate(truncate_request)) + .await + .map_err(|err| format!("error sending request to commit_log_server: {:?}", err))?; + + let response = resp_rx + .await + .map_err(|err| format!("error receiving response: {:?}", err))??; + + if let AppResponse::Truncate = response { + Ok(()) + } else { + Err(StringError("invalid response type".into())) + } +} + +#[derive(Debug)] +pub enum AppResponse { + IndexBounds(IndexBoundsResponse), + Read { record_value: Vec }, + Append(AppendResponse), + Truncate, +} + +#[derive(Debug)] +pub enum AppRequest { + IndexBounds, + Read { index: u32 }, + Append { record_value: Body }, + Truncate(TruncateRequest), +} + +type ResponseResult = Result; + +pub enum Message { + Connection { + resp_tx: oneshot::Sender, + request: AppRequest, + }, + + Terminate, +} + +#[allow(unused)] +pub struct CommitLogServer { + message_rx: mpsc::Receiver, + commit_log: CL, + max_connections: usize, +} + +impl CommitLogServer { + pub fn new( + message_rx: mpsc::Receiver, + commit_log: CL, + max_connections: usize, + ) -> Self { + Self { + message_rx, + commit_log, + max_connections, + } + } +} + +#[derive(Debug)] +pub enum CommitLogServerError { + ConnPermitAcquireError(AcquireError), + CommitLogError(CLE), + IoError(io::Error), + ResponseSendError, +} + +pub type CommitLogServerResult = Result>; + +impl CommitLogServer +where + CL: CommitLog, Vec, Idx = u32> + 'static, +{ + pub async fn handle_request( + commit_log: Rc>, + request: AppRequest, + ) -> Result> { + match request { + AppRequest::IndexBounds => { + let commit_log = commit_log.read().await; + + Ok(AppResponse::IndexBounds(IndexBoundsResponse { + highest_index: commit_log.highest_index(), + lowest_index: commit_log.lowest_index(), + })) + } + + AppRequest::Read { index: idx } => commit_log + .read() + .await + .read(&idx) + .await + .map(|Record { metadata: _, value }| AppResponse::Read { + record_value: value, + }) + .map_err(CommitLogServerError::CommitLogError), + + AppRequest::Append { record_value } => commit_log + .write() + .await + .append(Record { + metadata: MetaWithIdx { + metadata: (), + index: None, + }, + value: record_value, + }) + .await + .map(|write_index| AppResponse::Append(AppendResponse { write_index })) + .map_err(CommitLogServerError::CommitLogError), + + AppRequest::Truncate(TruncateRequest { + truncate_index: idx, + }) => commit_log + .write() + .await + .truncate(&idx) + .await + .map(|_| AppResponse::Truncate) + .map_err(CommitLogServerError::CommitLogError), + } + } + + pub async fn serve(self) { + let (mut message_rx, commit_log, max_connections) = + (self.message_rx, self.commit_log, self.max_connections); + + let conn_semaphore = Rc::new(Semaphore::new(max_connections)); + let commit_log = Rc::new(RwLock::new(commit_log)); + + let commit_log_copy = commit_log.clone(); + + let local = task::LocalSet::new(); + + local + .run_until(async move { + while let Some(Message::Connection { resp_tx, request }) = message_rx.recv().await { + let (conn_semaphore, commit_log_copy) = + (conn_semaphore.clone(), commit_log_copy.clone()); + + task::spawn_local( + async move { + let response = async move { + let _semaphore_permit = conn_semaphore + .acquire() + .await + .map_err(CommitLogServerError::ConnPermitAcquireError)?; + + let commit_log = commit_log_copy; + + let response = Self::handle_request(commit_log, request).await?; + + Ok::<_, CommitLogServerError>(response) + } + .await + .map_err(|err| format!("{:?}", err)); + + if let Err(err) = resp_tx.send(response) { + error!("error sending response: {:?}", err) + } + } + .instrument(error_span!("commit_log_server_handler_task")), + ); + } + }) + .await; + + match Rc::into_inner(commit_log) { + Some(commit_log) => match commit_log.into_inner().close().await { + Ok(_) => {} + Err(err) => error!("error closing commit_log: {:?}", err), + }, + None => error!("unable to unrwap commit_log Rc"), + }; + + info!("Closed commit_log."); + } + + pub fn orchestrate( + server_config: CommitLogServerConfig, + commit_log_provider: CLP, + ) -> (JoinHandle>, mpsc::Sender) + where + CLP: FnOnce() -> CLF + Send + 'static, + CLF: Future, + CL::Error: Send + 'static, + { + let CommitLogServerConfig { + message_buffer_size, + max_connections, + } = server_config; + + let (message_tx, message_rx) = mpsc::channel::(message_buffer_size); + + ( + thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread().build()?; + + rt.block_on( + async move { + let commit_log_server = CommitLogServer::new( + message_rx, + commit_log_provider().await, + max_connections, + ); + + commit_log_server.serve().await; + + info!("Done serving requests."); + } + .instrument(info_span!("commit_log_server")), + ); + + Ok(()) + }), + message_tx, + ) + } +} From d363e6971e0ec552c4ee63577d3112495264175c Mon Sep 17 00:00:00 2001 From: Arindam Das Date: Fri, 24 May 2024 20:47:09 +0530 Subject: [PATCH 30/30] chore: update examples path in README --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 3b92abe22..013e89a49 100644 --- a/README.md +++ b/README.md @@ -53,7 +53,7 @@ order they need. ## Examples Find examples demonstrating different capabilities of `laminarmq` in the -[examples branch](https://github.com/arindas/laminarmq/tree/examples). +[examples](./examples) directory. ## Media