diff --git a/Cargo.lock b/Cargo.lock index 3a25374..dd70cad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2024,6 +2024,7 @@ dependencies = [ "ethrex-rlp", "hex", "libp2p", + "rand 0.8.5", "sha2", "snap", "ssz_types", @@ -2749,9 +2750,6 @@ name = "hashbrown" version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" -dependencies = [ - "ahash", -] [[package]] name = "hashbrown" @@ -2776,15 +2774,6 @@ dependencies = [ "serde_core", ] -[[package]] -name = "hashlink" -version = "0.9.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af" -dependencies = [ - "hashbrown 0.14.5", -] - [[package]] name = "hashlink" version = "0.10.0" @@ -3517,9 +3506,8 @@ checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de" [[package]] name = "libp2p" -version = "0.56.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce71348bf5838e46449ae240631117b487073d5f347c06d434caddcb91dceb5a" +version = "0.56.1" +source = "git+https://github.com/lambdaclass/rust-libp2p.git?rev=cd6cc3b1e5db2c5e23e133c2201c23b063fc4895#cd6cc3b1e5db2c5e23e133c2201c23b063fc4895" dependencies = [ "bytes", "either", @@ -3567,8 +3555,7 @@ dependencies = [ [[package]] name = "libp2p-allow-block-list" version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d16ccf824ee859ca83df301e1c0205270206223fd4b1f2e512a693e1912a8f4a" +source = "git+https://github.com/lambdaclass/rust-libp2p.git?rev=cd6cc3b1e5db2c5e23e133c2201c23b063fc4895#cd6cc3b1e5db2c5e23e133c2201c23b063fc4895" dependencies = [ "libp2p-core", "libp2p-identity", @@ -3578,8 +3565,7 @@ dependencies = [ [[package]] name = "libp2p-autonat" version = "0.15.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fab5e25c49a7d48dac83d95d8f3bac0a290d8a5df717012f6e34ce9886396c0b" +source = "git+https://github.com/lambdaclass/rust-libp2p.git?rev=cd6cc3b1e5db2c5e23e133c2201c23b063fc4895#cd6cc3b1e5db2c5e23e133c2201c23b063fc4895" dependencies = [ "async-trait", "asynchronous-codec", @@ -3603,8 +3589,7 @@ dependencies = [ [[package]] name = "libp2p-connection-limits" version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a18b8b607cf3bfa2f8c57db9c7d8569a315d5cc0a282e6bfd5ebfc0a9840b2a0" +source = "git+https://github.com/lambdaclass/rust-libp2p.git?rev=cd6cc3b1e5db2c5e23e133c2201c23b063fc4895#cd6cc3b1e5db2c5e23e133c2201c23b063fc4895" dependencies = [ "libp2p-core", "libp2p-identity", @@ -3614,8 +3599,7 @@ dependencies = [ [[package]] name = "libp2p-core" version = "0.43.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "249128cd37a2199aff30a7675dffa51caf073b51aa612d2f544b19932b9aebca" +source = "git+https://github.com/lambdaclass/rust-libp2p.git?rev=cd6cc3b1e5db2c5e23e133c2201c23b063fc4895#cd6cc3b1e5db2c5e23e133c2201c23b063fc4895" dependencies = [ "either", "fnv", @@ -3632,22 +3616,21 @@ dependencies = [ "rw-stream-sink", "thiserror 2.0.17", "tracing", - "unsigned-varint 0.8.0", + "unsigned-varint", "web-time", ] [[package]] name = "libp2p-dcutr" version = "0.14.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b4107305e12158af3e66960b6181789c547394c9c9a8696f721521602bfc73a" +source = "git+https://github.com/lambdaclass/rust-libp2p.git?rev=cd6cc3b1e5db2c5e23e133c2201c23b063fc4895#cd6cc3b1e5db2c5e23e133c2201c23b063fc4895" dependencies = [ "asynchronous-codec", "either", "futures", "futures-bounded", "futures-timer", - "hashlink 0.10.0", + "hashlink", "libp2p-core", "libp2p-identity", "libp2p-swarm", @@ -3661,8 +3644,7 @@ dependencies = [ [[package]] name = "libp2p-dns" version = "0.44.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b770c1c8476736ca98c578cba4b505104ff8e842c2876b528925f9766379f9a" +source = "git+https://github.com/lambdaclass/rust-libp2p.git?rev=cd6cc3b1e5db2c5e23e133c2201c23b063fc4895#cd6cc3b1e5db2c5e23e133c2201c23b063fc4895" dependencies = [ "async-trait", "futures", @@ -3677,8 +3659,7 @@ dependencies = [ [[package]] name = "libp2p-floodsub" version = "0.47.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0914997f56315c83bc64ffb721cd4e764ad819370582db287232c5791469697" +source = "git+https://github.com/lambdaclass/rust-libp2p.git?rev=cd6cc3b1e5db2c5e23e133c2201c23b063fc4895#cd6cc3b1e5db2c5e23e133c2201c23b063fc4895" dependencies = [ "asynchronous-codec", "bytes", @@ -3698,9 +3679,8 @@ dependencies = [ [[package]] name = "libp2p-gossipsub" -version = "0.49.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7f58e37d8d6848e5c4c9e3c35c6f61133235bff2960c9c00a663b0849301221" +version = "0.50.0" +source = "git+https://github.com/lambdaclass/rust-libp2p.git?rev=cd6cc3b1e5db2c5e23e133c2201c23b063fc4895#cd6cc3b1e5db2c5e23e133c2201c23b063fc4895" dependencies = [ "async-channel", "asynchronous-codec", @@ -3712,11 +3692,12 @@ dependencies = [ "futures", "futures-timer", "getrandom 0.2.16", - "hashlink 0.9.1", + "hashlink", "hex_fmt", "libp2p-core", "libp2p-identity", "libp2p-swarm", + "prometheus-client", "quick-protobuf", "quick-protobuf-codec", "rand 0.8.5", @@ -3730,8 +3711,7 @@ dependencies = [ [[package]] name = "libp2p-identify" version = "0.47.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ab792a8b68fdef443a62155b01970c81c3aadab5e659621b063ef252a8e65e8" +source = "git+https://github.com/lambdaclass/rust-libp2p.git?rev=cd6cc3b1e5db2c5e23e133c2201c23b063fc4895#cd6cc3b1e5db2c5e23e133c2201c23b063fc4895" dependencies = [ "asynchronous-codec", "either", @@ -3774,9 +3754,8 @@ dependencies = [ [[package]] name = "libp2p-kad" -version = "0.48.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13d3fd632a5872ec804d37e7413ceea20588f69d027a0fa3c46f82574f4dee60" +version = "0.49.0" +source = "git+https://github.com/lambdaclass/rust-libp2p.git?rev=cd6cc3b1e5db2c5e23e133c2201c23b063fc4895#cd6cc3b1e5db2c5e23e133c2201c23b063fc4895" dependencies = [ "asynchronous-codec", "bytes", @@ -3803,8 +3782,7 @@ dependencies = [ [[package]] name = "libp2p-mdns" version = "0.48.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c66872d0f1ffcded2788683f76931be1c52e27f343edb93bc6d0bcd8887be443" +source = "git+https://github.com/lambdaclass/rust-libp2p.git?rev=cd6cc3b1e5db2c5e23e133c2201c23b063fc4895#cd6cc3b1e5db2c5e23e133c2201c23b063fc4895" dependencies = [ "futures", "hickory-proto", @@ -3814,7 +3792,7 @@ dependencies = [ "libp2p-swarm", "rand 0.8.5", "smallvec", - "socket2 0.5.10", + "socket2 0.6.1", "tokio", "tracing", ] @@ -3822,8 +3800,7 @@ dependencies = [ [[package]] name = "libp2p-memory-connection-limits" version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9d052a767edd0235d5c29dacf46013955eabce1085781ce0d12a4fc66bf87cd" +source = "git+https://github.com/lambdaclass/rust-libp2p.git?rev=cd6cc3b1e5db2c5e23e133c2201c23b063fc4895#cd6cc3b1e5db2c5e23e133c2201c23b063fc4895" dependencies = [ "libp2p-core", "libp2p-identity", @@ -3835,9 +3812,8 @@ dependencies = [ [[package]] name = "libp2p-metrics" -version = "0.17.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "805a555148522cb3414493a5153451910cb1a146c53ffbf4385708349baf62b7" +version = "0.17.1" +source = "git+https://github.com/lambdaclass/rust-libp2p.git?rev=cd6cc3b1e5db2c5e23e133c2201c23b063fc4895#cd6cc3b1e5db2c5e23e133c2201c23b063fc4895" dependencies = [ "futures", "libp2p-core", @@ -3857,8 +3833,7 @@ dependencies = [ [[package]] name = "libp2p-noise" version = "0.46.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc73eacbe6462a0eb92a6527cac6e63f02026e5407f8831bde8293f19217bfbf" +source = "git+https://github.com/lambdaclass/rust-libp2p.git?rev=cd6cc3b1e5db2c5e23e133c2201c23b063fc4895#cd6cc3b1e5db2c5e23e133c2201c23b063fc4895" dependencies = [ "asynchronous-codec", "bytes", @@ -3880,8 +3855,7 @@ dependencies = [ [[package]] name = "libp2p-ping" version = "0.47.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74bb7fcdfd9fead4144a3859da0b49576f171a8c8c7c0bfc7c541921d25e60d3" +source = "git+https://github.com/lambdaclass/rust-libp2p.git?rev=cd6cc3b1e5db2c5e23e133c2201c23b063fc4895#cd6cc3b1e5db2c5e23e133c2201c23b063fc4895" dependencies = [ "futures", "futures-timer", @@ -3896,8 +3870,7 @@ dependencies = [ [[package]] name = "libp2p-plaintext" version = "0.43.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e659439578fc6d305da8303834beb9d62f155f40e7f5b9d81c9f2b2c69d1926" +source = "git+https://github.com/lambdaclass/rust-libp2p.git?rev=cd6cc3b1e5db2c5e23e133c2201c23b063fc4895#cd6cc3b1e5db2c5e23e133c2201c23b063fc4895" dependencies = [ "asynchronous-codec", "bytes", @@ -3912,8 +3885,7 @@ dependencies = [ [[package]] name = "libp2p-pnet" version = "0.26.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf240b834dfa3f8b48feb2c4b87bb2cf82751543001b6ee86077f48183b18d52" +source = "git+https://github.com/lambdaclass/rust-libp2p.git?rev=cd6cc3b1e5db2c5e23e133c2201c23b063fc4895#cd6cc3b1e5db2c5e23e133c2201c23b063fc4895" dependencies = [ "futures", "pin-project", @@ -3926,8 +3898,7 @@ dependencies = [ [[package]] name = "libp2p-quic" version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8dc448b2de9f4745784e3751fe8bc6c473d01b8317edd5ababcb0dec803d843f" +source = "git+https://github.com/lambdaclass/rust-libp2p.git?rev=cd6cc3b1e5db2c5e23e133c2201c23b063fc4895#cd6cc3b1e5db2c5e23e133c2201c23b063fc4895" dependencies = [ "futures", "futures-timer", @@ -3939,7 +3910,7 @@ dependencies = [ "rand 0.8.5", "ring", "rustls", - "socket2 0.5.10", + "socket2 0.6.1", "thiserror 2.0.17", "tokio", "tracing", @@ -3948,8 +3919,7 @@ dependencies = [ [[package]] name = "libp2p-relay" version = "0.21.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8b9b0392ed623243ad298326b9f806d51191829ac7585cc825c54c6c67b04d9" +source = "git+https://github.com/lambdaclass/rust-libp2p.git?rev=cd6cc3b1e5db2c5e23e133c2201c23b063fc4895#cd6cc3b1e5db2c5e23e133c2201c23b063fc4895" dependencies = [ "asynchronous-codec", "bytes", @@ -3972,8 +3942,7 @@ dependencies = [ [[package]] name = "libp2p-rendezvous" version = "0.17.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15285d828c2b4a34cb660c2e74cd6938116daceab1f4357bae933d5b08cca933" +source = "git+https://github.com/lambdaclass/rust-libp2p.git?rev=cd6cc3b1e5db2c5e23e133c2201c23b063fc4895#cd6cc3b1e5db2c5e23e133c2201c23b063fc4895" dependencies = [ "async-trait", "asynchronous-codec", @@ -3995,8 +3964,7 @@ dependencies = [ [[package]] name = "libp2p-request-response" version = "0.29.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9f1cca83488b90102abac7b67d5c36fc65bc02ed47620228af7ed002e6a1478" +source = "git+https://github.com/lambdaclass/rust-libp2p.git?rev=cd6cc3b1e5db2c5e23e133c2201c23b063fc4895#cd6cc3b1e5db2c5e23e133c2201c23b063fc4895" dependencies = [ "async-trait", "cbor4ii", @@ -4015,15 +3983,14 @@ dependencies = [ [[package]] name = "libp2p-swarm" version = "0.47.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce88c6c4bf746c8482480345ea3edfd08301f49e026889d1cbccfa1808a9ed9e" +source = "git+https://github.com/lambdaclass/rust-libp2p.git?rev=cd6cc3b1e5db2c5e23e133c2201c23b063fc4895#cd6cc3b1e5db2c5e23e133c2201c23b063fc4895" dependencies = [ "either", "fnv", "futures", "futures-timer", "getrandom 0.2.16", - "hashlink 0.10.0", + "hashlink", "libp2p-core", "libp2p-identity", "libp2p-swarm-derive", @@ -4039,8 +4006,7 @@ dependencies = [ [[package]] name = "libp2p-swarm-derive" version = "0.35.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd297cf53f0cb3dee4d2620bb319ae47ef27c702684309f682bdb7e55a18ae9c" +source = "git+https://github.com/lambdaclass/rust-libp2p.git?rev=cd6cc3b1e5db2c5e23e133c2201c23b063fc4895#cd6cc3b1e5db2c5e23e133c2201c23b063fc4895" dependencies = [ "heck", "quote", @@ -4050,8 +4016,7 @@ dependencies = [ [[package]] name = "libp2p-tcp" version = "0.44.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb6585b9309699f58704ec9ab0bb102eca7a3777170fa91a8678d73ca9cafa93" +source = "git+https://github.com/lambdaclass/rust-libp2p.git?rev=cd6cc3b1e5db2c5e23e133c2201c23b063fc4895#cd6cc3b1e5db2c5e23e133c2201c23b063fc4895" dependencies = [ "futures", "futures-timer", @@ -4066,8 +4031,7 @@ dependencies = [ [[package]] name = "libp2p-tls" version = "0.6.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96ff65a82e35375cbc31ebb99cacbbf28cb6c4fefe26bf13756ddcf708d40080" +source = "git+https://github.com/lambdaclass/rust-libp2p.git?rev=cd6cc3b1e5db2c5e23e133c2201c23b063fc4895#cd6cc3b1e5db2c5e23e133c2201c23b063fc4895" dependencies = [ "futures", "futures-rustls", @@ -4084,9 +4048,8 @@ dependencies = [ [[package]] name = "libp2p-uds" -version = "0.43.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0413aa7a1cc51c409358186a46a198ad9195a782dae6b9a95ea3acf5db67569d" +version = "0.43.1" +source = "git+https://github.com/lambdaclass/rust-libp2p.git?rev=cd6cc3b1e5db2c5e23e133c2201c23b063fc4895#cd6cc3b1e5db2c5e23e133c2201c23b063fc4895" dependencies = [ "futures", "libp2p-core", @@ -4095,9 +4058,8 @@ dependencies = [ [[package]] name = "libp2p-upnp" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4757e65fe69399c1a243bbb90ec1ae5a2114b907467bf09f3575e899815bb8d3" +version = "0.6.0" +source = "git+https://github.com/lambdaclass/rust-libp2p.git?rev=cd6cc3b1e5db2c5e23e133c2201c23b063fc4895#cd6cc3b1e5db2c5e23e133c2201c23b063fc4895" dependencies = [ "futures", "futures-timer", @@ -4111,8 +4073,7 @@ dependencies = [ [[package]] name = "libp2p-webrtc-utils" version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "490abff5ee5f9a7a77f0145c79cc97c76941231a3626f4dee18ebf2abb95618f" +source = "git+https://github.com/lambdaclass/rust-libp2p.git?rev=cd6cc3b1e5db2c5e23e133c2201c23b063fc4895#cd6cc3b1e5db2c5e23e133c2201c23b063fc4895" dependencies = [ "asynchronous-codec", "bytes", @@ -4133,8 +4094,7 @@ dependencies = [ [[package]] name = "libp2p-webrtc-websys" version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3830f0bf6f0f16ded2c735599fe70baea43a8c1a2d76152216693329217301dd" +source = "git+https://github.com/lambdaclass/rust-libp2p.git?rev=cd6cc3b1e5db2c5e23e133c2201c23b063fc4895#cd6cc3b1e5db2c5e23e133c2201c23b063fc4895" dependencies = [ "bytes", "futures", @@ -4154,9 +4114,8 @@ dependencies = [ [[package]] name = "libp2p-websocket" -version = "0.45.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "520e29066a48674c007bc11defe5dce49908c24cafd8fad2f5e1a6a8726ced53" +version = "0.45.2" +source = "git+https://github.com/lambdaclass/rust-libp2p.git?rev=cd6cc3b1e5db2c5e23e133c2201c23b063fc4895#cd6cc3b1e5db2c5e23e133c2201c23b063fc4895" dependencies = [ "either", "futures", @@ -4176,8 +4135,7 @@ dependencies = [ [[package]] name = "libp2p-websocket-websys" version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e73d85b4dc8c2044f58508461bd8bb12f541217c0038ade8cce0ddc1607b8f72" +source = "git+https://github.com/lambdaclass/rust-libp2p.git?rev=cd6cc3b1e5db2c5e23e133c2201c23b063fc4895#cd6cc3b1e5db2c5e23e133c2201c23b063fc4895" dependencies = [ "bytes", "futures", @@ -4193,8 +4151,7 @@ dependencies = [ [[package]] name = "libp2p-webtransport-websys" version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34bc528d7fa278e1324a88978114a610deaa9e75c8e2230cd868321c512b3f43" +source = "git+https://github.com/lambdaclass/rust-libp2p.git?rev=cd6cc3b1e5db2c5e23e133c2201c23b063fc4895#cd6cc3b1e5db2c5e23e133c2201c23b063fc4895" dependencies = [ "futures", "js-sys", @@ -4214,8 +4171,7 @@ dependencies = [ [[package]] name = "libp2p-yamux" version = "0.47.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f15df094914eb4af272acf9adaa9e287baa269943f32ea348ba29cfb9bfc60d8" +source = "git+https://github.com/lambdaclass/rust-libp2p.git?rev=cd6cc3b1e5db2c5e23e133c2201c23b063fc4895#cd6cc3b1e5db2c5e23e133c2201c23b063fc4895" dependencies = [ "either", "futures", @@ -4490,7 +4446,7 @@ dependencies = [ "percent-encoding", "serde", "static_assertions", - "unsigned-varint 0.8.0", + "unsigned-varint", "url", ] @@ -4514,7 +4470,7 @@ checksum = "6b430e7953c29dd6a09afc29ff0bb69c6e306329ee6794700aee27b76a1aea8d" dependencies = [ "core2", "serde", - "unsigned-varint 0.8.0", + "unsigned-varint", ] [[package]] @@ -4536,15 +4492,14 @@ dependencies = [ [[package]] name = "multistream-select" version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea0df8e5eec2298a62b326ee4f0d7fe1a6b90a09dfcf9df37b38f947a8c42f19" +source = "git+https://github.com/lambdaclass/rust-libp2p.git?rev=cd6cc3b1e5db2c5e23e133c2201c23b063fc4895#cd6cc3b1e5db2c5e23e133c2201c23b063fc4895" dependencies = [ "bytes", "futures", - "log", "pin-project", "smallvec", - "unsigned-varint 0.7.2", + "tracing", + "unsigned-varint", ] [[package]] @@ -5594,9 +5549,9 @@ dependencies = [ [[package]] name = "prometheus-client" -version = "0.23.1" +version = "0.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf41c1a7c32ed72abe5082fb19505b969095c12da9f5732a4bc9878757fd087c" +checksum = "e4500adecd7af8e0e9f4dbce15cfee07ce913fbf6ad605cc468b83f2d531ee94" dependencies = [ "dtoa", "itoa", @@ -5606,9 +5561,9 @@ dependencies = [ [[package]] name = "prometheus-client-derive-encode" -version = "0.4.2" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "440f724eba9f6996b75d63681b0a92b06947f1457076d503a4d2e2c8f56442b8" +checksum = "9adf1691c04c0a5ff46ff8f262b58beb07b0dbb61f96f9f54f6cbd82106ed87f" dependencies = [ "proc-macro2", "quote", @@ -5701,14 +5656,13 @@ dependencies = [ [[package]] name = "quick-protobuf-codec" version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15a0580ab32b169745d7a39db2ba969226ca16738931be152a3209b409de2474" +source = "git+https://github.com/lambdaclass/rust-libp2p.git?rev=cd6cc3b1e5db2c5e23e133c2201c23b063fc4895#cd6cc3b1e5db2c5e23e133c2201c23b063fc4895" dependencies = [ "asynchronous-codec", "bytes", "quick-protobuf", - "thiserror 1.0.69", - "unsigned-varint 0.8.0", + "thiserror 2.0.17", + "unsigned-varint", ] [[package]] @@ -6276,8 +6230,7 @@ dependencies = [ [[package]] name = "rw-stream-sink" version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8c9026ff5d2f23da5e45bbc283f156383001bfb09c4e44256d02c1a685fe9a1" +source = "git+https://github.com/lambdaclass/rust-libp2p.git?rev=cd6cc3b1e5db2c5e23e133c2201c23b063fc4895#cd6cc3b1e5db2c5e23e133c2201c23b063fc4895" dependencies = [ "futures", "pin-project", @@ -7380,12 +7333,6 @@ version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861" -[[package]] -name = "unsigned-varint" -version = "0.7.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6889a77d49f1f013504cec6bf97a2c730394adedaeb1deb5ea08949a50541105" - [[package]] name = "unsigned-varint" version = "0.8.0" diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 9f35821..3fe6eb4 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -3,6 +3,7 @@ use std::time::{Duration, SystemTime}; use ethlambda_state_transition::is_proposer; use ethlambda_storage::Store; +use ethlambda_types::primitives::H256; use ethlambda_types::{ attestation::{Attestation, AttestationData, SignedAttestation}, block::{BlockSignatures, BlockWithAttestation, SignedBlockWithAttestation}, @@ -14,7 +15,7 @@ use spawned_concurrency::tasks::{ CallResponse, CastResponse, GenServer, GenServerHandle, send_after, }; use tokio::sync::mpsc; -use tracing::{error, info, warn}; +use tracing::{error, info, trace, warn}; use crate::store::StoreError; @@ -22,13 +23,15 @@ pub mod key_manager; pub mod metrics; pub mod store; -/// Messages sent from the blockchain to the P2P layer for publishing. +/// Messages sent from the blockchain to the P2P layer. #[derive(Clone, Debug)] -pub enum OutboundGossip { +pub enum P2PMessage { /// Publish an attestation to the gossip network. PublishAttestation(SignedAttestation), /// Publish a block to the gossip network. PublishBlock(SignedBlockWithAttestation), + /// Fetch a block by its root hash. + FetchBlock(H256), } pub struct BlockChain { @@ -41,7 +44,7 @@ pub const SECONDS_PER_SLOT: u64 = 4; impl BlockChain { pub fn spawn( store: Store, - p2p_tx: mpsc::UnboundedSender, + p2p_tx: mpsc::UnboundedSender, validator_keys: HashMap, ) -> BlockChain { let genesis_time = store.config().genesis_time; @@ -50,6 +53,7 @@ impl BlockChain { store, p2p_tx, key_manager, + pending_blocks: HashMap::new(), } .start(); let time_until_genesis = (SystemTime::UNIX_EPOCH + Duration::from_secs(genesis_time)) @@ -84,8 +88,11 @@ impl BlockChain { struct BlockChainServer { store: Store, - p2p_tx: mpsc::UnboundedSender, + p2p_tx: mpsc::UnboundedSender, key_manager: key_manager::KeyManager, + + // Pending blocks waiting for their parent + pending_blocks: HashMap>, } impl BlockChainServer { @@ -173,7 +180,7 @@ impl BlockChainServer { // Publish to gossip network let Ok(_) = self .p2p_tx - .send(OutboundGossip::PublishAttestation(signed_attestation)) + .send(P2PMessage::PublishAttestation(signed_attestation)) .inspect_err( |err| error!(%slot, %validator_id, %err, "Failed to publish attestation"), ) @@ -244,7 +251,7 @@ impl BlockChainServer { // Publish to gossip network let Ok(()) = self .p2p_tx - .send(OutboundGossip::PublishBlock(signed_block)) + .send(P2PMessage::PublishBlock(signed_block)) .inspect_err(|err| error!(%slot, %validator_id, %err, "Failed to publish block")) else { return; @@ -268,8 +275,60 @@ impl BlockChainServer { fn on_block(&mut self, signed_block: SignedBlockWithAttestation) { let slot = signed_block.message.block.slot; - if let Err(err) = self.process_block(signed_block) { - warn!(%slot, %err, "Failed to process block"); + let block_root = signed_block.message.block.tree_hash_root(); + let parent_root = signed_block.message.block.parent_root; + + // Check if parent block exists before attempting to process + if !self.store.contains_block(&parent_root) { + info!(%slot, %parent_root, %block_root, "Block parent missing, storing as pending"); + + // Store block for later processing + self.pending_blocks + .entry(parent_root) + .or_default() + .push(signed_block); + + // Request missing parent from network + self.request_missing_block(parent_root); + return; + } + + // Parent exists, proceed with processing + match self.process_block(signed_block) { + Ok(_) => { + info!(%slot, "Block processed successfully"); + + // Check if any pending blocks can now be processed + self.process_pending_children(block_root); + } + Err(err) => { + warn!(%slot, %err, "Failed to process block"); + } + } + } + + fn request_missing_block(&mut self, block_root: H256) { + // Send request to P2P layer (deduplication handled by P2P module) + if let Err(err) = self.p2p_tx.send(P2PMessage::FetchBlock(block_root)) { + error!(%block_root, %err, "Failed to send FetchBlock message to P2P"); + } else { + info!(%block_root, "Requested missing block from network"); + } + } + + fn process_pending_children(&mut self, parent_root: H256) { + // Remove and process all blocks that were waiting for this parent + if let Some(children) = self.pending_blocks.remove(&parent_root) { + info!(%parent_root, num_children=%children.len(), + "Processing pending blocks after parent arrival"); + + for child_block in children { + let slot = child_block.message.block.slot; + trace!(%parent_root, %slot, "Processing pending child block"); + + // Process recursively - might unblock more descendants + self.on_block(child_block); + } } } diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 71d280f..8b28e37 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -299,8 +299,9 @@ pub fn on_block( return Ok(()); } - // Verify parent chain is available - // TODO: sync parent chain if parent is missing + // Verify parent state is available + // Note: Parent block existence is checked by the caller before calling this function. + // This check ensures the state has been computed for the parent block. let parent_state = store .get_state(&block.parent_root) diff --git a/crates/net/p2p/Cargo.toml b/crates/net/p2p/Cargo.toml index 6ed7e3a..9c26e34 100644 --- a/crates/net/p2p/Cargo.toml +++ b/crates/net/p2p/Cargo.toml @@ -17,13 +17,18 @@ ethlambda-types.workspace = true async-trait = "0.1" -libp2p = { version = "0.56", features = ["full"] } +# Fork with request-response feature for outbound protocol selection +libp2p = { git = "https://github.com/lambdaclass/rust-libp2p.git", rev = "cd6cc3b1e5db2c5e23e133c2201c23b063fc4895", features = [ + "full", +] } snap = "1.1" tokio.workspace = true tracing.workspace = true +rand = "0.8" + # Required for NodeEnr parsing ethrex-p2p = { git = "https://github.com/lambdaclass/ethrex", rev = "1af63a4de7c93eb7413b9b003df1be82e1484c69" } ethrex-rlp = { git = "https://github.com/lambdaclass/ethrex", rev = "1af63a4de7c93eb7413b9b003df1be82e1484c69" } diff --git a/crates/net/p2p/src/gossipsub.rs b/crates/net/p2p/src/gossipsub.rs deleted file mode 100644 index aa2651e..0000000 --- a/crates/net/p2p/src/gossipsub.rs +++ /dev/null @@ -1,91 +0,0 @@ -use ethlambda_blockchain::BlockChain; -use ethlambda_types::{attestation::SignedAttestation, block::SignedBlockWithAttestation}; -use libp2p::gossipsub::Event; -use ssz::Decode; -use tracing::{error, info, trace}; - -/// Topic kind for block gossip -pub const BLOCK_TOPIC_KIND: &str = "block"; -/// Topic kind for attestation gossip -pub const ATTESTATION_TOPIC_KIND: &str = "attestation"; - -pub async fn handle_gossipsub_message(blockchain: &mut BlockChain, event: Event) { - let Event::Message { - propagation_source: _, - message_id: _, - message, - } = event - else { - unreachable!("we already matched on event_loop"); - }; - match message.topic.as_str().split("/").nth(3) { - Some(BLOCK_TOPIC_KIND) => { - let Ok(uncompressed_data) = decompress_message(&message.data) - .inspect_err(|err| error!(%err, "Failed to decompress gossipped block")) - else { - return; - }; - - let Ok(signed_block) = SignedBlockWithAttestation::from_ssz_bytes(&uncompressed_data) - .inspect_err(|err| error!(?err, "Failed to decode gossipped block")) - else { - return; - }; - let slot = signed_block.message.block.slot; - info!(%slot, "Received new block from gossipsub, sending for processing"); - blockchain.notify_new_block(signed_block).await; - } - Some(ATTESTATION_TOPIC_KIND) => { - let Ok(uncompressed_data) = decompress_message(&message.data) - .inspect_err(|err| error!(%err, "Failed to decompress gossipped attestation")) - else { - return; - }; - - let Ok(signed_attestation) = SignedAttestation::from_ssz_bytes(&uncompressed_data) - .inspect_err(|err| error!(?err, "Failed to decode gossipped attestation")) - else { - return; - }; - let slot = signed_attestation.message.slot; - let validator = signed_attestation.validator_id; - info!(%slot, %validator, "Received new attestation from gossipsub, sending for processing"); - blockchain.notify_new_attestation(signed_attestation).await; - } - _ => { - trace!("Received message on unknown topic: {}", message.topic); - } - } -} - -fn decompress_message(data: &[u8]) -> snap::Result> { - let uncompressed_size = snap::raw::decompress_len(data)?; - let mut uncompressed_data = vec![0u8; uncompressed_size]; - snap::raw::Decoder::new().decompress(data, &mut uncompressed_data)?; - Ok(uncompressed_data) -} - -/// Compress data using raw snappy format (for gossipsub messages). -pub fn compress_message(data: &[u8]) -> Vec { - let max_compressed_len = snap::raw::max_compress_len(data.len()); - let mut compressed = vec![0u8; max_compressed_len]; - let compressed_len = snap::raw::Encoder::new() - .compress(data, &mut compressed) - .expect("snappy compression should not fail"); - compressed.truncate(compressed_len); - compressed -} - -#[cfg(test)] -mod tests { - use ethlambda_types::block::SignedBlockWithAttestation; - use ssz::Decode; - - #[test] - #[ignore = "Test data uses old BlockSignatures field order (proposer_signature, attestation_signatures). Needs regeneration with correct order (attestation_signatures, proposer_signature)."] - fn test_decode_block() { - // Sample uncompressed block sent by Zeam (commit b153373806aa49f65aadc47c41b68ead4fab7d6e) - let block_bytes = include_bytes!("../test_data/signed_block_with_attestation.ssz"); - let _block = SignedBlockWithAttestation::from_ssz_bytes(block_bytes).unwrap(); - } -} diff --git a/crates/net/p2p/src/gossipsub/encoding.rs b/crates/net/p2p/src/gossipsub/encoding.rs new file mode 100644 index 0000000..07d20a4 --- /dev/null +++ b/crates/net/p2p/src/gossipsub/encoding.rs @@ -0,0 +1,32 @@ +/// Decompress data using raw snappy format (for gossipsub messages). +pub fn decompress_message(data: &[u8]) -> snap::Result> { + let uncompressed_size = snap::raw::decompress_len(data)?; + let mut uncompressed_data = vec![0u8; uncompressed_size]; + snap::raw::Decoder::new().decompress(data, &mut uncompressed_data)?; + Ok(uncompressed_data) +} + +/// Compress data using raw snappy format (for gossipsub messages). +pub fn compress_message(data: &[u8]) -> Vec { + let max_compressed_len = snap::raw::max_compress_len(data.len()); + let mut compressed = vec![0u8; max_compressed_len]; + let compressed_len = snap::raw::Encoder::new() + .compress(data, &mut compressed) + .expect("snappy compression should not fail"); + compressed.truncate(compressed_len); + compressed +} + +#[cfg(test)] +mod tests { + use ethlambda_types::block::SignedBlockWithAttestation; + use ssz::Decode; + + #[test] + #[ignore = "Test data uses old BlockSignatures field order (proposer_signature, attestation_signatures). Needs regeneration with correct order (attestation_signatures, proposer_signature)."] + fn test_decode_block() { + // Sample uncompressed block sent by Zeam (commit b153373806aa49f65aadc47c41b68ead4fab7d6e) + let block_bytes = include_bytes!("../../test_data/signed_block_with_attestation.ssz"); + let _block = SignedBlockWithAttestation::from_ssz_bytes(block_bytes).unwrap(); + } +} diff --git a/crates/net/p2p/src/gossipsub/handler.rs b/crates/net/p2p/src/gossipsub/handler.rs new file mode 100644 index 0000000..f9caedf --- /dev/null +++ b/crates/net/p2p/src/gossipsub/handler.rs @@ -0,0 +1,106 @@ +use ethlambda_types::{attestation::SignedAttestation, block::SignedBlockWithAttestation}; +use libp2p::gossipsub::Event; +use ssz::{Decode, Encode}; +use tracing::{error, info, trace}; + +use super::{ + encoding::{compress_message, decompress_message}, + messages::{ATTESTATION_TOPIC_KIND, BLOCK_TOPIC_KIND}, +}; +use crate::P2PServer; + +pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { + let Event::Message { + propagation_source: _, + message_id: _, + message, + } = event + else { + unreachable!("we already matched on event_loop"); + }; + match message.topic.as_str().split("/").nth(3) { + Some(BLOCK_TOPIC_KIND) => { + let Ok(uncompressed_data) = decompress_message(&message.data) + .inspect_err(|err| error!(%err, "Failed to decompress gossipped block")) + else { + return; + }; + + let Ok(signed_block) = SignedBlockWithAttestation::from_ssz_bytes(&uncompressed_data) + .inspect_err(|err| error!(?err, "Failed to decode gossipped block")) + else { + return; + }; + let slot = signed_block.message.block.slot; + info!(%slot, "Received new block from gossipsub, sending for processing"); + server.blockchain.notify_new_block(signed_block).await; + } + Some(ATTESTATION_TOPIC_KIND) => { + let Ok(uncompressed_data) = decompress_message(&message.data) + .inspect_err(|err| error!(%err, "Failed to decompress gossipped attestation")) + else { + return; + }; + + let Ok(signed_attestation) = SignedAttestation::from_ssz_bytes(&uncompressed_data) + .inspect_err(|err| error!(?err, "Failed to decode gossipped attestation")) + else { + return; + }; + let slot = signed_attestation.message.slot; + let validator = signed_attestation.validator_id; + info!(%slot, %validator, "Received new attestation from gossipsub, sending for processing"); + server + .blockchain + .notify_new_attestation(signed_attestation) + .await; + } + _ => { + trace!("Received message on unknown topic: {}", message.topic); + } + } +} + +pub async fn publish_attestation(server: &mut P2PServer, attestation: SignedAttestation) { + let slot = attestation.message.slot; + let validator = attestation.validator_id; + + // Encode to SSZ + let ssz_bytes = attestation.as_ssz_bytes(); + + // Compress with raw snappy + let compressed = compress_message(&ssz_bytes); + + // Publish to gossipsub + let _ = server + .swarm + .behaviour_mut() + .gossipsub + .publish(server.attestation_topic.clone(), compressed) + .inspect(|_| trace!(%slot, %validator, "Published attestation to gossipsub")) + .inspect_err(|err| { + tracing::warn!(%slot, %validator, %err, "Failed to publish attestation to gossipsub") + }); +} + +pub async fn publish_block(server: &mut P2PServer, signed_block: SignedBlockWithAttestation) { + let slot = signed_block.message.block.slot; + let proposer = signed_block.message.block.proposer_index; + + // Encode to SSZ + let ssz_bytes = signed_block.as_ssz_bytes(); + + // Compress with raw snappy + let compressed = compress_message(&ssz_bytes); + + // Publish to gossipsub + let _ = server + .swarm + .behaviour_mut() + .gossipsub + .publish(server.block_topic.clone(), compressed) + .inspect(|_| info!(%slot, %proposer, "Published block to gossipsub")) + .inspect_err( + |err| tracing::warn!(%slot, %proposer, %err, "Failed to publish block to gossipsub"), + ); +} diff --git a/crates/net/p2p/src/gossipsub/messages.rs b/crates/net/p2p/src/gossipsub/messages.rs new file mode 100644 index 0000000..7df4db1 --- /dev/null +++ b/crates/net/p2p/src/gossipsub/messages.rs @@ -0,0 +1,4 @@ +/// Topic kind for block gossip +pub const BLOCK_TOPIC_KIND: &str = "block"; +/// Topic kind for attestation gossip +pub const ATTESTATION_TOPIC_KIND: &str = "attestation"; diff --git a/crates/net/p2p/src/gossipsub/mod.rs b/crates/net/p2p/src/gossipsub/mod.rs new file mode 100644 index 0000000..a66855e --- /dev/null +++ b/crates/net/p2p/src/gossipsub/mod.rs @@ -0,0 +1,6 @@ +mod encoding; +mod handler; +mod messages; + +pub use handler::{handle_gossipsub_message, publish_attestation, publish_block}; +pub use messages::{ATTESTATION_TOPIC_KIND, BLOCK_TOPIC_KIND}; diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index ecaab66..06f77ad 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -1,11 +1,12 @@ use std::{ + collections::{HashMap, HashSet}, net::{IpAddr, SocketAddr}, time::Duration, }; -use ethlambda_blockchain::{BlockChain, OutboundGossip}; +use ethlambda_blockchain::{BlockChain, P2PMessage}; use ethlambda_storage::Store; -use ethlambda_types::state::Checkpoint; +use ethlambda_types::primitives::H256; use ethrex_common::H264; use ethrex_p2p::types::NodeRecord; use ethrex_rlp::decode::RLPDecode; @@ -15,34 +16,43 @@ use libp2p::{ gossipsub::{MessageAuthenticity, ValidationMode}, identity::{PublicKey, secp256k1}, multiaddr::Protocol, - request_response, + request_response::{self, OutboundRequestId}, swarm::{NetworkBehaviour, SwarmEvent}, }; use sha2::Digest; -use ssz::Encode; use tokio::sync::mpsc; use tracing::{info, trace, warn}; use crate::{ - gossipsub::{ATTESTATION_TOPIC_KIND, BLOCK_TOPIC_KIND}, - messages::{ - MAX_COMPRESSED_PAYLOAD_SIZE, - status::{STATUS_PROTOCOL_V1, Status}, + gossipsub::{ATTESTATION_TOPIC_KIND, BLOCK_TOPIC_KIND, publish_attestation, publish_block}, + req_resp::{ + BLOCKS_BY_ROOT_PROTOCOL_V1, Codec, MAX_COMPRESSED_PAYLOAD_SIZE, Request, + STATUS_PROTOCOL_V1, build_status, fetch_block_from_peer, }, }; mod gossipsub; -mod messages; pub mod metrics; +mod req_resp; pub use metrics::populate_name_registry; +// 10ms, 40ms, 160ms, 640ms, 2560ms +const MAX_FETCH_RETRIES: u32 = 5; +const INITIAL_BACKOFF_MS: u64 = 10; +const BACKOFF_MULTIPLIER: u64 = 4; + +pub(crate) struct PendingRequest { + pub(crate) attempts: u32, + pub(crate) last_peer: Option, +} + pub async fn start_p2p( node_key: Vec, bootnodes: Vec, listening_socket: SocketAddr, blockchain: BlockChain, - p2p_rx: mpsc::UnboundedReceiver, + p2p_rx: mpsc::UnboundedReceiver, store: Store, ) { let config = libp2p::gossipsub::ConfigBuilder::default() @@ -76,10 +86,16 @@ pub async fn start_p2p( .expect("failed to initiate behaviour"); let req_resp = request_response::Behaviour::new( - vec![( - StreamProtocol::new(STATUS_PROTOCOL_V1), - request_response::ProtocolSupport::Full, - )], + vec![ + ( + StreamProtocol::new(STATUS_PROTOCOL_V1), + request_response::ProtocolSupport::Full, + ), + ( + StreamProtocol::new(BLOCKS_BY_ROOT_PROTOCOL_V1), + request_response::ProtocolSupport::Full, + ), + ], Default::default(), ); @@ -145,204 +161,192 @@ pub async fn start_p2p( info!("P2P node started on {listening_socket}"); - event_loop( + let (retry_tx, retry_rx) = mpsc::unbounded_channel(); + + let server = P2PServer { swarm, blockchain, + store, p2p_rx, attestation_topic, block_topic, - store, - ) - .await; + connected_peers: HashSet::new(), + pending_requests: HashMap::new(), + request_id_map: HashMap::new(), + retry_tx, + retry_rx, + }; + + event_loop(server).await; } /// [libp2p Behaviour](libp2p::swarm::NetworkBehaviour) combining Gossipsub and Request-Response Behaviours #[derive(NetworkBehaviour)] -struct Behaviour { +pub(crate) struct Behaviour { gossipsub: libp2p::gossipsub::Behaviour, - req_resp: request_response::Behaviour, + req_resp: request_response::Behaviour, +} + +pub(crate) struct P2PServer { + pub(crate) swarm: libp2p::Swarm, + pub(crate) blockchain: BlockChain, + pub(crate) store: Store, + pub(crate) p2p_rx: mpsc::UnboundedReceiver, + pub(crate) attestation_topic: libp2p::gossipsub::IdentTopic, + pub(crate) block_topic: libp2p::gossipsub::IdentTopic, + pub(crate) connected_peers: HashSet, + pub(crate) pending_requests: HashMap, + pub(crate) request_id_map: HashMap, + retry_tx: mpsc::UnboundedSender, + retry_rx: mpsc::UnboundedReceiver, } /// Event loop for the P2P crate. /// Processes swarm events, incoming requests, responses, gossip, and outgoing messages from blockchain. -async fn event_loop( - mut swarm: libp2p::Swarm, - mut blockchain: BlockChain, - mut p2p_rx: mpsc::UnboundedReceiver, - attestation_topic: libp2p::gossipsub::IdentTopic, - block_topic: libp2p::gossipsub::IdentTopic, - store: Store, -) { +async fn event_loop(mut server: P2PServer) { loop { tokio::select! { biased; - message = p2p_rx.recv() => { + message = server.p2p_rx.recv() => { let Some(message) = message else { break; }; - handle_outgoing_gossip(&mut swarm, message, &attestation_topic, &block_topic).await; + handle_p2p_message(&mut server, message).await; } - event = swarm.next() => { + event = server.swarm.next() => { let Some(event) = event else { break; }; - match event { - SwarmEvent::Behaviour(BehaviourEvent::ReqResp( - message @ request_response::Event::Message { .. }, - )) => { - handle_req_resp_message(&mut swarm, message, &store).await; - } - SwarmEvent::Behaviour(BehaviourEvent::Gossipsub( - message @ libp2p::gossipsub::Event::Message { .. }, - )) => { - gossipsub::handle_gossipsub_message(&mut blockchain, message).await; - } - SwarmEvent::ConnectionEstablished { - peer_id, - endpoint, - num_established, - .. - } => { - let direction = connection_direction(&endpoint); - if num_established.get() == 1 { - metrics::notify_peer_connected(&Some(peer_id), direction, "success"); - // Send status request on first connection to this peer - let our_status = build_status(&store); - info!(%peer_id, %direction, finalized_slot=%our_status.finalized.slot, head_slot=%our_status.head.slot, "Added connection to new peer, sending status request"); - swarm.behaviour_mut().req_resp.send_request(&peer_id, our_status); - } else { - info!(%peer_id, %direction, "Added peer connection"); - } - } - SwarmEvent::ConnectionClosed { - peer_id, - endpoint, - num_established, - cause, - .. - } => { - let direction = connection_direction(&endpoint); - let reason = match cause { - None => "remote_close", - Some(err) => { - // Categorize disconnection reasons - let err_str = err.to_string().to_lowercase(); - if err_str.contains("timeout") || err_str.contains("timedout") || err_str.contains("keepalive") { - "timeout" - } else if err_str.contains("reset") || err_str.contains("connectionreset") { - "remote_close" - } else { - "error" - } - } - }; - if num_established == 0 { - metrics::notify_peer_disconnected(&Some(peer_id), direction, reason); - } - info!(%peer_id, %direction, %reason, "Peer disconnected"); - } - SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => { - let result = if error.to_string().to_lowercase().contains("timed out") { - "timeout" - } else { - "error" - }; - metrics::notify_peer_connected(&peer_id, "outbound", result); - warn!(?peer_id, %error, "Outgoing connection error"); - } - SwarmEvent::IncomingConnectionError { peer_id, error, .. } => { - metrics::notify_peer_connected(&peer_id, "inbound", "error"); - warn!(%error, "Incoming connection error"); - } - _ => { - trace!(?event, "Ignored swarm event"); - } - } + handle_swarm_event(&mut server, event).await; + } + Some(root) = server.retry_rx.recv() => { + handle_retry(&mut server, root).await; } } } } -async fn handle_outgoing_gossip( - swarm: &mut libp2p::Swarm, - message: OutboundGossip, - attestation_topic: &libp2p::gossipsub::IdentTopic, - block_topic: &libp2p::gossipsub::IdentTopic, -) { - match message { - OutboundGossip::PublishAttestation(attestation) => { - let slot = attestation.message.slot; - let validator = attestation.validator_id; - - // Encode to SSZ - let ssz_bytes = attestation.as_ssz_bytes(); - - // Compress with raw snappy - let compressed = gossipsub::compress_message(&ssz_bytes); - - // Publish to gossipsub - let _ = swarm - .behaviour_mut() - .gossipsub - .publish(attestation_topic.clone(), compressed) - .inspect(|_| trace!(%slot, %validator, "Published attestation to gossipsub")) - .inspect_err(|err| tracing::warn!(%slot, %validator, %err, "Failed to publish attestation to gossipsub")); +async fn handle_swarm_event(server: &mut P2PServer, event: SwarmEvent) { + match event { + SwarmEvent::Behaviour(BehaviourEvent::ReqResp(req_resp_event)) => { + req_resp::handle_req_resp_message(server, req_resp_event).await; } - OutboundGossip::PublishBlock(signed_block) => { - let slot = signed_block.message.block.slot; - let proposer = signed_block.message.block.proposer_index; - - // Encode to SSZ - let ssz_bytes = signed_block.as_ssz_bytes(); - - // Compress with raw snappy - let compressed = gossipsub::compress_message(&ssz_bytes); - - // Publish to gossipsub - let _ = swarm - .behaviour_mut() - .gossipsub - .publish(block_topic.clone(), compressed) - .inspect(|_| info!(%slot, %proposer, "Published block to gossipsub")) - .inspect_err(|err| tracing::warn!(%slot, %proposer, %err, "Failed to publish block to gossipsub")); + SwarmEvent::Behaviour(BehaviourEvent::Gossipsub( + message @ libp2p::gossipsub::Event::Message { .. }, + )) => { + gossipsub::handle_gossipsub_message(server, message).await; + } + SwarmEvent::ConnectionEstablished { + peer_id, + endpoint, + num_established, + .. + } => { + let direction = connection_direction(&endpoint); + if num_established.get() == 1 { + server.connected_peers.insert(peer_id); + metrics::notify_peer_connected(&Some(peer_id), direction, "success"); + // Send status request on first connection to this peer + let our_status = build_status(&server.store); + info!(%peer_id, %direction, finalized_slot=%our_status.finalized.slot, head_slot=%our_status.head.slot, "Added connection to new peer, sending status request"); + server + .swarm + .behaviour_mut() + .req_resp + .send_request_with_protocol( + &peer_id, + Request::Status(our_status), + libp2p::StreamProtocol::new(STATUS_PROTOCOL_V1), + ); + } else { + info!(%peer_id, %direction, "Added peer connection"); + } + } + SwarmEvent::ConnectionClosed { + peer_id, + endpoint, + num_established, + cause, + .. + } => { + let direction = connection_direction(&endpoint); + let reason = match cause { + None => "remote_close", + Some(err) => { + // Categorize disconnection reasons + let err_str = err.to_string().to_lowercase(); + if err_str.contains("timeout") + || err_str.contains("timedout") + || err_str.contains("keepalive") + { + "timeout" + } else if err_str.contains("reset") || err_str.contains("connectionreset") { + "remote_close" + } else { + "error" + } + } + }; + if num_established == 0 { + server.connected_peers.remove(&peer_id); + metrics::notify_peer_disconnected(&Some(peer_id), direction, reason); + } + info!(%peer_id, %direction, %reason, "Peer disconnected"); + } + SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => { + let result = if error.to_string().to_lowercase().contains("timed out") { + "timeout" + } else { + "error" + }; + metrics::notify_peer_connected(&peer_id, "outbound", result); + warn!(?peer_id, %error, "Outgoing connection error"); + } + SwarmEvent::IncomingConnectionError { peer_id, error, .. } => { + metrics::notify_peer_connected(&peer_id, "inbound", "error"); + warn!(%error, "Incoming connection error"); + } + _ => { + trace!(?event, "Ignored swarm event"); } } } -async fn handle_req_resp_message( - swarm: &mut libp2p::Swarm, - event: request_response::Event, - store: &Store, -) { - let request_response::Event::Message { - peer, - connection_id: _, - message, - } = event - else { - unreachable!("we already matched on event_loop"); - }; +async fn handle_p2p_message(server: &mut P2PServer, message: P2PMessage) { match message { - request_response::Message::Request { - request_id: _, - request, - channel, - } => { - info!(finalized_slot=%request.finalized.slot, head_slot=%request.head.slot, "Received status request from peer {peer}"); - let our_status = build_status(store); - swarm - .behaviour_mut() - .req_resp - .send_response(channel, our_status) - .unwrap(); + P2PMessage::PublishAttestation(attestation) => { + publish_attestation(server, attestation).await; } - request_response::Message::Response { - request_id: _, - response, - } => { - info!(finalized_slot=%response.finalized.slot, head_slot=%response.head.slot, "Received status response from peer {peer}"); + P2PMessage::PublishBlock(signed_block) => { + publish_block(server, signed_block).await; } + P2PMessage::FetchBlock(root) => { + // Deduplicate - if already pending, ignore + if server.pending_requests.contains_key(&root) { + trace!(%root, "Block fetch already in progress, ignoring duplicate"); + return; + } + + // Send request and track it (tracking handled internally by fetch_block_from_peer) + fetch_block_from_peer(server, root).await; + } + } +} + +async fn handle_retry(server: &mut P2PServer, root: H256) { + // Check if still pending (might have succeeded during backoff) + if !server.pending_requests.contains_key(&root) { + trace!(%root, "Block fetch completed during backoff, skipping retry"); + return; + } + + info!(%root, "Retrying block fetch after backoff"); + + // Retry the fetch (tracking handled internally by fetch_block_from_peer) + if !fetch_block_from_peer(server, root).await { + tracing::error!(%root, "Failed to retry block fetch, giving up"); + server.pending_requests.remove(&root); } } @@ -394,20 +398,6 @@ fn connection_direction(endpoint: &libp2p::core::ConnectedPoint) -> &'static str } } -/// Build a Status message from the current Store state. -fn build_status(store: &Store) -> Status { - let finalized = store.latest_finalized(); - let head_root = store.head(); - let head_slot = store.get_block(&head_root).expect("head block exists").slot; - Status { - finalized, - head: Checkpoint { - root: head_root, - slot: head_slot, - }, - } -} - fn compute_message_id(message: &libp2p::gossipsub::Message) -> libp2p::gossipsub::MessageId { const MESSAGE_DOMAIN_INVALID_SNAPPY: [u8; 4] = [0x00, 0x00, 0x00, 0x00]; const MESSAGE_DOMAIN_VALID_SNAPPY: [u8; 4] = [0x01, 0x00, 0x00, 0x00]; diff --git a/crates/net/p2p/src/messages/mod.rs b/crates/net/p2p/src/messages/mod.rs deleted file mode 100644 index 4591439..0000000 --- a/crates/net/p2p/src/messages/mod.rs +++ /dev/null @@ -1,6 +0,0 @@ -pub mod status; - -pub const MAX_PAYLOAD_SIZE: usize = 10 * 1024 * 1024; // 10 MB - -// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/p2p-interface.md#max_message_size -pub const MAX_COMPRESSED_PAYLOAD_SIZE: usize = 32 + MAX_PAYLOAD_SIZE + MAX_PAYLOAD_SIZE / 6 + 1024; // ~12 MB diff --git a/crates/net/p2p/src/messages/status.rs b/crates/net/p2p/src/messages/status.rs deleted file mode 100644 index f30a249..0000000 --- a/crates/net/p2p/src/messages/status.rs +++ /dev/null @@ -1,208 +0,0 @@ -use std::io; - -use ethlambda_types::state::Checkpoint; -use libp2p::futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; -use snap::read::FrameEncoder; -use ssz::{Decode, Encode}; -use ssz_derive::{Decode, Encode}; -use tracing::trace; - -use crate::messages::MAX_COMPRESSED_PAYLOAD_SIZE; - -pub const STATUS_PROTOCOL_V1: &str = "/leanconsensus/req/status/1/ssz_snappy"; - -#[derive(Debug, Clone, Default)] -pub struct StatusCodec; - -#[async_trait::async_trait] -impl libp2p::request_response::Codec for StatusCodec { - type Protocol = libp2p::StreamProtocol; - type Request = Status; - type Response = Status; - - async fn read_request(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result - where - T: AsyncRead + Unpin + Send, - { - let payload = decode_payload(io).await?; - let status = deserialize_payload(payload)?; - Ok(status) - } - - async fn read_response( - &mut self, - _: &Self::Protocol, - io: &mut T, - ) -> io::Result - where - T: AsyncRead + Unpin + Send, - { - let mut result = 0_u8; - io.read_exact(std::slice::from_mut(&mut result)).await?; - - // TODO: send errors to event loop? - if result != 0 { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "non-zero result in response", - )); - } - - let payload = decode_payload(io).await?; - let status = deserialize_payload(payload)?; - Ok(status) - } - - async fn write_request( - &mut self, - _: &Self::Protocol, - io: &mut T, - req: Self::Request, - ) -> io::Result<()> - where - T: AsyncWrite + Unpin + Send, - { - trace!(?req, "Writing status request"); - - let encoded = req.as_ssz_bytes(); - let mut compressor = FrameEncoder::new(&encoded[..]); - - let mut buf = Vec::new(); - io::Read::read_to_end(&mut compressor, &mut buf)?; - - let mut size_buf = [0; 5]; - let varint_buf = encode_varint(buf.len() as u32, &mut size_buf); - io.write_all(varint_buf).await?; - io.write_all(&buf).await?; - - Ok(()) - } - - async fn write_response( - &mut self, - _: &Self::Protocol, - io: &mut T, - resp: Self::Response, - ) -> io::Result<()> - where - T: AsyncWrite + Unpin + Send, - { - // Send result byte - io.write_all(&[0]).await?; - - let encoded = resp.as_ssz_bytes(); - let mut compressor = FrameEncoder::new(&encoded[..]); - - let mut buf = Vec::new(); - io::Read::read_to_end(&mut compressor, &mut buf)?; - - let mut size_buf = [0; 5]; - let varint_buf = encode_varint(buf.len() as u32, &mut size_buf); - io.write_all(varint_buf).await?; - io.write_all(&buf).await?; - - Ok(()) - } -} - -async fn decode_payload(io: &mut T) -> io::Result> -where - T: AsyncRead + Unpin + Send, -{ - // TODO: limit bytes received - let mut varint_buf = [0; 5]; - - let read = io - .take(varint_buf.len() as u64) - .read(&mut varint_buf) - .await?; - let (size, rest) = decode_varint(&varint_buf[..read])?; - - if (size as usize) < rest.len() || size as usize > MAX_COMPRESSED_PAYLOAD_SIZE { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "invalid message size", - )); - } - - let mut message = vec![0; size as usize]; - if rest.is_empty() { - io.read_exact(&mut message).await?; - } else { - message[..rest.len()].copy_from_slice(rest); - io.read_exact(&mut message[rest.len()..]).await?; - } - - let mut decoder = snap::read::FrameDecoder::new(&message[..]); - let mut uncompressed = Vec::new(); - io::Read::read_to_end(&mut decoder, &mut uncompressed)?; - - Ok(uncompressed) -} - -fn deserialize_payload(payload: Vec) -> io::Result { - let status = Status::from_ssz_bytes(&payload) - // We turn to string since DecodeError does not implement std::error::Error - .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, format!("{err:?}")))?; - Ok(status) -} - -/// Encodes a u32 as a varint into the provided buffer, returning a slice of the buffer -/// containing the encoded bytes. -fn encode_varint(mut value: u32, dst: &mut [u8; 5]) -> &[u8] { - for i in 0..5 { - let mut byte = (value & 0x7F) as u8; - value >>= 7; - if value != 0 { - byte |= 0x80; - } - dst[i] = byte; - if value == 0 { - return &dst[..=i]; - } - } - &dst[..] -} - -fn decode_varint(buf: &[u8]) -> io::Result<(u32, &[u8])> { - let mut result = 0_u32; - let mut read_size = 0; - - for (i, byte) in buf.iter().enumerate() { - let value = (byte & 0x7F) as u32; - result |= value << (7 * i); - if byte & 0x80 == 0 { - read_size = i + 1; - break; - } - } - if read_size == 0 { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "message size is bigger than 28 bits", - )); - } - Ok((result, &buf[read_size..])) -} - -#[derive(Debug, Clone, Encode, Decode)] -pub struct Status { - pub finalized: Checkpoint, - pub head: Checkpoint, -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_decode_varint() { - // Example from https://protobuf.dev/programming-guides/encoding/ - let buf = [0b10010110, 0b00000001]; - let (value, rest) = decode_varint(&buf).unwrap(); - assert_eq!(value, 150); - - let expected: &[u8] = &[]; - assert_eq!(rest, expected); - } -} diff --git a/crates/net/p2p/src/req_resp/codec.rs b/crates/net/p2p/src/req_resp/codec.rs new file mode 100644 index 0000000..e50775f --- /dev/null +++ b/crates/net/p2p/src/req_resp/codec.rs @@ -0,0 +1,154 @@ +use std::io; + +use libp2p::futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use ssz::{Decode, Encode}; +use tracing::trace; + +use super::{ + encoding::{decode_payload, write_payload}, + messages::{ + BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRootRequest, Request, Response, STATUS_PROTOCOL_V1, + Status, + }, +}; + +use ethlambda_types::block::SignedBlockWithAttestation; + +#[derive(Debug, Clone, Default)] +pub struct Codec; + +#[async_trait::async_trait] +impl libp2p::request_response::Codec for Codec { + type Protocol = libp2p::StreamProtocol; + type Request = Request; + type Response = Response; + + async fn read_request( + &mut self, + protocol: &Self::Protocol, + io: &mut T, + ) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + let payload = decode_payload(io).await?; + + match protocol.as_ref() { + STATUS_PROTOCOL_V1 => { + let status = Status::from_ssz_bytes(&payload).map_err(|err| { + io::Error::new(io::ErrorKind::InvalidData, format!("{err:?}")) + })?; + Ok(Request::Status(status)) + } + BLOCKS_BY_ROOT_PROTOCOL_V1 => { + let request = BlocksByRootRequest::from_ssz_bytes(&payload).map_err(|err| { + io::Error::new(io::ErrorKind::InvalidData, format!("{err:?}")) + })?; + Ok(Request::BlocksByRoot(request)) + } + _ => Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("unknown protocol: {}", protocol.as_ref()), + )), + } + } + + async fn read_response( + &mut self, + protocol: &Self::Protocol, + io: &mut T, + ) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + let mut result = 0_u8; + io.read_exact(std::slice::from_mut(&mut result)).await?; + + // TODO: move matching to ResponseResult impl + let result_code = match result { + 0 => super::messages::ResponseResult::Success, + 1 => super::messages::ResponseResult::InvalidRequest, + _ => { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("invalid result code: {}", result), + )); + } + }; + + // TODO: send errors to event loop when result != Success? + if result_code != super::messages::ResponseResult::Success { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "non-success result in response", + )); + } + + let payload = decode_payload(io).await?; + + match protocol.as_ref() { + STATUS_PROTOCOL_V1 => { + let status = Status::from_ssz_bytes(&payload).map_err(|err| { + io::Error::new(io::ErrorKind::InvalidData, format!("{err:?}")) + })?; + Ok(Response::new( + result_code, + super::messages::ResponsePayload::Status(status), + )) + } + BLOCKS_BY_ROOT_PROTOCOL_V1 => { + let block = + SignedBlockWithAttestation::from_ssz_bytes(&payload).map_err(|err| { + io::Error::new(io::ErrorKind::InvalidData, format!("{err:?}")) + })?; + Ok(Response::new( + result_code, + super::messages::ResponsePayload::BlocksByRoot(block), + )) + } + _ => Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("unknown protocol: {}", protocol.as_ref()), + )), + } + } + + async fn write_request( + &mut self, + _: &Self::Protocol, + io: &mut T, + req: Self::Request, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + trace!(?req, "Writing request"); + + let encoded = match req { + Request::Status(status) => status.as_ssz_bytes(), + Request::BlocksByRoot(request) => request.as_ssz_bytes(), + }; + + write_payload(io, &encoded).await + } + + async fn write_response( + &mut self, + _: &Self::Protocol, + io: &mut T, + resp: Self::Response, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + // Send result byte + io.write_all(&[resp.result as u8]).await?; + + let encoded = match &resp.payload { + super::messages::ResponsePayload::Status(status) => status.as_ssz_bytes(), + super::messages::ResponsePayload::BlocksByRoot(response) => response.as_ssz_bytes(), + }; + + write_payload(io, &encoded).await + } +} diff --git a/crates/net/p2p/src/req_resp/encoding.rs b/crates/net/p2p/src/req_resp/encoding.rs new file mode 100644 index 0000000..b45d447 --- /dev/null +++ b/crates/net/p2p/src/req_resp/encoding.rs @@ -0,0 +1,121 @@ +use std::io; + +use libp2p::futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use snap::read::FrameEncoder; + +pub const MAX_PAYLOAD_SIZE: usize = 10 * 1024 * 1024; // 10 MB + +// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/p2p-interface.md#max_message_size +pub const MAX_COMPRESSED_PAYLOAD_SIZE: usize = 32 + MAX_PAYLOAD_SIZE + MAX_PAYLOAD_SIZE / 6 + 1024; // ~12 MB + +/// Decode a varint-prefixed, snappy-compressed SSZ payload from an async reader. +pub async fn decode_payload(io: &mut T) -> io::Result> +where + T: AsyncRead + Unpin + Send, +{ + let mut buf = vec![]; + let read = io + .take(MAX_COMPRESSED_PAYLOAD_SIZE as u64) + .read_to_end(&mut buf) + .await?; + + if read < 2 { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "message too short", + )); + } + let (size, rest) = decode_varint(&buf)?; + + if size as usize > MAX_PAYLOAD_SIZE { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "message size exceeds maximum allowed", + )); + } + + let mut decoder = snap::read::FrameDecoder::new(rest); + let mut uncompressed = Vec::new(); + io::Read::read_to_end(&mut decoder, &mut uncompressed)?; + if uncompressed.len() != size as usize { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "uncompressed size does not match received size", + )); + } + + Ok(uncompressed) +} + +pub async fn write_payload(io: &mut T, encoded: &[u8]) -> io::Result<()> +where + T: AsyncWrite + Unpin, +{ + let uncompressed_size = encoded.len(); + let mut compressor = FrameEncoder::new(encoded); + + let mut buf = Vec::new(); + io::Read::read_to_end(&mut compressor, &mut buf)?; + + let mut size_buf = [0; 5]; + let varint_buf = encode_varint(uncompressed_size as u32, &mut size_buf); + io.write_all(varint_buf).await?; + io.write_all(&buf).await?; + + Ok(()) +} + +/// Encodes a u32 as a varint into the provided buffer, returning a slice of the buffer +/// containing the encoded bytes. +pub fn encode_varint(mut value: u32, dst: &mut [u8; 5]) -> &[u8] { + for i in 0..5 { + let mut byte = (value & 0x7F) as u8; + value >>= 7; + if value != 0 { + byte |= 0x80; + } + dst[i] = byte; + if value == 0 { + return &dst[..=i]; + } + } + &dst[..] +} + +/// Decode a varint from a byte buffer, returning the value and remaining bytes. +pub fn decode_varint(buf: &[u8]) -> io::Result<(u32, &[u8])> { + let mut result = 0_u32; + let mut read_size = 0; + + for (i, byte) in buf.iter().enumerate() { + let value = (byte & 0x7F) as u32; + result |= value << (7 * i); + if byte & 0x80 == 0 { + read_size = i + 1; + break; + } + } + if read_size == 0 { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "message size is bigger than 28 bits", + )); + } + Ok((result, &buf[read_size..])) +} + +#[cfg(test)] +mod tests { + use super::decode_varint; + + #[test] + fn test_decode_varint() { + // Example from https://protobuf.dev/programming-guides/encoding/ + let buf = [0b10010110, 0b00000001]; + let (value, rest) = decode_varint(&buf).unwrap(); + assert_eq!(value, 150); + + let expected: &[u8] = &[]; + assert_eq!(rest, expected); + } +} diff --git a/crates/net/p2p/src/req_resp/handlers.rs b/crates/net/p2p/src/req_resp/handlers.rs new file mode 100644 index 0000000..d8e670c --- /dev/null +++ b/crates/net/p2p/src/req_resp/handlers.rs @@ -0,0 +1,231 @@ +use ethlambda_storage::Store; +use libp2p::{PeerId, request_response}; +use rand::seq::SliceRandom; +use tokio::time::Duration; +use tracing::{debug, error, info, warn}; + +use ethlambda_types::block::SignedBlockWithAttestation; +use ethlambda_types::primitives::TreeHash; + +use super::{ + BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRootRequest, Request, Response, ResponsePayload, + ResponseResult, Status, +}; +use crate::{BACKOFF_MULTIPLIER, INITIAL_BACKOFF_MS, MAX_FETCH_RETRIES, P2PServer, PendingRequest}; + +pub async fn handle_req_resp_message( + server: &mut P2PServer, + event: request_response::Event, +) { + match event { + request_response::Event::Message { peer, message, .. } => match message { + request_response::Message::Request { + request, channel, .. + } => match request { + Request::Status(status) => { + handle_status_request(server, status, channel, peer).await; + } + Request::BlocksByRoot(request) => { + handle_blocks_by_root_request(server, request, channel, peer).await; + } + }, + request_response::Message::Response { response, .. } => match response.payload { + ResponsePayload::Status(status) => { + handle_status_response(status, peer).await; + } + ResponsePayload::BlocksByRoot(blocks) => { + handle_blocks_by_root_response(server, blocks, peer).await; + } + }, + }, + request_response::Event::OutboundFailure { + peer, + request_id, + error, + .. + } => { + warn!(%peer, ?request_id, %error, "Outbound request failed"); + + // Check if this was a block fetch request + if let Some(root) = server.request_id_map.remove(&request_id) { + handle_fetch_failure(server, root, peer, error).await; + } + } + request_response::Event::InboundFailure { + peer, + request_id, + error, + .. + } => { + warn!(%peer, ?request_id, %error, "Inbound request failed"); + } + request_response::Event::ResponseSent { + peer, request_id, .. + } => { + debug!(%peer, ?request_id, "Response sent successfully"); + } + } +} + +async fn handle_status_request( + server: &mut P2PServer, + request: Status, + channel: request_response::ResponseChannel, + peer: PeerId, +) { + info!(finalized_slot=%request.finalized.slot, head_slot=%request.head.slot, "Received status request from peer {peer}"); + let our_status = build_status(&server.store); + server + .swarm + .behaviour_mut() + .req_resp + .send_response( + channel, + Response::new(ResponseResult::Success, ResponsePayload::Status(our_status)), + ) + .unwrap(); +} + +async fn handle_status_response(status: Status, peer: PeerId) { + info!(finalized_slot=%status.finalized.slot, head_slot=%status.head.slot, "Received status response from peer {peer}"); +} + +async fn handle_blocks_by_root_request( + _server: &mut P2PServer, + request: BlocksByRootRequest, + _channel: request_response::ResponseChannel, + peer: PeerId, +) { + let num_roots = request.len(); + info!(%peer, num_roots, "Received BlocksByRoot request"); + + // TODO: Implement signed block storage and send response chunks + // For now, we don't send any response (drop the channel) + // In a full implementation, we would: + // 1. Look up each requested block root + // 2. Send a response chunk for each found block + // 3. Each chunk contains: result byte + encoded SignedBlockWithAttestation + warn!(%peer, num_roots, "BlocksByRoot request received but block storage not implemented"); +} + +async fn handle_blocks_by_root_response( + server: &mut P2PServer, + block: SignedBlockWithAttestation, + peer: PeerId, +) { + let slot = block.message.block.slot; + let root = block.message.block.tree_hash_root(); + + info!(%peer, %slot, %root, "Received BlocksByRoot response"); + + // Clean up tracking (success!) + if server.pending_requests.remove(&root).is_some() { + info!(%root, "Block fetch succeeded"); + server.request_id_map.retain(|_, r| *r != root); + } + + server.blockchain.notify_new_block(block).await; +} + +/// Build a Status message from the current Store state. +pub fn build_status(store: &Store) -> Status { + let finalized = store.latest_finalized(); + let head_root = store.head(); + let head_slot = store.get_block(&head_root).expect("head block exists").slot; + Status { + finalized, + head: ethlambda_types::state::Checkpoint { + root: head_root, + slot: head_slot, + }, + } +} + +/// Fetch a missing block from a random connected peer. +/// Handles tracking in both pending_requests and request_id_map. +pub async fn fetch_block_from_peer( + server: &mut P2PServer, + root: ethlambda_types::primitives::H256, +) -> bool { + if server.connected_peers.is_empty() { + warn!(%root, "Cannot fetch block: no connected peers"); + return false; + } + + // Select random peer + let peers: Vec<_> = server.connected_peers.iter().copied().collect(); + let peer = match peers.choose(&mut rand::thread_rng()) { + Some(&p) => p, + None => { + warn!(%root, "Failed to select random peer"); + return false; + } + }; + + // Create BlocksByRoot request with single root + let mut request = BlocksByRootRequest::empty(); + if let Err(err) = request.push(root) { + error!(%root, ?err, "Failed to create BlocksByRoot request"); + return false; + } + + info!(%peer, %root, "Sending BlocksByRoot request for missing block"); + let request_id = server + .swarm + .behaviour_mut() + .req_resp + .send_request_with_protocol( + &peer, + Request::BlocksByRoot(request), + libp2p::StreamProtocol::new(BLOCKS_BY_ROOT_PROTOCOL_V1), + ); + + // Track the request if not already tracked (new request) + let pending = server + .pending_requests + .entry(root) + .or_insert(PendingRequest { + attempts: 1, + last_peer: None, + }); + + // Update last_peer + pending.last_peer = Some(peer); + + // Map request_id to root for failure handling + server.request_id_map.insert(request_id, root); + + true +} + +async fn handle_fetch_failure( + server: &mut P2PServer, + root: ethlambda_types::primitives::H256, + peer: PeerId, + error: request_response::OutboundFailure, +) { + let Some(pending) = server.pending_requests.get_mut(&root) else { + return; + }; + + if pending.attempts >= MAX_FETCH_RETRIES { + error!(%root, %peer, attempts=%pending.attempts, %error, + "Block fetch failed after max retries, giving up"); + server.pending_requests.remove(&root); + return; + } + + let backoff_ms = INITIAL_BACKOFF_MS * BACKOFF_MULTIPLIER.pow(pending.attempts - 1); + let backoff = Duration::from_millis(backoff_ms); + + warn!(%root, %peer, attempts=%pending.attempts, ?backoff, %error, + "Block fetch failed, scheduling retry"); + + pending.attempts += 1; + + let retry_tx = server.retry_tx.clone(); + tokio::spawn(async move { + tokio::time::sleep(backoff).await; + let _ = retry_tx.send(root); + }); +} diff --git a/crates/net/p2p/src/req_resp/messages.rs b/crates/net/p2p/src/req_resp/messages.rs new file mode 100644 index 0000000..91cc70b --- /dev/null +++ b/crates/net/p2p/src/req_resp/messages.rs @@ -0,0 +1,49 @@ +use ethlambda_types::{block::SignedBlockWithAttestation, primitives::H256, state::Checkpoint}; +use ssz_derive::{Decode, Encode}; +use ssz_types::typenum; + +pub const STATUS_PROTOCOL_V1: &str = "/leanconsensus/req/status/1/ssz_snappy"; +pub const BLOCKS_BY_ROOT_PROTOCOL_V1: &str = "/leanconsensus/req/blocks_by_root/1/ssz_snappy"; + +#[derive(Debug, Clone)] +pub enum Request { + Status(Status), + BlocksByRoot(BlocksByRootRequest), +} + +#[derive(Debug, Clone)] +pub struct Response { + pub result: ResponseResult, + pub payload: ResponsePayload, +} + +impl Response { + pub fn new(result: ResponseResult, payload: ResponsePayload) -> Self { + Self { result, payload } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ResponseResult { + Success = 0, + InvalidRequest = 1, +} + +#[derive(Debug, Clone)] +#[allow(clippy::large_enum_variant)] +pub enum ResponsePayload { + Status(Status), + + // TODO: here we assume there's a single block per request + BlocksByRoot(SignedBlockWithAttestation), +} + +#[derive(Debug, Clone, Encode, Decode)] +pub struct Status { + pub finalized: Checkpoint, + pub head: Checkpoint, +} + +type MaxRequestBlocks = typenum::U1024; + +pub type BlocksByRootRequest = ssz_types::VariableList; diff --git a/crates/net/p2p/src/req_resp/mod.rs b/crates/net/p2p/src/req_resp/mod.rs new file mode 100644 index 0000000..654cba2 --- /dev/null +++ b/crates/net/p2p/src/req_resp/mod.rs @@ -0,0 +1,12 @@ +mod codec; +mod encoding; +pub mod handlers; +mod messages; + +pub use codec::Codec; +pub use encoding::MAX_COMPRESSED_PAYLOAD_SIZE; +pub use handlers::{build_status, fetch_block_from_peer, handle_req_resp_message}; +pub use messages::{ + BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRootRequest, Request, Response, ResponsePayload, + ResponseResult, STATUS_PROTOCOL_V1, Status, +}; diff --git a/crates/net/rpc/src/lib.rs b/crates/net/rpc/src/lib.rs index 88562bf..9546211 100644 --- a/crates/net/rpc/src/lib.rs +++ b/crates/net/rpc/src/lib.rs @@ -77,7 +77,7 @@ mod tests { use ethlambda_storage::{Store, backend::InMemoryBackend}; use ethlambda_types::{ block::{BlockBody, BlockHeader}, - primitives::TreeHash, + primitives::{H256, TreeHash}, state::{ChainConfig, Checkpoint, JustificationValidators, JustifiedSlots, State}, }; use http_body_util::BodyExt; @@ -90,13 +90,13 @@ mod tests { let genesis_header = BlockHeader { slot: 0, proposer_index: 0, - parent_root: ethlambda_types::primitives::H256::ZERO, - state_root: ethlambda_types::primitives::H256::ZERO, + parent_root: H256::ZERO, + state_root: H256::ZERO, body_root: BlockBody::default().tree_hash_root(), }; let genesis_checkpoint = Checkpoint { - root: ethlambda_types::primitives::H256::ZERO, + root: H256::ZERO, slot: 0, };