From 5b070ea7b1dc2c4e5f7a9a4717499116c4a237fa Mon Sep 17 00:00:00 2001 From: neonphog Date: Mon, 20 Jan 2025 15:57:32 -0700 Subject: [PATCH 01/14] initial tx5 integration --- Cargo.lock | 858 ++++++++++++++++++++++++++++++-- Cargo.toml | 3 + crates/transport_tx5/Cargo.toml | 18 + crates/transport_tx5/src/lib.rs | 168 +++++++ 4 files changed, 1018 insertions(+), 29 deletions(-) create mode 100644 crates/transport_tx5/Cargo.toml create mode 100644 crates/transport_tx5/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 3c1b2b9..d04835c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,24 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" +[[package]] +name = "adler32" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" + +[[package]] +name = "ahash" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "aho-corasick" version = "1.1.3" @@ -26,6 +44,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" + [[package]] name = "anstream" version = "0.6.18" @@ -81,6 +105,15 @@ version = "1.0.94" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1fd03a028ef38ba2276dce7e33fcd6369c158a1bca17946c4b1b701891c1ff7" +[[package]] +name = "arbitrary" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dde20b3d026af13f561bdd0f15edf01fc734f0dafcedbaf42bba506a9517f223" +dependencies = [ + "derive_arbitrary", +] + [[package]] name = "async-channel" version = "2.3.1" @@ -101,7 +134,7 @@ checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.90", ] [[package]] @@ -204,6 +237,35 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" +[[package]] +name = "bindgen" +version = "0.69.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "271383c67ccabffb7381723dea0672a673f292304fcb45c01cc648c7a8d58088" +dependencies = [ + "bitflags", + "cexpr", + "clang-sys", + "itertools 0.12.1", + "lazy_static", + "lazycell", + "log", + "prettyplease", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", + "syn 2.0.90", + "which", +] + +[[package]] +name = "bit_field" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc827186963e592360843fb5ba4b973e145841266c1357f7180c43526f2e5b61" + [[package]] name = "bitflags" version = "2.6.0" @@ -249,6 +311,15 @@ dependencies = [ "shlex", ] +[[package]] +name = "cexpr" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" +dependencies = [ + "nom", +] + [[package]] name = "cfg-if" version = "1.0.0" @@ -261,6 +332,17 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" +[[package]] +name = "clang-sys" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4" +dependencies = [ + "glob", + "libc", + "libloading", +] + [[package]] name = "clap" version = "4.5.23" @@ -293,7 +375,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn", + "syn 2.0.90", ] [[package]] @@ -302,6 +384,15 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6" +[[package]] +name = "cmake" +version = "0.1.52" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c682c223677e0e5b6b7f63a64b9351844c3f1b1678a68b7ee617e30fb082620e" +dependencies = [ + "cc", +] + [[package]] name = "colorchoice" version = "1.0.3" @@ -323,6 +414,57 @@ version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" +[[package]] +name = "core-foundation" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "core-foundation-sys" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" + +[[package]] +name = "core2" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b49ba7ef1ad6107f8824dbe97de947cbaac53c44e7f9756a1fba0d37c1eec505" +dependencies = [ + "memchr", +] + +[[package]] +name = "cpp_build" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27f8638c97fbd79cc6fc80b616e0e74b49bac21014faed590bbc89b7e2676c90" +dependencies = [ + "cc", + "cpp_common", + "lazy_static", + "proc-macro2", + "regex", + "syn 2.0.90", + "unicode-xid", +] + +[[package]] +name = "cpp_common" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25fcfea2ee05889597d35e986c2ad0169694320ae5cc8f6d2640a4bb8a884560" +dependencies = [ + "lazy_static", + "proc-macro2", + "syn 2.0.90", +] + [[package]] name = "cpufeatures" version = "0.2.16" @@ -391,7 +533,46 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.90", +] + +[[package]] +name = "dary_heap" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04d2cd9c18b9f454ed67da600630b021a8a80bf33f8c95896ab33aaf1c26b728" + +[[package]] +name = "data-encoding" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e60eed09d8c01d3cee5b7d30acb059b76614c918fa0f992e0dd6eeb10daad6f" + +[[package]] +name = "datachannel" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "065c0a2f521db946447fcea36f1e6a2634633568e4f32dc1384941e455763471" +dependencies = [ + "datachannel-sys", + "derivative", + "parking_lot", + "serde", + "tracing", + "webrtc-sdp", +] + +[[package]] +name = "datachannel-sys" +version = "0.21.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a1b2f94958fd5ba1ba19d88db9757906ba7849c34cc6a1e535e91c3c8cad831" +dependencies = [ + "bindgen", + "cmake", + "cpp_build", + "once_cell", + "openssl-src", ] [[package]] @@ -404,6 +585,28 @@ dependencies = [ "zeroize", ] +[[package]] +name = "derivative" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "derive_arbitrary" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30542c1ad912e0e3d22a1935c290e12e8a29d704a420177a31faad4a601a0800" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.90", +] + [[package]] name = "digest" version = "0.10.7" @@ -422,7 +625,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.90", ] [[package]] @@ -511,6 +714,18 @@ version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d" +[[package]] +name = "filetime" +version = "0.2.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35c0522e981e68cbfa8c3f978441a5f34b30b96e146b33cd3359176b50fe8586" +dependencies = [ + "cfg-if", + "libc", + "libredox", + "windows-sys 0.59.0", +] + [[package]] name = "fixedbitset" version = "0.4.2" @@ -604,7 +819,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.90", ] [[package]] @@ -664,6 +879,12 @@ version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" +[[package]] +name = "glob" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2" + [[package]] name = "gloo-timers" version = "0.3.0" @@ -688,13 +909,29 @@ dependencies = [ "futures-core", "futures-sink", "http", - "indexmap", + "indexmap 2.7.0", "slab", "tokio", "tokio-util", "tracing", ] +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" + +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +dependencies = [ + "ahash", + "allocator-api2", +] + [[package]] name = "hashbrown" version = "0.15.2" @@ -713,6 +950,15 @@ version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" +[[package]] +name = "home" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589533453244b0995c858700322199b2becb13b627df2851f64a2775d024abcf" +dependencies = [ + "windows-sys 0.59.0", +] + [[package]] name = "http" version = "1.2.0" @@ -910,7 +1156,7 @@ checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.90", ] [[package]] @@ -934,6 +1180,16 @@ dependencies = [ "icu_properties", ] +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown 0.12.3", +] + [[package]] name = "indexmap" version = "2.7.0" @@ -941,7 +1197,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62f822373a4fe84d4bb149bf54e584a7f4abec90e072ed49cda0edea5b95471f" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.15.2", +] + +[[package]] +name = "influxive-otel-atomic-obs" +version = "0.0.2-alpha.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ac0ec101d28862a46c15d6140cec376b02725160dfcf57282952898a94cf35e" +dependencies = [ + "opentelemetry_api", ] [[package]] @@ -950,6 +1215,15 @@ version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" +[[package]] +name = "itertools" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.13.0" @@ -988,7 +1262,7 @@ dependencies = [ "prost", "serde", "serde_json", - "thiserror", + "thiserror 2.0.8", "tokio", "tracing", "url", @@ -1077,18 +1351,97 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "kitsune2_transport_tx5" +version = "0.0.1-alpha" +dependencies = [ + "bytes", + "kitsune2_api", + "tokio", + "tracing", + "tx5", +] + [[package]] name = "lazy_static" version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" +[[package]] +name = "lazycell" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" + [[package]] name = "libc" version = "0.2.169" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a" +[[package]] +name = "libflate" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45d9dfdc14ea4ef0900c1cddbc8dcd553fbaacd8a4a282cf4018ae9dd04fb21e" +dependencies = [ + "adler32", + "core2", + "crc32fast", + "dary_heap", + "libflate_lz77", +] + +[[package]] +name = "libflate_lz77" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6e0d73b369f386f1c44abd9c570d5318f55ccde816ff4b562fa452e5182863d" +dependencies = [ + "core2", + "hashbrown 0.14.5", + "rle-decode-fast", +] + +[[package]] +name = "libloading" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34" +dependencies = [ + "cfg-if", + "windows-targets", +] + +[[package]] +name = "libredox" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" +dependencies = [ + "bitflags", + "libc", + "redox_syscall", +] + +[[package]] +name = "libsodium-sys-stable" +version = "1.22.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7717550bb3ec725f7b312848902d1534f332379b1d575d2347ec265c8814566" +dependencies = [ + "cc", + "libc", + "libflate", + "minisign-verify", + "pkg-config", + "tar", + "ureq", + "vcpkg", + "zip", +] + [[package]] name = "linux-raw-sys" version = "0.4.14" @@ -1111,6 +1464,12 @@ dependencies = [ "scopeguard", ] +[[package]] +name = "lockfree-object-pool" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9374ef4228402d4b7e403e5838cb880d9ee663314b0a900d5a6aabf0c213552e" + [[package]] name = "log" version = "0.4.22" @@ -1144,6 +1503,18 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + +[[package]] +name = "minisign-verify" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6367d84fb54d4242af283086402907277715b8fe46976963af5ebf173f8efba3" + [[package]] name = "miniz_oxide" version = "0.8.2" @@ -1187,7 +1558,7 @@ dependencies = [ "cfg-if", "proc-macro2", "quote", - "syn", + "syn 2.0.90", ] [[package]] @@ -1208,6 +1579,16 @@ dependencies = [ "libc", ] +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -1243,6 +1624,37 @@ version = "1.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" +[[package]] +name = "openssl-probe" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" + +[[package]] +name = "openssl-src" +version = "300.4.1+3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "faa4eac4138c62414b5622d1b31c5c304f34b406b013c079c2bbc652fdd6678c" +dependencies = [ + "cc", +] + +[[package]] +name = "opentelemetry_api" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a81f725323db1b1206ca3da8bb19874bbd3f57c3bcd59471bfb04525b265b9b" +dependencies = [ + "futures-channel", + "futures-util", + "indexmap 1.9.3", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror 1.0.69", + "urlencoding", +] + [[package]] name = "overload" version = "0.1.1" @@ -1291,7 +1703,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" dependencies = [ "fixedbitset", - "indexmap", + "indexmap 2.7.0", ] [[package]] @@ -1316,6 +1728,12 @@ dependencies = [ "spki", ] +[[package]] +name = "pkg-config" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "953ec861398dccce10c670dfeaf3ec4911ca479e9c02154b3a215178c5f566f2" + [[package]] name = "ppv-lite86" version = "0.2.20" @@ -1358,7 +1776,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "64d1ec885c64d0457d564db4ec299b2dae3f9c02808b8ad9c3a089c591b18033" dependencies = [ "proc-macro2", - "syn", + "syn 2.0.90", ] [[package]] @@ -1387,7 +1805,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0f3e5beed80eb580c68e2c600937ac2c4eedabdfd5ef1e5b7ea4f3fba84497b" dependencies = [ "heck", - "itertools", + "itertools 0.13.0", "log", "multimap", "once_cell", @@ -1396,7 +1814,7 @@ dependencies = [ "prost", "prost-types", "regex", - "syn", + "syn 2.0.90", "tempfile", ] @@ -1407,10 +1825,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "157c5a9d7ea5c2ed2d9fb8f495b64759f7816c7eaea54ba3978f0d63000162e3" dependencies = [ "anyhow", - "itertools", + "itertools 0.13.0", "proc-macro2", "quote", - "syn", + "syn 2.0.90", ] [[package]] @@ -1529,12 +1947,24 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rle-decode-fast" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3582f63211428f83597b51b2ddb88e2a91a9d52d12831f9d08f5e624e8977422" + [[package]] name = "rustc-demangle" version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rustc_version" version = "0.4.1" @@ -1557,6 +1987,20 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "rustls" +version = "0.22.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432" +dependencies = [ + "log", + "ring", + "rustls-pki-types", + "rustls-webpki", + "subtle", + "zeroize", +] + [[package]] name = "rustls" version = "0.23.20" @@ -1572,6 +2016,28 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5bfb394eeed242e909609f56089eecfe5fda225042e8b171791b9c95f5931e5" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "rustls-pki-types", + "schannel", + "security-framework", +] + +[[package]] +name = "rustls-pemfile" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "rustls-pki-types" version = "1.10.1" @@ -1601,12 +2067,76 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" +[[package]] +name = "sbd-client" +version = "0.0.9-alpha2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5bc6dac7ad1a4e0001865290c3bcf430bbbc8e238e759f9fdc51deffdb1cf918" +dependencies = [ + "base64", + "ed25519-dalek", + "futures", + "rand", + "rustls 0.22.4", + "rustls-native-certs", + "tokio", + "tokio-rustls", + "tokio-tungstenite", + "tracing", + "webpki-roots", +] + +[[package]] +name = "sbd-e2e-crypto-client" +version = "0.0.9-alpha2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50b3f2616dad53cbe25ba970243bdad92aadb37fec2caeb91462c40900f75448" +dependencies = [ + "bytes", + "sbd-client", + "sodoken", + "tokio", + "tracing", +] + +[[package]] +name = "schannel" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f29ebaa345f945cec9fbbc532eb307f0fdad8161f281b6369539c8d84876b3d" +dependencies = [ + "windows-sys 0.59.0", +] + [[package]] name = "scopeguard" version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "security-framework" +version = "2.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" +dependencies = [ + "bitflags", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49db231d56a190491cb4aeda9527f1ad45345af50b0851622a7adb8c03b01c32" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "semver" version = "1.0.24" @@ -1630,7 +2160,7 @@ checksum = "46f859dbbf73865c6627ed570e78961cd3ac92407a2d117204c49232485da55e" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.90", ] [[package]] @@ -1639,12 +2169,24 @@ version = "1.0.133" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c7fceb2473b9166b2294ef05efcb65a3db80803f0b03ef86a5fc88a2b85ee377" dependencies = [ + "indexmap 2.7.0", "itoa", "memchr", "ryu", "serde", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sha2" version = "0.10.8" @@ -1689,6 +2231,12 @@ dependencies = [ "rand_core", ] +[[package]] +name = "simd-adler32" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d66dc143e6b11c1eddc06d5c423cfc97062865baf299914ab64caa38182078fe" + [[package]] name = "slab" version = "0.4.9" @@ -1714,6 +2262,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "sodoken" +version = "0.0.901-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "888b6eb6ff4b987cd894f90d396562c9f332dfa3ab27a00c9cbc798d2f402037" +dependencies = [ + "libc", + "libsodium-sys-stable", +] + [[package]] name = "spin" version = "0.9.8" @@ -1748,6 +2306,17 @@ version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" +[[package]] +name = "syn" +version = "1.0.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "syn" version = "2.0.90" @@ -1773,7 +2342,18 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.90", +] + +[[package]] +name = "tar" +version = "0.4.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c65998313f8e17d0d553d28f91a0df93e4dbbbf770279c7bc21ca0f09ea1a1f6" +dependencies = [ + "filetime", + "libc", + "xattr", ] [[package]] @@ -1805,13 +2385,33 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f50febec83f5ee1df3015341d8bd429f2d1cc62bcba7ea2076759d315084683" +[[package]] +name = "thiserror" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" +dependencies = [ + "thiserror-impl 1.0.69", +] + [[package]] name = "thiserror" version = "2.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08f5383f3e0071702bf93ab5ee99b52d26936be9dedd9413067cbdcddcb6141a" dependencies = [ - "thiserror-impl", + "thiserror-impl 2.0.8", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.90", ] [[package]] @@ -1822,7 +2422,7 @@ checksum = "f2f357fcec90b3caef6623a099691be676d033b40a058ac95d2a6ade6fa0c943" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.90", ] [[package]] @@ -1871,7 +2471,33 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.90", +] + +[[package]] +name = "tokio-rustls" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" +dependencies = [ + "rustls 0.22.4", + "rustls-pki-types", + "tokio", +] + +[[package]] +name = "tokio-tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38" +dependencies = [ + "futures-util", + "log", + "rustls 0.22.4", + "rustls-pki-types", + "tokio", + "tokio-rustls", + "tungstenite", ] [[package]] @@ -1940,7 +2566,7 @@ checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.90", ] [[package]] @@ -1995,6 +2621,91 @@ dependencies = [ "tracing-serde", ] +[[package]] +name = "tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "rand", + "rustls 0.22.4", + "rustls-pki-types", + "sha1", + "thiserror 1.0.69", + "url", + "utf-8", +] + +[[package]] +name = "tx5" +version = "0.2.0-beta" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4810b1b8750ab391afdf66ab1daa86dc6837ce59fae152613553fa4fc76ba775" +dependencies = [ + "base64", + "futures", + "influxive-otel-atomic-obs", + "serde", + "slab", + "tokio", + "tracing", + "tx5-connection", + "tx5-core", + "url", +] + +[[package]] +name = "tx5-connection" +version = "0.2.0-beta" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2eb09810cd3bc416d08942a2946221b8a3d44aec46e3e6244666e2acccb69f9d" +dependencies = [ + "bit_field", + "datachannel", + "futures", + "serde", + "serde_json", + "tokio", + "tracing", + "tx5-core", + "tx5-signal", +] + +[[package]] +name = "tx5-core" +version = "0.2.0-beta" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "590bb5cc3a81ae6ba72a4f8a7b70f1b5fcd90aa35e7b06b6dace02c2655391e4" +dependencies = [ + "base64", + "once_cell", + "rand", + "serde", + "serde_json", + "tokio", + "tracing", + "url", +] + +[[package]] +name = "tx5-signal" +version = "0.2.0-beta" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f69f51ad82e18bc465eb64508ae6c125263b5bf8b6661e6c11fae037adc6466" +dependencies = [ + "rand", + "sbd-e2e-crypto-client", + "tokio", + "tracing", + "tx5-core", +] + [[package]] name = "typenum" version = "1.17.0" @@ -2007,6 +2718,12 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adb9e6ca4f869e1180728b7950e35922a7fc6397f7b641499e8f3ef06e50dc83" +[[package]] +name = "unicode-xid" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" + [[package]] name = "untrusted" version = "0.9.0" @@ -2023,7 +2740,7 @@ dependencies = [ "flate2", "log", "once_cell", - "rustls", + "rustls 0.23.20", "rustls-pki-types", "url", "webpki-roots", @@ -2038,8 +2755,21 @@ dependencies = [ "form_urlencoded", "idna", "percent-encoding", + "serde", ] +[[package]] +name = "urlencoding" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" + +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf16_iter" version = "1.0.5" @@ -2064,6 +2794,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.5" @@ -2097,7 +2833,7 @@ dependencies = [ "log", "proc-macro2", "quote", - "syn", + "syn 2.0.90", "wasm-bindgen-shared", ] @@ -2119,7 +2855,7 @@ checksum = "30d7a95b763d3c45903ed6c81f156801839e5ee968bb07e534c44df0fcd330c2" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.90", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -2139,6 +2875,28 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "webrtc-sdp" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a87d58624aae43577604ea137de9dcaf92793eccc4d816efad482001c2e055ca" +dependencies = [ + "log", + "url", +] + +[[package]] +name = "which" +version = "4.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7" +dependencies = [ + "either", + "home", + "once_cell", + "rustix", +] + [[package]] name = "winapi" version = "0.3.9" @@ -2255,6 +3013,17 @@ version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51" +[[package]] +name = "xattr" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e105d177a3871454f754b33bb0ee637ecaaac997446375fd3e5d43a2ed00c909" +dependencies = [ + "libc", + "linux-raw-sys", + "rustix", +] + [[package]] name = "yoke" version = "0.7.5" @@ -2275,7 +3044,7 @@ checksum = "2380878cad4ac9aac1e2435f3eb4020e8374b5f13c296cb75b4620ff8e229154" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.90", "synstructure", ] @@ -2297,7 +3066,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.90", ] [[package]] @@ -2317,7 +3086,7 @@ checksum = "595eed982f7d355beb85837f651fa22e90b3c044842dc7f2c2842c086f295808" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.90", "synstructure", ] @@ -2346,5 +3115,36 @@ checksum = "6eafa6dfb17584ea3e2bd6e76e0cc15ad7af12b09abdd1ca55961bed9b1063c6" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.90", +] + +[[package]] +name = "zip" +version = "2.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae9c1ea7b3a5e1f4b922ff856a129881167511563dc219869afe3787fc0c1a45" +dependencies = [ + "arbitrary", + "crc32fast", + "crossbeam-utils", + "displaydoc", + "flate2", + "indexmap 2.7.0", + "memchr", + "thiserror 2.0.8", + "zopfli", +] + +[[package]] +name = "zopfli" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5019f391bac5cf252e93bbcc53d039ffd62c7bfb7c150414d61369afe57e946" +dependencies = [ + "bumpalo", + "crc32fast", + "lockfree-object-pool", + "log", + "once_cell", + "simd-adler32", ] diff --git a/Cargo.toml b/Cargo.toml index 50f18f4..d1e4ef8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "crates/gossip", "crates/test_utils", "crates/tool_proto_build", + "crates/transport_tx5", ] resolver = "2" @@ -67,6 +68,8 @@ url = "2.5.4" # kitsune2 uses tracing to log events. A consumer can choose any subscriber # to receive these logs. tracing = "0.1" +# for the transport_tx5 crate +tx5 = { version = "0.2.0-beta", default-features = false } # --- tool-dependencies --- # The following workspace dependencies are thus-far only used in unpublished # tools and so are not needed in any true dependency trees. diff --git a/crates/transport_tx5/Cargo.toml b/crates/transport_tx5/Cargo.toml new file mode 100644 index 0000000..1b00771 --- /dev/null +++ b/crates/transport_tx5/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "kitsune2_transport_tx5" +version = "0.0.1-alpha" +description = "The Tx5-based transport implementation for kitsune2" +license = "Apache-2.0" +homepage = "https://github.com/holochain/kitsune2" +documentation = "https://docs.rs/kitsune2_transport_tx5" +authors = ["Holochain Core Dev Team "] +keywords = ["holochain", "holo", "p2p", "dht", "networking"] +categories = ["network-programming"] +edition = "2021" + +[dependencies] +bytes = { workspace = true } +kitsune2_api = { workspace = true } +tokio = { workspace = true, features = ["rt", "sync", "time"] } +tracing = { workspace = true } +tx5 = { version = "0.2.0-beta", default-features = false, features = ["backend-libdatachannel"] } diff --git a/crates/transport_tx5/src/lib.rs b/crates/transport_tx5/src/lib.rs new file mode 100644 index 0000000..6a9b73e --- /dev/null +++ b/crates/transport_tx5/src/lib.rs @@ -0,0 +1,168 @@ +//! The core stub transport implementation provided by Kitsune2. + +use kitsune2_api::{config::*, transport::*, *}; +use std::sync::Arc; + +/// The core stub transport implementation provided by Kitsune2. +/// This is NOT a production module. It is for testing only. +/// It will only establish "connections" within the same process. +#[derive(Debug)] +pub struct Tx5TransportFactory {} + +impl Tx5TransportFactory { + /// Construct a new Tx5TransportFactory. + pub fn create() -> DynTransportFactory { + let out: DynTransportFactory = Arc::new(Tx5TransportFactory {}); + out + } +} + +impl TransportFactory for Tx5TransportFactory { + fn default_config(&self, _config: &mut Config) -> K2Result<()> { + Ok(()) + } + + fn create( + &self, + _builder: Arc, + handler: DynTxHandler, + ) -> BoxFut<'static, K2Result> { + Box::pin(async move { + let handler = TxImpHnd::new(handler); + let imp = Tx5Transport::create(handler.clone()).await?; + Ok(DefaultTransport::create(&handler, imp)) + }) + } +} + +#[derive(Debug)] +struct Tx5Transport { + ep: tx5::Endpoint, + evt_task: tokio::task::JoinHandle<()>, +} + +impl Drop for Tx5Transport { + fn drop(&mut self) { + self.evt_task.abort(); + } +} + +impl Tx5Transport { + pub async fn create(handler: Arc) -> K2Result { + let config = Arc::new(tx5::Config { + signal_allow_plain_text: true, + backend_module: tx5::backend::BackendModule::LibDataChannel, + backend_module_config: Some( + tx5::backend::BackendModule::LibDataChannel.default_config(), + ), + ..Default::default() + }); + + let (ep, ep_recv) = tx5::Endpoint::new(config); + + if let Some(local_url) = ep + .listen( + // TODO from config + tx5::SigUrl::parse("wss://sbd.holo.host").map_err(|e| { + K2Error::other_src("parsing tx5 sig url", e) + })?, + ) + .await + { + handler.new_listening_address(Url::from_str(local_url.as_ref())?); + } + + let evt_task = tokio::task::spawn(evt_task(handler, ep_recv)); + + let out: DynTxImp = Arc::new(Self { ep, evt_task }); + + Ok(out) + } +} + +impl TxImp for Tx5Transport { + fn url(&self) -> Option { + self.ep + .get_listening_addresses() + .get(0) + .map(|u| Url::from_str(u.as_ref()).ok()) + .flatten() + } + + fn disconnect( + &self, + peer: Url, + _payload: Option<(String, bytes::Bytes)>, + ) -> BoxFut<'_, ()> { + Box::pin(async move { + if let Ok(peer) = tx5::PeerUrl::parse(&peer) { + self.ep.close(&peer); + } + }) + } + + fn send(&self, peer: Url, data: bytes::Bytes) -> BoxFut<'_, K2Result<()>> { + Box::pin(async move { + let peer = tx5::PeerUrl::parse(&peer).map_err(|e| { + K2Error::other_src("parsing peer url in tx5_transport send", e) + })?; + // TODO use bytes internally in tx5 + self.ep + .send(peer, data.to_vec()) + .await + .map_err(|e| K2Error::other_src("tx5 send error", e)) + }) + } +} + +async fn evt_task(handler: Arc, mut ep_recv: tx5::EndpointRecv) { + use tx5::EndpointEvent::*; + while let Some(evt) = ep_recv.recv().await { + match evt { + ListeningAddressOpen { local_url } => { + let local_url = match Url::from_str(local_url.as_ref()) { + Ok(local_url) => local_url, + Err(err) => { + tracing::debug!(?err, "ignoring malformed local url"); + continue; + } + }; + handler.new_listening_address(local_url); + } + ListeningAddressClosed { local_url: _ } => { + // we should probably tombstone our bootstrap entry here + } + Connected { peer_url: _ } => { + // this is handled in our preflight hook, not here + } + Disconnected { peer_url } => { + let peer_url = match Url::from_str(peer_url.as_ref()) { + Ok(peer_url) => peer_url, + Err(err) => { + tracing::debug!(?err, "ignoring malformed peer url"); + continue; + } + }; + handler.peer_disconnect(peer_url, None); + } + Message { peer_url, message } => { + let peer_url = match Url::from_str(peer_url.as_ref()) { + Ok(peer_url) => peer_url, + Err(err) => { + // TODO - ban this connection + tracing::debug!(?err, "ignoring malformed peer url"); + continue; + } + }; + // TODO - retool tx5 to use bytes internally + let message = + bytes::BytesMut::from(message.as_slice()).freeze(); + if let Err(err) = handler.recv_data(peer_url, message) { + // TODO - ban this connection + tracing::debug!(?err, "error in recv data handler"); + continue; + } + } + } + } +} From e66a304e110382b4e56aa5189902f5bb61f81bc4 Mon Sep 17 00:00:00 2001 From: neonphog Date: Tue, 21 Jan 2025 13:41:05 -0700 Subject: [PATCH 02/14] lint --- crates/transport_tx5/Cargo.toml | 4 +++- crates/transport_tx5/src/lib.rs | 5 ++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/crates/transport_tx5/Cargo.toml b/crates/transport_tx5/Cargo.toml index 1b00771..e5bfa9f 100644 --- a/crates/transport_tx5/Cargo.toml +++ b/crates/transport_tx5/Cargo.toml @@ -15,4 +15,6 @@ bytes = { workspace = true } kitsune2_api = { workspace = true } tokio = { workspace = true, features = ["rt", "sync", "time"] } tracing = { workspace = true } -tx5 = { version = "0.2.0-beta", default-features = false, features = ["backend-libdatachannel"] } +tx5 = { version = "0.2.0-beta", default-features = false, features = [ + "backend-libdatachannel", +] } diff --git a/crates/transport_tx5/src/lib.rs b/crates/transport_tx5/src/lib.rs index 6a9b73e..d0204f3 100644 --- a/crates/transport_tx5/src/lib.rs +++ b/crates/transport_tx5/src/lib.rs @@ -84,9 +84,8 @@ impl TxImp for Tx5Transport { fn url(&self) -> Option { self.ep .get_listening_addresses() - .get(0) - .map(|u| Url::from_str(u.as_ref()).ok()) - .flatten() + .first() + .and_then(|u| Url::from_str(u.as_ref()).ok()) } fn disconnect( From f52321e667a798da22264c9be9b4feae9993115b Mon Sep 17 00:00:00 2001 From: neonphog Date: Tue, 21 Jan 2025 14:46:18 -0700 Subject: [PATCH 03/14] cleanup --- Cargo.lock | 1 + crates/transport_tx5/Cargo.toml | 1 + crates/transport_tx5/src/lib.rs | 183 ++++++++++++++++++++++++++------ 3 files changed, 151 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d04835c..496d373 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1357,6 +1357,7 @@ version = "0.0.1-alpha" dependencies = [ "bytes", "kitsune2_api", + "serde", "tokio", "tracing", "tx5", diff --git a/crates/transport_tx5/Cargo.toml b/crates/transport_tx5/Cargo.toml index e5bfa9f..72f0c69 100644 --- a/crates/transport_tx5/Cargo.toml +++ b/crates/transport_tx5/Cargo.toml @@ -13,6 +13,7 @@ edition = "2021" [dependencies] bytes = { workspace = true } kitsune2_api = { workspace = true } +serde = { workspace = true } tokio = { workspace = true, features = ["rt", "sync", "time"] } tracing = { workspace = true } tx5 = { version = "0.2.0-beta", default-features = false, features = [ diff --git a/crates/transport_tx5/src/lib.rs b/crates/transport_tx5/src/lib.rs index d0204f3..f2d2b22 100644 --- a/crates/transport_tx5/src/lib.rs +++ b/crates/transport_tx5/src/lib.rs @@ -3,6 +3,57 @@ use kitsune2_api::{config::*, transport::*, *}; use std::sync::Arc; +trait PeerUrlExt { + fn to_k(&self) -> K2Result; +} + +impl PeerUrlExt for tx5::PeerUrl { + fn to_k(&self) -> K2Result { + Url::from_str(self.as_ref()) + } +} + +trait UrlExt { + fn to_peer_url(&self) -> K2Result; +} + +impl UrlExt for Url { + fn to_peer_url(&self) -> K2Result { + tx5::PeerUrl::parse(self).map_err(|e| { + K2Error::other_src("converting kitsune url to tx5 PeerUrl", e) + }) + } +} + +/// Tx5Transport configuration types. +pub mod config { + /// Configuration parameters for [Tx5TransportFactory](super::Tx5TransportFactory). + #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] + #[serde(rename_all = "camelCase")] + pub struct Tx5TransportConfig { + /// The url of the sbd signal server. E.g. `wss://sbd.kitsu.ne`. + pub server_url: String, + } + + impl Default for Tx5TransportConfig { + fn default() -> Self { + Self { + server_url: "".into(), + } + } + } + + /// Module-level configuration for Tx5Transport. + #[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)] + #[serde(rename_all = "camelCase")] + pub struct Tx5TransportModConfig { + /// Tx5Transport configuration. + pub tx5_transport: Tx5TransportConfig, + } +} + +use config::*; + /// The core stub transport implementation provided by Kitsune2. /// This is NOT a production module. It is for testing only. /// It will only establish "connections" within the same process. @@ -18,18 +69,22 @@ impl Tx5TransportFactory { } impl TransportFactory for Tx5TransportFactory { - fn default_config(&self, _config: &mut Config) -> K2Result<()> { - Ok(()) + fn default_config(&self, config: &mut Config) -> K2Result<()> { + config.set_module_config(&Tx5TransportModConfig::default()) } fn create( &self, - _builder: Arc, + builder: Arc, handler: DynTxHandler, ) -> BoxFut<'static, K2Result> { Box::pin(async move { + let config: Tx5TransportModConfig = + builder.config.get_module_config()?; let handler = TxImpHnd::new(handler); - let imp = Tx5Transport::create(handler.clone()).await?; + let imp = + Tx5Transport::create(config.tx5_transport, handler.clone()) + .await?; Ok(DefaultTransport::create(&handler, imp)) }) } @@ -37,20 +92,59 @@ impl TransportFactory for Tx5TransportFactory { #[derive(Debug)] struct Tx5Transport { - ep: tx5::Endpoint, + ep: Arc, + pre_task: tokio::task::JoinHandle<()>, evt_task: tokio::task::JoinHandle<()>, } impl Drop for Tx5Transport { fn drop(&mut self) { + self.pre_task.abort(); self.evt_task.abort(); } } +type PreCheckResp = tokio::sync::oneshot::Sender>; +type PreCheck = (tx5::PeerUrl, Vec, PreCheckResp); +type PreCheckRecv = tokio::sync::mpsc::Receiver; + impl Tx5Transport { - pub async fn create(handler: Arc) -> K2Result { - let config = Arc::new(tx5::Config { + pub async fn create( + config: Tx5TransportConfig, + handler: Arc, + ) -> K2Result { + let (pre_send, pre_recv) = tokio::sync::mpsc::channel::(1024); + + let preflight_send_handler = handler.clone(); + let tx5_config = Arc::new(tx5::Config { signal_allow_plain_text: true, + preflight: Some(( + Arc::new(move |peer_url| { + let handler = preflight_send_handler.clone(); + let peer_url = peer_url.to_k(); + Box::pin(async move { + let peer_url = + peer_url.map_err(std::io::Error::other)?; + let data = handler + .peer_connect(peer_url) + .map_err(std::io::Error::other)?; + Ok(data.to_vec()) + }) + }), + Arc::new(move |peer_url, data| { + let peer_url = peer_url.clone(); + let pre_send = pre_send.clone(); + Box::pin(async move { + let (s, r) = tokio::sync::oneshot::channel(); + pre_send + .try_send((peer_url, data, s)) + .map_err(std::io::Error::other)?; + r.await.map_err(|_| { + std::io::Error::other("channel closed") + })? + }) + }), + )), backend_module: tx5::backend::BackendModule::LibDataChannel, backend_module_config: Some( tx5::backend::BackendModule::LibDataChannel.default_config(), @@ -58,12 +152,12 @@ impl Tx5Transport { ..Default::default() }); - let (ep, ep_recv) = tx5::Endpoint::new(config); + let (ep, ep_recv) = tx5::Endpoint::new(tx5_config); + let ep = Arc::new(ep); if let Some(local_url) = ep .listen( - // TODO from config - tx5::SigUrl::parse("wss://sbd.holo.host").map_err(|e| { + tx5::SigUrl::parse(&config.server_url).map_err(|e| { K2Error::other_src("parsing tx5 sig url", e) })?, ) @@ -72,9 +166,15 @@ impl Tx5Transport { handler.new_listening_address(Url::from_str(local_url.as_ref())?); } + let pre_task = tokio::task::spawn(pre_task(handler.clone(), pre_recv)); + let evt_task = tokio::task::spawn(evt_task(handler, ep_recv)); - let out: DynTxImp = Arc::new(Self { ep, evt_task }); + let out: DynTxImp = Arc::new(Self { + ep, + pre_task, + evt_task, + }); Ok(out) } @@ -102,10 +202,8 @@ impl TxImp for Tx5Transport { fn send(&self, peer: Url, data: bytes::Bytes) -> BoxFut<'_, K2Result<()>> { Box::pin(async move { - let peer = tx5::PeerUrl::parse(&peer).map_err(|e| { - K2Error::other_src("parsing peer url in tx5_transport send", e) - })?; - // TODO use bytes internally in tx5 + let peer = peer.to_peer_url()?; + // this would be more efficient if we retool tx5 to use bytes self.ep .send(peer, data.to_vec()) .await @@ -114,12 +212,40 @@ impl TxImp for Tx5Transport { } } +fn handle_msg( + handler: &TxImpHnd, + peer_url: tx5::PeerUrl, + message: Vec, +) -> K2Result<()> { + let peer_url = match peer_url.to_k() { + Ok(peer_url) => peer_url, + Err(err) => { + return Err(K2Error::other_src("malformed peer url", err)); + } + }; + // this would be more efficient if we retool tx5 to use bytes internally + let message = bytes::BytesMut::from(message.as_slice()).freeze(); + if let Err(err) = handler.recv_data(peer_url, message) { + return Err(K2Error::other_src("error in recv data handler", err)); + } + Ok(()) +} + +async fn pre_task(handler: Arc, mut pre_recv: PreCheckRecv) { + while let Some((peer_url, message, resp)) = pre_recv.recv().await { + let _ = resp.send( + handle_msg(&handler, peer_url, message) + .map_err(std::io::Error::other), + ); + } +} + async fn evt_task(handler: Arc, mut ep_recv: tx5::EndpointRecv) { use tx5::EndpointEvent::*; while let Some(evt) = ep_recv.recv().await { match evt { ListeningAddressOpen { local_url } => { - let local_url = match Url::from_str(local_url.as_ref()) { + let local_url = match local_url.to_k() { Ok(local_url) => local_url, Err(err) => { tracing::debug!(?err, "ignoring malformed local url"); @@ -129,13 +255,14 @@ async fn evt_task(handler: Arc, mut ep_recv: tx5::EndpointRecv) { handler.new_listening_address(local_url); } ListeningAddressClosed { local_url: _ } => { - // we should probably tombstone our bootstrap entry here + // MABYE trigger tombstone of our bootstrap entry here } Connected { peer_url: _ } => { - // this is handled in our preflight hook, not here + // This is handled in our preflight hook, + // we can safely ignore this event. } Disconnected { peer_url } => { - let peer_url = match Url::from_str(peer_url.as_ref()) { + let peer_url = match peer_url.to_k() { Ok(peer_url) => peer_url, Err(err) => { tracing::debug!(?err, "ignoring malformed peer url"); @@ -145,21 +272,9 @@ async fn evt_task(handler: Arc, mut ep_recv: tx5::EndpointRecv) { handler.peer_disconnect(peer_url, None); } Message { peer_url, message } => { - let peer_url = match Url::from_str(peer_url.as_ref()) { - Ok(peer_url) => peer_url, - Err(err) => { - // TODO - ban this connection - tracing::debug!(?err, "ignoring malformed peer url"); - continue; - } - }; - // TODO - retool tx5 to use bytes internally - let message = - bytes::BytesMut::from(message.as_slice()).freeze(); - if let Err(err) = handler.recv_data(peer_url, message) { - // TODO - ban this connection - tracing::debug!(?err, "error in recv data handler"); - continue; + if let Err(err) = handle_msg(&handler, peer_url, message) { + // TODO - ban the connection + tracing::debug!(?err); } } } From bf64ff772d98062a4c15ad91eca89109f8bb54f6 Mon Sep 17 00:00:00 2001 From: neonphog Date: Tue, 21 Jan 2025 15:41:17 -0700 Subject: [PATCH 04/14] test --- Cargo.lock | 23 ++++ Cargo.toml | 3 + crates/transport_tx5/Cargo.toml | 5 + crates/transport_tx5/src/lib.rs | 3 + crates/transport_tx5/src/test.rs | 208 +++++++++++++++++++++++++++++++ 5 files changed, 242 insertions(+) create mode 100644 crates/transport_tx5/src/test.rs diff --git a/Cargo.lock b/Cargo.lock index 496d373..f3bb4ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1357,6 +1357,8 @@ version = "0.0.1-alpha" dependencies = [ "bytes", "kitsune2_api", + "kitsune2_core", + "sbd-server", "serde", "tokio", "tracing", @@ -2100,6 +2102,27 @@ dependencies = [ "tracing", ] +[[package]] +name = "sbd-server" +version = "0.0.9-alpha2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa2c5b7f97a15217e0c4378b6416a6f525202e321d1dc056f6a83d16b085547e" +dependencies = [ + "anstyle", + "base64", + "bytes", + "clap", + "ed25519-dalek", + "futures", + "rand", + "rustls 0.22.4", + "rustls-pemfile", + "slab", + "tokio", + "tokio-rustls", + "tokio-tungstenite", +] + [[package]] name = "schannel" version = "0.1.27" diff --git a/Cargo.toml b/Cargo.toml index d1e4ef8..df63ade 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -88,4 +88,7 @@ kitsune2_test_utils = { path = "crates/test_utils" } # used to hash op data sha2 = "0.10.8" +# used to test the tx5 integration +sbd-server = "0.0.9-alpha2" +# used in testing to output tracing tracing-subscriber = "0.3" diff --git a/crates/transport_tx5/Cargo.toml b/crates/transport_tx5/Cargo.toml index 72f0c69..a493be5 100644 --- a/crates/transport_tx5/Cargo.toml +++ b/crates/transport_tx5/Cargo.toml @@ -19,3 +19,8 @@ tracing = { workspace = true } tx5 = { version = "0.2.0-beta", default-features = false, features = [ "backend-libdatachannel", ] } + +[dev-dependencies] +kitsune2_core = { workspace = true } +sbd-server = { workspace = true } +tokio = { workspace = true, features = ["full"] } diff --git a/crates/transport_tx5/src/lib.rs b/crates/transport_tx5/src/lib.rs index f2d2b22..e878963 100644 --- a/crates/transport_tx5/src/lib.rs +++ b/crates/transport_tx5/src/lib.rs @@ -280,3 +280,6 @@ async fn evt_task(handler: Arc, mut ep_recv: tx5::EndpointRecv) { } } } + +#[cfg(test)] +mod test; diff --git a/crates/transport_tx5/src/test.rs b/crates/transport_tx5/src/test.rs new file mode 100644 index 0000000..de86ce2 --- /dev/null +++ b/crates/transport_tx5/src/test.rs @@ -0,0 +1,208 @@ +use super::*; +use std::sync::Mutex; + +// We don't need or want to test all of tx5 in here... that should be done +// in the tx5 repo. Here we should only test the kitsune2 integration of tx5. +// +// Specifically: +// +// - That new_listening_address is called if the sbd server is restarted +// - That peer connect / disconnect are invoked appropriately. +// - That messages can be send / received. +// - That preflight generation and checking work, which are a little weird +// because in kitsune2 the check logic is handled in the same recv_data +// callback, where tx5 handles it as a special callback. + +struct Test { + #[allow(dead_code)] + pub srv: sbd_server::SbdServer, + #[allow(dead_code)] + pub port: u16, + pub builder: Arc, +} + +impl Test { + pub async fn new() -> Self { + let srv = sbd_server::SbdServer::new(Arc::new(sbd_server::Config { + bind: vec!["127.0.0.1:0".into()], + limit_clients: 100, + disable_rate_limiting: true, + ..Default::default() + })) + .await + .unwrap(); + + let port = srv.bind_addrs().first().unwrap().port(); + + let builder = kitsune2_api::builder::Builder { + transport: Tx5TransportFactory::create(), + ..kitsune2_core::default_test_builder() + }; + + builder + .config + .set_module_config(&config::Tx5TransportModConfig { + tx5_transport: config::Tx5TransportConfig { + server_url: format!("ws://127.0.0.1:{port}"), + }, + }) + .unwrap(); + + Self { + srv, + port, + builder: Arc::new(builder), + } + } + + pub async fn build_transport(&self, handler: DynTxHandler) -> DynTransport { + self.builder + .transport + .create(self.builder.clone(), handler) + .await + .unwrap() + } +} + +struct CbHandler { + new_addr: Arc, + peer_con: Arc K2Result<()> + 'static + Send + Sync>, + peer_dis: Arc) + 'static + Send + Sync>, + pre_out: Arc K2Result + 'static + Send + Sync>, + pre_in: + Arc K2Result<()> + 'static + Send + Sync>, + space: Arc< + dyn Fn(Url, SpaceId, bytes::Bytes) -> K2Result<()> + + 'static + + Send + + Sync, + >, + module: Arc< + dyn Fn(Url, SpaceId, String, bytes::Bytes) -> K2Result<()> + + 'static + + Send + + Sync, + >, +} + +impl std::fmt::Debug for CbHandler { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CbHandler").finish() + } +} + +impl Default for CbHandler { + fn default() -> Self { + Self { + new_addr: Arc::new(|_| {}), + peer_con: Arc::new(|_| Ok(())), + peer_dis: Arc::new(|_, _| {}), + pre_out: Arc::new(|_| Ok(bytes::Bytes::new())), + pre_in: Arc::new(|_, _| Ok(())), + space: Arc::new(|_, _, _| Ok(())), + module: Arc::new(|_, _, _, _| Ok(())), + } + } +} + +impl TxBaseHandler for CbHandler { + fn new_listening_address(&self, this_url: Url) { + (self.new_addr)(this_url); + } + + fn peer_connect(&self, peer: Url) -> K2Result<()> { + (self.peer_con)(peer) + } + + fn peer_disconnect(&self, peer: Url, reason: Option) { + (self.peer_dis)(peer, reason) + } +} + +impl TxHandler for CbHandler { + fn preflight_gather_outgoing( + &self, + peer_url: Url, + ) -> K2Result { + (self.pre_out)(peer_url) + } + + fn preflight_validate_incoming( + &self, + peer_url: Url, + data: bytes::Bytes, + ) -> K2Result<()> { + (self.pre_in)(peer_url, data) + } +} + +impl TxSpaceHandler for CbHandler { + fn recv_space_notify( + &self, + peer: Url, + space: SpaceId, + data: bytes::Bytes, + ) -> K2Result<()> { + (self.space)(peer, space, data) + } +} + +impl TxModuleHandler for CbHandler { + fn recv_module_msg( + &self, + peer: Url, + space: SpaceId, + module: String, + data: bytes::Bytes, + ) -> K2Result<()> { + (self.module)(peer, space, module, data) + } +} + +const S1: SpaceId = SpaceId(id::Id(bytes::Bytes::from_static(b"space1"))); + +#[tokio::test(flavor = "multi_thread")] +async fn message_send_recv() { + let test = Test::new().await; + + let h1 = Arc::new(CbHandler { + ..Default::default() + }); + let t1 = test.build_transport(h1.clone()).await; + t1.register_space_handler(S1, h1.clone()); + t1.register_module_handler(S1, "mod".into(), h1.clone()); + + let (s_send, mut s_recv) = tokio::sync::mpsc::unbounded_channel(); + let u2 = Arc::new(Mutex::new(Url::from_str("ws://bla.bla:38/1").unwrap())); + let u2_2 = u2.clone(); + let h2 = Arc::new(CbHandler { + new_addr: Arc::new(move |url| { + *u2_2.lock().unwrap() = url; + }), + space: Arc::new(move |url, space, data| { + let _ = s_send.send((url, space, data)); + Ok(()) + }), + ..Default::default() + }); + let t2 = test.build_transport(h2.clone()).await; + t2.register_space_handler(S1, h2.clone()); + t2.register_module_handler(S1, "mod".into(), h2.clone()); + + let u2: Url = u2.lock().unwrap().clone(); + println!("got u2: {}", u2); + + t1.send_space_notify(u2, S1, bytes::Bytes::from_static(b"hello")) + .await + .unwrap(); + + let r = tokio::time::timeout(std::time::Duration::from_secs(5), async { + s_recv.recv().await + }) + .await + .unwrap() + .unwrap(); + + println!("{r:?}"); + assert_eq!(b"hello", r.2.as_ref()) +} From fcab75445a927cbbd6d43bf8b1cc61d1820bc632 Mon Sep 17 00:00:00 2001 From: neonphog Date: Wed, 22 Jan 2025 10:37:08 -0700 Subject: [PATCH 05/14] tests --- crates/transport_tx5/src/lib.rs | 48 +++++-- crates/transport_tx5/src/test.rs | 232 ++++++++++++++++++++++++++++--- 2 files changed, 251 insertions(+), 29 deletions(-) diff --git a/crates/transport_tx5/src/lib.rs b/crates/transport_tx5/src/lib.rs index e878963..f1a9949 100644 --- a/crates/transport_tx5/src/lib.rs +++ b/crates/transport_tx5/src/lib.rs @@ -1,4 +1,5 @@ -//! The core stub transport implementation provided by Kitsune2. +#![deny(missing_docs)] +//! kitsune2 tx5 transport module. use kitsune2_api::{config::*, transport::*, *}; use std::sync::Arc; @@ -54,9 +55,7 @@ pub mod config { use config::*; -/// The core stub transport implementation provided by Kitsune2. -/// This is NOT a production module. It is for testing only. -/// It will only establish "connections" within the same process. +/// Provides a Kitsune2 transport module based on the Tx5 crate. #[derive(Debug)] pub struct Tx5TransportFactory {} @@ -120,6 +119,7 @@ impl Tx5Transport { signal_allow_plain_text: true, preflight: Some(( Arc::new(move |peer_url| { + // gather any preflight data, and send to remote let handler = preflight_send_handler.clone(); let peer_url = peer_url.to_k(); Box::pin(async move { @@ -132,13 +132,20 @@ impl Tx5Transport { }) }), Arc::new(move |peer_url, data| { + // process sent preflight data let peer_url = peer_url.clone(); let pre_send = pre_send.clone(); Box::pin(async move { let (s, r) = tokio::sync::oneshot::channel(); - pre_send - .try_send((peer_url, data, s)) - .map_err(std::io::Error::other)?; + // kitsune2 expects this to be sent in the normal + // "recv_data" handler, so we need another task + // to forward that. + // + // If the app is too slow processing incoming + // preflights, reject it to close the connection. + pre_send.try_send((peer_url, data, s)).map_err( + |_| std::io::Error::other("app overloaded"), + )?; r.await.map_err(|_| { std::io::Error::other("channel closed") })? @@ -168,7 +175,8 @@ impl Tx5Transport { let pre_task = tokio::task::spawn(pre_task(handler.clone(), pre_recv)); - let evt_task = tokio::task::spawn(evt_task(handler, ep_recv)); + let evt_task = + tokio::task::spawn(evt_task(handler, ep.clone(), ep_recv)); let out: DynTxImp = Arc::new(Self { ep, @@ -231,7 +239,16 @@ fn handle_msg( Ok(()) } +struct TaskDrop(&'static str); + +impl Drop for TaskDrop { + fn drop(&mut self) { + tracing::error!(task = %self.0, "Task Ended"); + } +} + async fn pre_task(handler: Arc, mut pre_recv: PreCheckRecv) { + let _drop = TaskDrop("pre_task"); while let Some((peer_url, message, resp)) = pre_recv.recv().await { let _ = resp.send( handle_msg(&handler, peer_url, message) @@ -240,7 +257,12 @@ async fn pre_task(handler: Arc, mut pre_recv: PreCheckRecv) { } } -async fn evt_task(handler: Arc, mut ep_recv: tx5::EndpointRecv) { +async fn evt_task( + handler: Arc, + ep: Arc, + mut ep_recv: tx5::EndpointRecv, +) { + let _drop = TaskDrop("evt_task"); use tx5::EndpointEvent::*; while let Some(evt) = ep_recv.recv().await { match evt { @@ -255,7 +277,7 @@ async fn evt_task(handler: Arc, mut ep_recv: tx5::EndpointRecv) { handler.new_listening_address(local_url); } ListeningAddressClosed { local_url: _ } => { - // MABYE trigger tombstone of our bootstrap entry here + // MAYBE trigger tombstone of our bootstrap entry here } Connected { peer_url: _ } => { // This is handled in our preflight hook, @@ -272,8 +294,10 @@ async fn evt_task(handler: Arc, mut ep_recv: tx5::EndpointRecv) { handler.peer_disconnect(peer_url, None); } Message { peer_url, message } => { - if let Err(err) = handle_msg(&handler, peer_url, message) { - // TODO - ban the connection + if let Err(err) = + handle_msg(&handler, peer_url.clone(), message) + { + ep.close(&peer_url); tracing::debug!(?err); } } diff --git a/crates/transport_tx5/src/test.rs b/crates/transport_tx5/src/test.rs index de86ce2..9141dda 100644 --- a/crates/transport_tx5/src/test.rs +++ b/crates/transport_tx5/src/test.rs @@ -14,25 +14,20 @@ use std::sync::Mutex; // callback, where tx5 handles it as a special callback. struct Test { - #[allow(dead_code)] - pub srv: sbd_server::SbdServer, - #[allow(dead_code)] + pub srv: Option, pub port: u16, pub builder: Arc, } impl Test { pub async fn new() -> Self { - let srv = sbd_server::SbdServer::new(Arc::new(sbd_server::Config { - bind: vec!["127.0.0.1:0".into()], - limit_clients: 100, - disable_rate_limiting: true, - ..Default::default() - })) - .await - .unwrap(); + let mut this = Self { + srv: None, + port: 0, + builder: Arc::new(kitsune2_core::default_test_builder()), + }; - let port = srv.bind_addrs().first().unwrap().port(); + this.restart().await; let builder = kitsune2_api::builder::Builder { transport: Tx5TransportFactory::create(), @@ -43,16 +38,47 @@ impl Test { .config .set_module_config(&config::Tx5TransportModConfig { tx5_transport: config::Tx5TransportConfig { - server_url: format!("ws://127.0.0.1:{port}"), + server_url: format!("ws://127.0.0.1:{}", this.port), }, }) .unwrap(); - Self { - srv, - port, - builder: Arc::new(builder), + this.builder = Arc::new(builder); + + this + } + + pub async fn restart(&mut self) { + std::mem::drop(self.srv.take()); + + let mut srv = None; + + let mut wait_ms = 250; + for _ in 0..5 { + srv = sbd_server::SbdServer::new(Arc::new(sbd_server::Config { + bind: vec![format!("127.0.0.1:{}", self.port)], + limit_clients: 100, + disable_rate_limiting: true, + ..Default::default() + })) + .await + .ok(); + + if srv.is_some() { + break; + } + + // allow time for the original port to be cleaned up + tokio::time::sleep(std::time::Duration::from_millis(wait_ms)).await; + wait_ms *= 2; } + + if srv.is_none() { + panic!("could not start sbd server on port {}", self.port); + } + + self.port = srv.as_ref().unwrap().bind_addrs().first().unwrap().port(); + self.srv = srv; } pub async fn build_transport(&self, handler: DynTxHandler) -> DynTransport { @@ -161,6 +187,117 @@ impl TxModuleHandler for CbHandler { const S1: SpaceId = SpaceId(id::Id(bytes::Bytes::from_static(b"space1"))); +#[tokio::test(flavor = "multi_thread")] +async fn restart_addr() { + let mut test = Test::new().await; + + let addr = Arc::new(Mutex::new(Vec::new())); + let addr2 = addr.clone(); + + let h = Arc::new(CbHandler { + new_addr: Arc::new(move |url| { + addr2.lock().unwrap().push(url); + }), + ..Default::default() + }); + let _t = test.build_transport(h).await; + + assert_eq!(1, addr.lock().unwrap().len()); + + test.restart().await; + + tokio::time::timeout(std::time::Duration::from_secs(5), async { + loop { + if addr.lock().unwrap().len() == 2 { + // End the test, we're happy! + return; + } + + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + } + }) + .await + .unwrap(); +} + +#[tokio::test(flavor = "multi_thread")] +#[ignore = "disconnects currently broken in tx5"] +async fn peer_connect_disconnect() { + let test = Test::new().await; + + let u1 = Arc::new(Mutex::new(Url::from_str("ws://bla.bla:38/1").unwrap())); + let u1_2 = u1.clone(); + let h1 = Arc::new(CbHandler { + new_addr: Arc::new(move |url| { + *u1_2.lock().unwrap() = url; + }), + ..Default::default() + }); + let t1 = test.build_transport(h1.clone()).await; + + let (s_send, mut s_recv) = tokio::sync::mpsc::unbounded_channel(); + let u2 = Arc::new(Mutex::new(Url::from_str("ws://bla.bla:38/1").unwrap())); + let u2_2 = u2.clone(); + let s_send_2 = s_send.clone(); + let h2 = Arc::new(CbHandler { + new_addr: Arc::new(move |url| { + *u2_2.lock().unwrap() = url; + }), + peer_con: Arc::new(move |_| { + let _ = s_send.send("con"); + Ok(()) + }), + peer_dis: Arc::new(move |_, _| { + let _ = s_send_2.send("dis"); + }), + ..Default::default() + }); + let t2 = test.build_transport(h2.clone()).await; + + let u1: Url = u1.lock().unwrap().clone(); + println!("got u1: {}", u1); + let u2: Url = u2.lock().unwrap().clone(); + println!("got u2: {}", u2); + + // trigger a connection establish + t1.send_space_notify(u2, S1, bytes::Bytes::from_static(b"hello")) + .await + .unwrap(); + + tokio::time::timeout(std::time::Duration::from_secs(5), async { + let con = s_recv.recv().await.unwrap(); + assert_eq!("con", con); + }) + .await + .unwrap(); + + std::mem::drop(t1); + + tokio::time::timeout(std::time::Duration::from_secs(5), async { + loop { + // trigger a connection establish + t2.send_space_notify( + u1.clone(), + S1, + bytes::Bytes::from_static(b"world"), + ) + .await + .unwrap(); + + if let Ok(dis) = s_recv.try_recv() { + assert_eq!("dis", dis); + // test pass + return; + } + + // haven't received yet, wait a bit and try again + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + } + }) + .await + .unwrap(); +} + #[tokio::test(flavor = "multi_thread")] async fn message_send_recv() { let test = Test::new().await; @@ -192,10 +329,12 @@ async fn message_send_recv() { let u2: Url = u2.lock().unwrap().clone(); println!("got u2: {}", u2); + // checks that send works t1.send_space_notify(u2, S1, bytes::Bytes::from_static(b"hello")) .await .unwrap(); + // checks that recv works let r = tokio::time::timeout(std::time::Duration::from_secs(5), async { s_recv.recv().await }) @@ -206,3 +345,62 @@ async fn message_send_recv() { println!("{r:?}"); assert_eq!(b"hello", r.2.as_ref()) } + +#[tokio::test(flavor = "multi_thread")] +async fn preflight_send_recv() { + use std::sync::atomic::*; + let test = Test::new().await; + + let r1 = Arc::new(AtomicBool::new(false)); + let r1_2 = r1.clone(); + + let h1 = Arc::new(CbHandler { + pre_out: Arc::new(|_| Ok(bytes::Bytes::from_static(b"hello"))), + pre_in: Arc::new(move |_, data| { + assert_eq!(b"world", data.as_ref()); + r1_2.store(true, Ordering::SeqCst); + Ok(()) + }), + ..Default::default() + }); + let t1 = test.build_transport(h1.clone()).await; + + let r2 = Arc::new(AtomicBool::new(false)); + let r2_2 = r2.clone(); + + let u2 = Arc::new(Mutex::new(Url::from_str("ws://bla.bla:38/1").unwrap())); + let u2_2 = u2.clone(); + let h2 = Arc::new(CbHandler { + pre_out: Arc::new(|_| Ok(bytes::Bytes::from_static(b"world"))), + pre_in: Arc::new(move |_, data| { + assert_eq!(b"hello", data.as_ref()); + r2_2.store(true, Ordering::SeqCst); + Ok(()) + }), + new_addr: Arc::new(move |url| { + *u2_2.lock().unwrap() = url; + }), + ..Default::default() + }); + let _t2 = test.build_transport(h2.clone()).await; + + let u2: Url = u2.lock().unwrap().clone(); + println!("got u2: {}", u2); + + // establish a connection + t1.send_space_notify(u2, S1, bytes::Bytes::from_static(b"hello")) + .await + .unwrap(); + + tokio::time::timeout(std::time::Duration::from_secs(5), async { + loop { + if r1.load(Ordering::SeqCst) && r2.load(Ordering::SeqCst) { + // test pass + return; + } + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + } + }) + .await + .unwrap(); +} From 7677080ce90d4dab8576ad60c05bafb56730fcb2 Mon Sep 17 00:00:00 2001 From: neonphog Date: Wed, 22 Jan 2025 10:53:29 -0700 Subject: [PATCH 06/14] merge fix --- crates/transport_tx5/src/lib.rs | 6 ++++-- crates/transport_tx5/src/test.rs | 3 ++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/crates/transport_tx5/src/lib.rs b/crates/transport_tx5/src/lib.rs index f1a9949..5de4133 100644 --- a/crates/transport_tx5/src/lib.rs +++ b/crates/transport_tx5/src/lib.rs @@ -170,7 +170,9 @@ impl Tx5Transport { ) .await { - handler.new_listening_address(Url::from_str(local_url.as_ref())?); + handler + .new_listening_address(Url::from_str(local_url.as_ref())?) + .await; } let pre_task = tokio::task::spawn(pre_task(handler.clone(), pre_recv)); @@ -274,7 +276,7 @@ async fn evt_task( continue; } }; - handler.new_listening_address(local_url); + handler.new_listening_address(local_url).await; } ListeningAddressClosed { local_url: _ } => { // MAYBE trigger tombstone of our bootstrap entry here diff --git a/crates/transport_tx5/src/test.rs b/crates/transport_tx5/src/test.rs index 9141dda..85586d6 100644 --- a/crates/transport_tx5/src/test.rs +++ b/crates/transport_tx5/src/test.rs @@ -132,8 +132,9 @@ impl Default for CbHandler { } impl TxBaseHandler for CbHandler { - fn new_listening_address(&self, this_url: Url) { + fn new_listening_address(&self, this_url: Url) -> BoxFut<'static, ()> { (self.new_addr)(this_url); + Box::pin(async {}) } fn peer_connect(&self, peer: Url) -> K2Result<()> { From bfb889b01223d278a05b30ee099cfebac42cf9f3 Mon Sep 17 00:00:00 2001 From: neonphog Date: Wed, 22 Jan 2025 11:03:41 -0700 Subject: [PATCH 07/14] ignore flaky windows test --- .../src/factories/core_fetch/test/outgoing_request_queue.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/core/src/factories/core_fetch/test/outgoing_request_queue.rs b/crates/core/src/factories/core_fetch/test/outgoing_request_queue.rs index fb11a6e..5977d32 100644 --- a/crates/core/src/factories/core_fetch/test/outgoing_request_queue.rs +++ b/crates/core/src/factories/core_fetch/test/outgoing_request_queue.rs @@ -109,6 +109,10 @@ fn make_mock_transport( } #[tokio::test(flavor = "multi_thread")] +#[cfg_attr( + windows, + ignore = "outgoing_request_queue.rs:154:6: called `Result::unwrap()` on an `Err` value: Elapsed(())" +)] async fn outgoing_request_queue() { let config = CoreFetchConfig { re_insert_outgoing_request_delay_ms: 10, From 68c756523a230e63a42f47223a489de159134acf Mon Sep 17 00:00:00 2001 From: David Braden Date: Wed, 22 Jan 2025 13:03:33 -0700 Subject: [PATCH 08/14] Update crates/transport_tx5/src/test.rs Co-authored-by: ThetaSinner --- crates/transport_tx5/src/test.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/transport_tx5/src/test.rs b/crates/transport_tx5/src/test.rs index 85586d6..23be8fa 100644 --- a/crates/transport_tx5/src/test.rs +++ b/crates/transport_tx5/src/test.rs @@ -8,7 +8,7 @@ use std::sync::Mutex; // // - That new_listening_address is called if the sbd server is restarted // - That peer connect / disconnect are invoked appropriately. -// - That messages can be send / received. +// - That messages can be sent / received. // - That preflight generation and checking work, which are a little weird // because in kitsune2 the check logic is handled in the same recv_data // callback, where tx5 handles it as a special callback. From 0db5652107f1096b610db1b68c9ad72cd6b2c7fe Mon Sep 17 00:00:00 2001 From: neonphog Date: Wed, 22 Jan 2025 13:04:33 -0700 Subject: [PATCH 09/14] review comment --- crates/transport_tx5/src/lib.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/transport_tx5/src/lib.rs b/crates/transport_tx5/src/lib.rs index 5de4133..55e3cdd 100644 --- a/crates/transport_tx5/src/lib.rs +++ b/crates/transport_tx5/src/lib.rs @@ -5,11 +5,11 @@ use kitsune2_api::{config::*, transport::*, *}; use std::sync::Arc; trait PeerUrlExt { - fn to_k(&self) -> K2Result; + fn to_kitsune(&self) -> K2Result; } impl PeerUrlExt for tx5::PeerUrl { - fn to_k(&self) -> K2Result { + fn to_kitsune(&self) -> K2Result { Url::from_str(self.as_ref()) } } @@ -121,7 +121,7 @@ impl Tx5Transport { Arc::new(move |peer_url| { // gather any preflight data, and send to remote let handler = preflight_send_handler.clone(); - let peer_url = peer_url.to_k(); + let peer_url = peer_url.to_kitsune(); Box::pin(async move { let peer_url = peer_url.map_err(std::io::Error::other)?; @@ -227,7 +227,7 @@ fn handle_msg( peer_url: tx5::PeerUrl, message: Vec, ) -> K2Result<()> { - let peer_url = match peer_url.to_k() { + let peer_url = match peer_url.to_kitsune() { Ok(peer_url) => peer_url, Err(err) => { return Err(K2Error::other_src("malformed peer url", err)); @@ -269,7 +269,7 @@ async fn evt_task( while let Some(evt) = ep_recv.recv().await { match evt { ListeningAddressOpen { local_url } => { - let local_url = match local_url.to_k() { + let local_url = match local_url.to_kitsune() { Ok(local_url) => local_url, Err(err) => { tracing::debug!(?err, "ignoring malformed local url"); @@ -286,7 +286,7 @@ async fn evt_task( // we can safely ignore this event. } Disconnected { peer_url } => { - let peer_url = match peer_url.to_k() { + let peer_url = match peer_url.to_kitsune() { Ok(peer_url) => peer_url, Err(err) => { tracing::debug!(?err, "ignoring malformed peer url"); From 947666b6b597c8b964531fb9115a696add002006 Mon Sep 17 00:00:00 2001 From: neonphog Date: Wed, 22 Jan 2025 13:06:59 -0700 Subject: [PATCH 10/14] review comment --- crates/transport_tx5/src/lib.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/crates/transport_tx5/src/lib.rs b/crates/transport_tx5/src/lib.rs index 55e3cdd..ec656ed 100644 --- a/crates/transport_tx5/src/lib.rs +++ b/crates/transport_tx5/src/lib.rs @@ -92,8 +92,8 @@ impl TransportFactory for Tx5TransportFactory { #[derive(Debug)] struct Tx5Transport { ep: Arc, - pre_task: tokio::task::JoinHandle<()>, - evt_task: tokio::task::JoinHandle<()>, + pre_task: tokio::task::AbortHandle, + evt_task: tokio::task::AbortHandle, } impl Drop for Tx5Transport { @@ -175,10 +175,12 @@ impl Tx5Transport { .await; } - let pre_task = tokio::task::spawn(pre_task(handler.clone(), pre_recv)); + let pre_task = tokio::task::spawn(pre_task(handler.clone(), pre_recv)) + .abort_handle(); let evt_task = - tokio::task::spawn(evt_task(handler, ep.clone(), ep_recv)); + tokio::task::spawn(evt_task(handler, ep.clone(), ep_recv)) + .abort_handle(); let out: DynTxImp = Arc::new(Self { ep, From e281c2bc6e603afff7f03884232bb495c94e6bb6 Mon Sep 17 00:00:00 2001 From: neonphog Date: Wed, 22 Jan 2025 13:09:40 -0700 Subject: [PATCH 11/14] review comment --- crates/transport_tx5/src/lib.rs | 9 ++++++++- crates/transport_tx5/src/test.rs | 1 + 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/crates/transport_tx5/src/lib.rs b/crates/transport_tx5/src/lib.rs index ec656ed..d0cb794 100644 --- a/crates/transport_tx5/src/lib.rs +++ b/crates/transport_tx5/src/lib.rs @@ -32,6 +32,12 @@ pub mod config { #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "camelCase")] pub struct Tx5TransportConfig { + /// Allow connecting to plaintext (ws) signal server + /// instead of the default requiring TLS (wss). + /// + /// Default: false. + pub signal_allow_plain_text: bool, + /// The url of the sbd signal server. E.g. `wss://sbd.kitsu.ne`. pub server_url: String, } @@ -39,6 +45,7 @@ pub mod config { impl Default for Tx5TransportConfig { fn default() -> Self { Self { + signal_allow_plain_text: false, server_url: "".into(), } } @@ -116,7 +123,7 @@ impl Tx5Transport { let preflight_send_handler = handler.clone(); let tx5_config = Arc::new(tx5::Config { - signal_allow_plain_text: true, + signal_allow_plain_text: config.signal_allow_plain_text, preflight: Some(( Arc::new(move |peer_url| { // gather any preflight data, and send to remote diff --git a/crates/transport_tx5/src/test.rs b/crates/transport_tx5/src/test.rs index 23be8fa..3a13e2b 100644 --- a/crates/transport_tx5/src/test.rs +++ b/crates/transport_tx5/src/test.rs @@ -38,6 +38,7 @@ impl Test { .config .set_module_config(&config::Tx5TransportModConfig { tx5_transport: config::Tx5TransportConfig { + signal_allow_plain_text: true, server_url: format!("ws://127.0.0.1:{}", this.port), }, }) From 3fcf51a5a7a02739badd0e83a54021941b7bd412 Mon Sep 17 00:00:00 2001 From: neonphog Date: Wed, 22 Jan 2025 13:22:56 -0700 Subject: [PATCH 12/14] review comment --- crates/transport_tx5/src/test.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/crates/transport_tx5/src/test.rs b/crates/transport_tx5/src/test.rs index 3a13e2b..4bbafa3 100644 --- a/crates/transport_tx5/src/test.rs +++ b/crates/transport_tx5/src/test.rs @@ -27,6 +27,8 @@ impl Test { builder: Arc::new(kitsune2_core::default_test_builder()), }; + // Note the `port: 0` above, so we get a free port the first time. + // This restart function will set the port to the actual value. this.restart().await; let builder = kitsune2_api::builder::Builder { @@ -49,6 +51,12 @@ impl Test { this } + /// Restart the sbd server, but re-use the port we first got in our + /// constructor so that already configured transports being tested + /// will be able to find the server automatically again. + /// + /// There is a small chance something else could grab the port + /// in the mean time, and this will error/flake. pub async fn restart(&mut self) { std::mem::drop(self.srv.take()); From a52181013a9e7eeab19fde3c297632256b24c4a0 Mon Sep 17 00:00:00 2001 From: neonphog Date: Wed, 22 Jan 2025 13:34:13 -0700 Subject: [PATCH 13/14] review comment --- Cargo.lock | 1 + crates/transport_tx5/Cargo.toml | 1 + crates/transport_tx5/src/test.rs | 43 ++++++++++++++++++++------------ 3 files changed, 29 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index aa971ac..1a4f7f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1359,6 +1359,7 @@ dependencies = [ "bytes", "kitsune2_api", "kitsune2_core", + "kitsune2_test_utils", "sbd-server", "serde", "tokio", diff --git a/crates/transport_tx5/Cargo.toml b/crates/transport_tx5/Cargo.toml index a493be5..7468aeb 100644 --- a/crates/transport_tx5/Cargo.toml +++ b/crates/transport_tx5/Cargo.toml @@ -22,5 +22,6 @@ tx5 = { version = "0.2.0-beta", default-features = false, features = [ [dev-dependencies] kitsune2_core = { workspace = true } +kitsune2_test_utils = { workspace = true } sbd-server = { workspace = true } tokio = { workspace = true, features = ["full"] } diff --git a/crates/transport_tx5/src/test.rs b/crates/transport_tx5/src/test.rs index 4bbafa3..370f864 100644 --- a/crates/transport_tx5/src/test.rs +++ b/crates/transport_tx5/src/test.rs @@ -1,4 +1,5 @@ use super::*; +use kitsune2_test_utils::space::TEST_SPACE_ID; use std::sync::Mutex; // We don't need or want to test all of tx5 in here... that should be done @@ -195,8 +196,6 @@ impl TxModuleHandler for CbHandler { } } -const S1: SpaceId = SpaceId(id::Id(bytes::Bytes::from_static(b"space1"))); - #[tokio::test(flavor = "multi_thread")] async fn restart_addr() { let mut test = Test::new().await; @@ -270,9 +269,13 @@ async fn peer_connect_disconnect() { println!("got u2: {}", u2); // trigger a connection establish - t1.send_space_notify(u2, S1, bytes::Bytes::from_static(b"hello")) - .await - .unwrap(); + t1.send_space_notify( + u2, + TEST_SPACE_ID, + bytes::Bytes::from_static(b"hello"), + ) + .await + .unwrap(); tokio::time::timeout(std::time::Duration::from_secs(5), async { let con = s_recv.recv().await.unwrap(); @@ -288,7 +291,7 @@ async fn peer_connect_disconnect() { // trigger a connection establish t2.send_space_notify( u1.clone(), - S1, + TEST_SPACE_ID, bytes::Bytes::from_static(b"world"), ) .await @@ -316,8 +319,8 @@ async fn message_send_recv() { ..Default::default() }); let t1 = test.build_transport(h1.clone()).await; - t1.register_space_handler(S1, h1.clone()); - t1.register_module_handler(S1, "mod".into(), h1.clone()); + t1.register_space_handler(TEST_SPACE_ID, h1.clone()); + t1.register_module_handler(TEST_SPACE_ID, "mod".into(), h1.clone()); let (s_send, mut s_recv) = tokio::sync::mpsc::unbounded_channel(); let u2 = Arc::new(Mutex::new(Url::from_str("ws://bla.bla:38/1").unwrap())); @@ -333,16 +336,20 @@ async fn message_send_recv() { ..Default::default() }); let t2 = test.build_transport(h2.clone()).await; - t2.register_space_handler(S1, h2.clone()); - t2.register_module_handler(S1, "mod".into(), h2.clone()); + t2.register_space_handler(TEST_SPACE_ID, h2.clone()); + t2.register_module_handler(TEST_SPACE_ID, "mod".into(), h2.clone()); let u2: Url = u2.lock().unwrap().clone(); println!("got u2: {}", u2); // checks that send works - t1.send_space_notify(u2, S1, bytes::Bytes::from_static(b"hello")) - .await - .unwrap(); + t1.send_space_notify( + u2, + TEST_SPACE_ID, + bytes::Bytes::from_static(b"hello"), + ) + .await + .unwrap(); // checks that recv works let r = tokio::time::timeout(std::time::Duration::from_secs(5), async { @@ -398,9 +405,13 @@ async fn preflight_send_recv() { println!("got u2: {}", u2); // establish a connection - t1.send_space_notify(u2, S1, bytes::Bytes::from_static(b"hello")) - .await - .unwrap(); + t1.send_space_notify( + u2, + TEST_SPACE_ID, + bytes::Bytes::from_static(b"hello"), + ) + .await + .unwrap(); tokio::time::timeout(std::time::Duration::from_secs(5), async { loop { From 9391e796d90049ea075f2386f6e0d4076893d75d Mon Sep 17 00:00:00 2001 From: neonphog Date: Wed, 22 Jan 2025 13:59:29 -0700 Subject: [PATCH 14/14] windows oddity --- crates/transport_tx5/src/test.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/transport_tx5/src/test.rs b/crates/transport_tx5/src/test.rs index 370f864..0338499 100644 --- a/crates/transport_tx5/src/test.rs +++ b/crates/transport_tx5/src/test.rs @@ -211,13 +211,14 @@ async fn restart_addr() { }); let _t = test.build_transport(h).await; - assert_eq!(1, addr.lock().unwrap().len()); + let init_len = addr.lock().unwrap().len(); + assert!(init_len > 0); test.restart().await; tokio::time::timeout(std::time::Duration::from_secs(5), async { loop { - if addr.lock().unwrap().len() == 2 { + if addr.lock().unwrap().len() > init_len { // End the test, we're happy! return; }