Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
d8ae35d
feat: add BlocksByRootCodec
MegaRedHand Jan 27, 2026
12e00ea
refactor: unify both Codecs
MegaRedHand Jan 27, 2026
ca4dea6
refactor: restructure code in gossipsub and req_resp modules
MegaRedHand Jan 27, 2026
2087ac3
refactor: remove the messages module
MegaRedHand Jan 27, 2026
20877f4
refactor: handle response result explicitly
MegaRedHand Jan 27, 2026
0e76955
refactor: make Response fields public
MegaRedHand Jan 27, 2026
ab3285f
refactor: move req-resp handlers to new module
MegaRedHand Jan 27, 2026
e83b37c
fix: make response include only a single block
MegaRedHand Jan 27, 2026
6807c0b
refactor: move handle_outgoing_gossip to gossipsub module
MegaRedHand Jan 27, 2026
f2da96e
refactor: add new FetchBlock P2PMessage
MegaRedHand Jan 27, 2026
0b41d04
refactor: rename to handle_p2p_message
MegaRedHand Jan 27, 2026
4009104
feat: request missing blocks
MegaRedHand Jan 27, 2026
2558c52
refactor: move block fetching helper to req_resp::handlers
MegaRedHand Jan 27, 2026
f0b30d5
refactor: store event loop state in struct
MegaRedHand Jan 27, 2026
be407df
feat: log req-resp failures
MegaRedHand Jan 27, 2026
27263e5
fix: treat encoded-length as uncompressed size
MegaRedHand Jan 27, 2026
b4ffabd
fix: split req-resp behaviour
MegaRedHand Jan 27, 2026
b845f4d
refactor: use imports
MegaRedHand Jan 27, 2026
debbaf1
Revert "fix: split req-resp behaviour"
MegaRedHand Jan 27, 2026
a2e0128
fix: implement protocol selection in libp2p fork
MegaRedHand Jan 27, 2026
5853fe5
feat: add block fetch request retrying
MegaRedHand Jan 28, 2026
c0eba9b
refactor: move bookkeeping to fetch_block_from_peer
MegaRedHand Jan 28, 2026
d182bca
refactor: remove BlockchainServer.in_flight_requests
MegaRedHand Jan 28, 2026
dd87b14
chore: tweak retries, initial backoff, and multiplier
MegaRedHand Jan 28, 2026
5a5001a
fix: avoid calling store::on_block if parent is missing
MegaRedHand Jan 28, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 62 additions & 115 deletions Cargo.lock

Large diffs are not rendered by default.

77 changes: 68 additions & 9 deletions crates/blockchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::time::{Duration, SystemTime};

use ethlambda_state_transition::is_proposer;
use ethlambda_storage::Store;
use ethlambda_types::primitives::H256;
use ethlambda_types::{
attestation::{Attestation, AttestationData, SignedAttestation},
block::{BlockSignatures, BlockWithAttestation, SignedBlockWithAttestation},
Expand All @@ -14,21 +15,23 @@ use spawned_concurrency::tasks::{
CallResponse, CastResponse, GenServer, GenServerHandle, send_after,
};
use tokio::sync::mpsc;
use tracing::{error, info, warn};
use tracing::{error, info, trace, warn};

use crate::store::StoreError;

pub mod key_manager;
pub mod metrics;
pub mod store;

/// Messages sent from the blockchain to the P2P layer for publishing.
/// Messages sent from the blockchain to the P2P layer.
#[derive(Clone, Debug)]
pub enum OutboundGossip {
pub enum P2PMessage {
/// Publish an attestation to the gossip network.
PublishAttestation(SignedAttestation),
/// Publish a block to the gossip network.
PublishBlock(SignedBlockWithAttestation),
/// Fetch a block by its root hash.
FetchBlock(H256),
}

pub struct BlockChain {
Expand All @@ -41,7 +44,7 @@ pub const SECONDS_PER_SLOT: u64 = 4;
impl BlockChain {
pub fn spawn(
store: Store,
p2p_tx: mpsc::UnboundedSender<OutboundGossip>,
p2p_tx: mpsc::UnboundedSender<P2PMessage>,
validator_keys: HashMap<u64, ValidatorSecretKey>,
) -> BlockChain {
let genesis_time = store.config().genesis_time;
Expand All @@ -50,6 +53,7 @@ impl BlockChain {
store,
p2p_tx,
key_manager,
pending_blocks: HashMap::new(),
}
.start();
let time_until_genesis = (SystemTime::UNIX_EPOCH + Duration::from_secs(genesis_time))
Expand Down Expand Up @@ -84,8 +88,11 @@ impl BlockChain {

struct BlockChainServer {
store: Store,
p2p_tx: mpsc::UnboundedSender<OutboundGossip>,
p2p_tx: mpsc::UnboundedSender<P2PMessage>,
key_manager: key_manager::KeyManager,

// Pending blocks waiting for their parent
pending_blocks: HashMap<H256, Vec<SignedBlockWithAttestation>>,
}

impl BlockChainServer {
Expand Down Expand Up @@ -173,7 +180,7 @@ impl BlockChainServer {
// Publish to gossip network
let Ok(_) = self
.p2p_tx
.send(OutboundGossip::PublishAttestation(signed_attestation))
.send(P2PMessage::PublishAttestation(signed_attestation))
.inspect_err(
|err| error!(%slot, %validator_id, %err, "Failed to publish attestation"),
)
Expand Down Expand Up @@ -244,7 +251,7 @@ impl BlockChainServer {
// Publish to gossip network
let Ok(()) = self
.p2p_tx
.send(OutboundGossip::PublishBlock(signed_block))
.send(P2PMessage::PublishBlock(signed_block))
.inspect_err(|err| error!(%slot, %validator_id, %err, "Failed to publish block"))
else {
return;
Expand All @@ -268,8 +275,60 @@ impl BlockChainServer {

fn on_block(&mut self, signed_block: SignedBlockWithAttestation) {
let slot = signed_block.message.block.slot;
if let Err(err) = self.process_block(signed_block) {
warn!(%slot, %err, "Failed to process block");
let block_root = signed_block.message.block.tree_hash_root();
let parent_root = signed_block.message.block.parent_root;

// Check if parent block exists before attempting to process
if !self.store.contains_block(&parent_root) {
info!(%slot, %parent_root, %block_root, "Block parent missing, storing as pending");

// Store block for later processing
self.pending_blocks
.entry(parent_root)
.or_default()
.push(signed_block);

// Request missing parent from network
self.request_missing_block(parent_root);
return;
}

// Parent exists, proceed with processing
match self.process_block(signed_block) {
Ok(_) => {
info!(%slot, "Block processed successfully");

// Check if any pending blocks can now be processed
self.process_pending_children(block_root);
}
Err(err) => {
warn!(%slot, %err, "Failed to process block");
}
}
}

fn request_missing_block(&mut self, block_root: H256) {
// Send request to P2P layer (deduplication handled by P2P module)
if let Err(err) = self.p2p_tx.send(P2PMessage::FetchBlock(block_root)) {
error!(%block_root, %err, "Failed to send FetchBlock message to P2P");
} else {
info!(%block_root, "Requested missing block from network");
}
}

fn process_pending_children(&mut self, parent_root: H256) {
// Remove and process all blocks that were waiting for this parent
if let Some(children) = self.pending_blocks.remove(&parent_root) {
info!(%parent_root, num_children=%children.len(),
"Processing pending blocks after parent arrival");

for child_block in children {
let slot = child_block.message.block.slot;
trace!(%parent_root, %slot, "Processing pending child block");

// Process recursively - might unblock more descendants
self.on_block(child_block);
}
}
}

Expand Down
5 changes: 3 additions & 2 deletions crates/blockchain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,9 @@ pub fn on_block(
return Ok(());
}

// Verify parent chain is available
// TODO: sync parent chain if parent is missing
// Verify parent state is available
// Note: Parent block existence is checked by the caller before calling this function.
// This check ensures the state has been computed for the parent block.
let parent_state =
store
.get_state(&block.parent_root)
Expand Down
7 changes: 6 additions & 1 deletion crates/net/p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@ ethlambda-types.workspace = true

async-trait = "0.1"

libp2p = { version = "0.56", features = ["full"] }
# Fork with request-response feature for outbound protocol selection
libp2p = { git = "https://github.com/lambdaclass/rust-libp2p.git", rev = "cd6cc3b1e5db2c5e23e133c2201c23b063fc4895", features = [
"full",
] }

snap = "1.1"

tokio.workspace = true
tracing.workspace = true

rand = "0.8"

# Required for NodeEnr parsing
ethrex-p2p = { git = "https://github.com/lambdaclass/ethrex", rev = "1af63a4de7c93eb7413b9b003df1be82e1484c69" }
ethrex-rlp = { git = "https://github.com/lambdaclass/ethrex", rev = "1af63a4de7c93eb7413b9b003df1be82e1484c69" }
Expand Down
91 changes: 0 additions & 91 deletions crates/net/p2p/src/gossipsub.rs

This file was deleted.

32 changes: 32 additions & 0 deletions crates/net/p2p/src/gossipsub/encoding.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/// Decompress data using raw snappy format (for gossipsub messages).
pub fn decompress_message(data: &[u8]) -> snap::Result<Vec<u8>> {
let uncompressed_size = snap::raw::decompress_len(data)?;
let mut uncompressed_data = vec![0u8; uncompressed_size];
snap::raw::Decoder::new().decompress(data, &mut uncompressed_data)?;
Ok(uncompressed_data)
}

/// Compress data using raw snappy format (for gossipsub messages).
pub fn compress_message(data: &[u8]) -> Vec<u8> {
let max_compressed_len = snap::raw::max_compress_len(data.len());
let mut compressed = vec![0u8; max_compressed_len];
let compressed_len = snap::raw::Encoder::new()
.compress(data, &mut compressed)
.expect("snappy compression should not fail");
compressed.truncate(compressed_len);
compressed
}

#[cfg(test)]
mod tests {
use ethlambda_types::block::SignedBlockWithAttestation;
use ssz::Decode;

#[test]
#[ignore = "Test data uses old BlockSignatures field order (proposer_signature, attestation_signatures). Needs regeneration with correct order (attestation_signatures, proposer_signature)."]
fn test_decode_block() {
// Sample uncompressed block sent by Zeam (commit b153373806aa49f65aadc47c41b68ead4fab7d6e)
let block_bytes = include_bytes!("../../test_data/signed_block_with_attestation.ssz");
let _block = SignedBlockWithAttestation::from_ssz_bytes(block_bytes).unwrap();
}
}
Loading