From 6920f6d3f30d6bc9c9c7eefeb5c7c1bcc87fa0c8 Mon Sep 17 00:00:00 2001 From: Vadim Date: Fri, 17 May 2024 13:09:59 +0200 Subject: [PATCH 01/11] feat: update solana crates --- Cargo.lock | 415 ++++++++++++++++++++----------- plerkle/Cargo.toml | 8 +- plerkle_serialization/Cargo.toml | 4 +- 3 files changed, 270 insertions(+), 157 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a2bbda0..33d3f24 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -81,7 +81,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2c99f64d1e06488f620f932677e24bc6e2897582980441ae90a671415bd7ec2f" dependencies = [ "cfg-if", - "getrandom 0.2.10", "once_cell", "version_check", ] @@ -309,7 +308,7 @@ checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.64", ] [[package]] @@ -361,9 +360,9 @@ checksum = "3441f0f7b02788e948e47f457ca01f1d7e6d92c693bc132c22b087d3141c03ff" [[package]] name = "base64" -version = "0.21.4" +version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ba43ea6f343b788c8764558649e08df62f86c6ef251fdaeb1ffd010a9ae50a2" +checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" [[package]] name = "bincode" @@ -382,9 +381,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.4.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4682ae6287fcf752ecaabbfcc7b6f9b72aa33933dc23a554d853aea8eea8635" +checksum = "cf4b9d6a944f767f8e5e0db018570623c85f3d925ac718db4e06d0187adb21c1" dependencies = [ "serde", ] @@ -457,6 +456,16 @@ dependencies = [ "hashbrown 0.13.2", ] +[[package]] +name = "borsh" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbe5b10e214954177fb1dc9fbd20a1a2608fe99e6c832033bdc7cea287a20d77" +dependencies = [ + "borsh-derive 1.5.0", + "cfg_aliases", +] + [[package]] name = "borsh-derive" version = "0.9.3" @@ -483,6 +492,20 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "borsh-derive" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7a8646f94ab393e43e8b35a2558b1624bed28b97ee09c5d15456e3c9463f46d" +dependencies = [ + "once_cell", + "proc-macro-crate 3.1.0", + "proc-macro2", + "quote", + "syn 2.0.64", + "syn_derive", +] + [[package]] name = "borsh-derive-internal" version = "0.9.3" @@ -572,9 +595,9 @@ dependencies = [ [[package]] name = "bytemuck" -version = "1.14.0" +version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "374d28ec25809ee0e23827c2ab573d729e293f281dfe393500e7ad618baa61c6" +checksum = "78834c15cb5d5efe3452d58b1e8ba890dd62d21907f867f383358198e56ebca5" dependencies = [ "bytemuck_derive", ] @@ -587,7 +610,7 @@ checksum = "965ab7eb5f8f97d2a083c799f3a1b994fc397b2fe2da5d1da1626ce15a39f2b1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.64", ] [[package]] @@ -636,6 +659,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "cfg_aliases" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e" + [[package]] name = "chrono" version = "0.4.31" @@ -749,11 +778,10 @@ dependencies = [ [[package]] name = "crossbeam-channel" -version = "0.5.8" +version = "0.5.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200" +checksum = "ab3db02a9c5b5121e1e42fbdb1aeb65f5e02624cc58c43f2884c6ccac0b82f95" dependencies = [ - "cfg-if", "crossbeam-utils", ] @@ -783,12 +811,9 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.16" +version = "0.8.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294" -dependencies = [ - "cfg-if", -] +checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" [[package]] name = "crunchy" @@ -860,7 +885,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.38", + "syn 2.0.64", ] [[package]] @@ -871,7 +896,7 @@ checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5" dependencies = [ "darling_core", "quote", - "syn 2.0.38", + "syn 2.0.64", ] [[package]] @@ -982,22 +1007,22 @@ dependencies = [ [[package]] name = "enum-iterator" -version = "1.4.1" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7add3873b5dd076766ee79c8e406ad1a472c385476b9e38849f8eec24f1be689" +checksum = "9fd242f399be1da0a5354aa462d57b4ab2b4ee0683cc552f7c007d2d12d36e94" dependencies = [ "enum-iterator-derive", ] [[package]] name = "enum-iterator-derive" -version = "1.2.1" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eecf8589574ce9b895052fa12d69af7a233f99e6107f5cb8dd1044f2a17bfdcb" +checksum = "a1ab991c1362ac86c61ab6f556cff143daa22e5a15e4e189df818b2fd19fe65b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.64", ] [[package]] @@ -1168,7 +1193,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.64", ] [[package]] @@ -1565,9 +1590,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.64" +version = "0.3.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5f195fe497f702db0f318b07fdd68edb16955aed830df8363d837542f8f935a" +checksum = "29c15563dc2726973df627357ce0c9ddddbea194836909d655df6a75d2cf296d" dependencies = [ "wasm-bindgen", ] @@ -1589,9 +1614,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.149" +version = "0.2.153" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b" +checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" [[package]] name = "libsecp256k1" @@ -1643,12 +1668,13 @@ dependencies = [ [[package]] name = "light-poseidon" -version = "0.1.2" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5b439809cdfc0d86ecc7317f1724df13dfa665df48991b79e90e689411451f7" +checksum = "3c9a85a9752c549ceb7578064b4ed891179d20acd85f27318573b64d2d7ee7ee" dependencies = [ "ark-bn254", "ark-ff", + "num-bigint 0.4.4", "thiserror", ] @@ -1838,7 +1864,7 @@ checksum = "cfb77679af88f8b125209d354a202862602672222e7f2313fdd6dc349bad4712" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.64", ] [[package]] @@ -1904,11 +1930,11 @@ dependencies = [ [[package]] name = "num_enum" -version = "0.7.0" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70bf6736f74634d299d00086f02986875b3c2d924781a6a2cb6c201e73da0ceb" +checksum = "02339744ee7253741199f897151b38e72257d13802d4ee837285cc2990a90845" dependencies = [ - "num_enum_derive 0.7.0", + "num_enum_derive 0.7.2", ] [[package]] @@ -1920,19 +1946,19 @@ dependencies = [ "proc-macro-crate 1.3.1", "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.64", ] [[package]] name = "num_enum_derive" -version = "0.7.0" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56ea360eafe1022f7cc56cd7b869ed57330fb2453d0c7831d99b74c65d2f5597" +checksum = "681030a937600a36906c185595136d26abfebb4aa9c65701cefcaf8578bb982b" dependencies = [ "proc-macro-crate 1.3.1", "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.64", ] [[package]] @@ -1962,7 +1988,7 @@ version = "0.10.57" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bac25ee399abb46215765b1cb35bc0212377e58a061560d8b29b024fd0430e7c" dependencies = [ - "bitflags 2.4.0", + "bitflags 2.5.0", "cfg-if", "foreign-types", "libc", @@ -1979,7 +2005,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.64", ] [[package]] @@ -2073,7 +2099,7 @@ dependencies = [ "proc-macro2", "proc-macro2-diagnostics", "quote", - "syn 2.0.38", + "syn 2.0.64", ] [[package]] @@ -2120,7 +2146,7 @@ name = "plerkle" version = "1.6.0" dependencies = [ "async-trait", - "base64 0.21.4", + "base64 0.21.7", "bs58", "bytemuck", "cadence", @@ -2213,14 +2239,46 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f4c021e1093a56626774e81216a4ce732a735e5bad4868a03f3ed65ca0c3919" dependencies = [ "once_cell", - "toml_edit", + "toml_edit 0.19.15", +] + +[[package]] +name = "proc-macro-crate" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d37c51ca738a55da99dc0c4a34860fd675453b8b36209178c2249bb13651284" +dependencies = [ + "toml_edit 0.21.1", +] + +[[package]] +name = "proc-macro-error" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" +dependencies = [ + "proc-macro-error-attr", + "proc-macro2", + "quote", + "version_check", +] + +[[package]] +name = "proc-macro-error-attr" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" +dependencies = [ + "proc-macro2", + "quote", + "version_check", ] [[package]] name = "proc-macro2" -version = "1.0.69" +version = "1.0.82" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "134c189feb4956b20f6f547d2cf727d4c0fe06722b20a0eec87ed445a97f92da" +checksum = "8ad3d49ab951a01fbaafe34f2ec74122942fe18a3f9814c3268f1bb72042131b" dependencies = [ "unicode-ident", ] @@ -2233,7 +2291,7 @@ checksum = "af066a9c399a26e020ada66a034357a868728e72cd426f3adcd35f80d88d88c8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.64", "version_check", "yansi", ] @@ -2255,14 +2313,14 @@ checksum = "9e2e25ee72f5b24d773cae88422baddefff7714f97aab68d96fe2b6fc4a28fb2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.64", ] [[package]] name = "quote" -version = "1.0.33" +version = "1.0.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae" +checksum = "0fa76aaf39101c457836aec0ce2316dbdc3ab723cdda1c6bd4e6ad4208acaca7" dependencies = [ "proc-macro2", ] @@ -2446,12 +2504,12 @@ checksum = "56d84fdd47036b038fc80dd333d10b6aab10d5d31f4a366e20014def75328d33" [[package]] name = "reqwest" -version = "0.11.22" +version = "0.11.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "046cd98826c46c2ac8ddecae268eb5c2e58628688a5fc7a2643704a73faba95b" +checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62" dependencies = [ "async-compression", - "base64 0.21.4", + "base64 0.21.7", "bytes", "encoding_rs", "futures-core", @@ -2473,6 +2531,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", + "sync_wrapper", "system-configuration", "tokio", "tokio-rustls", @@ -2528,7 +2587,7 @@ version = "0.38.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "745ecfa778e66b2b63c88a61cb36e0eea109e803b0b86bf9879fbc77c70e86ed" dependencies = [ - "bitflags 2.4.0", + "bitflags 2.5.0", "errno", "libc", "linux-raw-sys", @@ -2553,7 +2612,7 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2d3987094b1d07b653b7dfdc3f70ce9a1da9c51ac18c1b06b662e4f9a0e9f4b2" dependencies = [ - "base64 0.21.4", + "base64 0.21.7", ] [[package]] @@ -2610,7 +2669,7 @@ checksum = "1db149f81d46d2deba7cd3c50772474707729550221e69588478ebf9ada425ae" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.64", ] [[package]] @@ -2654,38 +2713,38 @@ checksum = "836fa6a3e1e547f9a2c4040802ec865b5d85f4014efe00555d7090a3dcaa1090" [[package]] name = "serde" -version = "1.0.189" +version = "1.0.202" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e422a44e74ad4001bdc8eede9a4570ab52f71190e9c076d14369f38b9200537" +checksum = "226b61a0d411b2ba5ff6d7f73a476ac4f8bb900373459cd00fab8512828ba395" dependencies = [ "serde_derive", ] [[package]] name = "serde_bytes" -version = "0.11.12" +version = "0.11.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab33ec92f677585af6d88c65593ae2375adde54efdbf16d597f2cbc7a6d368ff" +checksum = "8b8497c313fd43ab992087548117643f6fcd935cbf36f176ffda0aacf9591734" dependencies = [ "serde", ] [[package]] name = "serde_derive" -version = "1.0.189" +version = "1.0.202" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e48d1f918009ce3145511378cf68d613e3b3d9137d67272562080d68a2b32d5" +checksum = "6048858004bcff69094cd972ed40a32500f153bd3be9f716b2eed2e8217c4838" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.64", ] [[package]] name = "serde_json" -version = "1.0.107" +version = "1.0.117" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65" +checksum = "455182ea6142b14f93f4bc5320a2b31c1f266b66a4a5c858b013302a5d8cbfc3" dependencies = [ "itoa", "ryu", @@ -2723,7 +2782,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.64", ] [[package]] @@ -2802,6 +2861,12 @@ version = "1.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74233d3b3b2f6d4b006dc19dee745e73e2a6bfb6f93607cd3b02bd5b00797d7c" +[[package]] +name = "siphasher" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" + [[package]] name = "sized-chunks" version = "0.6.5" @@ -2849,12 +2914,12 @@ dependencies = [ [[package]] name = "solana-account-decoder" -version = "1.17.5" +version = "1.18.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7592d5226a565440a367de95128b34e031752483ca0552e18d5601661f9e52f8" +checksum = "0f31c3dc9c7ebfaff452f063b406bbf64d326d71120996f4d3fdeee7ae7f1b6e" dependencies = [ "Inflector", - "base64 0.21.4", + "base64 0.21.7", "bincode", "bs58", "bv", @@ -2866,6 +2931,7 @@ dependencies = [ "solana-sdk", "spl-token", "spl-token-2022", + "spl-token-group-interface", "spl-token-metadata-interface", "thiserror", "zstd", @@ -2873,9 +2939,9 @@ dependencies = [ [[package]] name = "solana-config-program" -version = "1.17.5" +version = "1.18.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eab6e41a3149615cc208fcbbb09f23a4fb721c867b93695ce9affe2427d64263" +checksum = "d12f4c7ca44f55afb012dfadd21a352cb818a225f4e6d7fe3db5c3fcb1e28ca1" dependencies = [ "bincode", "chrono", @@ -2887,17 +2953,13 @@ dependencies = [ [[package]] name = "solana-frozen-abi" -version = "1.17.5" +version = "1.18.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "358b2e42869ebb34c6cafc89ca7cda381817b5bd781fa33302d335c6feaa04b6" +checksum = "9843fe4a4e4d541bd056465257704d8d53b50ed59328dcb5f37821ae0f843676" dependencies = [ - "ahash 0.8.3", - "blake3", "block-buffer 0.10.4", "bs58", "bv", - "byteorder", - "cc", "either", "generic-array", "im", @@ -2908,7 +2970,6 @@ dependencies = [ "serde", "serde_bytes", "serde_derive", - "serde_json", "sha2 0.10.8", "solana-frozen-abi-macro", "subtle", @@ -2917,21 +2978,21 @@ dependencies = [ [[package]] name = "solana-frozen-abi-macro" -version = "1.17.5" +version = "1.18.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea690497947bc9cc8bcf7985b3c4c539b199e56c28f250746026797b681ea291" +checksum = "3f24edb8172842544ace0ccb9547353cc55fe4a6d3b2786e209939d3a8bf271d" dependencies = [ "proc-macro2", "quote", "rustc_version", - "syn 2.0.38", + "syn 2.0.64", ] [[package]] name = "solana-geyser-plugin-interface" -version = "1.17.5" +version = "1.18.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e15a6eb2b41056a4ed98b3de12d37af553e8ddb22cb3726a7801ca15f2a5c0a" +checksum = "bac9c1d761318b992ea6514d2e32853e285af07e6158879dc299500f7fee9033" dependencies = [ "log", "solana-sdk", @@ -2941,9 +3002,9 @@ dependencies = [ [[package]] name = "solana-logger" -version = "1.17.5" +version = "1.18.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e679b49d07fa6a140e194cb453322b265fb48993e4ae6225a9524a7a6e28f27b" +checksum = "0a9c97300d5fd98fd490819186debfda9d47b1a5c82b5ffdb76e2ea6bad055c4" dependencies = [ "env_logger", "lazy_static", @@ -2952,9 +3013,9 @@ dependencies = [ [[package]] name = "solana-measure" -version = "1.17.5" +version = "1.18.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8d4e3d96aa286a70626b3e6ec1fd1373b4ffe687eaaa8db2f2105538dd771ad" +checksum = "e9bf69dbc3d69406b67d3d263c8a5aa0d8501051d75aa842f47502652060596d" dependencies = [ "log", "solana-sdk", @@ -2962,9 +3023,9 @@ dependencies = [ [[package]] name = "solana-metrics" -version = "1.17.5" +version = "1.18.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "096528fd079d6be74488629477d06b8ce0575ffe60bd9616538924e5a6846e8c" +checksum = "35a2112662341adaf1b8fbd4a8d819bc24ae5d1d59655e0561161c5c816894b9" dependencies = [ "crossbeam-channel", "gethostname", @@ -2977,20 +3038,21 @@ dependencies = [ [[package]] name = "solana-program" -version = "1.17.5" +version = "1.18.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "926ca88c8b0d0815ce4336092379eb0b61aebaab8ba984784c029a559200527f" +checksum = "9de9a1634b9d30ca0e5c2d53806c030a5d9c07dfcc4505ebeb218206514d17b8" dependencies = [ "ark-bn254", "ark-ec", "ark-ff", "ark-serialize", - "base64 0.21.4", + "base64 0.21.7", "bincode", - "bitflags 2.4.0", + "bitflags 2.5.0", "blake3", "borsh 0.10.3", "borsh 0.9.3", + "borsh 1.5.0", "bs58", "bv", "bytemuck", @@ -3008,7 +3070,7 @@ dependencies = [ "log", "memoffset", "num-bigint 0.4.4", - "num-derive 0.3.3", + "num-derive 0.4.1", "num-traits", "parking_lot", "rand 0.8.5", @@ -3031,18 +3093,18 @@ dependencies = [ [[package]] name = "solana-program-runtime" -version = "1.17.5" +version = "1.18.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14a0118d5aa0cd97530b2225e7ebb4d89f6cc589b86298a1052a01b7c58d4f1e" +checksum = "078fbc30339aff91d84ef5fc49ad75818419fedc543da22617d2f36a93d56bff" dependencies = [ - "base64 0.21.4", + "base64 0.21.7", "bincode", "eager", "enum-iterator", "itertools", "libc", "log", - "num-derive 0.3.3", + "num-derive 0.4.1", "num-traits", "percentage", "rand 0.8.5", @@ -3059,15 +3121,15 @@ dependencies = [ [[package]] name = "solana-sdk" -version = "1.17.5" +version = "1.18.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d55719c5a4fdcf7651120ba6650cf4eba5f2b1eb30b06d2d53aec7237146d1f9" +checksum = "323d21f0cb307e28ccfbcb3a24a5ae230abc8176bfb82492df6773deb79b62de" dependencies = [ "assert_matches", - "base64 0.21.4", + "base64 0.21.7", "bincode", - "bitflags 2.4.0", - "borsh 0.10.3", + "bitflags 2.5.0", + "borsh 1.5.0", "bs58", "bytemuck", "byteorder", @@ -3084,9 +3146,9 @@ dependencies = [ "libsecp256k1", "log", "memmap2", - "num-derive 0.3.3", + "num-derive 0.4.1", "num-traits", - "num_enum 0.6.1", + "num_enum 0.7.2", "pbkdf2 0.11.0", "qstring", "qualifier_attr", @@ -3101,6 +3163,7 @@ dependencies = [ "serde_with", "sha2 0.10.8", "sha3 0.10.8", + "siphasher", "solana-frozen-abi", "solana-frozen-abi-macro", "solana-logger", @@ -3113,25 +3176,31 @@ dependencies = [ [[package]] name = "solana-sdk-macro" -version = "1.17.5" +version = "1.18.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7985176f781b66c625070b413f48740fab2ec2dd330f7afd04563799dffec44" +checksum = "ff6d088aff04f5ad17f6f4a1a84a7a6aef633d48e8ed6c12154fcbb5dfde07bd" dependencies = [ "bs58", "proc-macro2", "quote", "rustversion", - "syn 2.0.38", + "syn 2.0.64", ] +[[package]] +name = "solana-security-txt" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "468aa43b7edb1f9b7b7b686d5c3aeb6630dc1708e86e31343499dd5c4d775183" + [[package]] name = "solana-transaction-status" -version = "1.17.5" +version = "1.18.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6c9124cce810f23e04d51bc45c7b6b8ba4cd1abc13926a04c3d3e2893d9bb96" +checksum = "d08bc13fa4f5ddf945253ac957b8b924c6181f9a80283e47e9922c07e73a845c" dependencies = [ "Inflector", - "base64 0.21.4", + "base64 0.21.7", "bincode", "borsh 0.10.3", "bs58", @@ -3151,12 +3220,12 @@ dependencies = [ [[package]] name = "solana-zk-token-sdk" -version = "1.17.5" +version = "1.18.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1d5f912154350af6318043ff857198914fbaaa88338f57bfff5f0ba5cf0da62" +checksum = "c7ea6cfb74066a35ea9ad53b1108bb26f35752569bcfb3d9203f58a7bf57fac5" dependencies = [ "aes-gcm-siv", - "base64 0.21.4", + "base64 0.21.7", "bincode", "bytemuck", "byteorder", @@ -3165,7 +3234,7 @@ dependencies = [ "itertools", "lazy_static", "merlin", - "num-derive 0.3.3", + "num-derive 0.4.1", "num-traits", "rand 0.7.3", "serde", @@ -3205,9 +3274,9 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" [[package]] name = "spl-associated-token-account" -version = "2.2.0" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "385e31c29981488f2820b2022d8e731aae3b02e6e18e2fd854e4c9a94dc44fc3" +checksum = "992d9c64c2564cc8f63a4b508bf3ebcdf2254b0429b13cd1d31adb6162432a5f" dependencies = [ "assert_matches", "borsh 0.10.3", @@ -3238,7 +3307,7 @@ checksum = "fadbefec4f3c678215ca72bd71862697bb06b41fd77c0088902dd3203354387b" dependencies = [ "quote", "spl-discriminator-syn", - "syn 2.0.38", + "syn 2.0.64", ] [[package]] @@ -3250,7 +3319,7 @@ dependencies = [ "proc-macro2", "quote", "sha2 0.10.8", - "syn 2.0.38", + "syn 2.0.64", "thiserror", ] @@ -3298,14 +3367,14 @@ dependencies = [ "proc-macro2", "quote", "sha2 0.10.8", - "syn 2.0.38", + "syn 2.0.64", ] [[package]] name = "spl-tlv-account-resolution" -version = "0.4.0" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "062e148d3eab7b165582757453632ffeef490c02c86a48bfdb4988f63eefb3b9" +checksum = "56f335787add7fa711819f9e7c573f8145a5358a709446fe2d24bf2a88117c90" dependencies = [ "bytemuck", "solana-program", @@ -3332,26 +3401,41 @@ dependencies = [ [[package]] name = "spl-token-2022" -version = "0.9.0" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4abf34a65ba420584a0c35f3903f8d727d1f13ababbdc3f714c6b065a686e86" +checksum = "d697fac19fd74ff472dfcc13f0b442dd71403178ce1de7b5d16f83a33561c059" dependencies = [ "arrayref", "bytemuck", "num-derive 0.4.1", "num-traits", - "num_enum 0.7.0", + "num_enum 0.7.2", "solana-program", + "solana-security-txt", "solana-zk-token-sdk", "spl-memo", "spl-pod", "spl-token", + "spl-token-group-interface", "spl-token-metadata-interface", "spl-transfer-hook-interface", "spl-type-length-value", "thiserror", ] +[[package]] +name = "spl-token-group-interface" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b889509d49fa74a4a033ca5dae6c2307e9e918122d97e58562f5c4ffa795c75d" +dependencies = [ + "bytemuck", + "solana-program", + "spl-discriminator", + "spl-pod", + "spl-program-error", +] + [[package]] name = "spl-token-metadata-interface" version = "0.2.0" @@ -3368,9 +3452,9 @@ dependencies = [ [[package]] name = "spl-transfer-hook-interface" -version = "0.3.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "051d31803f873cabe71aec3c1b849f35248beae5d19a347d93a5c9cccc5d5a9b" +checksum = "7aabdb7c471566f6ddcee724beb8618449ea24b399e58d464d6b5bc7db550259" dependencies = [ "arrayref", "bytemuck", @@ -3420,15 +3504,33 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.38" +version = "2.0.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e96b79aaa137db8f61e26363a0c9b47d8b4ec75da28b7d1d614c2303e232408b" +checksum = "7ad3dee41f36859875573074334c200d1add8e4a87bb37113ebd31d926b7b11f" dependencies = [ "proc-macro2", "quote", "unicode-ident", ] +[[package]] +name = "syn_derive" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1329189c02ff984e9736652b1631330da25eaa6bc639089ed4915d25446cbe7b" +dependencies = [ + "proc-macro-error", + "proc-macro2", + "quote", + "syn 2.0.64", +] + +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + [[package]] name = "system-configuration" version = "0.5.1" @@ -3474,22 +3576,22 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.49" +version = "1.0.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1177e8c6d7ede7afde3585fd2513e611227efd6481bd78d2e82ba1ce16557ed4" +checksum = "579e9083ca58dd9dcf91a9923bb9054071b9ebbd800b342194c9feb0ee89fc18" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.49" +version = "1.0.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10712f02019e9288794769fba95cd6847df9874d49d871d062172f9dd41bc4cc" +checksum = "e2470041c06ec3ac1ab38d0356a6119054dedaea53e12fbefc0de730a1c08524" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.64", ] [[package]] @@ -3563,7 +3665,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.64", ] [[package]] @@ -3611,9 +3713,9 @@ dependencies = [ [[package]] name = "toml_datetime" -version = "0.6.3" +version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7cda73e2f1397b1262d6dfdcef8aafae14d1de7748d66822d3bfeeb6d03e5e4b" +checksum = "4badfd56924ae69bcc9039335b2e017639ce3f9b001c393c1b2d1ef846ce2cbf" [[package]] name = "toml_edit" @@ -3626,6 +3728,17 @@ dependencies = [ "winnow", ] +[[package]] +name = "toml_edit" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1" +dependencies = [ + "indexmap 2.0.2", + "toml_datetime", + "winnow", +] + [[package]] name = "tower-service" version = "0.3.2" @@ -3652,7 +3765,7 @@ checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.64", ] [[package]] @@ -3842,9 +3955,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.87" +version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7706a72ab36d8cb1f80ffbf0e071533974a60d0a308d01a5d0375bf60499a342" +checksum = "4be2531df63900aeb2bca0daaaddec08491ee64ceecbee5076636a3b026795a8" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -3852,16 +3965,16 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.87" +version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ef2b6d3c510e9625e5fe6f509ab07d66a760f0885d858736483c32ed7809abd" +checksum = "614d787b966d3989fa7bb98a654e369c762374fd3213d212cfc0251257e747da" dependencies = [ "bumpalo", "log", "once_cell", "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.64", "wasm-bindgen-shared", ] @@ -3879,9 +3992,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.87" +version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dee495e55982a3bd48105a7b947fd2a9b4a8ae3010041b9e0faab3f9cd028f1d" +checksum = "a1f8823de937b71b9460c0c34e25f3da88250760bec0ebac694b49997550d726" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -3889,22 +4002,22 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.87" +version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" +checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.64", "wasm-bindgen-backend", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-shared" -version = "0.2.87" +version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1" +checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96" [[package]] name = "web-sys" @@ -4070,7 +4183,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.64", ] [[package]] diff --git a/plerkle/Cargo.toml b/plerkle/Cargo.toml index 30fb211..f76265b 100644 --- a/plerkle/Cargo.toml +++ b/plerkle/Cargo.toml @@ -14,10 +14,10 @@ crate-type = ["cdylib", "rlib"] [dependencies] log = "0.4.11" async-trait = "0.1.53" -solana-sdk = { version ="~1.17" } -solana-transaction-status = { version = "~1.17" } -solana-geyser-plugin-interface = { version = "~1.17" } -solana-logger = { version = "~1.17" } +solana-sdk = { version ="~1.18.11" } +solana-transaction-status = { version = "~1.18.11" } +solana-geyser-plugin-interface = { version = "~1.18.11" } +solana-logger = { version = "~1.18.11" } thiserror = "1.0.30" base64 = "0.21.0" lazy_static = "1.4.0" diff --git a/plerkle_serialization/Cargo.toml b/plerkle_serialization/Cargo.toml index dfdda40..80cafb0 100644 --- a/plerkle_serialization/Cargo.toml +++ b/plerkle_serialization/Cargo.toml @@ -12,8 +12,8 @@ readme = "Readme.md" flatbuffers = "23.1.21" chrono = "0.4.22" serde = { version = "1.0.152" } -solana-sdk = { version = "~1.17" } -solana-transaction-status = { version = "~1.17" } +solana-sdk = { version = "~1.18.11" } +solana-transaction-status = { version = "~1.18.11" } bs58 = "0.4.0" thiserror = "1.0.38" [package.metadata.docs.rs] From b0314d8c948a1e27282d945f98788e11c703227e Mon Sep 17 00:00:00 2001 From: rwwwx Date: Thu, 8 Aug 2024 11:49:32 +0300 Subject: [PATCH 02/11] `RUST_VERSION` and `SOLANA_VERSION` update on ci --- .github/workflows/release.yml | 4 ++-- .github/workflows/test.yml | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index d34dbd0..c53cae5 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -6,8 +6,8 @@ on: env: CARGO_TERM_COLOR: always IMAGE_NAME: plerkle-test-validator - RUST_VERSION: 1.73.0 - SOLANA_VERSION_STABLE: v1.17.20 + RUST_VERSION: 1.75.0 + SOLANA_VERSION_STABLE: v1.18.20 jobs: release-stable: runs-on: ubuntu-20-04-8-cores diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e6e564d..194396b 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -12,8 +12,8 @@ on: env: CARGO_TERM_COLOR: always IMAGE_NAME: plerkle-test-validator - RUST_VERSION: 1.73.0 - SOLANA_VERSION_STABLE: v1.17.20 + RUST_VERSION: 1.75.0 + SOLANA_VERSION_STABLE: v1.18.20 jobs: test-stable: From 0bb9affb1199ffb22dc7ac83daaec49129800aac Mon Sep 17 00:00:00 2001 From: rwwwx Date: Thu, 8 Aug 2024 12:57:49 +0300 Subject: [PATCH 03/11] added '_is_reload' flag to match `GeyserPlugin` trait definition of `1.18.*` version + fmt and warnings fix --- plerkle/src/geyser_plugin_nft.rs | 98 ++++++++++++++++++-------------- 1 file changed, 55 insertions(+), 43 deletions(-) diff --git a/plerkle/src/geyser_plugin_nft.rs b/plerkle/src/geyser_plugin_nft.rs index 1a3d933..5defdc4 100644 --- a/plerkle/src/geyser_plugin_nft.rs +++ b/plerkle/src/geyser_plugin_nft.rs @@ -11,9 +11,9 @@ use plerkle_messenger::{ select_messenger, MessengerConfig, ACCOUNT_STREAM, BLOCK_STREAM, SLOT_STREAM, TRANSACTION_STREAM, }; -use plerkle_serialization::{serializer::{ +use plerkle_serialization::serializer::{ serialize_account, serialize_block, serialize_transaction, -}}; +}; use serde::Deserialize; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; @@ -28,8 +28,7 @@ use std::{ fs::File, io::Read, net::UdpSocket, - ops::Bound::Included, - ops::RangeBounds, + ops::{Bound::Included, RangeBounds}, sync::{Arc, Mutex}, }; use tokio::{ @@ -103,31 +102,31 @@ pub(crate) struct Plerkle<'a> { } trait PlerklePrivateMethods { - fn get_plerkle_block_info<'b>(&self, blockinfo: ReplicaBlockInfoVersions<'b>) -> plerkle_serialization::solana_geyser_plugin_interface_shims::ReplicaBlockInfoV2<'b>; + fn get_plerkle_block_info<'b>( + &self, + blockinfo: ReplicaBlockInfoVersions<'b>, + ) -> plerkle_serialization::solana_geyser_plugin_interface_shims::ReplicaBlockInfoV2<'b>; } impl<'a> PlerklePrivateMethods for Plerkle<'a> { - fn get_plerkle_block_info<'b>(&self, blockinfo: ReplicaBlockInfoVersions<'b>) -> plerkle_serialization::solana_geyser_plugin_interface_shims::ReplicaBlockInfoV2<'b> { + fn get_plerkle_block_info<'b>( + &self, + blockinfo: ReplicaBlockInfoVersions<'b>, + ) -> plerkle_serialization::solana_geyser_plugin_interface_shims::ReplicaBlockInfoV2<'b> { match blockinfo { - ReplicaBlockInfoVersions::V0_0_1(block_info) => plerkle_serialization::solana_geyser_plugin_interface_shims::ReplicaBlockInfoV2 { - parent_slot: 0, - parent_blockhash: "", - slot: block_info.slot, - blockhash: block_info.blockhash, - block_time: block_info.block_time, - block_height: block_info.block_height, - executed_transaction_count: 0, - }, - ReplicaBlockInfoVersions::V0_0_2(block_info) => plerkle_serialization::solana_geyser_plugin_interface_shims::ReplicaBlockInfoV2 { - parent_slot: 0, - parent_blockhash: "", - slot: block_info.slot, - blockhash: block_info.blockhash, - block_time: block_info.block_time, - block_height: block_info.block_height, - executed_transaction_count: 0, - }, - ReplicaBlockInfoVersions::V0_0_3(block_info) => plerkle_serialization::solana_geyser_plugin_interface_shims::ReplicaBlockInfoV2 { + ReplicaBlockInfoVersions::V0_0_1(block_info) => { + plerkle_serialization::solana_geyser_plugin_interface_shims::ReplicaBlockInfoV2 { + parent_slot: 0, + parent_blockhash: "", + slot: block_info.slot, + blockhash: block_info.blockhash, + block_time: block_info.block_time, + block_height: block_info.block_height, + executed_transaction_count: 0, + } + } + ReplicaBlockInfoVersions::V0_0_2(block_info) => { + plerkle_serialization::solana_geyser_plugin_interface_shims::ReplicaBlockInfoV2 { parent_slot: 0, parent_blockhash: "", slot: block_info.slot, @@ -136,6 +135,18 @@ impl<'a> PlerklePrivateMethods for Plerkle<'a> { block_height: block_info.block_height, executed_transaction_count: 0, } + } + ReplicaBlockInfoVersions::V0_0_3(block_info) => { + plerkle_serialization::solana_geyser_plugin_interface_shims::ReplicaBlockInfoV2 { + parent_slot: 0, + parent_blockhash: "", + slot: block_info.slot, + blockhash: block_info.blockhash, + block_time: block_info.block_time, + block_height: block_info.block_height, + executed_transaction_count: 0, + } + } } } } @@ -294,11 +305,10 @@ impl<'a> Plerkle<'a> { // Currently not used but may want later. pub fn _txn_contains_program<'b>(keys: AccountKeys, program: &Pubkey) -> bool { keys.iter() - .find(|p| { + .any(|p| { let d = *p; d.eq(program) }) - .is_some() } } @@ -313,14 +323,15 @@ impl GeyserPlugin for Plerkle<'static> { "Plerkle" } - fn on_load(&mut self, config_file: &str) -> Result<()> { + fn on_load(&mut self, config_file: &str, _is_reload: bool) -> Result<()> { solana_logger::setup_with_default("info"); // Read in config file. info!( - "Loading plugin {:?} from config_file {:?}", + "Loading plugin {:?} from config_file {:?} with '_is_reload' flag: {:?}", self.name(), - config_file + config_file, + _is_reload, ); let mut file = File::open(config_file)?; let mut contents = String::new(); @@ -384,10 +395,10 @@ impl GeyserPlugin for Plerkle<'static> { .await .unwrap(); // We want to fail if the messenger is not configured correctly. - msg.add_stream(ACCOUNT_STREAM).await; - msg.add_stream(SLOT_STREAM).await; - msg.add_stream(TRANSACTION_STREAM).await; - msg.add_stream(BLOCK_STREAM).await; + let _ = msg.add_stream(ACCOUNT_STREAM).await; + let _ = msg.add_stream(SLOT_STREAM).await; + let _ = msg.add_stream(TRANSACTION_STREAM).await; + let _ = msg.add_stream(BLOCK_STREAM).await; msg.set_buffer_size(ACCOUNT_STREAM, config.account_stream_size.unwrap_or(100_000_000)).await; msg.set_buffer_size(SLOT_STREAM, config.slot_stream_size.unwrap_or(100_000)).await; msg.set_buffer_size(TRANSACTION_STREAM, config.transaction_stream_size.unwrap_or(10_000_000)).await; @@ -423,7 +434,7 @@ impl GeyserPlugin for Plerkle<'static> { })); } - tasks.push(tokio::spawn(async move { + tasks.push(tokio::spawn(async move { let mut last_idx = 0; while let Some(data) = main_receiver.recv().await { let seen = data.seen_at.elapsed().as_millis() as u64; @@ -448,7 +459,7 @@ impl GeyserPlugin for Plerkle<'static> { } last_idx = (last_idx + 1) % worker_senders.len(); - } + } })); }); @@ -589,7 +600,9 @@ impl GeyserPlugin for Plerkle<'static> { ) -> solana_geyser_plugin_interface::geyser_plugin_interface::Result<()> { info!("Slot status update: {:?} {:?}", slot, status); if status == SlotStatus::Processed && parent.is_some() { - let mut seen = self.slots_seen.lock() + let mut seen = self + .slots_seen + .lock() .map_err(|e| PlerkleError::SlotsSeenLockError { msg: e.to_string() })?; seen.insert(parent.unwrap()) } @@ -617,7 +630,9 @@ impl GeyserPlugin for Plerkle<'static> { } } - let mut seen: std::sync::MutexGuard<'_, SlotStore> = self.slots_seen.lock() + let mut seen: std::sync::MutexGuard<'_, SlotStore> = self + .slots_seen + .lock() .map_err(|e| PlerkleError::SlotsSeenLockError { msg: e.to_string() })?; let slots_to_purge = seen.needs_purge(slot); if let Some(purgable) = slots_to_purge { @@ -724,10 +739,7 @@ impl GeyserPlugin for Plerkle<'static> { Ok(()) } - fn notify_block_metadata( - &self, - blockinfo: ReplicaBlockInfoVersions, - ) -> Result<()> { + fn notify_block_metadata(&self, blockinfo: ReplicaBlockInfoVersions) -> Result<()> { let seen = Instant::now(); let plerkle_blockinfo = self.get_plerkle_block_info(blockinfo); @@ -744,7 +756,7 @@ impl GeyserPlugin for Plerkle<'static> { let data = SerializedData { stream: BLOCK_STREAM, builder, - seen_at: seen.clone(), + seen_at: seen, }; let _ = sender.send(data); }); From 529a9a8865715641626cbbaad376e906973101a9 Mon Sep 17 00:00:00 2001 From: rwwwx Date: Thu, 8 Aug 2024 12:59:43 +0300 Subject: [PATCH 04/11] fixed warnings --- plerkle_messenger/src/redis_messenger.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/plerkle_messenger/src/redis_messenger.rs b/plerkle_messenger/src/redis_messenger.rs index 9eaec6e..856aae4 100644 --- a/plerkle_messenger/src/redis_messenger.rs +++ b/plerkle_messenger/src/redis_messenger.rs @@ -41,7 +41,7 @@ pub struct RedisMessenger { retries: usize, batch_size: usize, idle_timeout: usize, - message_wait_timeout: usize, + _message_wait_timeout: usize, consumer_group_name: String, pipeline_size: usize, pipeline_max_time: u64, @@ -61,7 +61,7 @@ impl RedisMessenger { &mut self, stream_key: &'static str, ) -> Result, MessengerError> { - let mut id = "0-0".to_owned(); + let id = "0-0".to_owned(); let mut xauto = cmd("XAUTOCLAIM"); xauto .arg(stream_key) @@ -227,7 +227,7 @@ impl Messenger for RedisMessenger { retries, batch_size, idle_timeout, - message_wait_timeout, + _message_wait_timeout: message_wait_timeout, consumer_group_name, pipeline_size, pipeline_max_time, From 696f6ff95dad4989ff36adbef565643b669ce09d Mon Sep 17 00:00:00 2001 From: rwwwx Date: Thu, 8 Aug 2024 15:03:26 +0300 Subject: [PATCH 05/11] bump up crates versions --- plerkle/Cargo.toml | 2 +- plerkle_messenger/Cargo.toml | 2 +- plerkle_serialization/Cargo.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/plerkle/Cargo.toml b/plerkle/Cargo.toml index f76265b..ee0c17a 100644 --- a/plerkle/Cargo.toml +++ b/plerkle/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "plerkle" description = "Geyser plugin with dynamic config reloading, message bus agnostic abstractions and a whole lot of fun." -version = "1.6.0" +version = "1.7.0" authors = ["Metaplex Developers "] repository = "https://github.com/metaplex-foundation/digital-asset-validator-plugin" license = "AGPL-3.0" diff --git a/plerkle_messenger/Cargo.toml b/plerkle_messenger/Cargo.toml index a6775a5..324a1f4 100644 --- a/plerkle_messenger/Cargo.toml +++ b/plerkle_messenger/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "plerkle_messenger" description = "Metaplex Messenger trait for Geyser plugin producer/consumer patterns." -version = "1.8.0" +version = "1.8.1" authors = ["Metaplex Developers "] repository = "https://github.com/metaplex-foundation/digital-asset-validator-plugin" license = "AGPL-3.0" diff --git a/plerkle_serialization/Cargo.toml b/plerkle_serialization/Cargo.toml index 80cafb0..60e3753 100644 --- a/plerkle_serialization/Cargo.toml +++ b/plerkle_serialization/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "plerkle_serialization" description = "Metaplex Flatbuffers Plerkle Serialization for Geyser plugin producer/consumer patterns." -version = "1.8.0" +version = "1.9.0" authors = ["Metaplex Developers "] repository = "https://github.com/metaplex-foundation/digital-asset-validator-plugin" license = "AGPL-3.0" From 61de71c8f64f2b743c0a339b9e9ccac5aabe66c4 Mon Sep 17 00:00:00 2001 From: rwwwx Date: Thu, 8 Aug 2024 17:12:16 +0300 Subject: [PATCH 06/11] renamed `_is_reload` to `is_reload` --- plerkle/src/geyser_plugin_nft.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/plerkle/src/geyser_plugin_nft.rs b/plerkle/src/geyser_plugin_nft.rs index 5defdc4..84e35be 100644 --- a/plerkle/src/geyser_plugin_nft.rs +++ b/plerkle/src/geyser_plugin_nft.rs @@ -323,15 +323,15 @@ impl GeyserPlugin for Plerkle<'static> { "Plerkle" } - fn on_load(&mut self, config_file: &str, _is_reload: bool) -> Result<()> { + fn on_load(&mut self, config_file: &str, is_reload: bool) -> Result<()> { solana_logger::setup_with_default("info"); // Read in config file. info!( - "Loading plugin {:?} from config_file {:?} with '_is_reload' flag: {:?}", + "Loading plugin {:?} from config_file {:?} with 'is_reload' flag: {:?}", self.name(), config_file, - _is_reload, + is_reload, ); let mut file = File::open(config_file)?; let mut contents = String::new(); From bc1b8ef521574c86230deb471a0940e83cb972e5 Mon Sep 17 00:00:00 2001 From: rwwwx Date: Thu, 8 Aug 2024 20:06:00 +0300 Subject: [PATCH 07/11] moved `plerkle_messenger` to `1.9.0` --- plerkle_messenger/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plerkle_messenger/Cargo.toml b/plerkle_messenger/Cargo.toml index 324a1f4..759b057 100644 --- a/plerkle_messenger/Cargo.toml +++ b/plerkle_messenger/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "plerkle_messenger" description = "Metaplex Messenger trait for Geyser plugin producer/consumer patterns." -version = "1.8.1" +version = "1.9.0" authors = ["Metaplex Developers "] repository = "https://github.com/metaplex-foundation/digital-asset-validator-plugin" license = "AGPL-3.0" From 975a5974bb8c3fe02afd05c5ee127b2dce6544dd Mon Sep 17 00:00:00 2001 From: rwwwx Date: Thu, 8 Aug 2024 20:20:00 +0300 Subject: [PATCH 08/11] use `1.18.11` of `SOLANA_VERSION_STABLE` --- .github/workflows/release.yml | 2 +- .github/workflows/test.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index c53cae5..4709f71 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -7,7 +7,7 @@ env: CARGO_TERM_COLOR: always IMAGE_NAME: plerkle-test-validator RUST_VERSION: 1.75.0 - SOLANA_VERSION_STABLE: v1.18.20 + SOLANA_VERSION_STABLE: v1.18.11 jobs: release-stable: runs-on: ubuntu-20-04-8-cores diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 194396b..8bf6504 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -13,7 +13,7 @@ env: CARGO_TERM_COLOR: always IMAGE_NAME: plerkle-test-validator RUST_VERSION: 1.75.0 - SOLANA_VERSION_STABLE: v1.18.20 + SOLANA_VERSION_STABLE: v1.18.11 jobs: test-stable: From 3810963b1c6ebab979275914cc819c750f8cf28c Mon Sep 17 00:00:00 2001 From: n00m4d Date: Mon, 7 Oct 2024 13:54:05 +0200 Subject: [PATCH 09/11] feat: add redis pool messenger --- plerkle/Cargo.toml | 2 +- plerkle/src/geyser_plugin_nft.rs | 5 +- plerkle_messenger/Cargo.toml | 3 +- plerkle_messenger/Readme.md | 31 ++ plerkle_messenger/src/lib.rs | 8 +- plerkle_messenger/src/plerkle_messenger.rs | 51 ++- plerkle_messenger/src/redis/mod.rs | 15 + .../src/{ => redis}/redis_messenger.rs | 340 +++++++++--------- .../src/redis/redis_pool_messenger.rs | 207 +++++++++++ rust-toolchain.toml | 2 +- 10 files changed, 467 insertions(+), 197 deletions(-) create mode 100644 plerkle_messenger/src/redis/mod.rs rename plerkle_messenger/src/{ => redis}/redis_messenger.rs (92%) create mode 100644 plerkle_messenger/src/redis/redis_pool_messenger.rs diff --git a/plerkle/Cargo.toml b/plerkle/Cargo.toml index ee0c17a..70cbcff 100644 --- a/plerkle/Cargo.toml +++ b/plerkle/Cargo.toml @@ -36,7 +36,7 @@ tracing-subscriber = { version = "0.3.16", features = [ "ansi", ] } hex = "0.4.3" -plerkle_messenger = { path = "../plerkle_messenger", version = "1.6.0", features = ["redis"] } +plerkle_messenger = { path = "../plerkle_messenger", version = "1.6.0" } flatbuffers = "23.1.21" plerkle_serialization = { path = "../plerkle_serialization", version = "1.6.0" } tokio = { version = "1.23.0", features = ["full"] } diff --git a/plerkle/src/geyser_plugin_nft.rs b/plerkle/src/geyser_plugin_nft.rs index 84e35be..a8796af 100644 --- a/plerkle/src/geyser_plugin_nft.rs +++ b/plerkle/src/geyser_plugin_nft.rs @@ -8,8 +8,7 @@ use dashmap::DashMap; use figment::{providers::Env, Figment}; use flatbuffers::FlatBufferBuilder; use plerkle_messenger::{ - select_messenger, MessengerConfig, ACCOUNT_STREAM, BLOCK_STREAM, SLOT_STREAM, - TRANSACTION_STREAM, + select_messenger_stream, MessengerConfig, ACCOUNT_STREAM, BLOCK_STREAM, SLOT_STREAM, TRANSACTION_STREAM }; use plerkle_serialization::serializer::{ serialize_account, serialize_block, serialize_transaction, @@ -391,7 +390,7 @@ impl GeyserPlugin for Plerkle<'static> { let mut worker_senders = Vec::with_capacity(workers_num); for _ in 0..workers_num { let (send, recv) = unbounded_channel::(); - let mut msg = select_messenger(config.messenger_config.clone()) + let mut msg = select_messenger_stream(config.messenger_config.clone()) .await .unwrap(); // We want to fail if the messenger is not configured correctly. diff --git a/plerkle_messenger/Cargo.toml b/plerkle_messenger/Cargo.toml index 759b057..5076703 100644 --- a/plerkle_messenger/Cargo.toml +++ b/plerkle_messenger/Cargo.toml @@ -9,7 +9,8 @@ edition = "2021" readme = "Readme.md" [dependencies] -redis = { version = "0.22.3", features = ["aio", "tokio-comp", "streams", "tokio-native-tls-comp", "connection-manager"], optional = true} +redis = { version = "0.27.2", features = ["aio", "tokio-comp", "streams", "tokio-native-tls-comp", "connection-manager"]} +tokio = "1.40.0" log = "0.4.11" thiserror = "1.0.30" async-trait = "0.1.53" diff --git a/plerkle_messenger/Readme.md b/plerkle_messenger/Readme.md index 6fda7da..8fb083d 100644 --- a/plerkle_messenger/Readme.md +++ b/plerkle_messenger/Readme.md @@ -6,3 +6,34 @@ A message bus agnostic Messaging Library that sends Transaction, Account, Block The plerkle serialization API changes at 1.0.0 which is a breaking change. This method removes confusion around the Recv data lifetime being tied back to the messenger interface. Now the data is owned. + +# Env example + +The Messenger can operate in two modes: a single Redis instance or multiple Redis instances. + +Just to clarify, the multiple Redis instances setup doesn't create a clustered connection. It's designed to work with separate, independent instances. + +You can configure the Redis client type via environment variables. + +Example environment configuration for a single Redis instance: + +``` +export PLUGIN_MESSENGER_CONFIG='{ + messenger_type="Redis", + redis_connection_str="redis://:pass@redis.app:6379" +}' +``` + +Example environment configuration for multiple Redis instances: + +``` +export PLUGIN_MESSENGER_CONFIG='{ + messenger_type="RedisPool", + redis_connection_str=[ + "redis://:pass@redis1.app:6379", + "redis://:pass@redis2.app:6379" + ] +}' +``` + +To switch between modes, you'll need to update both the `messenger_type` and `redis_connection_str` values. \ No newline at end of file diff --git a/plerkle_messenger/src/lib.rs b/plerkle_messenger/src/lib.rs index e8fb5fb..6272671 100644 --- a/plerkle_messenger/src/lib.rs +++ b/plerkle_messenger/src/lib.rs @@ -1,8 +1,8 @@ -#[cfg(feature = "redis")] -pub mod redis_messenger; - mod error; mod metrics; mod plerkle_messenger; -pub use crate::{error::*, plerkle_messenger::*}; +pub mod redis; +pub use redis::*; + +pub use {crate::error::*, plerkle_messenger::*}; diff --git a/plerkle_messenger/src/plerkle_messenger.rs b/plerkle_messenger/src/plerkle_messenger.rs index 08a67c9..cec4b74 100644 --- a/plerkle_messenger/src/plerkle_messenger.rs +++ b/plerkle_messenger/src/plerkle_messenger.rs @@ -1,13 +1,10 @@ -use crate::error::MessengerError; +use crate::{error::MessengerError, redis_pool_messenger::RedisPoolMessenger}; use async_trait::async_trait; use blake3::OUT_LEN; use figment::value::{Dict, Value}; use serde::Deserialize; use std::collections::BTreeMap; -#[cfg(feature = "pulsar")] -use crate::pulsar_messenger::PulsarMessenger; -#[cfg(feature = "redis")] use crate::redis_messenger::RedisMessenger; /// Some constants that can be used as stream key values. @@ -51,14 +48,12 @@ pub enum ConsumptionType { #[async_trait] pub trait Messenger: Sync + Send { - async fn new(config: MessengerConfig) -> Result - where - Self: Sized; fn messenger_type(&self) -> MessengerType; - async fn add_stream(&mut self, stream_key: &'static str) -> Result<(), MessengerError>; - async fn set_buffer_size(&mut self, stream_key: &'static str, max_buffer_size: usize); - async fn send(&mut self, stream_key: &'static str, bytes: &[u8]) -> Result<(), MessengerError>; - async fn recv(&mut self, stream_key: &'static str, consumption_type: ConsumptionType) -> Result, MessengerError>; + async fn recv( + &mut self, + stream_key: &'static str, + consumption_type: ConsumptionType, + ) -> Result, MessengerError>; async fn stream_size(&mut self, stream_key: &'static str) -> Result; // Ack-ing messages is made a bit awkward by the current interface layout because @@ -81,24 +76,50 @@ pub trait Messenger: Sync + Send { ) -> Result<(), MessengerError>; } -pub async fn select_messenger( +#[async_trait] +pub trait MessageStreamer: Sync + Send { + fn messenger_type(&self) -> MessengerType; + async fn add_stream(&mut self, stream_key: &'static str) -> Result<(), MessengerError>; + async fn set_buffer_size(&mut self, stream_key: &'static str, max_buffer_size: usize); + async fn send(&mut self, stream_key: &'static str, bytes: &[u8]) -> Result<(), MessengerError>; +} + +pub async fn select_messenger_read( config: MessengerConfig, ) -> Result, MessengerError> { match config.messenger_type { - #[cfg(feature = "redis")] MessengerType::Redis => { RedisMessenger::new(config).await.map(|a| Box::new(a) as Box) } _ => Err(MessengerError::ConfigurationError { - msg: "This Messenger type is not valid, unimplemented or you dont have the right crate features on.".to_string() + msg: "This Messenger type is not valid, unimplemented or you don't have the right crate features on.".to_string() + }) + } +} + +pub async fn select_messenger_stream( + config: MessengerConfig, +) -> Result, MessengerError> { + match config.messenger_type { + MessengerType::Redis => { + RedisMessenger::new(config).await.map(|a| Box::new(a) as Box) + } + MessengerType::RedisPool => { + RedisPoolMessenger::new(config).await.map(|a| Box::new(a) as Box) + } + _ => Err(MessengerError::ConfigurationError { + msg: "This Messenger type is not valid, unimplemented or you don't have the right crate features on.".to_string() }) } } #[derive(Deserialize, Debug, PartialEq, Eq, Clone)] pub enum MessengerType { + // Connect to one Redis instance Redis, - Pulsar, + // Connect to few different Redis instances + // Not a cluster + RedisPool, Invalid, } diff --git a/plerkle_messenger/src/redis/mod.rs b/plerkle_messenger/src/redis/mod.rs new file mode 100644 index 0000000..660855e --- /dev/null +++ b/plerkle_messenger/src/redis/mod.rs @@ -0,0 +1,15 @@ +pub mod redis_messenger; +pub mod redis_pool_messenger; + +// Redis stream values. +pub const GROUP_NAME: &str = "plerkle"; +pub const DATA_KEY: &str = "data"; +pub const DEFAULT_RETRIES: usize = 3; +pub const DEFAULT_MSG_BATCH_SIZE: usize = 10; +pub const MESSAGE_WAIT_TIMEOUT: usize = 10; +pub const IDLE_TIMEOUT: usize = 5000; +pub const REDIS_MAX_BYTES_COMMAND: usize = 536870912; +pub const PIPELINE_SIZE_BYTES: usize = REDIS_MAX_BYTES_COMMAND / 100; +pub const PIPELINE_MAX_TIME: u64 = 10; + +pub(crate) const REDIS_CON_STR: &str = "redis_connection_str"; diff --git a/plerkle_messenger/src/redis_messenger.rs b/plerkle_messenger/src/redis/redis_messenger.rs similarity index 92% rename from plerkle_messenger/src/redis_messenger.rs rename to plerkle_messenger/src/redis/redis_messenger.rs index 856aae4..2f89aad 100644 --- a/plerkle_messenger/src/redis_messenger.rs +++ b/plerkle_messenger/src/redis/redis_messenger.rs @@ -1,6 +1,6 @@ use crate::{ - error::MessengerError, metric, ConsumptionType, Messenger, MessengerConfig, MessengerType, - RecvData, + error::MessengerError, metric, ConsumptionType, MessageStreamer, Messenger, MessengerConfig, + MessengerType, RecvData, }; use async_trait::async_trait; @@ -23,16 +23,10 @@ use std::{ time::{Duration, Instant}, }; -// Redis stream values. -pub const GROUP_NAME: &str = "plerkle"; -pub const DATA_KEY: &str = "data"; -pub const DEFAULT_RETRIES: usize = 3; -pub const DEFAULT_MSG_BATCH_SIZE: usize = 10; -pub const MESSAGE_WAIT_TIMEOUT: usize = 10; -pub const IDLE_TIMEOUT: usize = 5000; -pub const REDIS_MAX_BYTES_COMMAND: usize = 536870912; -pub const PIPELINE_SIZE_BYTES: usize = REDIS_MAX_BYTES_COMMAND / 100; -pub const PIPELINE_MAX_TIME: u64 = 10; +use super::{ + DATA_KEY, DEFAULT_MSG_BATCH_SIZE, DEFAULT_RETRIES, GROUP_NAME, IDLE_TIMEOUT, + MESSAGE_WAIT_TIMEOUT, PIPELINE_MAX_TIME, PIPELINE_SIZE_BYTES, REDIS_CON_STR, +}; pub struct RedisMessenger { connection: ConnectionManager, @@ -48,15 +42,84 @@ pub struct RedisMessenger { } pub struct RedisMessengerStream { - max_len: Option, - local_buffer: LinkedList>, - local_buffer_total: usize, - local_buffer_last_flush: Instant, + pub max_len: Option, + pub local_buffer: LinkedList>, + pub local_buffer_total: usize, + pub local_buffer_last_flush: Instant, } -const REDIS_CON_STR: &str = "redis_connection_str"; - impl RedisMessenger { + pub async fn new(config: MessengerConfig) -> Result { + let uri = config + .get(REDIS_CON_STR) + .and_then(|u| u.clone().into_string()) + .ok_or(MessengerError::ConfigurationError { + msg: format!("Connection String Missing: {}", REDIS_CON_STR), + })?; + // Setup Redis client. + let client = redis::Client::open(uri).unwrap(); + + // Get connection. + let connection = client.get_connection_manager().await.map_err(|e| { + error!("{}", e.to_string()); + MessengerError::ConnectionError { msg: e.to_string() } + })?; + + let consumer_id = config + .get("consumer_id") + .and_then(|id| id.clone().into_string()) + // Using the previous default name when the configuration does not + // specify any particular consumer_id. + .unwrap_or_else(|| String::from("ingester")); + + let retries = config + .get("retries") + .and_then(|r| r.clone().to_u128().map(|n| n as usize)) + .unwrap_or(DEFAULT_RETRIES); + + let batch_size = config + .get("batch_size") + .and_then(|r| r.clone().to_u128().map(|n| n as usize)) + .unwrap_or(DEFAULT_MSG_BATCH_SIZE); + + let idle_timeout = config + .get("idle_timeout") + .and_then(|r| r.clone().to_u128().map(|n| n as usize)) + .unwrap_or(IDLE_TIMEOUT); + let message_wait_timeout = config + .get("message_wait_timeout") + .and_then(|r| r.clone().to_u128().map(|n| n as usize)) + .unwrap_or(MESSAGE_WAIT_TIMEOUT); + + let consumer_group_name = config + .get("consumer_group_name") + .and_then(|r| r.clone().into_string()) + .unwrap_or_else(|| GROUP_NAME.to_string()); + + let pipeline_size = config + .get("pipeline_size_bytes") + .and_then(|r| r.clone().to_u128().map(|n| n as usize)) + .unwrap_or(PIPELINE_SIZE_BYTES); + + let pipeline_max_time = config + .get("local_buffer_max_window") + .and_then(|r| r.clone().to_u128().map(|n| n as u64)) + .unwrap_or(PIPELINE_MAX_TIME); + + Ok(Self { + connection, + streams: HashMap::<&'static str, RedisMessengerStream>::default(), + consumer_id, + retries, + batch_size, + idle_timeout, + _message_wait_timeout: message_wait_timeout, + consumer_group_name, + pipeline_size, + pipeline_max_time, + }) + } + async fn xautoclaim( &mut self, stream_key: &'static str, @@ -128,7 +191,7 @@ impl RedisMessenger { // Get data from map. let bytes = match data { - Value::Data(bytes) => bytes, + Value::BulkString(bytes) => bytes, _ => { error!("Redis data for ID {id} in wrong format"); continue; @@ -158,94 +221,105 @@ impl RedisMessenger { #[async_trait] impl Messenger for RedisMessenger { - async fn new(config: MessengerConfig) -> Result { - let uri = config - .get(REDIS_CON_STR) - .and_then(|u| u.clone().into_string()) - .ok_or(MessengerError::ConfigurationError { - msg: format!("Connection String Missing: {}", REDIS_CON_STR), - })?; - // Setup Redis client. - let client = redis::Client::open(uri).unwrap(); - - // Get connection. - let connection = client.get_tokio_connection_manager().await.map_err(|e| { - error!("{}", e.to_string()); - MessengerError::ConnectionError { msg: e.to_string() } - })?; - - let _cluster_mode = config - .get("cluster_mode") - .and_then(|r| r.clone().to_bool()) - .unwrap_or(false); - - let consumer_id = config - .get("consumer_id") - .and_then(|id| id.clone().into_string()) - // Using the previous default name when the configuration does not - // specify any particular consumer_id. - .unwrap_or_else(|| String::from("ingester")); + fn messenger_type(&self) -> MessengerType { + MessengerType::Redis + } - let retries = config - .get("retries") - .and_then(|r| r.clone().to_u128().map(|n| n as usize)) - .unwrap_or(DEFAULT_RETRIES); + async fn stream_size(&mut self, stream_key: &'static str) -> Result { + let result: RedisResult = self.connection.xlen(stream_key).await; + match result { + Ok(reply) => Ok(reply), + Err(e) => Err(MessengerError::ConnectionError { msg: e.to_string() }), + } + } - let batch_size = config - .get("batch_size") - .and_then(|r| r.clone().to_u128().map(|n| n as usize)) - .unwrap_or(DEFAULT_MSG_BATCH_SIZE); + // is used only on client side + // Geyser does not call this method + async fn recv( + &mut self, + stream_key: &'static str, + consumption_type: ConsumptionType, + ) -> Result, MessengerError> { + let mut data_vec = Vec::with_capacity(self.batch_size * 2); + if consumption_type == ConsumptionType::New || consumption_type == ConsumptionType::All { + let opts = StreamReadOptions::default() + //.block(self.message_wait_timeout) + .count(self.batch_size) + .group(self.consumer_group_name.as_str(), self.consumer_id.as_str()); - let idle_timeout = config - .get("idle_timeout") - .and_then(|r| r.clone().to_u128().map(|n| n as usize)) - .unwrap_or(IDLE_TIMEOUT); - let message_wait_timeout = config - .get("message_wait_timeout") - .and_then(|r| r.clone().to_u128().map(|n| n as usize)) - .unwrap_or(MESSAGE_WAIT_TIMEOUT); + // Read on stream key and save the reply. Log but do not return errors. + let reply: StreamReadReply = self + .connection + .xread_options(&[stream_key], &[">"], &opts) + .await + .map_err(|e| { + error!("Redis receive error: {e}"); + MessengerError::ReceiveError { msg: e.to_string() } + })?; + // Parse data in stream read reply and store in Vec to return to caller. + for StreamKey { key: _, ids } in reply.keys.into_iter() { + for StreamId { id, map } in ids { + // Get data from map. + let data = if let Some(data) = map.get(DATA_KEY) { + data + } else { + error!("No Data was stored in Redis for ID {id}"); + continue; + }; + let bytes = match data { + Value::BulkString(bytes) => bytes, + _ => { + error!("Redis data for ID {id} in wrong format"); + continue; + } + }; - let consumer_group_name = config - .get("consumer_group_name") - .and_then(|r| r.clone().into_string()) - .unwrap_or_else(|| GROUP_NAME.to_string()); + data_vec.push(RecvData::new(id, bytes.to_vec())); + } + } + } + if consumption_type == ConsumptionType::Redeliver + || consumption_type == ConsumptionType::All + { + let xauto_reply = self.xautoclaim(stream_key).await; + match xauto_reply { + Ok(reply) => { + let mut pending_messages = reply; + data_vec.append(&mut pending_messages); + } + Err(e) => { + error!("XPENDING ERROR {e}"); + } + } + } - let pipeline_size = config - .get("pipeline_size_bytes") - .and_then(|r| r.clone().to_u128().map(|n| n as usize)) - .unwrap_or(PIPELINE_SIZE_BYTES); + Ok(data_vec) + } - let pipeline_max_time = config - .get("local_buffer_max_window") - .and_then(|r| r.clone().to_u128().map(|n| n as u64)) - .unwrap_or(PIPELINE_MAX_TIME); + async fn ack_msg( + &mut self, + stream_key: &'static str, + ids: &[String], + ) -> Result<(), MessengerError> { + if ids.is_empty() { + return Ok(()); + } + let mut pipe = redis::pipe(); + pipe.xack(stream_key, self.consumer_group_name.as_str(), ids); + pipe.xdel(stream_key, ids); - Ok(Self { - connection, - streams: HashMap::<&'static str, RedisMessengerStream>::default(), - consumer_id, - retries, - batch_size, - idle_timeout, - _message_wait_timeout: message_wait_timeout, - consumer_group_name, - pipeline_size, - pipeline_max_time, - }) + pipe.query_async(&mut self.connection) + .await + .map_err(|e| MessengerError::AckError { msg: e.to_string() }) } +} +#[async_trait] +impl MessageStreamer for RedisMessenger { fn messenger_type(&self) -> MessengerType { MessengerType::Redis } - async fn stream_size(&mut self, stream_key: &'static str) -> Result { - let result: RedisResult = self.connection.xlen(stream_key).await; - match result { - Ok(reply) => Ok(reply), - Err(e) => Err(MessengerError::ConnectionError { msg: e.to_string() }), - } - } - async fn add_stream(&mut self, stream_key: &'static str) -> Result<(), MessengerError> { // Add to streams hashmap. let _result = self.streams.insert( @@ -329,84 +403,6 @@ impl Messenger for RedisMessenger { } Ok(()) } - - async fn recv( - &mut self, - stream_key: &'static str, - consumption_type: ConsumptionType, - ) -> Result, MessengerError> { - let mut data_vec = Vec::with_capacity(self.batch_size * 2); - if consumption_type == ConsumptionType::New || consumption_type == ConsumptionType::All { - let opts = StreamReadOptions::default() - //.block(self.message_wait_timeout) - .count(self.batch_size) - .group(self.consumer_group_name.as_str(), self.consumer_id.as_str()); - - // Read on stream key and save the reply. Log but do not return errors. - let reply: StreamReadReply = self - .connection - .xread_options(&[stream_key], &[">"], &opts) - .await - .map_err(|e| { - error!("Redis receive error: {e}"); - MessengerError::ReceiveError { msg: e.to_string() } - })?; - // Parse data in stream read reply and store in Vec to return to caller. - for StreamKey { key: _, ids } in reply.keys.into_iter() { - for StreamId { id, map } in ids { - // Get data from map. - let data = if let Some(data) = map.get(DATA_KEY) { - data - } else { - error!("No Data was stored in Redis for ID {id}"); - continue; - }; - let bytes = match data { - Value::Data(bytes) => bytes, - _ => { - error!("Redis data for ID {id} in wrong format"); - continue; - } - }; - - data_vec.push(RecvData::new(id, bytes.to_vec())); - } - } - } - if consumption_type == ConsumptionType::Redeliver - || consumption_type == ConsumptionType::All - { - let xauto_reply = self.xautoclaim(stream_key).await; - match xauto_reply { - Ok(reply) => { - let mut pending_messages = reply; - data_vec.append(&mut pending_messages); - } - Err(e) => { - error!("XPENDING ERROR {e}"); - } - } - } - - Ok(data_vec) - } - - async fn ack_msg( - &mut self, - stream_key: &'static str, - ids: &[String], - ) -> Result<(), MessengerError> { - if ids.is_empty() { - return Ok(()); - } - let mut pipe = redis::pipe(); - pipe.xack(stream_key, self.consumer_group_name.as_str(), ids); - pipe.xdel(stream_key, ids); - - pipe.query_async(&mut self.connection) - .await - .map_err(|e| MessengerError::AckError { msg: e.to_string() }) - } } impl Debug for RedisMessenger { diff --git a/plerkle_messenger/src/redis/redis_pool_messenger.rs b/plerkle_messenger/src/redis/redis_pool_messenger.rs new file mode 100644 index 0000000..531c9c4 --- /dev/null +++ b/plerkle_messenger/src/redis/redis_pool_messenger.rs @@ -0,0 +1,207 @@ +use crate::{ + error::MessengerError, redis_messenger::RedisMessengerStream, MessageStreamer, MessengerConfig, + MessengerType, DATA_KEY, +}; +use async_trait::async_trait; + +use log::*; +use redis::{aio::ConnectionManager, streams::StreamMaxlen, AsyncCommands, RedisResult}; + +use std::{ + collections::{HashMap, LinkedList}, + time::{Duration, Instant}, +}; +use tokio::task::JoinSet; + +use super::{ + GROUP_NAME, MESSAGE_WAIT_TIMEOUT, PIPELINE_MAX_TIME, PIPELINE_SIZE_BYTES, REDIS_CON_STR, +}; + +/// A Redis Messenger capable of streaming data to multiple separate Redis instances. +pub struct RedisPoolMessenger { + connections_pool: Vec, + streams: HashMap<&'static str, RedisMessengerStream>, + _message_wait_timeout: usize, + consumer_group_name: String, + pipeline_size: usize, + pipeline_max_time: u64, +} + +impl RedisPoolMessenger { + pub async fn new(config: MessengerConfig) -> Result { + let uris = config + .get(REDIS_CON_STR) + .and_then(|u| u.clone().into_array()) + .ok_or(MessengerError::ConfigurationError { + msg: format!("Connection String Missing: {}", REDIS_CON_STR), + })?; + + let mut connections_pool = vec![]; + + for uri in uris { + // Setup Redis client. + let client = redis::Client::open(uri.into_string().ok_or( + MessengerError::ConfigurationError { + msg: format!("Connection String Missing: {}", REDIS_CON_STR), + }, + )?) + .unwrap(); + + // Get connection. + connections_pool.push(client.get_connection_manager().await.map_err(|e| { + error!("{}", e.to_string()); + MessengerError::ConnectionError { msg: e.to_string() } + })?); + } + + let message_wait_timeout = config + .get("message_wait_timeout") + .and_then(|r| r.clone().to_u128().map(|n| n as usize)) + .unwrap_or(MESSAGE_WAIT_TIMEOUT); + + let consumer_group_name = config + .get("consumer_group_name") + .and_then(|r| r.clone().into_string()) + .unwrap_or_else(|| GROUP_NAME.to_string()); + + let pipeline_size = config + .get("pipeline_size_bytes") + .and_then(|r| r.clone().to_u128().map(|n| n as usize)) + .unwrap_or(PIPELINE_SIZE_BYTES); + + let pipeline_max_time = config + .get("local_buffer_max_window") + .and_then(|r| r.clone().to_u128().map(|n| n as u64)) + .unwrap_or(PIPELINE_MAX_TIME); + + Ok(Self { + connections_pool, + streams: HashMap::<&'static str, RedisMessengerStream>::default(), + _message_wait_timeout: message_wait_timeout, + consumer_group_name, + pipeline_size, + pipeline_max_time, + }) + } +} + +#[async_trait] +impl MessageStreamer for RedisPoolMessenger { + fn messenger_type(&self) -> MessengerType { + MessengerType::RedisPool + } + + async fn add_stream(&mut self, stream_key: &'static str) -> Result<(), MessengerError> { + // Add to streams hashmap. + let _result = self.streams.insert( + stream_key, + RedisMessengerStream { + max_len: None, + local_buffer: LinkedList::new(), + local_buffer_total: 0, + local_buffer_last_flush: Instant::now(), + }, + ); + + for connection in &mut self.connections_pool { + // Add stream to Redis. + let result: RedisResult<()> = connection + .xgroup_create_mkstream(stream_key, self.consumer_group_name.as_str(), "$") + .await; + + if let Err(e) = result { + info!("Group already exists: {:?}", e) + } + } + + Ok(()) + } + + async fn set_buffer_size(&mut self, stream_key: &'static str, max_buffer_size: usize) { + // Set max length for the stream. + if let Some(stream) = self.streams.get_mut(stream_key) { + stream.max_len = Some(StreamMaxlen::Approx(max_buffer_size)); + } else { + error!("Stream key {stream_key} not configured"); + } + } + + async fn send(&mut self, stream_key: &'static str, bytes: &[u8]) -> Result<(), MessengerError> { + // Check if stream is configured. + let stream = if let Some(stream) = self.streams.get_mut(stream_key) { + stream + } else { + error!("Cannot send data for stream key {stream_key}, it is not configured"); + return Ok(()); + }; + + // Get max length for the stream. + let maxlen = if let Some(maxlen) = stream.max_len { + maxlen + } else { + error!("Cannot send data for stream key {stream_key}, buffer size not set."); + return Ok(()); + }; + stream.local_buffer.push_back(bytes.to_vec()); + stream.local_buffer_total += bytes.len(); + // Put serialized data into Redis. + if stream.local_buffer_total < self.pipeline_size + && stream.local_buffer_last_flush.elapsed() + <= Duration::from_millis(self.pipeline_max_time as u64) + { + debug!( + "Redis local buffer bytes {} and message pipeline size {} elapsed time {}ms", + stream.local_buffer_total, + stream.local_buffer.len(), + stream.local_buffer_last_flush.elapsed().as_millis() + ); + return Ok(()); + } else { + let mut pipe = redis::pipe(); + pipe.atomic(); + for bytes in stream.local_buffer.iter() { + pipe.xadd_maxlen(stream_key, maxlen, "*", &[(DATA_KEY, &bytes)]); + } + + let mut tasks = JoinSet::new(); + + for connection in &self.connections_pool { + let mut connection = connection.clone(); + let pipe = pipe.clone(); + tasks.spawn(async move { + let result: Result, redis::RedisError> = + pipe.query_async(&mut connection).await; + if let Err(e) = result { + error!("Redis send error: {e}"); + return Err(MessengerError::SendError { msg: e.to_string() }); + } + + Ok(()) + }); + } + + while let Some(task) = tasks.join_next().await { + match task { + Ok(_) => { + debug!("One of the message send tasks was finished") + } + Err(err) if err.is_panic() => { + let msg = err.to_string(); + error!("Task panic during sending message to Redis: {:?}", err); + return Err(MessengerError::SendError { msg }); + } + Err(err) => { + let msg = err.to_string(); + return Err(MessengerError::SendError { msg }); + } + } + } + + debug!("Data Sent to {}", stream_key); + stream.local_buffer.clear(); + stream.local_buffer_total = 0; + stream.local_buffer_last_flush = Instant::now(); + } + Ok(()) + } +} diff --git a/rust-toolchain.toml b/rust-toolchain.toml index f4ec842..4dd8e5c 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,2 +1,2 @@ [toolchain] -channel = "1.73.0" \ No newline at end of file +channel = "1.75.0" \ No newline at end of file From 81dc0301f74d53712ba26cba6a0ba82d944932fe Mon Sep 17 00:00:00 2001 From: n00m4d Date: Mon, 14 Oct 2024 12:06:23 +0200 Subject: [PATCH 10/11] chore: change error message --- plerkle_messenger/src/plerkle_messenger.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plerkle_messenger/src/plerkle_messenger.rs b/plerkle_messenger/src/plerkle_messenger.rs index cec4b74..2a26969 100644 --- a/plerkle_messenger/src/plerkle_messenger.rs +++ b/plerkle_messenger/src/plerkle_messenger.rs @@ -92,7 +92,7 @@ pub async fn select_messenger_read( RedisMessenger::new(config).await.map(|a| Box::new(a) as Box) } _ => Err(MessengerError::ConfigurationError { - msg: "This Messenger type is not valid, unimplemented or you don't have the right crate features on.".to_string() + msg: "This Messenger type is not valid or not unimplemented.".to_string() }) } } From 4e41a9b6c3ce0a387fc935c886d00a1d458eda04 Mon Sep 17 00:00:00 2001 From: n00m4d Date: Thu, 17 Oct 2024 11:56:26 +0200 Subject: [PATCH 11/11] chore: drop return --- plerkle_messenger/src/redis/redis_pool_messenger.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/plerkle_messenger/src/redis/redis_pool_messenger.rs b/plerkle_messenger/src/redis/redis_pool_messenger.rs index 531c9c4..68c0c54 100644 --- a/plerkle_messenger/src/redis/redis_pool_messenger.rs +++ b/plerkle_messenger/src/redis/redis_pool_messenger.rs @@ -155,7 +155,6 @@ impl MessageStreamer for RedisPoolMessenger { stream.local_buffer.len(), stream.local_buffer_last_flush.elapsed().as_millis() ); - return Ok(()); } else { let mut pipe = redis::pipe(); pipe.atomic();