diff --git a/Cargo.lock b/Cargo.lock
index 0190e7bfc..4c5f1276a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -94,7 +94,7 @@ dependencies = [
"polling",
"rustix 0.37.23",
"slab",
- "socket2",
+ "socket2 0.4.9",
"waker-fn",
]
@@ -146,6 +146,55 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
+[[package]]
+name = "axum"
+version = "0.6.20"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf"
+dependencies = [
+ "async-trait",
+ "axum-core",
+ "bitflags 1.3.2",
+ "bytes",
+ "futures-util",
+ "http",
+ "http-body",
+ "hyper",
+ "itoa",
+ "matchit",
+ "memchr",
+ "mime",
+ "percent-encoding",
+ "pin-project-lite",
+ "rustversion",
+ "serde",
+ "serde_json",
+ "serde_path_to_error",
+ "serde_urlencoded",
+ "sync_wrapper",
+ "tokio",
+ "tower",
+ "tower-layer",
+ "tower-service",
+]
+
+[[package]]
+name = "axum-core"
+version = "0.3.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c"
+dependencies = [
+ "async-trait",
+ "bytes",
+ "futures-util",
+ "http",
+ "http-body",
+ "mime",
+ "rustversion",
+ "tower-layer",
+ "tower-service",
+]
+
[[package]]
name = "backtrace"
version = "0.3.68"
@@ -475,7 +524,7 @@ checksum = "4bcfec3a70f97c962c307b2d2c56e358cf1d00b558d74262b5f929ee8cc7e73a"
dependencies = [
"errno-dragonfly",
"libc",
- "windows-sys",
+ "windows-sys 0.48.0",
]
[[package]]
@@ -534,6 +583,15 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
+[[package]]
+name = "form_urlencoded"
+version = "1.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456"
+dependencies = [
+ "percent-encoding",
+]
+
[[package]]
name = "futures"
version = "0.3.28"
@@ -652,9 +710,9 @@ dependencies = [
[[package]]
name = "generational-cache"
-version = "0.2.0"
+version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "60c96758dc31f4a38cf0b64f2fab050f56fa450fdefe923b6d6de147430733e4"
+checksum = "8f81129066835be752a1470e4b4c182f92881c11635b292c6b18594539984843"
[[package]]
name = "getrandom"
@@ -705,7 +763,7 @@ dependencies = [
"signal-hook",
"sketches-ddsketch",
"smallvec",
- "socket2",
+ "socket2 0.4.9",
"tracing",
"typenum",
]
@@ -769,6 +827,12 @@ dependencies = [
"pin-project-lite",
]
+[[package]]
+name = "http-range-header"
+version = "0.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "add0ab9360ddbd88cfeb3bd9574a1d85cfdfa14db10b3e21d3700dbc4328758f"
+
[[package]]
name = "httparse"
version = "1.8.0"
@@ -798,6 +862,7 @@ dependencies = [
"httpdate",
"itoa",
"pin-project-lite",
+ "socket2 0.4.9",
"tokio",
"tower-service",
"tracing",
@@ -858,7 +923,7 @@ checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2"
dependencies = [
"hermit-abi",
"libc",
- "windows-sys",
+ "windows-sys 0.48.0",
]
[[package]]
@@ -869,7 +934,7 @@ checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b"
dependencies = [
"hermit-abi",
"rustix 0.38.4",
- "windows-sys",
+ "windows-sys 0.48.0",
]
[[package]]
@@ -898,11 +963,12 @@ dependencies = [
[[package]]
name = "laminarmq"
-version = "0.0.5-rc2"
+version = "0.0.5"
dependencies = [
"async-io",
"async-stream",
"async-trait",
+ "axum",
"bincode",
"byteorder",
"bytes",
@@ -923,6 +989,8 @@ dependencies = [
"route-recognizer",
"serde",
"tokio",
+ "tower",
+ "tower-http",
"tower-service",
"tracing",
"tracing-subscriber",
@@ -936,9 +1004,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
-version = "0.2.147"
+version = "0.2.155"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3"
+checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c"
[[package]]
name = "linux-raw-sys"
@@ -977,6 +1045,21 @@ version = "0.4.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4"
+[[package]]
+name = "matchers"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
+dependencies = [
+ "regex-automata 0.1.10",
+]
+
+[[package]]
+name = "matchit"
+version = "0.7.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94"
+
[[package]]
name = "memchr"
version = "2.5.0"
@@ -1019,6 +1102,12 @@ dependencies = [
"autocfg",
]
+[[package]]
+name = "mime"
+version = "0.3.17"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
+
[[package]]
name = "miniz_oxide"
version = "0.7.1"
@@ -1030,13 +1119,13 @@ dependencies = [
[[package]]
name = "mio"
-version = "0.8.8"
+version = "0.8.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2"
+checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c"
dependencies = [
"libc",
"wasi",
- "windows-sys",
+ "windows-sys 0.48.0",
]
[[package]]
@@ -1238,9 +1327,15 @@ dependencies = [
"libc",
"redox_syscall",
"smallvec",
- "windows-targets",
+ "windows-targets 0.48.1",
]
+[[package]]
+name = "percent-encoding"
+version = "2.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
+
[[package]]
name = "pin-project"
version = "1.1.2"
@@ -1263,9 +1358,9 @@ dependencies = [
[[package]]
name = "pin-project-lite"
-version = "0.2.10"
+version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4c40d25201921e5ff0c862a505c6557ea88568a4e3ace775ab55e93f2f4f9d57"
+checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02"
[[package]]
name = "pin-utils"
@@ -1314,7 +1409,7 @@ dependencies = [
"libc",
"log",
"pin-project-lite",
- "windows-sys",
+ "windows-sys 0.48.0",
]
[[package]]
@@ -1341,9 +1436,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
-version = "1.0.64"
+version = "1.0.83"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "78803b62cbf1f46fde80d7c0e803111524b9877184cfe7c3033659490ac7a7da"
+checksum = "0b33eb56c327dec362a9e55b3ad14f9d2f0904fb5a5b03b513ab5465399e9f43"
dependencies = [
"unicode-ident",
]
@@ -1359,9 +1454,9 @@ dependencies = [
[[package]]
name = "quote"
-version = "1.0.29"
+version = "1.0.36"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "573015e8ab27661678357f27dc26460738fd2b6c86e46f386fde94cb5d913105"
+checksum = "0fa76aaf39101c457836aec0ce2316dbdc3ab723cdda1c6bd4e6ad4208acaca7"
dependencies = [
"proc-macro2",
]
@@ -1405,8 +1500,17 @@ checksum = "b2eae68fc220f7cf2532e4494aded17545fce192d59cd996e0fe7887f4ceb575"
dependencies = [
"aho-corasick",
"memchr",
- "regex-automata",
- "regex-syntax",
+ "regex-automata 0.3.3",
+ "regex-syntax 0.7.4",
+]
+
+[[package]]
+name = "regex-automata"
+version = "0.1.10"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
+dependencies = [
+ "regex-syntax 0.6.29",
]
[[package]]
@@ -1417,9 +1521,15 @@ checksum = "39354c10dd07468c2e73926b23bb9c2caca74c5501e38a35da70406f1d923310"
dependencies = [
"aho-corasick",
"memchr",
- "regex-syntax",
+ "regex-syntax 0.7.4",
]
+[[package]]
+name = "regex-syntax"
+version = "0.6.29"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
+
[[package]]
name = "regex-syntax"
version = "0.7.4"
@@ -1476,7 +1586,7 @@ dependencies = [
"io-lifetimes",
"libc",
"linux-raw-sys 0.3.8",
- "windows-sys",
+ "windows-sys 0.48.0",
]
[[package]]
@@ -1489,9 +1599,15 @@ dependencies = [
"errno",
"libc",
"linux-raw-sys 0.4.3",
- "windows-sys",
+ "windows-sys 0.48.0",
]
+[[package]]
+name = "rustversion"
+version = "1.0.17"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "955d28af4278de8121b7ebeb796b6a45735dc01436d898801014aced2773a3d6"
+
[[package]]
name = "ryu"
version = "1.0.14"
@@ -1521,18 +1637,18 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "serde"
-version = "1.0.171"
+version = "1.0.202"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "30e27d1e4fd7659406c492fd6cfaf2066ba8773de45ca75e855590f856dc34a9"
+checksum = "226b61a0d411b2ba5ff6d7f73a476ac4f8bb900373459cd00fab8512828ba395"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
-version = "1.0.171"
+version = "1.0.202"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "389894603bd18c46fa56231694f8d827779c0951a667087194cf9de94ed24682"
+checksum = "6048858004bcff69094cd972ed40a32500f153bd3be9f716b2eed2e8217c4838"
dependencies = [
"proc-macro2",
"quote",
@@ -1550,6 +1666,28 @@ dependencies = [
"serde",
]
+[[package]]
+name = "serde_path_to_error"
+version = "0.1.16"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "af99884400da37c88f5e9146b7f1fd0fbcae8f6eec4e9da38b67d05486f814a6"
+dependencies = [
+ "itoa",
+ "serde",
+]
+
+[[package]]
+name = "serde_urlencoded"
+version = "0.7.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd"
+dependencies = [
+ "form_urlencoded",
+ "itoa",
+ "ryu",
+ "serde",
+]
+
[[package]]
name = "sharded-slab"
version = "0.1.4"
@@ -1609,6 +1747,16 @@ dependencies = [
"winapi",
]
+[[package]]
+name = "socket2"
+version = "0.5.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c"
+dependencies = [
+ "libc",
+ "windows-sys 0.52.0",
+]
+
[[package]]
name = "spin"
version = "0.9.8"
@@ -1661,15 +1809,21 @@ dependencies = [
[[package]]
name = "syn"
-version = "2.0.25"
+version = "2.0.66"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "15e3fc8c0c74267e2df136e5e5fb656a464158aa57624053375eb9c8c6e25ae2"
+checksum = "c42f3f41a2de00b01c0aaad383c5a45241efc8b2d1eda5661812fda5f3cdcff5"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
+[[package]]
+name = "sync_wrapper"
+version = "0.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160"
+
[[package]]
name = "tempfile"
version = "3.6.0"
@@ -1681,7 +1835,7 @@ dependencies = [
"fastrand",
"redox_syscall",
"rustix 0.37.23",
- "windows-sys",
+ "windows-sys 0.48.0",
]
[[package]]
@@ -1726,27 +1880,27 @@ dependencies = [
[[package]]
name = "tokio"
-version = "1.29.1"
+version = "1.37.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "532826ff75199d5833b9d2c5fe410f29235e25704ee5f0ef599fb51c21f4a4da"
+checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787"
dependencies = [
- "autocfg",
"backtrace",
"bytes",
"libc",
"mio",
"num_cpus",
"pin-project-lite",
- "socket2",
+ "signal-hook-registry",
+ "socket2 0.5.7",
"tokio-macros",
- "windows-sys",
+ "windows-sys 0.48.0",
]
[[package]]
name = "tokio-macros"
-version = "2.1.0"
+version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
+checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b"
dependencies = [
"proc-macro2",
"quote",
@@ -1767,6 +1921,47 @@ dependencies = [
"tracing",
]
+[[package]]
+name = "tower"
+version = "0.4.13"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
+dependencies = [
+ "futures-core",
+ "futures-util",
+ "pin-project",
+ "pin-project-lite",
+ "tokio",
+ "tower-layer",
+ "tower-service",
+ "tracing",
+]
+
+[[package]]
+name = "tower-http"
+version = "0.4.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140"
+dependencies = [
+ "bitflags 2.3.3",
+ "bytes",
+ "futures-core",
+ "futures-util",
+ "http",
+ "http-body",
+ "http-range-header",
+ "pin-project-lite",
+ "tower-layer",
+ "tower-service",
+ "tracing",
+]
+
+[[package]]
+name = "tower-layer"
+version = "0.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0"
+
[[package]]
name = "tower-service"
version = "0.3.2"
@@ -1780,6 +1975,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
dependencies = [
"cfg-if",
+ "log",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
@@ -1823,10 +2019,14 @@ version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77"
dependencies = [
+ "matchers",
"nu-ansi-term",
+ "once_cell",
+ "regex",
"sharded-slab",
"smallvec",
"thread_local",
+ "tracing",
"tracing-core",
"tracing-log",
]
@@ -1999,7 +2199,16 @@ version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9"
dependencies = [
- "windows-targets",
+ "windows-targets 0.48.1",
+]
+
+[[package]]
+name = "windows-sys"
+version = "0.52.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d"
+dependencies = [
+ "windows-targets 0.52.5",
]
[[package]]
@@ -2008,13 +2217,29 @@ version = "0.48.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05d4b17490f70499f20b9e791dcf6a299785ce8af4d709018206dc5b4953e95f"
dependencies = [
- "windows_aarch64_gnullvm",
- "windows_aarch64_msvc",
- "windows_i686_gnu",
- "windows_i686_msvc",
- "windows_x86_64_gnu",
- "windows_x86_64_gnullvm",
- "windows_x86_64_msvc",
+ "windows_aarch64_gnullvm 0.48.0",
+ "windows_aarch64_msvc 0.48.0",
+ "windows_i686_gnu 0.48.0",
+ "windows_i686_msvc 0.48.0",
+ "windows_x86_64_gnu 0.48.0",
+ "windows_x86_64_gnullvm 0.48.0",
+ "windows_x86_64_msvc 0.48.0",
+]
+
+[[package]]
+name = "windows-targets"
+version = "0.52.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6f0713a46559409d202e70e28227288446bf7841d3211583a4b53e3f6d96e7eb"
+dependencies = [
+ "windows_aarch64_gnullvm 0.52.5",
+ "windows_aarch64_msvc 0.52.5",
+ "windows_i686_gnu 0.52.5",
+ "windows_i686_gnullvm",
+ "windows_i686_msvc 0.52.5",
+ "windows_x86_64_gnu 0.52.5",
+ "windows_x86_64_gnullvm 0.52.5",
+ "windows_x86_64_msvc 0.52.5",
]
[[package]]
@@ -2023,38 +2248,86 @@ version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc"
+[[package]]
+name = "windows_aarch64_gnullvm"
+version = "0.52.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7088eed71e8b8dda258ecc8bac5fb1153c5cffaf2578fc8ff5d61e23578d3263"
+
[[package]]
name = "windows_aarch64_msvc"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3"
+[[package]]
+name = "windows_aarch64_msvc"
+version = "0.52.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9985fd1504e250c615ca5f281c3f7a6da76213ebd5ccc9561496568a2752afb6"
+
[[package]]
name = "windows_i686_gnu"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241"
+[[package]]
+name = "windows_i686_gnu"
+version = "0.52.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "88ba073cf16d5372720ec942a8ccbf61626074c6d4dd2e745299726ce8b89670"
+
+[[package]]
+name = "windows_i686_gnullvm"
+version = "0.52.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "87f4261229030a858f36b459e748ae97545d6f1ec60e5e0d6a3d32e0dc232ee9"
+
[[package]]
name = "windows_i686_msvc"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00"
+[[package]]
+name = "windows_i686_msvc"
+version = "0.52.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "db3c2bf3d13d5b658be73463284eaf12830ac9a26a90c717b7f771dfe97487bf"
+
[[package]]
name = "windows_x86_64_gnu"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1"
+[[package]]
+name = "windows_x86_64_gnu"
+version = "0.52.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4e4246f76bdeff09eb48875a0fd3e2af6aada79d409d33011886d3e1581517d9"
+
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953"
+[[package]]
+name = "windows_x86_64_gnullvm"
+version = "0.52.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "852298e482cd67c356ddd9570386e2862b5673c85bd5f88df9ab6802b334c596"
+
[[package]]
name = "windows_x86_64_msvc"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a"
+
+[[package]]
+name = "windows_x86_64_msvc"
+version = "0.52.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0"
diff --git a/Cargo.toml b/Cargo.toml
index da850a7fd..12ab2b89c 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -6,7 +6,7 @@ license = "MIT"
categories = ["web-programming"]
keywords = ["message-queue", "distributed-systems", "segmented-log", "io-uring"]
exclude = [".github/", "assets/"]
-version = "0.0.5-rc2"
+version = "0.0.5"
edition = "2021"
rust-version = "1.62"
@@ -35,7 +35,7 @@ tower-service = "0.3.2"
num = "0.4.0"
futures-time = "3.0.0"
async-io = "1.13.0"
-generational-cache = "0.2.0"
+generational-cache = "0.2.2"
[lib]
name = "laminarmq"
@@ -55,6 +55,15 @@ bench = false
rlimit = "0.10.1"
criterion = { version = "0.5", features = ["html_reports", "async_futures", "async_tokio"] }
pprof = { version = "0.12", features = ["flamegraph", "criterion"] }
+axum = "0.6.20"
+crc32fast = "1.3.2"
+hyper = "0.14.27"
+serde = { version = "1.0.188", features = ["derive"] }
+tokio = { version = "1.32.0", features = ["rt", "rt-multi-thread", "sync", "net", "fs", "signal"] }
+tower = { version = "0.4.13", features = ["util", "timeout"] }
+tower-http = { version = "0.4.4", features = ["add-extension", "trace"] }
+tracing = "0.1.37"
+tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
[[bench]]
name = "commit_log_append"
diff --git a/README.md b/README.md
index da9c89ef7..013e89a49 100644
--- a/README.md
+++ b/README.md
@@ -29,7 +29,7 @@ to use `laminarmq` as a library, add the following to your `Cargo.toml`:
```toml
[dependencies]
-laminarmq = "0.0.5-rc2"
+laminarmq = "0.0.5"
```
Refer to latest git [API Documentation](https://arindas.github.io/laminarmq/docs/laminarmq/) or
@@ -47,13 +47,19 @@ order they need.
- [x] Locally persistent queue of records
- [ ] Single node, multi threaded, eBPF based request to thread routed message queue
- [ ] Service discovery with
- [SWIM](https://www.cs.cornell.edu/projects/Quicksilver/public_pdfs/SWIM.pdf).
+ [SWIM](https://www.cs.cornell.edu/projects/Quicksilver/public_pdfs/SWIM.pdf).
- [ ] Replication and consensus of replicated records with [Raft](https://raft.github.io/raft.pdf).
## Examples
Find examples demonstrating different capabilities of `laminarmq` in the
-[examples branch](https://github.com/arindas/laminarmq/tree/examples).
+[examples](./examples) directory.
+
+## Media
+
+Media associated with the `laminarmq` project.
+
+- `[BLOG]` [Building Segmented Logs in Rust: From Theory to Production!](https://arindas.github.io/blog/segmented-log-rust/)
## Design
@@ -76,8 +82,9 @@ partition_id_1 = (topic_id_0, partition_idx_1)
partition_id_2 = (topic_id_1, partition_idx_0)
```
->The exact numerical ids don't have any pattern with respect to partition_id and topic_id; there can
->be multiple topics, each of which can have multiple partitions (identified by partition_idx).
+
+> The exact numerical ids don't have any pattern with respect to partition_id and topic_id; there can
+> be multiple topics, each of which can have multiple partitions (identified by partition_idx).
… alternatively:
@@ -98,6 +105,7 @@ partition_id_2 = (topic_id_1, partition_idx_0)
└── ...other nodes
```
+
```text
[L] := leader; [F] := follower
```
@@ -119,6 +127,7 @@ have chosen to maintain a flat representation of topic partitions. We present an
commit-log API at the partition level.
Users may hence do the following:
+
- Directly interact with our message queue at the partition level
- Use client side load balancing between topic partitions
@@ -169,20 +178,21 @@ using Rendezvous hashing.
From the Wikipedia [article](https://en.wikipedia.org/wiki/Rendezvous_hashing):
->Rendezvous or highest random weight (HRW) hashing is an algorithm that allows clients to achieve
->distributed agreement on a set of _k_ options out of a possible set of _n_ options. A typical
->application is when clients need to agree on which sites (or proxies) objects are assigned to.
+> Rendezvous or highest random weight (HRW) hashing is an algorithm that allows clients to achieve
+> distributed agreement on a set of _k_ options out of a possible set of _n_ options. A typical
+> application is when clients need to agree on which sites (or proxies) objects are assigned to.
In our case, we use rendezvous hashing to determine the subset of nodes to use for placing the
replicas of a partition.
For some hashing function `H`, some weight function `f(w, hash)` and partition id `P_x`, we proceed
as follows:
+
- For every node `N_i` in the cluster with a weight `w_i`, we compute `R_i = f(w_i, H(concat(P_x,
N_i)))`
- We rank all nodes `N_i` belonging to the set of nodes `N` with respect to their rank value `R_i`.
- For some replication factor `k`, we select the top `k` nodes to place the `k` replicas of the
-partition with id `P_x`
+ partition with id `P_x`
(We assume `k <= |N|`; where `|N|` is the number of nodes and `k` is the number of replicas)
@@ -198,14 +208,14 @@ current leader of the replica set.
### Supported execution models
`laminarmq` supports two execution models:
+
- General async execution model used by various async runtimes in the Rust ecosystem (e.g `tokio`)
- Thread per core execution model
In the thread-per-core execution model individual processor cores are limited to single threads.
This model encourages design that minimizes inter-thread contention and locks, thereby improving
tail latencies in software services. Read: [The Impact of Thread per Core Architecture on
-Application Tail Latency.](
-https://helda.helsinki.fi//bitstream/handle/10138/313642/tpc_ancs19.pdf?sequence=1)
+Application Tail Latency.](https://helda.helsinki.fi//bitstream/handle/10138/313642/tpc_ancs19.pdf?sequence=1)
In the thread per core execution model, we have to leverage application level partitioning such that
each individual thread is responsible for a subset of requests and/or responsibilities. We also have
@@ -231,9 +241,10 @@ model.
In our cluster, we have two kinds of requests:
-- __membership requests__: used by the gossip style service discovery system for maintaining cluster
-membership.
-- __partition requests__: used to interact with `laminarmq` topic partitions.
+
+- **membership requests**: used by the gossip style service discovery system for maintaining cluster
+ membership.
+- **partition requests**: used to interact with `laminarmq` topic partitions.
We use an [eBPF](https://ebpf.io/what-is-ebpf/) XDP filter to classify request packets at the socket
layer into membership request packets and partition request packets. Next we use eBPF to route
@@ -243,24 +254,26 @@ subsystem in that node. The partition request packets are left to flow as is.
Next we have an "HTTP server", which parses the incoming partition request packets from the original
socket into valid `partition::*` requests. For every `partition::*` request, the HTTP server spawns
a future to handle it. This request handler future does the following:
+
- Create a new channel `(tx, rx)` for the request.
- Send the parsed partition request along with send end of the channel `(partition::*, tx)` to the
-"Request Router" over the request router's receiving channel.
+ "Request Router" over the request router's receiving channel.
- Await on the recv. end of the channel created by this future for the response. `res = rx.await`
- When we receive the response from this future's channel, we serialize it and respond back to the
-socket we had received the packets from.
+ socket we had received the packets from.
Next we have a "Request Router / Partition manager" responsible for routing various requests to the
partition serving futures. The request router unit receives both `membership::*` requests from the
membership subsystem and `partition::*` requests received from the "HTTP server" request handler
futures (also called request poller futures from here on since they poll for the response from the
channel recv. `rx` end). The request router unit routes requests as follows:
+
- `membership::*` requests are broadcast to all the partition serving futures
- `(partition::*_request(partition_id_x, …), tx)` tuples are routed to their destination partitions
-using the `partition_id`.
+ using the `partition_id`.
- `(partition::create(partition_id_x, …), tx)` tuples are handled by the request router/ partition
-manager itself. For this, the request router / partition manager creates a new partition serving
-future, allocates the required storage units or it and sends and appropriate response on `tx`.
+ manager itself. For this, the request router / partition manager creates a new partition serving
+ future, allocates the required storage units or it and sends and appropriate response on `tx`.
Finally, the individual partition server futures receive both `membership::*` and `(partition::*,
tx)` requests as they come to our node and routed. They handle the requests as necessary and send a
@@ -338,7 +351,7 @@ The partition request handler handles the different requests as follows:
replicas, we initial the leadership election process with each replica as a candidate.
- `membership::leave(j)`: remove {node #j} from priority queue and Raft group if present. If `{node
- #j}` was not present in the Raft group no further action is necessary. If it was present in the
+#j}` was not present in the Raft group no further action is necessary. If it was present in the
Raft group, `pop()` another member from the priority queue, add it to the Raft group and proceed
similarly as in the case of `membership::join(j)`
@@ -352,6 +365,7 @@ The partition request handler handles the different requests as follows:
When a node goes down the appropriate `membership::leave(i)` message (where `i` is the node that
went down) is sent to all the nodes in the cluster. The partition replica controllers in each node
handle the membership request accordingly. In effect:
+
- For every leader partition in that node:
- if there are no other follower replicas in other nodes in it's Raft group, that partition goes
down.
@@ -369,6 +383,7 @@ In our system, we use different Raft groups for different data buckets (replica
different Raft groups for different data buckets on the same node as MultiRaft.
Read more here:
+
-
-
@@ -377,6 +392,7 @@ Every partition controller is backed by a `segmented_log` for persisting records
### Persistence mechanism
#### `segmented_log`: Persistent data structure for storing records in a partition
+
The segmented-log data structure for storing was originally described in the [Apache
Kafka](https://www.microsoft.com/en-us/research/wp-content/uploads/2017/09/Kafka.pdf) paper.
@@ -392,6 +408,7 @@ A segmented log is a collection of read segments and a single write segment. Eac
backed by a storage file on disk called "store".
The log is:
+
- "immutable", since only "append", "read" and "truncate" operations are allowed. It is not possible
to update or delete records from the middle of the log.
- "segmented", since it is composed of segments, where each segment services records from a
@@ -407,6 +424,7 @@ When reading from a particular offset, we linearly check which segment contains
segment. If a segment capable of servicing a read from the given offset is found, we read from that
segment. If no such segment is found among the read segments, we default to the write segment. The
following scenarios may occur when reading from the write segment in this case:
+
- The write segment has synced the messages including the message at the given offset. In this case
the record is read successfully and returned.
- The write segment hasn't synced the data at the given offset. In this case the read fails with a
@@ -414,9 +432,11 @@ following scenarios may occur when reading from the write segment in this case:
- If the offset is out of bounds of even the write segment, we return an "out of bounds" error.
#### `laminarmq` specific enhancements to the `segmented_log` data structure
+
While the conventional `segmented_log` data structure is quite performant for a `commit_log`
implementation, it still requires the following properties to hold true for the record being
appended:
+
- We have the entire record in memory
- We know the record bytes' length and record bytes' checksum before the record is appended
@@ -443,6 +463,7 @@ records[i+1].position = records[i].position + records[i].record_header.length
// segment index invariants in segmented_log
segments[i+1].base_index = segments[i].highest_index = segments[i].index[index.len-1].index + 1
```
+
Fig: Data organisation for persisting the segmented_log
data structure on a
*nix
file system.
@@ -459,6 +480,7 @@ record bytes to the store, we write it's corresponding `record_header` (containi
length), position and index as an `index_record` in the segment index.
This provides two quality of life enhancements:
+
- Allow asynchronous streaming writes, without having to concatenate intermediate byte buffers
- Records are accessed much more easily with easy to use indices
@@ -486,15 +508,16 @@ This execution model is based on the executor, reactor, waker abstractions used
runtimes. We don't have to specifically care about how and where a particular future is executed.
The data flow in this execution model is as follows:
+
- A HTTP server future parses HTTP requests from the request socket
- For every HTTP request it creates a new future to handle it
- The HTTP handler future sends the request and a response channel tx to the request router via a channel.
-It also awaits on the response rx end.
+ It also awaits on the response rx end.
- The request router future maintains a map of partition_id to designated request channel tx for each
-partition controller future.
+ partition controller future.
- For every partition request received it forwards the request on the appropriate partition request
-channel tx. If a `partition::create(...)` request is received it creates a new partition controller
-future.
+ channel tx. If a `partition::create(...)` request is received it creates a new partition controller
+ future.
- The partition controller future send back the response to the provided response channel tx.
- The response poller future received it and responds back with a serialized response to the socket.
@@ -523,6 +546,7 @@ than the one that runs tasks for persisting data to the disk.
We re-use the same constructs that we use in the general async runtime execution model. The only
difference being, we explicitly care about in which task queue a class of future's tasks are
executed. In our case, we have the following 4 task queues:
+
- Request router task queue
- HTTP server request parser task queue
- Partition replica controller task queue
@@ -573,6 +597,7 @@ ulimit -l
```
If the `memlock` resource limit (rlimit) is lesser than 512 KiB, you can increase it as follows:
+
```sh
sudo vi /etc/security/limits.conf
* hard memlock 512
@@ -582,15 +607,18 @@ sudo vi /etc/security/limits.conf
To make the new limits effective, you need to log in to the machine again. Verify whether the limits
have been reflected with `ulimit` as described above.
->(On old WSL versions, you might need to spawn a login shell every time for the limits to be
->reflected:
->```sh
->su ${USER} -l
->```
->The limits persist once inside the login shell. This is not necessary on the latest WSL2 version as
->of 22.12.2022)
+> (On old WSL versions, you might need to spawn a login shell every time for the limits to be
+> reflected:
+>
+> ```sh
+> su ${USER} -l
+> ```
+>
+> The limits persist once inside the login shell. This is not necessary on the latest WSL2 version as
+> of 22.12.2022)
Finally, clone the repository and run the tests:
+
```sh
git clone https://github.com/arindas/laminarmq.git
cd laminarmq/
@@ -611,52 +639,54 @@ cargo bench
The complete latest benchmark reports are available at .
All benchmarks in the reports have been run on a machine (HP Pavilion x360 Convertible 14-ba0xx) with:
+
- 4 core CPU (Intel(R) Core(TM) i5-7200U CPU @ 2.50GHz)
- 8GB RAM (SK Hynix HMA81GS6AFR8N-UH DDR4 2133 MT/s)
- 128GB SSD storage (SanDisk SD8SN8U-128G-1006)
### Selected Benchmark Reports
- __Note__: We use the following names for different record sizes:
-
-
- size_name |
- size |
- comments |
-
-
- tiny |
- 12 bytes |
- none |
-
-
- tweet |
- 140 bytes |
- none |
-
-
- half_k |
- 560 bytes |
- ≈ 512 bytes |
-
-
- k |
- 1120 bytes |
- ≈ 1024 bytes (1 KiB) |
-
-
- linked_in_post |
- 2940 bytes |
- ≤ 3000 bytes (3 KB) |
-
-
- blog |
- 11760 bytes (11.76 KB) |
- 4x linked_in_post |
-
-
-
-This section presents some selected benchmark reports:
+This section presents some selected benchmark reports.
+
+> **Note**: We use the following names for different record sizes:
+>
+>
+>
+> size_name |
+> size |
+> comments |
+>
+>
+> tiny |
+> 12 bytes |
+> none |
+>
+>
+> tweet |
+> 140 bytes |
+> none |
+>
+>
+> half_k |
+> 560 bytes |
+> ≈ 512 bytes |
+>
+>
+> k |
+> 1120 bytes |
+> ≈ 1024 bytes (1 KiB) |
+>
+>
+> linked_in_post |
+> 2940 bytes |
+> ≤ 3000 bytes (3 KB) |
+>
+>
+> blog |
+> 11760 bytes (11.76 KB) |
+> 4X linked_in_post |
+>
+>
#### `commit_log` write benchmark with 1KB messages
diff --git a/examples/laminarmq-tokio-commit-log-server/README.md b/examples/laminarmq-tokio-commit-log-server/README.md
new file mode 100644
index 000000000..b03eb3511
--- /dev/null
+++ b/examples/laminarmq-tokio-commit-log-server/README.md
@@ -0,0 +1,72 @@
+# laminarmq-tokio-commit-log-server
+
+A simple persistent commit log server using the tokio runtime.
+
+## Endpoints
+
+This server exposes the following HTTP endpoints:
+
+```rust
+.route("/index_bounds", get(index_bounds)) // obtain the index bounds
+.route("/records/:index", get(read)) // obtain the record at given index
+.route("/records", post(append)) // append a new record at the end of the commit log
+
+.route("/rpc/truncate", post(truncate)) // truncate the commit log
+ // expects JSON: { "truncate_index": }
+ // records starting from truncate_index are removed
+```
+
+## Usage
+
+Run the server as follows:
+
+```sh
+cargo run --example laminarmq-tokio-commit-log-server --release
+```
+
+The server optionally expects an environment variable: `STORAGE_DIRECTORY`.
+
+The default value is:
+
+```rust
+const DEFAULT_STORAGE_DIRECTORY: &str = "./.storage/laminarmq_tokio_commit_log_server/commit_log";
+```
+
+You may specify it as follows:
+
+```sh
+STORAGE_DIRECTORY="" cargo run --release
+```
+
+Once the server is running you may make requests as follows:
+
+```sh
+curl -w "\n" "http://127.0.0.1:3000/index_bounds"
+
+curl -w "\n" --request POST --data "Hello World" "http://127.0.0.1:3000/records"
+curl -w "\n" --request POST --data "Moshi moshi" "http://127.0.0.1:3000/records"
+curl -w "\n" --request POST --data "Bonjour <3" "http://127.0.0.1:3000/records"
+
+curl -w "\n" "http://127.0.0.1:3000/index_bounds"
+
+curl -w "\n" "http://127.0.0.1:3000/records/1"
+
+curl -w "\n" --header "Content-Type: application/json" --request POST \
+ --data "{\"truncate_index\": 1}" \
+ "http://127.0.0.1:3000/rpc/truncate"
+
+curl -w "\n" "http://127.0.0.1:3000/index_bounds"
+```
+
+Here's what's happening above:
+
+- First request find the index_bounds, (highest_index) is exclusive
+- We append three records with the given data
+- We lookup the current index_bounds after appending to the commit_log
+- We read the record at index 1
+- We truncate the commit_log at index 1. All records starting from index 1 are
+ removed. After this operation the bounds are [0, 1)
+- We lookup the current index_bounds after truncating the commit_log
+
+> Note: The `-w "\n"` flag is for appending a "\n" to the output of curl. This way
+> the output is more readable.
diff --git a/examples/laminarmq-tokio-commit-log-server/main.rs b/examples/laminarmq-tokio-commit-log-server/main.rs
new file mode 100644
index 000000000..4775db5e9
--- /dev/null
+++ b/examples/laminarmq-tokio-commit-log-server/main.rs
@@ -0,0 +1,556 @@
+use axum::{
+ error_handling::HandleErrorLayer,
+ extract::{Path, State},
+ http::StatusCode,
+ response::{IntoResponse, Response},
+ routing::{get, post},
+ Json, Router,
+};
+use hyper::{Body, Request};
+
+extern crate laminarmq;
+
+use laminarmq::{
+ common::{cache::NoOpCache, serde_compat::bincode},
+ storage::{
+ commit_log::{
+ segmented_log::{segment::Config as SegmentConfig, Config, MetaWithIdx, SegmentedLog},
+ CommitLog, Record,
+ },
+ impls::{
+ common::DiskBackedSegmentStorageProvider,
+ in_mem::{segment::InMemSegmentStorageProvider, storage::InMemStorage},
+ tokio::storage::std_seek_read::{
+ StdSeekReadFileStorage, StdSeekReadFileStorageProvider,
+ },
+ },
+ },
+};
+use serde::{Deserialize, Serialize};
+use std::{
+ env,
+ fmt::Debug,
+ future::Future,
+ io,
+ net::SocketAddr,
+ rc::Rc,
+ thread::{self, JoinHandle},
+ time::Duration,
+};
+use tokio::{
+ signal,
+ sync::{mpsc, oneshot, AcquireError, RwLock, Semaphore},
+ task,
+};
+use tower::{BoxError, ServiceBuilder};
+use tower_http::trace::TraceLayer;
+use tracing::{error, error_span, info, info_span, Instrument};
+use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
+
+pub type InMemSegLog = SegmentedLog<
+ InMemStorage,
+ (),
+ crc32fast::Hasher,
+ u32,
+ usize,
+ bincode::BinCode,
+ InMemSegmentStorageProvider,
+ NoOpCache,
+>;
+
+#[allow(unused)]
+#[derive(Clone)]
+struct AppState {
+ message_tx: mpsc::Sender,
+}
+
+#[derive(Debug)]
+pub enum ChannelError {
+ SendError,
+ RecvError,
+}
+
+impl AppState {
+ pub async fn enqueue_request(
+ &self,
+ request: AppRequest,
+ ) -> Result, ChannelError> {
+ let (resp_tx, resp_rx) = oneshot::channel();
+
+ let message = Message::Connection { resp_tx, request };
+
+ self.message_tx
+ .send(message)
+ .await
+ .map_err(|_| ChannelError::SendError)?;
+
+ Ok(resp_rx)
+ }
+}
+
+pub struct CommitLogServerConfig {
+ message_buffer_size: usize,
+ max_connections: usize,
+}
+
+#[allow(unused)]
+const IN_MEMORY_SEGMENTED_LOG_CONFIG: Config = Config {
+ segment_config: SegmentConfig {
+ max_store_size: 1048576, // = 1MiB
+ max_store_overflow: 524288,
+ max_index_size: 1048576,
+ },
+ initial_index: 0,
+ num_index_cached_read_segments: None,
+};
+
+const PERSISTENT_SEGMENTED_LOG_CONFIG: Config = Config {
+ segment_config: SegmentConfig {
+ max_store_size: 10000000, // ~ 10MB
+ max_store_overflow: 10000000 / 2,
+ max_index_size: 10000000,
+ },
+ initial_index: 0,
+ num_index_cached_read_segments: None,
+};
+
+const DEFAULT_STORAGE_DIRECTORY: &str = "./.storage/laminarmq_tokio_commit_log_server/commit_log";
+
+#[tokio::main]
+async fn main() {
+ tracing_subscriber::registry()
+ .with(
+ tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| {
+ "laminarmq_tokio_commit_log_server=debug,tower_http=debug".into()
+ }),
+ )
+ .with(tracing_subscriber::fmt::layer())
+ .init();
+
+ let storage_directory =
+ env::var("STORAGE_DIRECTORY").unwrap_or(DEFAULT_STORAGE_DIRECTORY.into());
+
+ let (join_handle, message_tx) = CommitLogServer::orchestrate(
+ CommitLogServerConfig {
+ message_buffer_size: 1024,
+ max_connections: 512,
+ },
+ || async {
+ let disk_backed_storage_provider =
+ DiskBackedSegmentStorageProvider::<_, _, u32>::with_storage_directory_path_and_provider(
+ storage_directory,
+ StdSeekReadFileStorageProvider,
+ )
+ .unwrap();
+
+ SegmentedLog::<
+ StdSeekReadFileStorage,
+ (),
+ crc32fast::Hasher,
+ u32,
+ u64,
+ bincode::BinCode,
+ _,
+ NoOpCache,
+ >::new(
+ PERSISTENT_SEGMENTED_LOG_CONFIG,
+ disk_backed_storage_provider,
+ )
+ .await
+ .unwrap()
+ },
+ );
+
+ // Compose the routes
+ let app = Router::new()
+ .route("/index_bounds", get(index_bounds))
+ .route("/records/:index", get(read))
+ .route("/records", post(append))
+ .route("/rpc/truncate", post(truncate))
+ // Add middleware to all routes
+ .layer(
+ ServiceBuilder::new()
+ .layer(HandleErrorLayer::new(|error: BoxError| async move {
+ if error.is::() {
+ Ok(StatusCode::REQUEST_TIMEOUT)
+ } else {
+ Err((
+ StatusCode::INTERNAL_SERVER_ERROR,
+ format!("Unhandled internal error: {}", error),
+ ))
+ }
+ }))
+ .timeout(Duration::from_secs(10))
+ .layer(TraceLayer::new_for_http())
+ .into_inner(),
+ )
+ .with_state(AppState {
+ message_tx: message_tx.clone(),
+ });
+
+ let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
+
+ tracing::debug!("listening on {}", addr);
+
+ hyper::Server::bind(&addr)
+ .serve(app.into_make_service())
+ .with_graceful_shutdown(shutdown_signal())
+ .await
+ .unwrap();
+
+ message_tx.send(Message::Terminate).await.unwrap();
+
+ tokio::task::spawn_blocking(|| join_handle.join())
+ .await
+ .unwrap()
+ .unwrap()
+ .unwrap();
+
+ info!("Exiting application.");
+}
+
+async fn shutdown_signal() {
+ let ctrl_c = async {
+ signal::ctrl_c()
+ .await
+ .expect("failed to install Ctrl+C handler");
+ };
+
+ #[cfg(unix)]
+ let terminate = async {
+ signal::unix::signal(signal::unix::SignalKind::terminate())
+ .expect("failed to install signal handler")
+ .recv()
+ .await;
+ };
+
+ #[cfg(not(unix))]
+ let terminate = std::future::pending::<()>();
+
+ tokio::select! {
+ _ = ctrl_c => {},
+ _ = terminate => {},
+ }
+
+ tracing::info!("signal received, starting graceful shutdown");
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct IndexBoundsResponse {
+ highest_index: u32,
+ lowest_index: u32,
+}
+
+pub struct StringError(String);
+
+impl From for StringError {
+ fn from(value: String) -> Self {
+ Self(value)
+ }
+}
+
+impl IntoResponse for StringError {
+ fn into_response(self) -> Response {
+ (StatusCode::INTERNAL_SERVER_ERROR, self.0).into_response()
+ }
+}
+
+async fn index_bounds(
+ State(state): State,
+) -> Result, StringError> {
+ let resp_rx = state
+ .enqueue_request(AppRequest::IndexBounds)
+ .await
+ .map_err(|err| format!("error sending request to commit_log_server: {:?}", err))?;
+
+ let response = resp_rx
+ .await
+ .map_err(|err| format!("error receiving response: {:?}", err))??;
+
+ if let AppResponse::IndexBounds(index_bounds_response) = response {
+ Ok(Json(index_bounds_response))
+ } else {
+ Err(StringError("invalid response type".into()))
+ }
+}
+
+async fn read(
+ Path(index): Path,
+ State(state): State,
+) -> Result, StringError> {
+ let resp_rx = state
+ .enqueue_request(AppRequest::Read { index })
+ .await
+ .map_err(|err| format!("error sending request to commit_log_server: {:?}", err))?;
+
+ let response = resp_rx
+ .await
+ .map_err(|err| format!("error receiving response: {:?}", err))??;
+
+ if let AppResponse::Read { record_value } = response {
+ Ok(record_value)
+ } else {
+ Err(StringError("invalid response type".into()))
+ }
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct AppendResponse {
+ write_index: u32,
+}
+
+async fn append(
+ State(state): State,
+ request: Request,
+) -> Result, StringError> {
+ let resp_rx = state
+ .enqueue_request(AppRequest::Append {
+ record_value: request.into_body(),
+ })
+ .await
+ .map_err(|err| format!("error sending request to commit_log_server: {:?}", err))?;
+
+ let response = resp_rx
+ .await
+ .map_err(|err| format!("error receiving reponse: {:?}", err))??;
+
+ if let AppResponse::Append(append_reponse) = response {
+ Ok(Json(append_reponse))
+ } else {
+ Err(StringError("invalid response type".into()))
+ }
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct TruncateRequest {
+ truncate_index: u32,
+}
+
+async fn truncate(
+ State(state): State,
+ Json(truncate_request): Json,
+) -> Result<(), StringError> {
+ let resp_rx = state
+ .enqueue_request(AppRequest::Truncate(truncate_request))
+ .await
+ .map_err(|err| format!("error sending request to commit_log_server: {:?}", err))?;
+
+ let response = resp_rx
+ .await
+ .map_err(|err| format!("error receiving response: {:?}", err))??;
+
+ if let AppResponse::Truncate = response {
+ Ok(())
+ } else {
+ Err(StringError("invalid response type".into()))
+ }
+}
+
+#[derive(Debug)]
+pub enum AppResponse {
+ IndexBounds(IndexBoundsResponse),
+ Read { record_value: Vec },
+ Append(AppendResponse),
+ Truncate,
+}
+
+#[derive(Debug)]
+pub enum AppRequest {
+ IndexBounds,
+ Read { index: u32 },
+ Append { record_value: Body },
+ Truncate(TruncateRequest),
+}
+
+type ResponseResult = Result;
+
+pub enum Message {
+ Connection {
+ resp_tx: oneshot::Sender,
+ request: AppRequest,
+ },
+
+ Terminate,
+}
+
+#[allow(unused)]
+pub struct CommitLogServer {
+ message_rx: mpsc::Receiver,
+ commit_log: CL,
+ max_connections: usize,
+}
+
+impl CommitLogServer {
+ pub fn new(
+ message_rx: mpsc::Receiver,
+ commit_log: CL,
+ max_connections: usize,
+ ) -> Self {
+ Self {
+ message_rx,
+ commit_log,
+ max_connections,
+ }
+ }
+}
+
+#[derive(Debug)]
+pub enum CommitLogServerError {
+ ConnPermitAcquireError(AcquireError),
+ CommitLogError(CLE),
+ IoError(io::Error),
+ ResponseSendError,
+}
+
+pub type CommitLogServerResult = Result>;
+
+impl CommitLogServer
+where
+ CL: CommitLog, Vec, Idx = u32> + 'static,
+{
+ pub async fn handle_request(
+ commit_log: Rc>,
+ request: AppRequest,
+ ) -> Result> {
+ match request {
+ AppRequest::IndexBounds => {
+ let commit_log = commit_log.read().await;
+
+ Ok(AppResponse::IndexBounds(IndexBoundsResponse {
+ highest_index: commit_log.highest_index(),
+ lowest_index: commit_log.lowest_index(),
+ }))
+ }
+
+ AppRequest::Read { index: idx } => commit_log
+ .read()
+ .await
+ .read(&idx)
+ .await
+ .map(|Record { metadata: _, value }| AppResponse::Read {
+ record_value: value,
+ })
+ .map_err(CommitLogServerError::CommitLogError),
+
+ AppRequest::Append { record_value } => commit_log
+ .write()
+ .await
+ .append(Record {
+ metadata: MetaWithIdx {
+ metadata: (),
+ index: None,
+ },
+ value: record_value,
+ })
+ .await
+ .map(|write_index| AppResponse::Append(AppendResponse { write_index }))
+ .map_err(CommitLogServerError::CommitLogError),
+
+ AppRequest::Truncate(TruncateRequest {
+ truncate_index: idx,
+ }) => commit_log
+ .write()
+ .await
+ .truncate(&idx)
+ .await
+ .map(|_| AppResponse::Truncate)
+ .map_err(CommitLogServerError::CommitLogError),
+ }
+ }
+
+ pub async fn serve(self) {
+ let (mut message_rx, commit_log, max_connections) =
+ (self.message_rx, self.commit_log, self.max_connections);
+
+ let conn_semaphore = Rc::new(Semaphore::new(max_connections));
+ let commit_log = Rc::new(RwLock::new(commit_log));
+
+ let commit_log_copy = commit_log.clone();
+
+ let local = task::LocalSet::new();
+
+ local
+ .run_until(async move {
+ while let Some(Message::Connection { resp_tx, request }) = message_rx.recv().await {
+ let (conn_semaphore, commit_log_copy) =
+ (conn_semaphore.clone(), commit_log_copy.clone());
+
+ task::spawn_local(
+ async move {
+ let response = async move {
+ let _semaphore_permit = conn_semaphore
+ .acquire()
+ .await
+ .map_err(CommitLogServerError::ConnPermitAcquireError)?;
+
+ let commit_log = commit_log_copy;
+
+ let response = Self::handle_request(commit_log, request).await?;
+
+ Ok::<_, CommitLogServerError>(response)
+ }
+ .await
+ .map_err(|err| format!("{:?}", err));
+
+ if let Err(err) = resp_tx.send(response) {
+ error!("error sending response: {:?}", err)
+ }
+ }
+ .instrument(error_span!("commit_log_server_handler_task")),
+ );
+ }
+ })
+ .await;
+
+ match Rc::into_inner(commit_log) {
+ Some(commit_log) => match commit_log.into_inner().close().await {
+ Ok(_) => {}
+ Err(err) => error!("error closing commit_log: {:?}", err),
+ },
+ None => error!("unable to unrwap commit_log Rc"),
+ };
+
+ info!("Closed commit_log.");
+ }
+
+ pub fn orchestrate(
+ server_config: CommitLogServerConfig,
+ commit_log_provider: CLP,
+ ) -> (JoinHandle>, mpsc::Sender)
+ where
+ CLP: FnOnce() -> CLF + Send + 'static,
+ CLF: Future