From 00d398771d6b2ecdfd42464b7a5232f48a1d0e7c Mon Sep 17 00:00:00 2001 From: Rob Date: Tue, 13 Feb 2024 16:20:15 -0500 Subject: [PATCH] add local db, feature flag, clean up trait bounds --- .github/workflows/test.yml | 7 +- Cargo.lock | 701 +++++++++++++++++- broker/Cargo.toml | 7 +- broker/src/handlers/broker.rs | 30 +- broker/src/handlers/user.rs | 70 +- broker/src/lib.rs | 73 +- broker/src/main.rs | 11 +- broker/src/state.rs | 2 +- broker/src/tasks/broker_listener.rs | 15 +- broker/src/tasks/heartbeat.rs | 28 +- broker/src/tasks/update.rs | 15 +- broker/src/tasks/user_listener.rs | 14 +- client/Cargo.toml | 7 +- client/src/main.rs | 2 +- client/src/retry.rs | 8 +- local_db/db.sqlite | Bin 0 -> 24576 bytes .../migrations/20240213022557_migrate.sql | 12 + marshal/Cargo.toml | 10 +- marshal/src/handlers.rs | 18 +- marshal/src/lib.rs | 32 +- marshal/src/main.rs | 25 +- process-compose.yaml | 14 +- proto/Cargo.toml | 18 +- proto/src/connection/auth/broker.rs | 34 +- proto/src/connection/auth/marshal.rs | 31 +- proto/src/connection/protocols/mod.rs | 2 +- proto/src/connection/protocols/quic.rs | 10 +- proto/src/connection/protocols/tcp.rs | 2 +- proto/src/crypto.rs | 10 +- proto/src/discovery/embedded.rs | 258 +++++++ proto/src/discovery/mod.rs | 107 +++ proto/src/{ => discovery}/redis.rs | 71 +- proto/src/lib.rs | 18 +- proto/src/message.rs | 7 +- 34 files changed, 1399 insertions(+), 270 deletions(-) create mode 100644 local_db/db.sqlite create mode 100644 local_db/migrations/20240213022557_migrate.sql create mode 100644 proto/src/discovery/embedded.rs create mode 100644 proto/src/discovery/mod.rs rename proto/src/{ => discovery}/redis.rs (81%) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index cb974bc..2242e22 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -14,5 +14,10 @@ jobs: - uses: actions/checkout@v4 - run: sudo apt-get install capnproto - run: cargo fmt + + # run clippy for each feature flag configuration + - run: cargo clippy + - run: cargo clippy --no-default-features + - run: cargo test - - run: cargo clippy \ No newline at end of file + \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index d23d55a..6c20d33 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -34,6 +34,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77c3a9648d43b9cd48db467b3f87fdd6e146bcc88ab0180006cef2179fe11d01" dependencies = [ "cfg-if", + "getrandom", "once_cell", "version_check", "zerocopy", @@ -360,6 +361,25 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "atoi" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528" +dependencies = [ + "num-traits", +] + +[[package]] +name = "atomic-write-file" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edcdbedc2236483ab103a53415653d6b4442ea6141baf1ffa85df29635e88436" +dependencies = [ + "nix", + "rand", +] + [[package]] name = "autocfg" version = "1.1.0" @@ -393,12 +413,27 @@ version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" +[[package]] +name = "base64ct" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" + [[package]] name = "bitflags" version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "bitflags" +version = "2.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed570934406eb16438a4e976b1b4500774099c13b8cb96eec99f620f05090ddf" +dependencies = [ + "serde", +] + [[package]] name = "blake2" version = "0.10.6" @@ -605,6 +640,12 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "const-oid" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" + [[package]] name = "core-foundation" version = "0.9.4" @@ -630,12 +671,27 @@ dependencies = [ "libc", ] +[[package]] +name = "crc" +version = "3.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86ec7a15cbe22e59248fc7eadb1907dab5ba09372595da4d73dd805ed4417dfe" +dependencies = [ + "crc-catalog", +] + [[package]] name = "crc-any" version = "2.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c01a5e1f881f6fb6099a7bdf949e946719fd4f1fefa56264890574febf0eb6d0" +[[package]] +name = "crc-catalog" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" + [[package]] name = "crossbeam-deque" version = "0.8.5" @@ -655,6 +711,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df0346b5d5e76ac2fe4e327c5fd1118d6be7c51dfb18f9b7922923f287471e35" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.19" @@ -722,6 +787,17 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "der" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fffa369a668c8af7dbf8b5e56c9f744fbd399949ed171606040001947de40b1c" +dependencies = [ + "const-oid", + "pem-rfc7468", + "zeroize", +] + [[package]] name = "deranged" version = "0.3.11" @@ -749,6 +825,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ "block-buffer", + "const-oid", "crypto-common", "subtle", ] @@ -770,6 +847,12 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" +[[package]] +name = "dotenvy" +version = "0.15.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" + [[package]] name = "downcast" version = "0.11.0" @@ -793,6 +876,9 @@ name = "either" version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" +dependencies = [ + "serde", +] [[package]] name = "embedded-io" @@ -800,17 +886,73 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "edd0f118536f44f5ccd48bcb8b111bdc3de888b58c74639dfb034a357d0f206d" +[[package]] +name = "equivalent" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" + +[[package]] +name = "errno" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "espresso-systems-common" version = "0.4.0" source = "git+https://github.com/espressosystems/espresso-systems-common?tag=0.4.0#5abd890f79014a86db31286e1f3a529f161e69de" +[[package]] +name = "etcetera" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "136d1b5283a1ab77bd9257427ffd09d8667ced0570b6f938942bc7568ed5b943" +dependencies = [ + "cfg-if", + "home", + "windows-sys 0.48.0", +] + +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + +[[package]] +name = "fastrand" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" + [[package]] name = "fiat-crypto" version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "27573eac26f4dd11e2b1916c3fe1baa56407c83c71a773a8ba17ec0bca03b6b7" +[[package]] +name = "finl_unicode" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fcfdc7a0362c9f4444381a9e697c79d435fe65b52a37466fc2c1184cee9edc6" + +[[package]] +name = "flume" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55ac459de2512911e4b674ce33cf20befaba382d05b62b008afc1c8b57cbf181" +dependencies = [ + "futures-core", + "futures-sink", + "spin 0.9.8", +] + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -868,6 +1010,17 @@ dependencies = [ "futures-util", ] +[[package]] +name = "futures-intrusive" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d930c203dd0b6ff06e0201a4a2fe9149b43c684fd4420555b26d21b1a02956f" +dependencies = [ + "futures-core", + "lock_api", + "parking_lot", +] + [[package]] name = "futures-io" version = "0.3.30" @@ -968,11 +1121,23 @@ dependencies = [ "allocator-api2", ] +[[package]] +name = "hashlink" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8094feaf31ff591f651a2664fb9cfd92bba7a60ce3197265e9482ebe753c8f7" +dependencies = [ + "hashbrown 0.14.3", +] + [[package]] name = "heck" version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" +dependencies = [ + "unicode-segmentation", +] [[package]] name = "hermit-abi" @@ -980,6 +1145,39 @@ version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d3d0e0f38255e7fa3cf31335b3a56f05febd18025f4db5ef7a0cfb4f8da651f" +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + +[[package]] +name = "hkdf" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b5f8eb2ad728638ea2c7d47a21db23b7b58a72ed6a38256b8a1849f15fbbdf7" +dependencies = [ + "hmac", +] + +[[package]] +name = "hmac" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" +dependencies = [ + "digest", +] + +[[package]] +name = "home" +version = "0.5.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3d1354bf6b7235cb4a0576c2619fd4ed18183f689b12b006a0ee7329eeff9a5" +dependencies = [ + "windows-sys 0.52.0", +] + [[package]] name = "idna" version = "0.5.0" @@ -990,6 +1188,16 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "indexmap" +version = "2.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "233cf39063f058ea2caae4091bf4a3ef70a653afbc026f5c4a4135d114e3c177" +dependencies = [ + "equivalent", + "hashbrown 0.14.3", +] + [[package]] name = "inout" version = "0.1.3" @@ -1133,6 +1341,9 @@ name = "lazy_static" version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +dependencies = [ + "spin 0.5.2", +] [[package]] name = "libc" @@ -1140,6 +1351,29 @@ version = "0.2.152" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13e3bf6590cbc649f4d1a3eefc9d5d6eb746f5200ffb04e5e142700b8faa56e7" +[[package]] +name = "libm" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" + +[[package]] +name = "libsqlite3-sys" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf4e226dcd58b4be396f7bd3c20da8fdee2911400705297ba7d2d7cc2c30f716" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + +[[package]] +name = "linux-raw-sys" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c" + [[package]] name = "local-ip-address" version = "0.5.7" @@ -1173,6 +1407,7 @@ name = "marshal" version = "0.1.0" dependencies = [ "ark-serialize", + "clap", "jf-primitives", "proto", "tokio", @@ -1180,6 +1415,16 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "md-5" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" +dependencies = [ + "cfg-if", + "digest", +] + [[package]] name = "memchr" version = "2.7.1" @@ -1198,6 +1443,12 @@ dependencies = [ "zeroize", ] +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "miniz_oxide" version = "0.7.1" @@ -1270,6 +1521,27 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "nix" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053" +dependencies = [ + "bitflags 2.4.2", + "cfg-if", + "libc", +] + +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -1291,6 +1563,23 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-bigint-dig" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc84195820f291c7697304f3cbdadd1cb7199c0efc917ff5eafd71225c136151" +dependencies = [ + "byteorder", + "lazy_static", + "libm", + "num-integer", + "num-iter", + "num-traits", + "rand", + "smallvec", + "zeroize", +] + [[package]] name = "num-conv" version = "0.1.0" @@ -1307,6 +1596,17 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-iter" +version = "0.1.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d869c01cc0c455284163fd0092f1f93835385ccab5a98a0dcc497b2f8bf055a9" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.17" @@ -1314,6 +1614,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39e3200413f237f41ab11ad6d161bc7239c84dcb631773ccd7de3dfe4b5c267c" dependencies = [ "autocfg", + "libm", ] [[package]] @@ -1398,6 +1699,15 @@ dependencies = [ "serde", ] +[[package]] +name = "pem-rfc7468" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88b39c9bfcfc231068454382784bb460aae594343fb030d46e9f50a645418412" +dependencies = [ + "base64ct", +] + [[package]] name = "percent-encoding" version = "2.3.1" @@ -1436,6 +1746,33 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkcs1" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8ffb9f10fa047879315e6625af03c164b16962a5368d724ed16323b68ace47f" +dependencies = [ + "der", + "pkcs8", + "spki", +] + +[[package]] +name = "pkcs8" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7" +dependencies = [ + "der", + "spki", +] + +[[package]] +name = "pkg-config" +version = "0.3.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2900ede94e305130c13ddd391e0ab7cbaeb783945ae07a279c268cb05109c6cb" + [[package]] name = "platforms" version = "3.3.0" @@ -1516,6 +1853,7 @@ dependencies = [ "rcgen", "redis", "rustls", + "sqlx", "thiserror", "tokio", "tracing", @@ -1671,7 +2009,7 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa" dependencies = [ - "bitflags", + "bitflags 1.3.2", ] [[package]] @@ -1703,6 +2041,26 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "rsa" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d0e5124fcb30e76a7e79bfee683a2746db83784b86289f6251b54b7950a0dfc" +dependencies = [ + "const-oid", + "digest", + "num-bigint-dig", + "num-integer", + "num-traits", + "pkcs1", + "pkcs8", + "rand_core", + "signature", + "spki", + "subtle", + "zeroize", +] + [[package]] name = "rustc-demangle" version = "0.1.23" @@ -1724,6 +2082,19 @@ dependencies = [ "semver", ] +[[package]] +name = "rustix" +version = "0.38.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ea3e1a662af26cd7a3ba09c0297a31af215563ecf42817c98df621387f4e949" +dependencies = [ + "bitflags 2.4.2", + "errno", + "libc", + "linux-raw-sys", + "windows-sys 0.52.0", +] + [[package]] name = "rustls" version = "0.21.10" @@ -1804,7 +2175,7 @@ version = "2.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05b64fb303737d99b81884b2c63433e9ae28abebe5eb5045dcdd175dc2ecf4de" dependencies = [ - "bitflags", + "bitflags 1.3.2", "core-foundation", "core-foundation-sys", "libc", @@ -1868,6 +2239,17 @@ dependencies = [ "serde", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sha1_smol" version = "1.0.0" @@ -1913,6 +2295,16 @@ dependencies = [ "libc", ] +[[package]] +name = "signature" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de" +dependencies = [ + "digest", + "rand_core", +] + [[package]] name = "slab" version = "0.4.9" @@ -1991,6 +2383,242 @@ name = "spin" version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] + +[[package]] +name = "spki" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d91ed6c858b01f942cd56b37a94b3e0a1798290327d1236e4d9cf4eaca44d29d" +dependencies = [ + "base64ct", + "der", +] + +[[package]] +name = "sqlformat" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce81b7bd7c4493975347ef60d8c7e8b742d4694f4c49f93e0a12ea263938176c" +dependencies = [ + "itertools 0.12.1", + "nom", + "unicode_categories", +] + +[[package]] +name = "sqlx" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dba03c279da73694ef99763320dea58b51095dfe87d001b1d4b5fe78ba8763cf" +dependencies = [ + "sqlx-core", + "sqlx-macros", + "sqlx-mysql", + "sqlx-postgres", + "sqlx-sqlite", +] + +[[package]] +name = "sqlx-core" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d84b0a3c3739e220d94b3239fd69fb1f74bc36e16643423bd99de3b43c21bfbd" +dependencies = [ + "ahash", + "atoi", + "byteorder", + "bytes", + "crc", + "crossbeam-queue", + "dotenvy", + "either", + "event-listener", + "futures-channel", + "futures-core", + "futures-intrusive", + "futures-io", + "futures-util", + "hashlink", + "hex", + "indexmap", + "log", + "memchr", + "once_cell", + "paste", + "percent-encoding", + "serde", + "serde_json", + "sha2", + "smallvec", + "sqlformat", + "thiserror", + "time", + "tokio", + "tokio-stream", + "tracing", + "url", +] + +[[package]] +name = "sqlx-macros" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89961c00dc4d7dffb7aee214964b065072bff69e36ddb9e2c107541f75e4f2a5" +dependencies = [ + "proc-macro2", + "quote", + "sqlx-core", + "sqlx-macros-core", + "syn 1.0.109", +] + +[[package]] +name = "sqlx-macros-core" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0bd4519486723648186a08785143599760f7cc81c52334a55d6a83ea1e20841" +dependencies = [ + "atomic-write-file", + "dotenvy", + "either", + "heck", + "hex", + "once_cell", + "proc-macro2", + "quote", + "serde", + "serde_json", + "sha2", + "sqlx-core", + "sqlx-mysql", + "sqlx-postgres", + "sqlx-sqlite", + "syn 1.0.109", + "tempfile", + "tokio", + "url", +] + +[[package]] +name = "sqlx-mysql" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e37195395df71fd068f6e2082247891bc11e3289624bbc776a0cdfa1ca7f1ea4" +dependencies = [ + "atoi", + "base64", + "bitflags 2.4.2", + "byteorder", + "bytes", + "crc", + "digest", + "dotenvy", + "either", + "futures-channel", + "futures-core", + "futures-io", + "futures-util", + "generic-array", + "hex", + "hkdf", + "hmac", + "itoa", + "log", + "md-5", + "memchr", + "once_cell", + "percent-encoding", + "rand", + "rsa", + "serde", + "sha1", + "sha2", + "smallvec", + "sqlx-core", + "stringprep", + "thiserror", + "time", + "tracing", + "whoami", +] + +[[package]] +name = "sqlx-postgres" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6ac0ac3b7ccd10cc96c7ab29791a7dd236bd94021f31eec7ba3d46a74aa1c24" +dependencies = [ + "atoi", + "base64", + "bitflags 2.4.2", + "byteorder", + "crc", + "dotenvy", + "etcetera", + "futures-channel", + "futures-core", + "futures-io", + "futures-util", + "hex", + "hkdf", + "hmac", + "home", + "itoa", + "log", + "md-5", + "memchr", + "once_cell", + "rand", + "serde", + "serde_json", + "sha1", + "sha2", + "smallvec", + "sqlx-core", + "stringprep", + "thiserror", + "time", + "tracing", + "whoami", +] + +[[package]] +name = "sqlx-sqlite" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "210976b7d948c7ba9fced8ca835b11cbb2d677c59c79de41ac0d397e14547490" +dependencies = [ + "atoi", + "flume", + "futures-channel", + "futures-core", + "futures-executor", + "futures-intrusive", + "futures-util", + "libsqlite3-sys", + "log", + "percent-encoding", + "serde", + "sqlx-core", + "time", + "tracing", + "url", + "urlencoding", +] + +[[package]] +name = "stringprep" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb41d74e231a107a1b4ee36bd1214b11285b77768d2e3824aedafa988fd36ee6" +dependencies = [ + "finl_unicode", + "unicode-bidi", + "unicode-normalization", +] [[package]] name = "strsim" @@ -2050,6 +2678,18 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "tempfile" +version = "3.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a365e8cd18e44762ef95d87f284f4b5cd04107fec2ff3052bd6a3e6069669e67" +dependencies = [ + "cfg-if", + "fastrand", + "rustix", + "windows-sys 0.52.0", +] + [[package]] name = "termtree" version = "0.4.1" @@ -2088,15 +2728,17 @@ dependencies = [ [[package]] name = "time" -version = "0.3.32" +version = "0.3.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe80ced77cbfb4cb91a94bf72b378b4b6791a0d9b7f09d0be747d1bdff4e68bd" +checksum = "c8248b6521bb14bc45b4067159b9b6ad792e2d6d754d6c41fb50e29fefe38749" dependencies = [ "deranged", + "itoa", "num-conv", "powerfmt", "serde", "time-core", + "time-macros", ] [[package]] @@ -2105,6 +2747,16 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" +[[package]] +name = "time-macros" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ba3a3ef41e6672a2f0f001392bb5dcd3ff0a9992d618ca761a11c3121547774" +dependencies = [ + "num-conv", + "time-core", +] + [[package]] name = "tinyvec" version = "1.6.0" @@ -2161,6 +2813,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.10" @@ -2260,6 +2923,18 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "unicode-segmentation" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4c87d22b6e3f4a18d4d40ef354e97c90fcb14dd91d7dc0aa9d8a1172ebf7202" + +[[package]] +name = "unicode_categories" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" + [[package]] name = "universal-hash" version = "0.5.1" @@ -2293,6 +2968,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "urlencoding" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" + [[package]] name = "utf8parse" version = "0.2.1" @@ -2305,6 +2986,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.4" @@ -2383,6 +3070,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "whoami" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22fc3756b8a9133049b26c7f61ab35416c130e8c09b660f5b3958b446f52cc50" + [[package]] name = "winapi" version = "0.3.9" diff --git a/broker/Cargo.toml b/broker/Cargo.toml index 733854e..0d65142 100644 --- a/broker/Cargo.toml +++ b/broker/Cargo.toml @@ -8,9 +8,14 @@ description = "Defines the broker server, which is responsible for routing messa [profile.release] debug = true +[features] +local_discovery = ["proto/local_discovery"] +insecure = ["proto/insecure"] +default = ["local_discovery", "insecure"] + [dependencies] jf-primitives.workspace = true -proto.path = "../proto" +proto = { path = "../proto", default-features = false } tokio.workspace = true tracing.workspace = true ark-serialize.workspace = true diff --git a/broker/src/handlers/broker.rs b/broker/src/handlers/broker.rs index 0fd52aa..7730b53 100644 --- a/broker/src/handlers/broker.rs +++ b/broker/src/handlers/broker.rs @@ -12,7 +12,7 @@ use proto::{ crypto::{Scheme, Serializable}, error::{Error, Result}, message::Message, - verify_broker, + verify_broker, BrokerProtocol, }; use tracing::{error, info}; @@ -20,12 +20,8 @@ use crate::{ get_lock, send_broadcast, send_direct, send_or_remove_many, state::ConnectionId, Inner, }; -impl< - BrokerSignatureScheme: Scheme, - BrokerProtocolType: Protocol, - UserSignatureScheme: Scheme, - UserProtocolType: Protocol, - > Inner +impl + Inner where BrokerSignatureScheme::VerificationKey: Serializable, BrokerSignatureScheme::Signature: Serializable, @@ -35,7 +31,10 @@ where /// This function is the callback for handling a broker (private) connection. pub async fn handle_broker_connection( self: Arc, - mut connection: (BrokerProtocolType::Sender, BrokerProtocolType::Receiver), + mut connection: ( + ::Sender, + ::Receiver, + ), is_outbound: bool, ) { // Depending on which way the direction came in, we will want to authenticate with a different @@ -54,14 +53,23 @@ where // Create new batch sender let (sender, receiver) = connection; // TODO: parameterize max interval and max size - let sender = Arc::from(BatchedSender::::from( + let sender = Arc::from(BatchedSender::::from( sender, Duration::from_millis(50), 1500, )); // Add to our connected broker identities so we don't try to reconnect - get_lock!(self.connected_broker_identities, write).insert(broker_address.clone()); + let mut connected_broker_guard = get_lock!(self.connected_broker_identities, write); + if connected_broker_guard.contains(&broker_address) { + // If the address is already there (we're already connected), drop this one + return; + } + + // If we aren't already connected, add it + connected_broker_guard.insert(broker_address.clone()); + + drop(connected_broker_guard); // Freeze the sender before adding it to our connections so we don't receive messages out of order. // This is to enforce message ordering @@ -107,7 +115,7 @@ where pub async fn broker_receive_loop( &self, connection_id: ConnectionId, - mut receiver: BrokerProtocolType::Receiver, + mut receiver: ::Receiver, ) -> Result<()> { while let Ok(message) = receiver.recv_message().await { match message { diff --git a/broker/src/handlers/user.rs b/broker/src/handlers/user.rs index 283a865..14325d8 100644 --- a/broker/src/handlers/user.rs +++ b/broker/src/handlers/user.rs @@ -11,20 +11,20 @@ use proto::{ }, crypto::{Scheme, Serializable}, message::Message, + UserProtocol, }; use slotmap::Key; use tracing::info; +#[cfg(feature = "local_discovery")] +use proto::discovery::DiscoveryClient; + use crate::{ get_lock, send_broadcast, send_direct, send_or_remove_many, state::ConnectionId, Inner, }; -impl< - BrokerSignatureScheme: Scheme, - BrokerProtocolType: Protocol, - UserSignatureScheme: Scheme, - UserProtocolType: Protocol, - > Inner +impl + Inner where BrokerSignatureScheme::VerificationKey: Serializable, BrokerSignatureScheme::Signature: Serializable, @@ -34,7 +34,10 @@ where /// This function handles a user (public) connection. pub async fn handle_user_connection( self: Arc, - mut connection: (UserProtocolType::Sender, UserProtocolType::Receiver), + mut connection: ( + ::Sender, + ::Receiver, + ), ) where BrokerSignatureScheme::VerificationKey: Serializable, BrokerSignatureScheme::Signature: Serializable, @@ -42,20 +45,19 @@ where UserSignatureScheme::Signature: Serializable, { // Verify (authenticate) the connection - let Ok((verification_key, topics)) = - BrokerAuth::::verify_user( - &mut connection, - &self.identity, - &mut self.redis_client.clone(), - ) - .await + let Ok((verification_key, topics)) = BrokerAuth::::verify_user( + &mut connection, + &self.identity, + &mut self.discovery_client.clone(), + ) + .await else { return; }; // Create new batch sender let (sender, receiver) = connection; - let sender = Arc::new(BatchedSender::::from( + let sender = Arc::new(BatchedSender::::from( sender, Duration::from_millis(50), 1500, @@ -74,23 +76,35 @@ where info!("received connection from user {:?}", connection_id.data()); - // If we have a small amount of users, send the updates immediately - if get_lock!(self.user_connection_lookup, read).get_connection_count() < 50 { - // TODO NEXT: Move this into just asking the task nicely to do it - let _ = self - .send_updates_to_brokers( - vec![], - get_lock!(self.broker_connection_lookup, read) - .get_all_connections() - .clone(), - ) - .await; - } + // If we are in local mode, send updates to brokers immediately. This makes + // it more strongly consistent with the tradeoff of being a bit more intensive. + #[cfg(feature = "local_discovery")] + let _ = self + .send_updates_to_brokers( + vec![], + get_lock!(self.broker_connection_lookup, read) + .get_all_connections() + .clone(), + ) + .await; + + // We want to perform a heartbeat for every user connection so that the number + // of users connected to brokers is always evenly distributed. + #[cfg(feature = "local_discovery")] + let _ = self + .discovery_client + .clone() + .perform_heartbeat( + get_lock!(self.user_connection_lookup, read).get_connection_count() as u64, + Duration::from_secs(60), + ) + .await; // This runs the main loop for receiving information from the user let () = self.user_receive_loop(connection_id, receiver).await; info!("user {:?} disconnected", connection_id.data()); + // Once the main loop ends, we remove the connection self.user_connection_lookup .write() @@ -103,7 +117,7 @@ where pub async fn user_receive_loop( &self, connection_id: ConnectionId, - mut receiver: UserProtocolType::Receiver, + mut receiver: ::Receiver, ) { while let Ok(message) = receiver.recv_message().await { match message { diff --git a/broker/src/lib.rs b/broker/src/lib.rs index 75d5bdc..0dddb6f 100644 --- a/broker/src/lib.rs +++ b/broker/src/lib.rs @@ -16,9 +16,9 @@ use proto::{ bail, connection::protocols::Protocol, crypto::{KeyPair, Scheme, Serializable}, + discovery::{BrokerIdentifier, DiscoveryClient}, error::{Error, Result}, - parse_socket_address, - redis::{self, BrokerIdentifier}, + parse_socket_address, BrokerProtocol, DiscoveryClientType, UserProtocol, }; use state::ConnectionLookup; use tokio::{select, spawn, sync::RwLock}; @@ -37,8 +37,8 @@ pub struct Config { /// The broker (private) bind address: the private-facing address we bind to. pub broker_bind_address: String, - /// The redis endpoint. We use this to maintain consistency between brokers and marshals. - pub redis_endpoint: String, + /// The discovery endpoint. We use this to maintain consistency between brokers and marshals. + pub discovery_endpoint: String, pub keypair: KeyPair, @@ -52,64 +52,51 @@ pub struct Config { struct Inner< // TODO: clean these up with some sort of generic trick or something BrokerSignatureScheme: Scheme, - BrokerProtocolType: Protocol, UserSignatureScheme: Scheme, - UserProtocolType: Protocol, > { /// A broker identifier that we can use to establish uniqueness among brokers. identity: BrokerIdentifier, - /// The (clonable) `Redis` client that we will use to maintain consistency between brokers and marshals - redis_client: redis::Client, + /// The (clonable) `Discovery` client that we will use to maintain consistency between brokers and marshals + discovery_client: DiscoveryClientType, /// The underlying (public) verification key, used to authenticate with the server. Checked /// against the stake table. keypair: KeyPair, - /// The set of all broker identities we see. Mapped against the brokers we see in `Redis` + /// The set of all broker identities we see. Mapped against the brokers we see in `Discovery` /// so that we don't connect multiple times. connected_broker_identities: RwLock>, /// A map of interests to their possible broker connections. We use this to facilitate /// where messages go. They need to be separate because of possibly different protocol /// types. - broker_connection_lookup: RwLock>, + broker_connection_lookup: RwLock>, /// A map of interests to their possible user connections. We use this to facilitate /// where messages go. They need to be separate because of possibly different protocol /// types. - user_connection_lookup: RwLock>, + user_connection_lookup: RwLock>, // connected_keys: LoggedSet, /// The `PhantomData` that we need to be generic over protocol types. - pd: PhantomData<(UserProtocolType, BrokerProtocolType, UserSignatureScheme)>, + pd: PhantomData, } /// The main `Broker` struct. We instantiate this when we want to run a broker. -pub struct Broker< - BrokerSignatureScheme: Scheme, - BrokerProtocolType: Protocol, - UserSignatureScheme: Scheme, - UserProtocolType: Protocol, -> { +pub struct Broker { /// The broker's `Inner`. We clone this and pass it around when needed. - inner: Arc< - Inner, - >, + inner: Arc>, /// The public (user -> broker) listener - user_listener: UserProtocolType::Listener, + user_listener: ::Listener, /// The private (broker <-> broker) listener - broker_listener: BrokerProtocolType::Listener, + broker_listener: ::Listener, } -impl< - BrokerSignatureScheme: Scheme, - BrokerProtocolType: Protocol, - UserSignatureScheme: Scheme, - UserProtocolType: Protocol, - > Broker +impl + Broker where BrokerSignatureScheme::VerificationKey: Serializable, BrokerSignatureScheme::Signature: Serializable, @@ -119,7 +106,7 @@ where /// Create a new `Broker` from a `Config` /// /// # Errors - /// - If we fail to create the `Redis` client + /// - If we fail to create the `Discovery` client /// - If we fail to bind to our public endpoint /// - If we fail to bind to our private endpoint pub async fn new(config: Config) -> Result { @@ -133,7 +120,7 @@ where keypair, - redis_endpoint, + discovery_endpoint, maybe_tls_cert_path, maybe_tls_key_path, } = config; @@ -144,17 +131,17 @@ where broker_advertise_address, }; - // Create the `Redis` client we will use to maintain consistency - let redis_client = bail!( - redis::Client::new(redis_endpoint, Some(identity.clone()),).await, + // Create the `Discovery` client we will use to maintain consistency + let discovery_client = bail!( + DiscoveryClientType::new(discovery_endpoint, Some(identity.clone()),).await, Parse, - "failed to create Redis client" + "failed to create discovery client" ); // Create the user (public) listener let user_bind_address = parse_socket_address!(user_bind_address); let user_listener = bail!( - UserProtocolType::bind( + ::bind( user_bind_address, maybe_tls_cert_path.clone(), maybe_tls_key_path.clone(), @@ -170,8 +157,12 @@ where // Create the broker (private) listener let broker_bind_address = parse_socket_address!(broker_bind_address); let broker_listener = bail!( - BrokerProtocolType::bind(broker_bind_address, maybe_tls_cert_path, maybe_tls_key_path,) - .await, + ::bind( + broker_bind_address, + maybe_tls_cert_path, + maybe_tls_key_path, + ) + .await, Connection, format!( "failed to bind to public (user) bind address {}", @@ -182,7 +173,7 @@ where // Create and return `Self` as wrapping an `Inner` (with things that we need to share) Ok(Self { inner: Arc::from(Inner { - redis_client, + discovery_client, identity, keypair, connected_broker_identities: RwLock::default(), @@ -200,11 +191,11 @@ where /// /// # Errors /// If any of the following tasks exit: - /// - The heartbeat (Redis) task + /// - The heartbeat (Discovery) task /// - The user connection handler /// - The broker connection handler pub async fn start(self) -> Result<()> { - // Spawn the heartbeat task, which we use to register with `Redis` every so often. + // Spawn the heartbeat task, which we use to register with `Discovery` every so often. // We also use it to check for new brokers who may have joined. // let heartbeat_task = ; let heartbeat_task = spawn(self.inner.clone().run_heartbeat_task()); diff --git a/broker/src/main.rs b/broker/src/main.rs index 6439693..4034d44 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -5,7 +5,6 @@ use broker::{Broker, Config}; use clap::Parser; use jf_primitives::signatures::bls_over_bn254::BLSOverBN254CurveSignatureScheme as BLS; use proto::{ - connection::protocols::{quic::Quic, tcp::Tcp}, crypto::{generate_random_keypair, DeterministicRng}, error::Result, }; @@ -14,9 +13,9 @@ use proto::{ #[command(author, version, about, long_about = None)] /// The main component of the push CDN. struct Args { - /// The redis endpoint (including password and scheme) to connect to - #[arg(short, long, default_value = "redis://:changeme!@127.0.0.1:6379")] - redis_endpoint: String, + /// The discovery client endpoint (including scheme) to connect to + #[arg(short, long)] + discovery_endpoint: String, /// The port to bind to for connections from users #[arg(short, long, default_value_t = 1738)] @@ -51,7 +50,7 @@ async fn main() -> Result<()> { broker_advertise_address: format!("127.0.0.1:{}", args.broker_bind_port), broker_bind_address: format!("127.0.0.1:{}", args.broker_bind_port), - redis_endpoint: args.redis_endpoint, + discovery_endpoint: args.discovery_endpoint, keypair: proto::crypto::KeyPair { verification_key, @@ -65,7 +64,7 @@ async fn main() -> Result<()> { // Create new `Broker` // Uses TCP from broker connections and Quic for user connections. - let broker = Broker::::new(broker_config).await?; + let broker = Broker::::new(broker_config).await?; // Start the main loop, consuming it broker.start().await?; diff --git a/broker/src/state.rs b/broker/src/state.rs index 5f4960c..4ac46da 100644 --- a/broker/src/state.rs +++ b/broker/src/state.rs @@ -105,7 +105,7 @@ impl ConnectionLookup { } /// Get the number of connections currently in the map. We use this to - /// report to `Redis`, so the marshal knows who has the least connections. + /// report to our discovery client, so the marshal knows who has the least connections. pub fn get_connection_count(&self) -> usize { self.connections.len() } diff --git a/broker/src/tasks/broker_listener.rs b/broker/src/tasks/broker_listener.rs index 5f3ee4c..9059cd5 100644 --- a/broker/src/tasks/broker_listener.rs +++ b/broker/src/tasks/broker_listener.rs @@ -5,18 +5,16 @@ use std::sync::Arc; use proto::{ connection::protocols::{Listener, Protocol}, crypto::{Scheme, Serializable}, + BrokerProtocol, }; use tokio::spawn; use tracing::warn; +// TODO: change connection to be named struct instead of tuple for readability purposes use crate::Inner; -impl< - BrokerSignatureScheme: Scheme, - BrokerProtocolType: Protocol, - UserSignatureScheme: Scheme, - UserProtocolType: Protocol, - > Inner +impl + Inner where BrokerSignatureScheme::VerificationKey: Serializable, BrokerSignatureScheme::Signature: Serializable, @@ -24,7 +22,10 @@ where UserSignatureScheme::Signature: Serializable, { /// Runs the broker listener task in a loop. - pub async fn run_broker_listener_task(self: Arc, listener: BrokerProtocolType::Listener) { + pub async fn run_broker_listener_task( + self: Arc, + listener: ::Listener, + ) { loop { // Accept a connection. If we fail, print the error and keep going. // diff --git a/broker/src/tasks/heartbeat.rs b/broker/src/tasks/heartbeat.rs index 80e1ef1..970edfd 100644 --- a/broker/src/tasks/heartbeat.rs +++ b/broker/src/tasks/heartbeat.rs @@ -1,38 +1,36 @@ -//! The heartbeat task periodically posts our state to Redis. +//! The heartbeat task periodically posts our state to either Redis or an embeddable file DB. use std::{sync::Arc, time::Duration}; use proto::{ connection::protocols::Protocol, crypto::{Scheme, Serializable}, + discovery::DiscoveryClient, + BrokerProtocol, }; use tokio::{spawn, time::sleep}; use tracing::error; use crate::{get_lock, Inner}; -impl< - BrokerSignatureScheme: Scheme, - BrokerProtocolType: Protocol, - UserSignatureScheme: Scheme, - UserProtocolType: Protocol, - > Inner +impl + Inner where BrokerSignatureScheme::VerificationKey: Serializable, BrokerSignatureScheme::Signature: Serializable, UserSignatureScheme::VerificationKey: Serializable, UserSignatureScheme::Signature: Serializable, { - /// This task deals with setting the number of our connected users in `Redis`. It allows + /// This task deals with setting the number of our connected users in Redis or the embedded db. It allows /// the marshal to correctly choose the broker with the least amount of connections. pub async fn run_heartbeat_task(self: Arc) { - // Clone the `Redis` client, which needs to be mutable - let mut redis_client = self.redis_client.clone(); + // Clone the `discovery` client, which needs to be mutable + let mut discovery_client = self.discovery_client.clone(); // Run this forever, unless we run into a panic (e.g. the "as" conversion.) loop { - // Register with `Redis` every n seconds, updating our number of connected users - if let Err(err) = redis_client + // Register with the discovery service every n seconds, updating our number of connected users + if let Err(err) = discovery_client .perform_heartbeat( get_lock!(self.user_connection_lookup, read).get_connection_count() as u64, Duration::from_secs(60), @@ -44,7 +42,7 @@ where } // Check for new brokers, spawning tasks to connect to them if necessary - match redis_client.get_other_brokers().await { + match discovery_client.get_other_brokers().await { Ok(brokers) => { // Calculate the difference, spawn tasks to connect to them for broker in brokers @@ -61,7 +59,9 @@ where spawn(async move { // Connect to the broker let connection = - match BrokerProtocolType::connect(to_connect_address).await { + match ::connect(&to_connect_address) + .await + { Ok(connection) => connection, Err(err) => { error!("failed to connect to broker: {err}"); diff --git a/broker/src/tasks/update.rs b/broker/src/tasks/update.rs index b1adb59..22ddff2 100644 --- a/broker/src/tasks/update.rs +++ b/broker/src/tasks/update.rs @@ -10,10 +10,11 @@ use crate::{ use crate::{new_serialized_message, send_or_remove_many}; use proto::{ bail, - connection::{batch::Position, protocols::Protocol}, + connection::batch::Position, crypto::{Scheme, Serializable}, error::{Error, Result}, message::Message, + BrokerProtocol, }; use tokio::time::sleep; use tracing::error; @@ -40,12 +41,8 @@ macro_rules! send_update_to_brokers { }}; } -impl< - BrokerSignatureScheme: Scheme, - BrokerProtocolType: Protocol, - UserSignatureScheme: Scheme, - UserProtocolType: Protocol, - > Inner +impl + Inner where BrokerSignatureScheme::VerificationKey: Serializable, BrokerSignatureScheme::Signature: Serializable, @@ -79,8 +76,8 @@ where /// on every user join if the number of connected users is sufficiently small. pub async fn send_updates_to_brokers( self: &Arc, - full: Vec<(ConnectionId, Sender)>, - partial: Vec<(ConnectionId, Sender)>, + full: Vec<(ConnectionId, Sender)>, + partial: Vec<(ConnectionId, Sender)>, ) -> Result<()> { // When a broker connects, we have to send: // 1. Our snapshot to the new broker (of what topics/users we're subscribed for) diff --git a/broker/src/tasks/user_listener.rs b/broker/src/tasks/user_listener.rs index 5713be8..998bcca 100644 --- a/broker/src/tasks/user_listener.rs +++ b/broker/src/tasks/user_listener.rs @@ -5,18 +5,15 @@ use std::sync::Arc; use proto::{ connection::protocols::{Listener, Protocol}, crypto::{Scheme, Serializable}, + UserProtocol, }; use tokio::spawn; use tracing::warn; use crate::Inner; -impl< - BrokerSignatureScheme: Scheme, - BrokerProtocolType: Protocol, - UserSignatureScheme: Scheme, - UserProtocolType: Protocol, - > Inner +impl + Inner where BrokerSignatureScheme::VerificationKey: Serializable, BrokerSignatureScheme::Signature: Serializable, @@ -24,7 +21,10 @@ where UserSignatureScheme::Signature: Serializable, { // We run the user listener task in a loop, accepting and handling new connections as needed. - pub async fn run_user_listener_task(self: Arc, listener: UserProtocolType::Listener) { + pub async fn run_user_listener_task( + self: Arc, + listener: ::Listener, + ) { loop { // Accept a connection. If we fail, print the error and keep going. // diff --git a/client/Cargo.toml b/client/Cargo.toml index f463fd7..04973e8 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -5,9 +5,14 @@ edition = "2021" description = "Defines client interactions for both marshals and brokers" +[features] +local_discovery = ["proto/local_discovery"] +insecure = ["proto/insecure"] +default = ["local_discovery", "insecure"] + [dependencies] jf-primitives.workspace = true -proto.path = "../proto" +proto = { path = "../proto", default-features = false } ark-serialize.workspace = true tokio.workspace = true tracing-subscriber.workspace = true diff --git a/client/src/main.rs b/client/src/main.rs index 3636973..7f877dc 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -55,7 +55,7 @@ async fn main() -> Result<()> { loop { // Create a big 512MB message - let m = vec![0u8; 256000000]; + let m = vec![0u8; 256_000_000]; if let Err(err) = client.send_direct_message(&other_verification_key, m) { tracing::error!("failed to send message: {}", err); diff --git a/client/src/retry.rs b/client/src/retry.rs index b51e87e..aebf11f 100644 --- a/client/src/retry.rs +++ b/client/src/retry.rs @@ -21,7 +21,7 @@ use tokio::{ sync::{Mutex, RwLock, Semaphore}, time::sleep, }; -use tracing::error; +use tracing::{error, info}; use crate::bail; @@ -256,7 +256,7 @@ where { // Make the connection to the marshal let mut connection = bail!( - ProtocolType::connect(marshal_endpoint.to_owned()).await, + ProtocolType::connect(marshal_endpoint).await, Connection, "failed to connect to endpoint" ); @@ -274,7 +274,7 @@ where // Make the connection to the broker let mut connection = bail!( - ProtocolType::connect(broker_address).await, + ProtocolType::connect(&broker_address).await, Connection, "failed to connect to broker" ); @@ -291,5 +291,7 @@ where "failed to authenticate to broker" ); + info!("connected to broker {}", broker_address); + Ok(connection) } diff --git a/local_db/db.sqlite b/local_db/db.sqlite new file mode 100644 index 0000000000000000000000000000000000000000..65f798462c42b2dc068d71910789a5ce9ce2d277 GIT binary patch literal 24576 zcmeI(PiWIn90%|u{pZTI4pDmO<>ASKm9_hG*hSMWj$+nsT?U&tLzlj}ur{svBOQpi zAs&?71ur5VM0TCSgBSJU!K)s;34$lVi+J>B10TE{3x+M z^m_O@>!Lvb0uX=z1Rwwb2tWV=5cuZ;Ta%GkGL_=D3wCw2PFvJ$=(gprqJ@eiDiTq| zyetuaCpjM{tm!ps*!r4IO`=Fk3MrM@vM9?JI#F*dDV7vzMydosGFK_iij`$@Ra*82 zIu(8kNMW&3kxGgh4Do0zN7JZfk$kx Marshal +impl Marshal where SignatureScheme::VerificationKey: Serializable, SignatureScheme::Signature: Serializable, { /// Handles a user's connection, including authentication. pub async fn handle_connection( - mut connection: (ProtocolType::Sender, ProtocolType::Receiver), - mut redis_client: redis::Client, + mut connection: ( + ::Sender, + ::Receiver, + ), + mut discovery_client: DiscoveryClientType, ) { // Verify (authenticate) the connection - let _ = MarshalAuth::::verify_user( - &mut connection, - &mut redis_client, - ) - .await; + let _ = MarshalAuth::::verify_user(&mut connection, &mut discovery_client) + .await; // We don't care about this, just drop the connection immediately. let _ = connection.0.finish().await; diff --git a/marshal/src/lib.rs b/marshal/src/lib.rs index 40d8ab5..291000e 100644 --- a/marshal/src/lib.rs +++ b/marshal/src/lib.rs @@ -11,8 +11,9 @@ use proto::{ bail, connection::protocols::{Listener, Protocol}, crypto::{Scheme, Serializable}, + discovery::DiscoveryClient, error::{Error, Result}, - redis, + DiscoveryClientType, UserProtocol, }; use tokio::spawn; use tracing::warn; @@ -20,19 +21,19 @@ use tracing::warn; /// A connection `Marshal`. The user authenticates with it, receiving a permit /// to connect to an actual broker. Think of it like a load balancer for /// the brokers. -pub struct Marshal { +pub struct Marshal { /// The underlying connection listener. Used to accept new connections. - listener: Arc, + listener: Arc<::Listener>, - /// The redis client we use to issue permits and check for brokers that are up - redis_client: redis::Client, + /// The client we use to issue permits and check for brokers that are up + discovery_client: DiscoveryClientType, /// We need this `PhantomData` to allow us to specify the signature scheme, /// protocol type, and authentication flow. pd: PhantomData, } -impl Marshal +impl Marshal where SignatureScheme::VerificationKey: Serializable, SignatureScheme::Signature: Serializable, @@ -44,7 +45,7 @@ where /// - If we fail to bind to the local address pub async fn new( bind_address: String, - redis_endpoint: String, + discovery_endpoint: String, maybe_tls_cert_path: Option, maybe_tls_key_path: Option, ) -> Result { @@ -53,22 +54,23 @@ where // Create the `Listener` from the bind address let listener = bail!( - ProtocolType::bind(bind_address, maybe_tls_cert_path, maybe_tls_key_path).await, + ::bind(bind_address, maybe_tls_cert_path, maybe_tls_key_path) + .await, Connection, format!("failed to listen to address {}", bind_address) ); - // Create the Redis client - let redis_client = bail!( - redis::Client::new(redis_endpoint.clone(), None).await, + // Create the discovery client + let discovery_client = bail!( + DiscoveryClientType::new(discovery_endpoint.clone(), None).await, Connection, - "failed to create Redis client" + "failed to create discovery client" ); // Create `Self` from the `Listener` Ok(Self { listener: Arc::from(listener), - redis_client, + discovery_client, pd: PhantomData, }) } @@ -94,8 +96,8 @@ where }; // Create a task to handle the connection - let redis_client = self.redis_client.clone(); - spawn(Self::handle_connection(connection, redis_client)); + let discovery_client = self.discovery_client.clone(); + spawn(Self::handle_connection(connection, discovery_client)); } } } diff --git a/marshal/src/main.rs b/marshal/src/main.rs index 375d6eb..a994e70 100644 --- a/marshal/src/main.rs +++ b/marshal/src/main.rs @@ -1,22 +1,39 @@ //! The following is the main `Marshal` binary, which just instantiates and runs //! a `Marshal` object. +use clap::Parser; use jf_primitives::signatures::bls_over_bn254::BLSOverBN254CurveSignatureScheme as BLS; use marshal::Marshal; -use proto::{connection::protocols::quic::Quic, error::Result}; +use proto::error::Result; //TODO: for both client and marshal, clean up and comment `main.rs` // TODO: forall, add logging where we need it +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +/// The main component of the push CDN. +struct Args { + /// The discovery client endpoint (including scheme) to connect to + #[arg(short, long)] + discovery_endpoint: String, + + /// The port to bind to for connections (from users) + #[arg(short, long, default_value_t = 8082)] + bind_port: u16, +} + #[tokio::main] async fn main() -> Result<()> { + // Parse command-line arguments + let args = Args::parse(); + // Initialize tracing tracing_subscriber::fmt::init(); // Create new `Marshal` - let marshal = Marshal::::new( - "0.0.0.0:8082".to_string(), - "redis://:changeme!@127.0.0.1:6379".to_string(), + let marshal = Marshal::::new( + format!("0.0.0.0:{}", args.bind_port), + args.discovery_endpoint, None, None, ) diff --git a/process-compose.yaml b/process-compose.yaml index 49b4d4c..7629cd2 100644 --- a/process-compose.yaml +++ b/process-compose.yaml @@ -1,39 +1,35 @@ version: "0.5" -environment: - - BROKER_REDIS_URL=redis://127.0.0.1:6379 - - BROKER_REDIS_PASSWORD=changeme! - processes: redis: command: echo 'requirepass changeme!' | keydb-server - --save "" --appendonly no marshal_0: - command: cargo run --package marshal + command: cargo run --package marshal --features insecure --no-default-features -- -d "redis://:changeme!@127.0.0.1:6379" depends_on: redis: condition: process_started broker_0: - command: cargo run --release --package broker -- -b 8082 -u 8083 + command: cargo run --package broker --features insecure --no-default-features -- -b 8082 -u 8083 -d "redis://:changeme!@127.0.0.1:6379" depends_on: redis: condition: process_started broker_1: - command: cargo run --release --package broker -- -b 8084 -u 8085 + command: cargo run --package broker --features insecure --no-default-features -- -b 8084 -u 8085 -d "redis://:changeme!@127.0.0.1:6379" depends_on: redis: condition: process_started client_0: - command: cargo run --release --bin client -- --id 0 + command: cargo run --bin client --features insecure --no-default-features -- --id 0 depends_on: marshal_0: condition: process_started client_1: - command: cargo run --release --bin client -- --id 1 + command: cargo run --bin client --features insecure --no-default-features -- --id 1 depends_on: marshal_0: condition: process_started diff --git a/proto/Cargo.toml b/proto/Cargo.toml index 11564fb..0e05204 100644 --- a/proto/Cargo.toml +++ b/proto/Cargo.toml @@ -8,8 +8,9 @@ description = "Contains the common protocol definition and common code for the b capnpc = "0.19.0" [features] -local-testing = ["rustls/dangerous_configuration"] -default = ["local-testing"] +local_discovery = ["dep:sqlx"] +insecure = ["rustls/dangerous_configuration"] +default = ["local_discovery", "insecure"] [dependencies] capnp = "0.19.1" @@ -24,6 +25,15 @@ tracing.workspace = true rustls = "0.21.10" rcgen = "0.12.0" pem = "3.0.3" -redis = { version = "0.24.0", features = ["tokio-comp", "connection-manager"] } mockall = "0.12.1" -async-trait.workspace = true \ No newline at end of file +async-trait.workspace = true + +# TODO: make this optional if local not specified +redis = { version = "0.24.0", features = ["tokio-comp", "connection-manager"] } + +sqlx = { version = "0.7.3", features = [ + "runtime-tokio", + "sqlite", + "migrate", + "time", +], optional = true } diff --git a/proto/src/connection/auth/broker.rs b/proto/src/connection/auth/broker.rs index 8601503..7dd7339 100644 --- a/proto/src/connection/auth/broker.rs +++ b/proto/src/connection/auth/broker.rs @@ -11,17 +11,18 @@ use crate::{ bail, connection::protocols::{Protocol, Receiver, Sender}, crypto::{self, DeterministicRng, KeyPair, Scheme, Serializable}, + discovery::{BrokerIdentifier, DiscoveryClient}, error::{Error, Result}, fail_verification_with_message, message::{AuthenticateResponse, AuthenticateWithKey, Message, Topic}, - redis::{self, BrokerIdentifier}, + BrokerProtocol, DiscoveryClientType, UserProtocol, }; /// This is the `BrokerAuth` struct that we define methods to for authentication purposes. -pub struct BrokerAuth { +pub struct BrokerAuth { /// We use `PhantomData` here so we can be generic over a signature scheme /// and protocol type - pub pd: PhantomData<(SignatureScheme, ProtocolType)>, + pub pd: PhantomData, } /// We use this macro upstream to conditionally order broker authentication flows @@ -30,7 +31,7 @@ pub struct BrokerAuth { macro_rules! authenticate_with_broker { ($connection: expr, $inner: expr) => { // Authenticate with the other broker, returning their reconnect address - match BrokerAuth::::authenticate_with_broker( + match BrokerAuth::::authenticate_with_broker( &mut $connection, &$inner.keypair, ) @@ -50,7 +51,7 @@ macro_rules! authenticate_with_broker { macro_rules! verify_broker { ($connection: expr, $inner: expr) => { // Verify the other broker's authentication - if let Err(err) = BrokerAuth::::verify_broker( + if let Err(err) = BrokerAuth::::verify_broker( &mut $connection, &$inner.identity, &$inner.keypair.verification_key, @@ -63,7 +64,7 @@ macro_rules! verify_broker { }; } -impl BrokerAuth +impl BrokerAuth where SignatureScheme::VerificationKey: Serializable, SignatureScheme::Signature: Serializable, @@ -77,9 +78,12 @@ where /// - If authentication fails /// - If our connection fails pub async fn verify_user( - connection: &mut (ProtocolType::Sender, ProtocolType::Receiver), + connection: &mut ( + ::Sender, + ::Receiver, + ), broker_identifier: &BrokerIdentifier, - redis_client: &mut redis::Client, + discovery_client: &mut DiscoveryClientType, ) -> Result<(Vec, Vec)> { // Receive the permit let auth_message = bail!( @@ -93,8 +97,8 @@ where fail_verification_with_message!(connection, "wrong message type"); }; - // Check the permit with `Redis` - let serialized_verification_key = match redis_client + // Check the permit + let serialized_verification_key = match discovery_client .validate_permit(broker_identifier, auth_message.permit) .await { @@ -154,7 +158,10 @@ where /// - If we fail to authenticate /// - If we have a connection failure pub async fn authenticate_with_broker( - connection: &mut (ProtocolType::Sender, ProtocolType::Receiver), + connection: &mut ( + ::Sender, + ::Receiver, + ), keypair: &KeyPair, ) -> Result { // Get the current timestamp, which we sign to avoid replay attacks @@ -245,7 +252,10 @@ where /// # Errors /// - If verification has failed pub async fn verify_broker( - connection: &mut (ProtocolType::Sender, ProtocolType::Receiver), + connection: &mut ( + ::Sender, + ::Receiver, + ), our_identifier: &BrokerIdentifier, our_verification_key: &SignatureScheme::VerificationKey, ) -> Result<()> { diff --git a/proto/src/connection/auth/marshal.rs b/proto/src/connection/auth/marshal.rs index ed74c01..4c69072 100644 --- a/proto/src/connection/auth/marshal.rs +++ b/proto/src/connection/auth/marshal.rs @@ -11,25 +11,24 @@ use crate::{ bail, connection::protocols::{Protocol, Receiver, Sender}, crypto::{self, Scheme, Serializable}, + discovery::DiscoveryClient, error::{Error, Result}, fail_verification_with_message, message::{AuthenticateResponse, Message}, - redis, + DiscoveryClientType, UserProtocol, }; /// This is the `BrokerAuth` struct that we define methods to for authentication purposes. -pub struct MarshalAuth -{ +pub struct MarshalAuth { /// We use `PhantomData` here so we can be generic over a signature scheme - /// and protocol type - pub pd: PhantomData<(SignatureScheme, ProtocolType)>, + pub pd: PhantomData, } -impl MarshalAuth +impl MarshalAuth where SignatureScheme::VerificationKey: Serializable, SignatureScheme::Signature: Serializable, - { +{ /// The authentication implementation for a marshal to a user. We take the following steps: /// 1. Receive a signed message from the user /// 2. Validate the message @@ -40,8 +39,11 @@ where /// - If authentication fails /// - If our connection fails pub async fn verify_user( - connection: &mut (ProtocolType::Sender, ProtocolType::Receiver), - redis_client: &mut redis::Client, + connection: &mut ( + ::Sender, + ::Receiver, + ), + discovery_client: &mut DiscoveryClientType, ) -> Result<()> { // Receive the signed message from the user let auth_message = bail!( @@ -90,10 +92,13 @@ where // Get the broker with the least amount of connections // TODO: do a macro for this - let broker_with_least_connections = match redis_client.get_with_least_connections().await { + let broker_with_least_connections = match discovery_client + .get_with_least_connections() + .await + { Ok(broker) => broker, Err(err) => { - error!("failed to get the broker with the least connections from Redis: {err}"); + error!("failed to get the broker with the least connections from discovery client: {err}"); fail_verification_with_message!(connection, "internal server error"); } }; @@ -101,7 +106,7 @@ where // Generate and issue a permit for said broker // TODO: add bounds check for verification key. There's the possibility it could be too big, if // verify does not check that. - let permit = match redis_client + let permit = match discovery_client .issue_permit( &broker_with_least_connections, Duration::from_secs(5), @@ -111,7 +116,7 @@ where { Ok(broker) => broker, Err(err) => { - error!("failed to issue a permit to Redis: {err}"); + error!("failed to issue a permit: {err}"); fail_verification_with_message!(connection, "internal server error"); } }; diff --git a/proto/src/connection/protocols/mod.rs b/proto/src/connection/protocols/mod.rs index 82652dd..6ace8ec 100644 --- a/proto/src/connection/protocols/mod.rs +++ b/proto/src/connection/protocols/mod.rs @@ -28,7 +28,7 @@ pub trait Protocol: Send + Sync + 'static { /// /// # Errors /// Errors if we fail to connect or if we fail to bind to the interface we want. - async fn connect(remote_endpoint: String) -> Result<(Self::Sender, Self::Receiver)>; + async fn connect(remote_endpoint: &str) -> Result<(Self::Sender, Self::Receiver)>; /// Bind to the local address, returning an instance of `Listener`. /// diff --git a/proto/src/connection/protocols/quic.rs b/proto/src/connection/protocols/quic.rs index f5f3409..6296b8d 100644 --- a/proto/src/connection/protocols/quic.rs +++ b/proto/src/connection/protocols/quic.rs @@ -6,9 +6,11 @@ use async_trait::async_trait; use quinn::{ClientConfig, Endpoint, ServerConfig}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; +#[cfg(feature = "insecure")] +use crate::crypto::SkipServerVerification; use crate::{ bail, bail_option, - crypto::{self, SkipServerVerification}, + crypto::{self}, error::{Error, Result}, message::Message, read_length_delimited, write_length_delimited, MAX_MESSAGE_SIZE, @@ -28,7 +30,7 @@ impl Protocol for Quic { type Receiver = QuicReceiver; type Listener = QuicListener; - async fn connect(remote_endpoint: String) -> Result<(QuicSender, QuicReceiver)> { + async fn connect(remote_endpoint: &str) -> Result<(QuicSender, QuicReceiver)> { // Parse the socket address let remote_address = bail_option!( bail!( @@ -61,12 +63,12 @@ impl Protocol for Quic { ); // Set up TLS configuration - #[cfg(not(feature = "local-testing"))] + #[cfg(not(feature = "insecure"))] // Production mode: native certs let config = ClientConfig::with_native_roots(); // Local testing mode: skip server verification, insecure - #[cfg(feature = "local-testing")] + #[cfg(feature = "insecure")] let config = ClientConfig::new(SkipServerVerification::new_config()); // Set default client config diff --git a/proto/src/connection/protocols/tcp.rs b/proto/src/connection/protocols/tcp.rs index 2cdf6b5..c7b59c4 100644 --- a/proto/src/connection/protocols/tcp.rs +++ b/proto/src/connection/protocols/tcp.rs @@ -37,7 +37,7 @@ impl Protocol for Tcp { /// /// # Errors /// Errors if we fail to connect or if we fail to bind to the interface we want. - async fn connect(remote_endpoint: String) -> Result<(Self::Sender, Self::Receiver)> + async fn connect(remote_endpoint: &str) -> Result<(Self::Sender, Self::Receiver)> where Self: Sized, { diff --git a/proto/src/crypto.rs b/proto/src/crypto.rs index 2ba21b9..58c8f88 100644 --- a/proto/src/crypto.rs +++ b/proto/src/crypto.rs @@ -12,8 +12,13 @@ use jf_primitives::signatures::{ }; use rand::{CryptoRng, RngCore}; use rcgen::generate_simple_self_signed; +use std::hash::Hash; + +// TODO: have `SkipServerVerify` as a separate module +#[cfg(feature = "insecure")] use rustls::ClientConfig; -use std::{hash::Hash, sync::Arc}; +#[cfg(feature = "insecure")] +use std::sync::Arc; /// We encapsulate keys here to help readability. pub struct KeyPair { @@ -123,10 +128,12 @@ pub fn generate_random_keypair Arc { Arc::new( @@ -140,6 +147,7 @@ impl SkipServerVerification { /// This is the implementation for `ServerCertVerifier` that `rustls` requires us /// to implement for server cert verification purposes. +#[cfg(feature = "insecure")] impl rustls::client::ServerCertVerifier for SkipServerVerification { fn verify_server_cert( &self, diff --git a/proto/src/discovery/embedded.rs b/proto/src/discovery/embedded.rs new file mode 100644 index 0000000..d865a65 --- /dev/null +++ b/proto/src/discovery/embedded.rs @@ -0,0 +1,258 @@ +use std::{collections::HashSet, ops::Add, time::Duration}; + +use async_trait::async_trait; +use rand::{rngs::StdRng, RngCore, SeedableRng}; +use sqlx::{query, query_as, types::time::OffsetDateTime, Row, SqlitePool}; + +use crate::{ + bail, + error::{Error, Result}, +}; + +use super::{BrokerIdentifier, DiscoveryClient}; + +#[derive(Clone)] +pub struct Embedded { + pool: SqlitePool, + identifier: BrokerIdentifier, +} + +#[derive(sqlx::FromRow)] +struct BrokerRow { + identifier: String, + num_connections: i64, + #[allow(unused)] + expiry: OffsetDateTime, +} + +#[async_trait] +impl DiscoveryClient for Embedded { + /// Create a new `Client` from the `SQLite` path and optional identifier. This is clonable, and + /// we don't have to worry about reconnections anywhere. + /// + /// # Errors + /// - If we failed to connect to the `SqlitePool` + async fn new(path: String, identity: Option) -> Result { + // Use the supplied identifier or a blank one, if we don't need/want one. + // We only "need" the identifier if we want to register + let identifier = identity.map_or_else( + || BrokerIdentifier { + user_advertise_address: String::new(), + broker_advertise_address: String::new(), + }, + |identifier| identifier, + ); + + // Open a test connection to the DB + let pool = bail!( + SqlitePool::connect(&path).await, + File, + "failed to open sqlite db" + ); + + // Return the thinly wrapped `Self`. + Ok(Self { pool, identifier }) + } + + /// (as a broker) perform the heartbeat operation. The heartbeat operation + /// consists of the following: + /// 1. Add to the list of brokers + /// 2. Set the expiry for the broker set member + /// 3. Set the number of connections + /// + /// # Errors + /// - If the `SQLite` connection fails + async fn perform_heartbeat( + &mut self, + num_connections: u64, + heartbeat_expiry: Duration, + ) -> Result<()> { + // Get the current time, add the expiry to it + let expiry = OffsetDateTime::now_utc().add(heartbeat_expiry); + + // Do some type conversions + let identifier = self.identifier.to_string(); + let num_connections = bail!( + u32::try_from(num_connections), + Parse, + "failed to parse number of connections" + ); + + // Store us as a broker with the number of connections + bail!( + query( + "INSERT or REPLACE INTO brokers (identifier, num_connections, expiry) VALUES (?, ?, ?)", + ).bind(identifier).bind(num_connections).bind(expiry).execute(&self.pool).await, + File, + "failed to insert self into brokers table" + ); + + Ok(()) + } + + /// Get the broker with the least number of connections (and permits). + /// We use this to figure out which broker gets our permit issued + /// + /// # Errors + /// - If the `SQLite` connection fails + async fn get_with_least_connections(&mut self) -> Result { + // Get all brokers + let brokers: Vec = bail!( + query_as("SELECT * from brokers") + .fetch_all(&self.pool) + .await, + File, + "failed to fetch broker list" + ); + + // Our tracker for the "least connected" broker + let (mut least_connections, mut broker_with_least_connections) = + (u64::MAX, "meowtown".to_string()); + + // Iterate over every broker + for broker in brokers { + // Delete old permits + bail!( + query("DELETE FROM permits WHERE expiry < datetime()") + .execute(&self.pool) + .await, + File, + "failed to delete old permits" + ); + + // Get the number of permits + let num_permits: u64 = u64::from( + bail!( + query("SELECT COUNT(permit) as count FROM permits WHERE identifier = ?;") + .bind(&broker.identifier) + .fetch_one(&self.pool) + .await, + File, + "failed to get permit table" + ) + .get::(0), + ); + + let total_broker_connections = num_permits + broker.num_connections as u64; + if total_broker_connections < least_connections { + least_connections = total_broker_connections; + broker_with_least_connections = broker.identifier; + } + } + + broker_with_least_connections.try_into() + } + + /// Get all other brokers, not including our own identifier (if applicable). This is so we + /// can connect to them if not already. + /// + /// # Errors + /// - If the `SQLite` connection fails + async fn get_other_brokers(&mut self) -> Result> { + // Delete old brokers + bail!( + query("DELETE FROM brokers WHERE expiry < datetime()") + .execute(&self.pool) + .await, + File, + "failed to delete old brokers" + ); + + // Get all other brokers + let brokers: Vec = bail!( + query_as::<_, BrokerRow>("SELECT * from brokers") + .fetch_all(&self.pool) + .await, + File, + "failed to get other brokers" + ); + + // Convert to broker identifiers + let mut brokers_parsed = HashSet::new(); + for broker in brokers { + brokers_parsed.insert(broker.identifier.try_into().unwrap()); + } + + // Remove ourselves + brokers_parsed.remove(&self.identifier); + + // Return all brokers (excluding ourselves) + Ok(brokers_parsed) + } + + + /// Issue a permit for a particular broker. This is separate from `get_with_least_connections` + /// because it allows for more modularity; and it isn't atomic anyway. + /// + /// # Errors + /// - If the `SQLite` connection fails + async fn issue_permit( + &mut self, + for_broker: &BrokerIdentifier, + expiry: Duration, + verification_key: Vec, + ) -> Result { + // Create random permit number + // TODO: figure out if it makes sense to initialize this somewhere else + let permit = StdRng::from_entropy().next_u32(); + + let broker = for_broker.to_string(); + + // Calculate record expiry + let expiry = OffsetDateTime::now_utc().add(expiry); + + // Insert into permits + bail!( + query( + "INSERT INTO permits (identifier, permit, user_pubkey, expiry) VALUES (?1, ?2, ?3, ?4)", + ).bind(&broker).bind(permit).bind(verification_key).bind(expiry) + .execute(&self.pool) + .await, + File, + "failed to issue permit" + ); + + Ok(permit as u64) + } + + /// Validate and remove a permit belonging to a particular broker. + /// Returns `Some(validation_key)` if successful, and `None` if not. + /// + /// # Errors + /// - If the `SQLite` connection fails + async fn validate_permit( + &mut self, + broker: &BrokerIdentifier, + permit: u64, + ) -> Result>> { + // Delete old permits + bail!( + query("DELETE FROM permits WHERE expiry < datetime()") + .execute(&self.pool) + .await, + File, + "failed to get old permits" + ); + + // Do some type conversions + let permit = bail!( + u32::try_from(permit), + Parse, + "failed to parse permit as u32" + ); + let broker = broker.to_string(); + + // Get possible permit + let res = bail!( + query("DELETE FROM permits WHERE permit=(?1) AND identifier=(?2) RETURNING *;",) + .bind(permit) + .bind(broker) + .fetch_optional(&self.pool) + .await, + File, + "failed to get permits" + ); + + Ok(res.map(|row| row.get("user_pubkey"))) + } +} diff --git a/proto/src/discovery/mod.rs b/proto/src/discovery/mod.rs new file mode 100644 index 0000000..2d79a9d --- /dev/null +++ b/proto/src/discovery/mod.rs @@ -0,0 +1,107 @@ +//! In this module we describe the `DiscoveryClient` trait. It defines a client that allows +//! us to derive a source of truth for the number of brokers, permits issued, and the +//! number of users connected per broker. + +use std::{collections::HashSet, time::Duration}; + +use async_trait::async_trait; +use std::result::Result as StdResult; + +use crate::error::{Error, Result}; + +// If local discovery is enabled, use the embedded db +#[cfg(feature = "local_discovery")] +pub mod embedded; + +// If local discovery is disabled, use Redis +#[cfg(not(feature = "local_discovery"))] +pub mod redis; + +#[async_trait] +// Defines a client that allows us to derive a source of truth for +// the number of brokers, permits issued, and the number of users connected per broker. +pub trait DiscoveryClient: Sized + Clone + Sync + Send + 'static { + /// Create a new `DiscoveryClient` from the path to it (file or otherwise) and an optional + /// identity that we store alongside our data, if appropriate. + async fn new(path: String, identity: Option) -> Result; + + /// (As a broker) perform a heartbeat. Publish our number of connections to the source of truth, + /// which expires after `heartbeat_expiry`. + async fn perform_heartbeat( + &mut self, + num_connections: u64, + heartbeat_expiry: Duration, + ) -> Result<()>; + + /// (As a marshal) get the broker with the least number of connections, for which we can issue a permit + /// for. + async fn get_with_least_connections(&mut self) -> Result; + + /// (As a broker) get other registered brokers so we can connect to new ones. + async fn get_other_brokers(&mut self) -> Result>; + + /// (As a marshal) issue a permit for a user to connect to a particular broker. + async fn issue_permit( + &mut self, + for_broker: &BrokerIdentifier, + expiry: Duration, + verification_key: Vec, + ) -> Result; + + /// (As a broker) validate a permit as existing for a broker and remove it, returning + /// the user's public key. + async fn validate_permit( + &mut self, + broker: &BrokerIdentifier, + permit: u64, + ) -> Result>>; +} + +/// Used as a unique identifier for a broker. Defines both public and private addresses. +#[derive(Eq, PartialEq, Hash, Clone, Debug)] +pub struct BrokerIdentifier { + /// The address that a broker advertises to publicly (to users) + pub user_advertise_address: String, + /// The address that a broker advertises to privately (to other brokers) + pub broker_advertise_address: String, +} + +/// We need this to convert in the opposite direction: to create a `String` +/// from a `BrokerIdentifier`. +impl std::fmt::Display for BrokerIdentifier { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!( + f, + "{}/{}", + self.user_advertise_address, self.broker_advertise_address + ) + } +} + +/// We need this to be able to convert a `String` to a broker identifier. +/// Allows us to be consistent about what we store. +impl TryFrom for BrokerIdentifier { + type Error = Error; + fn try_from(value: String) -> StdResult { + // Split the string + let mut split = value.split('/'); + + // Create a new `Self` from the split string + Ok(Self { + user_advertise_address: split + .next() + .ok_or_else(|| { + Error::Parse("failed to parse public advertise address from string".to_string()) + })? + .to_string(), + broker_advertise_address: split + .next() + .ok_or_else(|| { + Error::Parse( + "failed to parse private advertise address from string".to_string(), + ) + })? + .to_string(), + }) + } +} diff --git a/proto/src/redis.rs b/proto/src/discovery/redis.rs similarity index 81% rename from proto/src/redis.rs rename to proto/src/discovery/redis.rs index a66aec9..cdd00f8 100644 --- a/proto/src/redis.rs +++ b/proto/src/discovery/redis.rs @@ -7,35 +7,38 @@ use std::{collections::HashSet, time::Duration}; +use async_trait::async_trait; use rand::{rngs::StdRng, RngCore, SeedableRng}; use redis::aio::ConnectionManager; -use std::result::Result as StdResult; use crate::{ bail, error::{Error, Result}, }; +use super::{BrokerIdentifier, DiscoveryClient}; + /// This struct is a light wrapper around a managed `Redis` connection which encpasulates /// an operator identifier for common operations #[derive(Clone)] -pub struct Client { +pub struct Redis { /// The underlying `Redis` connection. Is managed, so we don't have to worry about reconnections underlying_connection: ConnectionManager, /// Our operator identifier (in practice, will be something like a concat of advertise addresses) identifier: BrokerIdentifier, } -impl Client { +#[async_trait] +impl DiscoveryClient for Redis { /// Create a new `Client` from the `Redis` endpoint and optional identifier. This is clonable, and /// we don't have to worry about reconnections anywhere. /// /// # Errors /// - If we couldn't parse the `Redis` endpoint - pub async fn new(endpoint: String, identity: Option) -> Result { + async fn new(path: String, identity: Option) -> Result { // Parse the `Redis` URL, creating a `redis-rs` client from it. let client = bail!( - redis::Client::open(endpoint), + redis::Client::open(path), Connection, "failed to parse `Redis` URL" ); @@ -69,7 +72,7 @@ impl Client { /// /// # Errors /// - If the `Redis` connection fails - pub async fn perform_heartbeat( + async fn perform_heartbeat( &mut self, num_connections: u64, heartbeat_expiry: Duration, @@ -144,7 +147,7 @@ impl Client { /// /// # Errors /// - If the `Redis` connection fails - pub async fn get_with_least_connections(&mut self) -> Result { + async fn get_with_least_connections(&mut self) -> Result { // Get all registered brokers let brokers: HashSet = bail!( redis::cmd("SMEMBERS") @@ -201,7 +204,7 @@ impl Client { /// /// # Errors /// - If the `Redis` connection fails - pub async fn get_other_brokers(&mut self) -> Result> { + async fn get_other_brokers(&mut self) -> Result> { // Get all registered brokers let mut brokers: HashSet = bail!( redis::cmd("SMEMBERS") @@ -230,7 +233,7 @@ impl Client { /// /// # Errors /// - If the `Redis` connection fails - pub async fn issue_permit( + async fn issue_permit( &mut self, for_broker: &BrokerIdentifier, expiry: Duration, @@ -261,7 +264,7 @@ impl Client { /// /// # Errors /// - If the `Redis` connection fails - pub async fn validate_permit( + async fn validate_permit( &mut self, broker: &BrokerIdentifier, permit: u64, @@ -277,51 +280,3 @@ impl Client { )) } } - -#[derive(Eq, PartialEq, Hash, Clone, Debug)] -pub struct BrokerIdentifier { - /// The address that a broker advertises to publicly (to users) - pub user_advertise_address: String, - /// The address that a broker advertises to privately (to other brokers) - pub broker_advertise_address: String, -} - -/// We need this to be able to convert a `String` to a broker identifier. -/// Allows us to be consistent about what we store in Redis. -impl TryFrom for BrokerIdentifier { - type Error = Error; - fn try_from(value: String) -> StdResult { - // Split the string - let mut split = value.split('/'); - - // Create a new `Self` from the split string - Ok(Self { - user_advertise_address: split - .next() - .ok_or_else(|| { - Error::Parse("failed to parse public advertise address from string".to_string()) - })? - .to_string(), - broker_advertise_address: split - .next() - .ok_or_else(|| { - Error::Parse( - "failed to parse private advertise address from string".to_string(), - ) - })? - .to_string(), - }) - } -} - -/// We need this to convert in the opposite direction: to create a `String` -/// from a `BrokerIdentifier` for `Redis` purposes. -impl std::fmt::Display for BrokerIdentifier { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!( - f, - "{}/{}", - self.user_advertise_address, self.broker_advertise_address - ) - } -} diff --git a/proto/src/lib.rs b/proto/src/lib.rs index 910b5b0..8968762 100644 --- a/proto/src/lib.rs +++ b/proto/src/lib.rs @@ -1,11 +1,27 @@ //! This crate defines the common code structures and constants used by both the //! broker client and server. +use connection::protocols::{quic::Quic, tcp::Tcp}; + pub mod connection; pub mod crypto; +pub mod discovery; pub mod error; pub mod message; -pub mod redis; + +// If local discovery mode is set, we want to use an embedded DB instead of Redis +// for brokers to discover other brokers. +#[cfg(feature = "local_discovery")] +pub type DiscoveryClientType = discovery::embedded::Embedded; + +// If local discovery mode is not set, we want to use Redis as opposed to the embedded +// DB.p +#[cfg(not(feature = "local_discovery"))] +pub type DiscoveryClientType = discovery::redis::Redis; + +// Defines the protocol types for each protocol actor. +pub type BrokerProtocol = Tcp; +pub type UserProtocol = Quic; /// Common constants used in both the client and server /// diff --git a/proto/src/message.rs b/proto/src/message.rs index 086800c..aa3c0b7 100644 --- a/proto/src/message.rs +++ b/proto/src/message.rs @@ -239,7 +239,12 @@ impl Message { let reader = bail!( serialize::read_message( bytes, - *ReaderOptions::new().traversal_limit_in_words(Some(MAX_MESSAGE_SIZE as usize)) + // TODO IMP: move this reader + *ReaderOptions::new().traversal_limit_in_words(Some(bail!( + usize::try_from(MAX_MESSAGE_SIZE), + Parse, + "maximum message size too high" + ))) ), Deserialize, "failed to create reader"