From bcfe4b6f3305c4b812df6102c9ba542af69b2459 Mon Sep 17 00:00:00 2001 From: Morty Date: Mon, 12 Jan 2026 19:49:11 +0800 Subject: [PATCH 1/4] feat: penalize peer on duplicate block --- crates/network/src/manager.rs | 39 +++++++++++++++++++++---------- crates/scroll-wire/src/manager.rs | 20 +++++++--------- 2 files changed, 35 insertions(+), 24 deletions(-) diff --git a/crates/network/src/manager.rs b/crates/network/src/manager.rs index f1353ac5..50772dcb 100644 --- a/crates/network/src/manager.rs +++ b/crates/network/src/manager.rs @@ -187,7 +187,7 @@ impl< // Filter the peers that have not seen this block hash. let peers: Vec> = self .scroll_wire - .state() + .block_received_state() .iter() .filter_map(|(peer_id, blocks)| (!blocks.contains(&hash)).then_some(*peer_id)) .collect(); @@ -240,15 +240,23 @@ impl< ScrollWireEvent::NewBlock { peer_id, block, signature } => { let block_hash = block.hash_slow(); trace!(target: "scroll::network::manager", peer_id = ?peer_id, block = ?block_hash, signature = ?signature, "Received new block"); + + // Update the state of the peer cache i.e. peer has seen this block. + self.scroll_wire + .block_received_state_mut() + .entry(peer_id) + .or_insert_with(|| LruCache::new(LRU_CACHE_SIZE)) + .insert(block_hash); + if self.blocks_seen.contains(&(block_hash, signature)) { + // Check if this peer has already sent this block to us, if so penalize it. + if self.scroll_wire.block_received_state().get(&peer_id).map_or(false, |cache| cache.contains(&block_hash)) { + trace!(target: "scroll::network::manager", peer_id = ?peer_id, block = ?block_hash, "Peer sent duplicate block, penalizing"); + self.inner_network_handle + .reputation_change(peer_id, reth_network_api::ReputationChangeKind::BadBlock); + } None } else { - // Update the state of the peer cache i.e. peer has seen this block. - self.scroll_wire - .state_mut() - .entry(peer_id) - .or_insert_with(|| LruCache::new(LRU_CACHE_SIZE)) - .insert(block_hash); // Update the state of the block cache i.e. we have seen this block. self.blocks_seen.insert((block.hash_slow(), signature)); @@ -339,18 +347,25 @@ impl< .and_then(|i| Signature::from_raw(&extra_data[i..]).ok()) { let block_hash = block.hash_slow(); - if self.blocks_seen.contains(&(block_hash, signature)) { - return None; - } - trace!(target: "scroll::bridge::import", peer_id = %peer_id, block_hash = %block_hash, signature = %signature.to_string(), extra_data = %extra_data.to_string(), "Received new block from eth-wire protocol"); // Update the state of the peer cache i.e. peer has seen this block. self.scroll_wire - .state_mut() + .block_received_state_mut() .entry(peer_id) .or_insert_with(|| LruCache::new(LRU_CACHE_SIZE)) .insert(block_hash); + if self.blocks_seen.contains(&(block_hash, signature)) { + // Check if this peer has already sent this block to us, if so penalize it. + if self.scroll_wire.block_received_state().get(&peer_id).map_or(false, |cache| cache.contains(&block_hash)) { + trace!(target: "scroll::bridge::import", peer_id = ?peer_id, block = ?block_hash, "Peer sent duplicate block, penalizing"); + self.inner_network_handle + .reputation_change(peer_id, reth_network_api::ReputationChangeKind::BadBlock); + } + return None; + } + trace!(target: "scroll::bridge::import", peer_id = %peer_id, block_hash = %block_hash, signature = %signature.to_string(), extra_data = %extra_data.to_string(), "Received new block from eth-wire protocol"); + // Update the state of the block cache i.e. we have seen this block. self.blocks_seen.insert((block_hash, signature)); Some(ScrollNetworkManagerEvent::NewBlock(NewBlockWithPeer { diff --git a/crates/scroll-wire/src/manager.rs b/crates/scroll-wire/src/manager.rs index 10a6b1dd..a10843e4 100644 --- a/crates/scroll-wire/src/manager.rs +++ b/crates/scroll-wire/src/manager.rs @@ -25,14 +25,14 @@ pub struct ScrollWireManager { connections: HashMap>, /// A map of the state of the scroll wire protocol. Currently the state for each peer /// is just a cache of the last 100 blocks seen by each peer. - state: HashMap>, + block_received_state: HashMap>, } impl ScrollWireManager { /// Creates a new [`ScrollWireManager`] instance. pub fn new(events: UnboundedReceiver) -> Self { trace!(target: "scroll::wire::manager", "Creating new ScrollWireManager instance"); - Self { events: events.into(), connections: HashMap::new(), state: HashMap::new() } + Self { events: events.into(), connections: HashMap::new(), block_received_state: HashMap::new() } } /// Announces a new block to the specified peer. @@ -42,27 +42,23 @@ impl ScrollWireManager { // connections map and delete its state as the connection is no longer valid. if to_connection.get().send(ScrollMessage::new_block(block.clone())).is_err() { trace!(target: "scroll::wire::manager", peer_id = %peer_id, "Failed to send block to peer - dropping peer."); - self.state.remove(&peer_id); + self.block_received_state.remove(&peer_id); to_connection.remove(); } else { // Upon successful sending of the block we update the state of the peer. trace!(target: "scroll::wire::manager", peer_id = %peer_id, "Announced block to peer"); - self.state - .entry(peer_id) - .or_insert_with(|| LruCache::new(LRU_CACHE_SIZE)) - .insert(hash); } } } /// Returns the state of the `ScrollWire` protocol. - pub const fn state(&self) -> &HashMap> { - &self.state + pub const fn block_received_state(&self) -> &HashMap> { + &self.block_received_state } /// Returns a mutable reference to the state of the `ScrollWire` protocol. - pub const fn state_mut(&mut self) -> &mut HashMap> { - &mut self.state + pub const fn block_received_state_mut(&mut self) -> &mut HashMap> { + &mut self.block_received_state } } @@ -94,7 +90,7 @@ impl Future for ScrollWireManager { direction ); this.connections.insert(peer_id, to_connection); - this.state.insert(peer_id, LruCache::new(100)); + this.block_received_state.insert(peer_id, LruCache::new(100)); } None => break, } From 4a98c535ba61995eaba04ac44ed72a2b86e81fd8 Mon Sep 17 00:00:00 2001 From: Morty Date: Mon, 12 Jan 2026 20:07:32 +0800 Subject: [PATCH 2/4] update makefile --- .github/workflows/lint.yaml | 6 ++++++ .github/workflows/test.yaml | 4 ++-- Makefile | 12 +++++++----- crates/network/src/manager.rs | 28 +++++++++++++++++++++------- crates/scroll-wire/src/manager.rs | 10 +++++++--- 5 files changed, 43 insertions(+), 17 deletions(-) diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml index d4d06330..3f80e053 100644 --- a/.github/workflows/lint.yaml +++ b/.github/workflows/lint.yaml @@ -21,6 +21,7 @@ jobs: - uses: rui314/setup-mold@v1 - uses: dtolnay/rust-toolchain@clippy with: + toolchain: 'nightly-2026-01-05' components: clippy - uses: Swatinem/rust-cache@v2 with: @@ -38,6 +39,7 @@ jobs: - uses: rui314/setup-mold@v1 - uses: dtolnay/rust-toolchain@nightly with: + toolchain: 'nightly-2026-01-05' components: rustfmt - name: Run fmt run: cargo fmt --all --check @@ -59,6 +61,8 @@ jobs: - uses: actions/checkout@v6 - uses: rui314/setup-mold@v1 - uses: dtolnay/rust-toolchain@nightly + with: + toolchain: 'nightly-2026-01-05' - uses: Swatinem/rust-cache@v2 with: cache-on-failure: true @@ -88,6 +92,8 @@ jobs: - uses: actions/checkout@v6 - uses: rui314/setup-mold@v1 - uses: dtolnay/rust-toolchain@nightly + with: + toolchain: 'nightly-2026-01-05' - uses: Swatinem/rust-cache@v2 with: cache-on-failure: true diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 800b624b..92b9fd3c 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -24,7 +24,7 @@ jobs: - uses: rui314/setup-mold@v1 - uses: dtolnay/rust-toolchain@nightly with: - toolchain: 'nightly' + toolchain: 'nightly-2026-01-05' - uses: Swatinem/rust-cache@v2 with: cache-on-failure: true @@ -43,7 +43,7 @@ jobs: - uses: rui314/setup-mold@v1 - uses: dtolnay/rust-toolchain@nightly with: - toolchain: 'nightly' + toolchain: 'nightly-2026-01-05' - uses: Swatinem/rust-cache@v2 with: cache-on-failure: true diff --git a/Makefile b/Makefile index f1554d91..7da521fe 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,8 @@ +NIGHTLY_TOOLCHAIN := nightly-2026-01-05 + .PHONY: fmt fmt: - cargo +nightly fmt + cargo +$(NIGHTLY_TOOLCHAIN) fmt .PHONY: lint-toml lint-toml: ensure-dprint @@ -14,7 +16,7 @@ ensure-dprint: .PHONY: clippy clippy: - cargo +nightly clippy \ + cargo +$(NIGHTLY_TOOLCHAIN) clippy \ --workspace \ --lib \ --examples \ @@ -25,7 +27,7 @@ clippy: .PHONY: clippy-fix clippy-fix: - cargo +nightly clippy \ + cargo +$(NIGHTLY_TOOLCHAIN) clippy \ --workspace \ --lib \ --examples \ @@ -37,7 +39,7 @@ clippy-fix: .PHONY: udeps udeps: - cargo +nightly udeps --workspace --lib --examples --tests --benches --all-features --locked + cargo +$(NIGHTLY_TOOLCHAIN) udeps --workspace --lib --examples --tests --benches --all-features --locked .PHONY: codespell codespell: ensure-codespell @@ -64,7 +66,7 @@ lint: fmt lint-toml clippy udeps codespell zepter .PHONY: test test: - cargo nextest run \ + cargo +$(NIGHTLY_TOOLCHAIN) nextest run \ --workspace \ --locked \ --all-features \ diff --git a/crates/network/src/manager.rs b/crates/network/src/manager.rs index 50772dcb..613691bd 100644 --- a/crates/network/src/manager.rs +++ b/crates/network/src/manager.rs @@ -227,7 +227,7 @@ impl< // Announce block to the filtered set of peers for peer_id in peers { trace!(target: "scroll::network::manager", peer_id = %peer_id, block_number = %block.block.header.number, block_hash = %hash, "Announcing new block to peer"); - self.scroll_wire.announce_block(peer_id, &block, hash); + self.scroll_wire.announce_block(peer_id, &block); } } @@ -250,10 +250,17 @@ impl< if self.blocks_seen.contains(&(block_hash, signature)) { // Check if this peer has already sent this block to us, if so penalize it. - if self.scroll_wire.block_received_state().get(&peer_id).map_or(false, |cache| cache.contains(&block_hash)) { + if self + .scroll_wire + .block_received_state() + .get(&peer_id) + .is_some_and(|cache| cache.contains(&block_hash)) + { trace!(target: "scroll::network::manager", peer_id = ?peer_id, block = ?block_hash, "Peer sent duplicate block, penalizing"); - self.inner_network_handle - .reputation_change(peer_id, reth_network_api::ReputationChangeKind::BadBlock); + self.inner_network_handle.reputation_change( + peer_id, + reth_network_api::ReputationChangeKind::BadBlock, + ); } None } else { @@ -357,10 +364,17 @@ impl< if self.blocks_seen.contains(&(block_hash, signature)) { // Check if this peer has already sent this block to us, if so penalize it. - if self.scroll_wire.block_received_state().get(&peer_id).map_or(false, |cache| cache.contains(&block_hash)) { + if self + .scroll_wire + .block_received_state() + .get(&peer_id) + .is_some_and(|cache| cache.contains(&block_hash)) + { trace!(target: "scroll::bridge::import", peer_id = ?peer_id, block = ?block_hash, "Peer sent duplicate block, penalizing"); - self.inner_network_handle - .reputation_change(peer_id, reth_network_api::ReputationChangeKind::BadBlock); + self.inner_network_handle.reputation_change( + peer_id, + reth_network_api::ReputationChangeKind::BadBlock, + ); } return None; } diff --git a/crates/scroll-wire/src/manager.rs b/crates/scroll-wire/src/manager.rs index a10843e4..44ce413c 100644 --- a/crates/scroll-wire/src/manager.rs +++ b/crates/scroll-wire/src/manager.rs @@ -24,7 +24,7 @@ pub struct ScrollWireManager { /// A map of connections to peers. connections: HashMap>, /// A map of the state of the scroll wire protocol. Currently the state for each peer - /// is just a cache of the last 100 blocks seen by each peer. + /// is just a cache of the last 100 blocks received from each peer. block_received_state: HashMap>, } @@ -32,11 +32,15 @@ impl ScrollWireManager { /// Creates a new [`ScrollWireManager`] instance. pub fn new(events: UnboundedReceiver) -> Self { trace!(target: "scroll::wire::manager", "Creating new ScrollWireManager instance"); - Self { events: events.into(), connections: HashMap::new(), block_received_state: HashMap::new() } + Self { + events: events.into(), + connections: HashMap::new(), + block_received_state: HashMap::new(), + } } /// Announces a new block to the specified peer. - pub fn announce_block(&mut self, peer_id: PeerId, block: &NewBlock, hash: B256) { + pub fn announce_block(&mut self, peer_id: PeerId, block: &NewBlock) { if let Entry::Occupied(to_connection) = self.connections.entry(peer_id) { // We send the block to the peer. If we receive an error we remove the peer from the // connections map and delete its state as the connection is no longer valid. From 6205407e5b5808d5c51b4095bfb876f88415264b Mon Sep 17 00:00:00 2001 From: Morty Date: Mon, 19 Jan 2026 16:45:41 +0800 Subject: [PATCH 3/4] feat: handle DoS node by sending block --- Makefile | 2 +- crates/chain-orchestrator/src/event.rs | 2 + crates/chain-orchestrator/src/lib.rs | 11 +++ crates/network/src/import.rs | 7 ++ crates/network/src/manager.rs | 112 +++++++++++++++---------- crates/scroll-wire/src/lib.rs | 2 +- crates/scroll-wire/src/manager.rs | 102 ++++++++++++++++++---- 7 files changed, 180 insertions(+), 58 deletions(-) diff --git a/Makefile b/Makefile index 7da521fe..f6ac364c 100644 --- a/Makefile +++ b/Makefile @@ -99,7 +99,7 @@ export-sample-test-data: .PHONY: docs docs: - cargo docs --document-private-items --exclude rollup-node-chain-orchestrator + cargo +$(NIGHTLY_TOOLCHAIN) docs --document-private-items --exclude rollup-node-chain-orchestrator .PHONY: pr pr: lint test docs diff --git a/crates/chain-orchestrator/src/event.rs b/crates/chain-orchestrator/src/event.rs index 027dc118..2c061f20 100644 --- a/crates/chain-orchestrator/src/event.rs +++ b/crates/chain-orchestrator/src/event.rs @@ -14,6 +14,8 @@ use scroll_network::NewBlockWithPeer; pub enum ChainOrchestratorEvent { /// A received block failed the consensus checks. BlockFailedConsensusChecks(B256, PeerId), + /// A finalized block was received from a peer. + L2FinalizedBlockReceived(B256, PeerId), /// A new block has been received from the network but we have insufficient data to process it /// due to being in optimistic mode. InsufficientDataForReceivedBlock(B256), diff --git a/crates/chain-orchestrator/src/lib.rs b/crates/chain-orchestrator/src/lib.rs index 64d5d657..8778d515 100644 --- a/crates/chain-orchestrator/src/lib.rs +++ b/crates/chain-orchestrator/src/lib.rs @@ -937,6 +937,17 @@ impl< ) -> Result, ChainOrchestratorError> { tracing::debug!(target: "scroll::chain_orchestrator", block_hash = ?block_with_peer.block.header.hash_slow(), block_number = ?block_with_peer.block.number, peer_id = ?block_with_peer.peer_id, "Received new block from peer"); + // Check we are not handling a finalized block. + if block_with_peer.block.header.number <= self.engine.fcs().finalized_block_info().number { + self.network + .handle() + .block_import_outcome(BlockImportOutcome::finalized_block(block_with_peer.peer_id)); + return Ok(Some(ChainOrchestratorEvent::L2FinalizedBlockReceived( + block_with_peer.block.header.hash_slow(), + block_with_peer.peer_id, + ))); + } + if let Err(err) = self.consensus.validate_new_block(&block_with_peer.block, &block_with_peer.signature) { diff --git a/crates/network/src/import.rs b/crates/network/src/import.rs index 281998b7..df723eb4 100644 --- a/crates/network/src/import.rs +++ b/crates/network/src/import.rs @@ -17,6 +17,11 @@ pub struct BlockImportOutcome { } impl BlockImportOutcome { + /// Creates a new `BlockImportOutcome` instance for a finalized block with the given peer ID. + pub fn finalized_block(peer: PeerId) -> Self { + Self { peer, result: Err(BlockImportError::L2FinalizedBlockReceived(peer)) } + } + /// Creates a new `BlockImportOutcome` instance for an invalid block with the given peer ID. pub fn invalid_block(peer: PeerId) -> Self { Self { peer, result: Err(BlockImportError::Validation(BlockValidationError::InvalidBlock)) } @@ -56,6 +61,8 @@ pub enum BlockImportError { Consensus(ConsensusError), /// An error occurred during block validation. Validation(BlockValidationError), + /// A finalized block was received from a peer. + L2FinalizedBlockReceived(PeerId), } /// A consensus related error that can occur during block import. diff --git a/crates/network/src/manager.rs b/crates/network/src/manager.rs index 613691bd..fc35d463 100644 --- a/crates/network/src/manager.rs +++ b/crates/network/src/manager.rs @@ -20,8 +20,8 @@ use reth_tokio_util::{EventSender, EventStream}; use rollup_node_primitives::{sig_encode_hash, BlockInfo}; use scroll_alloy_hardforks::ScrollHardforks; use scroll_wire::{ - NewBlock, ScrollWireConfig, ScrollWireEvent, ScrollWireManager, ScrollWireProtocolHandler, - LRU_CACHE_SIZE, + NewBlock, PeerBlockState, ScrollWireConfig, ScrollWireEvent, ScrollWireManager, + ScrollWireProtocolHandler, LRU_CACHE_SIZE, }; use std::{ pin::Pin, @@ -184,12 +184,26 @@ impl< // Compute the block hash. let hash = block.block.hash_slow(); - // Filter the peers that have not seen this block hash. + // Filter the peers that have not seen this block hash via either protocol. + // We iterate over all connected scroll-wire peers. let peers: Vec> = self .scroll_wire - .block_received_state() - .iter() - .filter_map(|(peer_id, blocks)| (!blocks.contains(&hash)).then_some(*peer_id)) + .connected_peers() + .filter_map(|peer_id| { + // Check if peer has seen this block via any protocol + let has_seen = self + .scroll_wire + .peer_block_state() + .get(peer_id) + .is_some_and(|state| state.has_seen(&hash)); + + // Only announce if peer hasn't seen this block + if !has_seen { + Some(*peer_id) + } else { + None + } + }) .collect(); // TODO: remove this once we deprecate l2geth. @@ -241,29 +255,34 @@ impl< let block_hash = block.hash_slow(); trace!(target: "scroll::network::manager", peer_id = ?peer_id, block = ?block_hash, signature = ?signature, "Received new block"); - // Update the state of the peer cache i.e. peer has seen this block. - self.scroll_wire - .block_received_state_mut() + // Check if this peer has already received this block via scroll-wire, if so + // penalize it. + let state = self + .scroll_wire + .peer_block_state_mut() .entry(peer_id) - .or_insert_with(|| LruCache::new(LRU_CACHE_SIZE)) - .insert(block_hash); + .or_insert_with(|| PeerBlockState::new(LRU_CACHE_SIZE)); + if state.has_seen_via_scroll_wire(&block_hash) { + tracing::warn!(target: "scroll::network::manager", peer_id = ?peer_id, block = ?block_hash, "Peer sent duplicate block via scroll-wire, penalizing"); + self.inner_network_handle.reputation_change( + peer_id, + reth_network_api::ReputationChangeKind::BadBlock, + ); + return None; + } else { + // Update the state: peer has seen this block via scroll-wire + state.insert_scroll_wire(block_hash); + } if self.blocks_seen.contains(&(block_hash, signature)) { - // Check if this peer has already sent this block to us, if so penalize it. - if self - .scroll_wire - .block_received_state() - .get(&peer_id) - .is_some_and(|cache| cache.contains(&block_hash)) - { - trace!(target: "scroll::network::manager", peer_id = ?peer_id, block = ?block_hash, "Peer sent duplicate block, penalizing"); - self.inner_network_handle.reputation_change( - peer_id, - reth_network_api::ReputationChangeKind::BadBlock, - ); - } None } else { + // Update the state: peer has seen this block via scroll-wire + self.scroll_wire + .peer_block_state_mut() + .entry(peer_id) + .or_insert_with(|| PeerBlockState::new(LRU_CACHE_SIZE)) + .insert_scroll_wire(block_hash); // Update the state of the block cache i.e. we have seen this block. self.blocks_seen.insert((block.hash_slow(), signature)); @@ -325,6 +344,11 @@ impl< self.inner_network_handle .reputation_change(peer, reth_network_api::ReputationChangeKind::BadBlock); } + Err(BlockImportError::L2FinalizedBlockReceived(peer)) => { + trace!(target: "scroll::network::manager", peer_id = ?peer, "Block import failed - finalized block received - penalizing peer"); + self.inner_network_handle + .reputation_change(peer, reth_network_api::ReputationChangeKind::BadBlock); + } } } @@ -355,31 +379,35 @@ impl< { let block_hash = block.hash_slow(); - // Update the state of the peer cache i.e. peer has seen this block. - self.scroll_wire - .block_received_state_mut() + // Check if this peer has already sent this block to us via eth-wire, if so penalize it. + let state = self + .scroll_wire + .peer_block_state_mut() .entry(peer_id) - .or_insert_with(|| LruCache::new(LRU_CACHE_SIZE)) - .insert(block_hash); + .or_insert_with(|| PeerBlockState::new(LRU_CACHE_SIZE)); + + if state.has_seen_via_eth_wire(&block_hash) { + tracing::warn!(target: "scroll::bridge::import", peer_id = ?peer_id, block = ?block_hash, "Peer sent duplicate block via eth-wire, penalizing"); + self.inner_network_handle + .reputation_change(peer_id, reth_network_api::ReputationChangeKind::BadBlock); + return None; + } else { + // Update the state: peer has seen this block via eth-wire + state.insert_eth_wire(block_hash); + } if self.blocks_seen.contains(&(block_hash, signature)) { - // Check if this peer has already sent this block to us, if so penalize it. - if self - .scroll_wire - .block_received_state() - .get(&peer_id) - .is_some_and(|cache| cache.contains(&block_hash)) - { - trace!(target: "scroll::bridge::import", peer_id = ?peer_id, block = ?block_hash, "Peer sent duplicate block, penalizing"); - self.inner_network_handle.reputation_change( - peer_id, - reth_network_api::ReputationChangeKind::BadBlock, - ); - } return None; } trace!(target: "scroll::bridge::import", peer_id = %peer_id, block_hash = %block_hash, signature = %signature.to_string(), extra_data = %extra_data.to_string(), "Received new block from eth-wire protocol"); + // Update the state: peer has seen this block via eth-wire + self.scroll_wire + .peer_block_state_mut() + .entry(peer_id) + .or_insert_with(|| PeerBlockState::new(LRU_CACHE_SIZE)) + .insert_eth_wire(block_hash); + // Update the state of the block cache i.e. we have seen this block. self.blocks_seen.insert((block_hash, signature)); Some(ScrollNetworkManagerEvent::NewBlock(NewBlockWithPeer { diff --git a/crates/scroll-wire/src/lib.rs b/crates/scroll-wire/src/lib.rs index 886b06ba..25f996f5 100644 --- a/crates/scroll-wire/src/lib.rs +++ b/crates/scroll-wire/src/lib.rs @@ -5,7 +5,7 @@ pub use config::ScrollWireConfig; mod connection; mod manager; -pub use manager::{ScrollWireManager, LRU_CACHE_SIZE}; +pub use manager::{PeerBlockState, ScrollWireManager, LRU_CACHE_SIZE}; mod protocol; pub use protocol::{NewBlock, ScrollWireEvent, ScrollWireProtocolHandler}; diff --git a/crates/scroll-wire/src/manager.rs b/crates/scroll-wire/src/manager.rs index 44ce413c..44750558 100644 --- a/crates/scroll-wire/src/manager.rs +++ b/crates/scroll-wire/src/manager.rs @@ -16,6 +16,71 @@ use tracing::trace; /// The size of the LRU cache used to track blocks that have been seen by peers. pub const LRU_CACHE_SIZE: u32 = 100; +/// Tracks which blocks a peer has seen via different protocols. +/// +/// This prevents duplicate block penalties when the same peer sends the same block +/// through both scroll-wire and eth-wire protocols (which is expected behavior). +/// Each protocol maintains its own LRU cache to detect true duplicates (same block +/// sent multiple times via the same protocol). +#[derive(Debug)] +pub struct PeerBlockState { + /// the blocks announced to the peer + announced: LruCache, + /// blocks received via scroll-wire protocol, this is used to penalize peers that send + /// duplicate blocks via scroll-wire. + scroll_wire_received: LruCache, + /// blocks received via eth-wire protocol, this is used to penalize peers that send duplicate + /// blocks via eth-wire. TODO: Remove this field once eth-wire protocol is deprecated and + /// l2geth is no longer supported. These entries are NOT automatically removed when + /// eth-wire connections close (since we can't detect that), but their memory is bounded by + /// `LRU_CACHE_SIZE` per peer (~6.4 KB per peer). + eth_wire_received: LruCache, +} + +impl PeerBlockState { + /// Creates a new `PeerBlockState` with the specified LRU cache capacity. + pub fn new(capacity: u32) -> Self { + Self { + announced: LruCache::new(capacity), + scroll_wire_received: LruCache::new(capacity), + eth_wire_received: LruCache::new(capacity), + } + } + + /// Check if peer knows about this block (either received or announced). + pub fn has_seen(&self, hash: &B256) -> bool { + self.announced.contains(hash) || + self.scroll_wire_received.contains(hash) || + self.eth_wire_received.contains(hash) + } + + /// Check if peer has received this block via scroll-wire specifically (for duplicate + /// detection). + pub fn has_seen_via_scroll_wire(&self, hash: &B256) -> bool { + self.scroll_wire_received.contains(hash) + } + + /// Check if peer has received this block via eth-wire specifically (for duplicate detection). + pub fn has_seen_via_eth_wire(&self, hash: &B256) -> bool { + self.eth_wire_received.contains(hash) + } + + /// Record that this peer has received a block via scroll-wire. + pub fn insert_scroll_wire(&mut self, hash: B256) { + self.scroll_wire_received.insert(hash); // Track for duplicate detection + } + + /// Record that this peer has received a block via eth-wire. + pub fn insert_eth_wire(&mut self, hash: B256) { + self.eth_wire_received.insert(hash); // Track for duplicate detection + } + + /// Record that we have announced a block to this peer. + pub fn insert_announced(&mut self, hash: B256) { + self.announced.insert(hash); // Only update unified announced, not protocol-specific + } +} + /// A manager for the `ScrollWire` protocol. #[derive(Debug)] pub struct ScrollWireManager { @@ -23,9 +88,8 @@ pub struct ScrollWireManager { events: UnboundedReceiverStream, /// A map of connections to peers. connections: HashMap>, - /// A map of the state of the scroll wire protocol. Currently the state for each peer - /// is just a cache of the last 100 blocks received from each peer. - block_received_state: HashMap>, + /// Unified state tracking block state and blocks received from each peer via both protocols. + peer_block_state: HashMap, } impl ScrollWireManager { @@ -35,7 +99,7 @@ impl ScrollWireManager { Self { events: events.into(), connections: HashMap::new(), - block_received_state: HashMap::new(), + peer_block_state: HashMap::new(), } } @@ -43,26 +107,37 @@ impl ScrollWireManager { pub fn announce_block(&mut self, peer_id: PeerId, block: &NewBlock) { if let Entry::Occupied(to_connection) = self.connections.entry(peer_id) { // We send the block to the peer. If we receive an error we remove the peer from the - // connections map and delete its state as the connection is no longer valid. + // connections map as the connection is no longer valid. if to_connection.get().send(ScrollMessage::new_block(block.clone())).is_err() { trace!(target: "scroll::wire::manager", peer_id = %peer_id, "Failed to send block to peer - dropping peer."); - self.block_received_state.remove(&peer_id); + // Clean up both connection and state when scroll-wire peer disconnects + self.peer_block_state.remove(&peer_id); to_connection.remove(); } else { - // Upon successful sending of the block we update the state of the peer. trace!(target: "scroll::wire::manager", peer_id = %peer_id, "Announced block to peer"); + // Record that we announced this block to the peer + let block_hash = block.block.hash_slow(); + self.peer_block_state + .entry(peer_id) + .or_insert_with(|| PeerBlockState::new(LRU_CACHE_SIZE)) + .insert_announced(block_hash); } } } - /// Returns the state of the `ScrollWire` protocol. - pub const fn block_received_state(&self) -> &HashMap> { - &self.block_received_state + /// Returns an iterator over the connected peer IDs. + pub fn connected_peers(&self) -> impl Iterator { + self.connections.keys() + } + + /// Returns a reference to the peer block state map. + pub const fn peer_block_state(&self) -> &HashMap { + &self.peer_block_state } - /// Returns a mutable reference to the state of the `ScrollWire` protocol. - pub const fn block_received_state_mut(&mut self) -> &mut HashMap> { - &mut self.block_received_state + /// Returns a mutable reference to the peer block state map. + pub const fn peer_block_state_mut(&mut self) -> &mut HashMap { + &mut self.peer_block_state } } @@ -94,7 +169,6 @@ impl Future for ScrollWireManager { direction ); this.connections.insert(peer_id, to_connection); - this.block_received_state.insert(peer_id, LruCache::new(100)); } None => break, } From 9ccf17a876ea29a1212c91e9868aa0d6d042aa5a Mon Sep 17 00:00:00 2001 From: Morty Date: Mon, 19 Jan 2026 17:15:38 +0800 Subject: [PATCH 4/4] improve --- crates/network/src/manager.rs | 2 +- crates/scroll-wire/src/manager.rs | 22 ++++++---------------- 2 files changed, 7 insertions(+), 17 deletions(-) diff --git a/crates/network/src/manager.rs b/crates/network/src/manager.rs index fc35d463..52d986df 100644 --- a/crates/network/src/manager.rs +++ b/crates/network/src/manager.rs @@ -241,7 +241,7 @@ impl< // Announce block to the filtered set of peers for peer_id in peers { trace!(target: "scroll::network::manager", peer_id = %peer_id, block_number = %block.block.header.number, block_hash = %hash, "Announcing new block to peer"); - self.scroll_wire.announce_block(peer_id, &block); + self.scroll_wire.announce_block(peer_id, &block, hash); } } diff --git a/crates/scroll-wire/src/manager.rs b/crates/scroll-wire/src/manager.rs index 44750558..6fc3541d 100644 --- a/crates/scroll-wire/src/manager.rs +++ b/crates/scroll-wire/src/manager.rs @@ -16,24 +16,16 @@ use tracing::trace; /// The size of the LRU cache used to track blocks that have been seen by peers. pub const LRU_CACHE_SIZE: u32 = 100; -/// Tracks which blocks a peer has seen via different protocols. -/// -/// This prevents duplicate block penalties when the same peer sends the same block -/// through both scroll-wire and eth-wire protocols (which is expected behavior). -/// Each protocol maintains its own LRU cache to detect true duplicates (same block -/// sent multiple times via the same protocol). +/// Tracks block announced and received state for a peer. #[derive(Debug)] pub struct PeerBlockState { - /// the blocks announced to the peer + /// blocks announced to the peer announced: LruCache, /// blocks received via scroll-wire protocol, this is used to penalize peers that send /// duplicate blocks via scroll-wire. scroll_wire_received: LruCache, /// blocks received via eth-wire protocol, this is used to penalize peers that send duplicate - /// blocks via eth-wire. TODO: Remove this field once eth-wire protocol is deprecated and - /// l2geth is no longer supported. These entries are NOT automatically removed when - /// eth-wire connections close (since we can't detect that), but their memory is bounded by - /// `LRU_CACHE_SIZE` per peer (~6.4 KB per peer). + /// blocks via eth-wire. eth_wire_received: LruCache, } @@ -104,23 +96,21 @@ impl ScrollWireManager { } /// Announces a new block to the specified peer. - pub fn announce_block(&mut self, peer_id: PeerId, block: &NewBlock) { + pub fn announce_block(&mut self, peer_id: PeerId, block: &NewBlock, hash: B256) { if let Entry::Occupied(to_connection) = self.connections.entry(peer_id) { // We send the block to the peer. If we receive an error we remove the peer from the - // connections map as the connection is no longer valid. + // connections map and peer_block_state as the connection is no longer valid. if to_connection.get().send(ScrollMessage::new_block(block.clone())).is_err() { trace!(target: "scroll::wire::manager", peer_id = %peer_id, "Failed to send block to peer - dropping peer."); - // Clean up both connection and state when scroll-wire peer disconnects self.peer_block_state.remove(&peer_id); to_connection.remove(); } else { trace!(target: "scroll::wire::manager", peer_id = %peer_id, "Announced block to peer"); // Record that we announced this block to the peer - let block_hash = block.block.hash_slow(); self.peer_block_state .entry(peer_id) .or_insert_with(|| PeerBlockState::new(LRU_CACHE_SIZE)) - .insert_announced(block_hash); + .insert_announced(hash); } } }