go-ethereum/eth/handler_eth.go
Csaba Kiraly 9f2575efeb eth/txtracker: add minimal tracker as inclusion stats provider
Minimal txtracker that records which peer delivered each transaction
and credits peers when their transactions appear on chain. Provides
the PeerInclusionStats needed by the dropper's protection logic.

Design:
- NotifyReceived(peer, txs): records deliverer per tx hash (called
  from handler_eth.go when tx bodies arrive via P2P)
- Subscribes to ChainHeadEvent, fetches block txs, credits the
  delivering peer for each included tx
- Per-peer EMA of recent inclusions (alpha=0.05), updated every block
- LRU eviction at 262K entries to bound memory
- Mutex-based (not channel-based) for simplicity — the hot path
  (NotifyReceived) is a fast map insert

Wired into the dropper via an adapter callback in backend.go that
converts txtracker.PeerStats to the dropper's PeerInclusionStats.
2026-04-10 08:30:21 +02:00

123 lines
4.2 KiB
Go

// Copyright 2020 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package eth
import (
"errors"
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/p2p/enode"
)
// ethHandler implements the eth.Backend interface to handle the various network
// packets that are sent as replies or broadcasts.
type ethHandler handler
func (h *ethHandler) Chain() *core.BlockChain { return h.chain }
func (h *ethHandler) TxPool() eth.TxPool { return h.txpool }
// RunPeer is invoked when a peer joins on the `eth` protocol.
func (h *ethHandler) RunPeer(peer *eth.Peer, hand eth.Handler) error {
return (*handler)(h).runEthPeer(peer, hand)
}
// PeerInfo retrieves all known `eth` information about a peer.
func (h *ethHandler) PeerInfo(id enode.ID) interface{} {
if p := h.peers.peer(id.String()); p != nil {
return p.info()
}
return nil
}
// AcceptTxs retrieves whether transaction processing is enabled on the node
// or if inbound transactions should simply be dropped.
func (h *ethHandler) AcceptTxs() bool {
return h.synced.Load()
}
// Handle is invoked from a peer's message handler when it receives a new remote
// message that the handler couldn't consume and serve itself.
func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
// Consume any broadcasts and announces, forwarding the rest to the downloader
switch packet := packet.(type) {
case *eth.NewPooledTransactionHashesPacket:
return h.txFetcher.Notify(peer.ID(), packet.Types, packet.Sizes, packet.Hashes)
case *eth.TransactionsPacket:
txs, err := packet.Items()
if err != nil {
return fmt.Errorf("Transactions: %v", err)
}
h.txTracker.NotifyReceived(peer.ID(), txs)
if err := handleTransactions(peer, txs, true); err != nil {
return fmt.Errorf("Transactions: %v", err)
}
return h.txFetcher.Enqueue(peer.ID(), txs, false)
case *eth.PooledTransactionsPacket:
txs, err := packet.List.Items()
if err != nil {
return fmt.Errorf("PooledTransactions: %v", err)
}
h.txTracker.NotifyReceived(peer.ID(), txs)
if err := handleTransactions(peer, txs, false); err != nil {
return fmt.Errorf("PooledTransactions: %v", err)
}
return h.txFetcher.Enqueue(peer.ID(), txs, true)
default:
return fmt.Errorf("unexpected eth packet type: %T", packet)
}
}
// handleTransactions marks all given transactions as known to the peer
// and performs basic validations.
func handleTransactions(peer *eth.Peer, list []*types.Transaction, directBroadcast bool) error {
seen := make(map[common.Hash]struct{})
for _, tx := range list {
if tx.Type() == types.BlobTxType {
if directBroadcast {
return errors.New("disallowed broadcast blob transaction")
} else {
// If we receive any blob transactions missing sidecars, or with
// sidecars that don't correspond to the versioned hashes reported
// in the header, disconnect from the sending peer.
if tx.BlobTxSidecar() == nil {
return errors.New("received sidecar-less blob transaction")
}
if err := tx.BlobTxSidecar().ValidateBlobCommitmentHashes(tx.BlobHashes()); err != nil {
return err
}
}
}
// Check for duplicates.
hash := tx.Hash()
if _, exists := seen[hash]; exists {
return fmt.Errorf("multiple copies of the same hash %v", hash)
}
seen[hash] = struct{}{}
// Mark as known.
peer.MarkTransaction(hash)
}
return nil
}