add hive tests

This commit is contained in:
healthykim 2026-04-14 15:28:24 +02:00
parent f7e46cc505
commit 6bbcbe6a65
2 changed files with 302 additions and 2 deletions

View file

@ -175,9 +175,9 @@ func (c *Conn) ReadEth() (any, error) {
case eth.PooledTransactionsMsg:
msg = new(eth.PooledTransactionsPacket)
case eth.GetCellsMsg:
msg = new(eth.GetCellsRequest)
msg = new(eth.GetCellsRequestPacket)
case eth.CellsMsg:
msg = new(eth.CellsResponse)
msg = new(eth.CellsPacket)
default:
panic(fmt.Sprintf("unhandled eth msg code %d", code))
}

View file

@ -21,6 +21,7 @@ import (
"crypto/rand"
"errors"
"fmt"
"os"
"reflect"
"sync"
"time"
@ -91,6 +92,10 @@ func (s *Suite) EthTests() []utesting.Test {
{Name: "BlobViolations", Fn: s.TestBlobViolations},
{Name: "TestBlobTxWithoutSidecar", Fn: s.TestBlobTxWithoutSidecar},
{Name: "TestBlobTxWithMismatchedSidecar", Fn: s.TestBlobTxWithMismatchedSidecar},
// test eth/72 blob txs
{Name: "BlobTxAvailabilityFailure", Fn: s.TestBlobTxAvailabilityFailure},
{Name: "GetCells", Fn: s.TestGetCells},
{Name: "BlobTxWithInvalidCells", Fn: s.TestBlobTxWithInvalidCells},
}
}
@ -1210,3 +1215,298 @@ func (s *Suite) testBadBlobTx(t *utesting.T, tx *types.Transaction, badTx *types
t.Fatalf("%v", err)
}
}
func (s *Suite) TestBlobTxAvailabilityFailure(t *utesting.T) {
t.Log(`This test announces 4 blob txs from a single peer. With fetchProbability 0.15,
there will be at least one partial fetch (1-0.15^4). When only 1 peer announced availability,
partial fetch GetCells should never arrive. Any GetCells that does arrive must be a full fetch.`)
if err := s.engine.sendForkchoiceUpdated(); err != nil {
t.Fatalf("send fcu failed: %v", err)
}
txs := s.makeBlobTxs(4, 4, 0x30)
conn, err := s.dial()
if err != nil {
t.Fatalf("dial failed: %v", err)
}
defer conn.Close()
if err := conn.peer(s.chain, nil); err != nil {
t.Fatalf("peering failed: %v", err)
}
// Announce all 4 txs from a single peer.
hashes := make([]common.Hash, len(txs))
txTypes := make([]byte, len(txs))
sizes := make([]uint32, len(txs))
for i, tx := range txs {
hashes[i] = tx.Hash()
txTypes[i] = types.BlobTxType
sizes[i] = uint32(tx.WithoutBlob().Size())
}
ann := eth.NewPooledTransactionHashesPacket72{
Types: txTypes,
Sizes: sizes,
Hashes: hashes,
Mask: *types.CustodyBitmapAll,
}
if err := conn.Write(ethProto, eth.NewPooledTransactionHashesMsg, ann); err != nil {
t.Fatalf("announce failed: %v", err)
}
// Read messages for a short period. Any GetCells that arrives must be
// a full fetch request (mask >= DataPerBlob), not a partial fetch.
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
for {
select {
case <-ctx.Done():
return
default:
}
msg, err := conn.ReadEth()
if err != nil {
if errors.Is(err, os.ErrDeadlineExceeded) {
return // timeout, test passed
}
t.Fatalf("unexpected error: %v", err)
}
switch req := msg.(type) {
case *eth.GetCellsRequestPacket:
if req.Mask.OneCount() < kzg4844.DataPerBlob {
t.Fatalf("received partial GetCells request with only %d cells from single peer announcement", req.Mask.OneCount())
}
case *eth.GetPooledTransactionsPacket:
var txsWithoutBlob []*types.Transaction
for _, h := range req.GetPooledTransactionsRequest {
for _, tx := range txs {
if tx.Hash() == h {
txsWithoutBlob = append(txsWithoutBlob, tx.WithoutBlob())
}
}
}
encTxs, _ := rlp.EncodeToRawList(txsWithoutBlob)
conn.Write(ethProto, eth.PooledTransactionsMsg, eth.PooledTransactionsPacket{
RequestId: req.RequestId,
List: encTxs,
})
}
}
}
// buildCells extracts cells at mask indices from the original tx's blobs
func buildCells(sidecar *types.BlobTxSidecar, mask types.CustodyBitmap) []kzg4844.Cell {
allCells, _ := kzg4844.ComputeCells(sidecar.Blobs)
indices := mask.Indices()
result := make([]kzg4844.Cell, 0, len(sidecar.Blobs)*len(indices))
for b := 0; b < len(sidecar.Blobs); b++ {
for _, idx := range indices {
result = append(result, allCells[b*kzg4844.CellsPerBlob+int(idx)])
}
}
return result
}
// readAnyFrom waits for a message of type T on any of the given conns
// and returns the packet and the conn it came from.
func readAnyFrom[T any](conns ...*Conn) (*T, *Conn, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
type result struct {
pkt *T
c *Conn
}
ch := make(chan result, len(conns))
errCh := make(chan error, len(conns))
for _, c := range conns {
go func(c *Conn) {
pkt, err := readUntil[T](ctx, c)
if err != nil {
if !errors.Is(err, context.Canceled) {
errCh <- err
}
return
}
ch <- result{pkt, c}
}(c)
}
select {
case r := <-ch:
return r.pkt, r.c, nil
case err := <-errCh:
return nil, nil, err
}
}
func (s *Suite) TestGetCells(t *utesting.T) {
t.Log(`This test checks that blob tx announcements trigger GetCells requests,
and that providing valid cells causes the tx to enter the pool.`)
if err := s.engine.sendForkchoiceUpdated(); err != nil {
t.Fatalf("send fcu failed: %v", err)
}
tx := s.makeBlobTxs(1, 1, 0x31)[0]
sidecar := tx.BlobTxSidecar()
tx = tx.WithoutBlob()
// Two peers ensure GetCells arrives regardless of full/partial fetch path.
conn1, err := s.dial()
if err != nil {
t.Fatalf("dial failed: %v", err)
}
defer conn1.Close()
if err := conn1.peer(s.chain, nil); err != nil {
t.Fatalf("peering failed: %v", err)
}
conn2, err := s.dial()
if err != nil {
t.Fatalf("dial failed: %v", err)
}
defer conn2.Close()
if err := conn2.peer(s.chain, nil); err != nil {
t.Fatalf("peering failed: %v", err)
}
ann := eth.NewPooledTransactionHashesPacket72{
Types: []byte{types.BlobTxType},
Sizes: []uint32{uint32(tx.Size())},
Hashes: []common.Hash{tx.Hash()},
Mask: *types.CustodyBitmapAll,
}
if err := conn1.Write(ethProto, eth.NewPooledTransactionHashesMsg, ann); err != nil {
t.Fatalf("conn1 announce failed: %v", err)
}
if err := conn2.Write(ethProto, eth.NewPooledTransactionHashesMsg, ann); err != nil {
t.Fatalf("conn2 announce failed: %v", err)
}
// Wait for GetPooledTransactions on either conn, respond with tx (without blobs).
pooledReq, pc, err := readAnyFrom[eth.GetPooledTransactionsPacket](conn1, conn2)
if err != nil {
t.Fatalf("failed to read GetPooledTransactions: %v", err)
}
encTxs, _ := rlp.EncodeToRawList([]*types.Transaction{tx})
resp := eth.PooledTransactionsPacket{RequestId: pooledReq.RequestId, List: encTxs}
if err := pc.Write(ethProto, eth.PooledTransactionsMsg, resp); err != nil {
t.Fatalf("writing pooled tx response failed: %v", err)
}
// Wait for GetCells request on either conn.
cellsReq, cc, err := readAnyFrom[eth.GetCellsRequestPacket](conn1, conn2)
if err != nil {
t.Fatalf("failed to read GetCells: %v", err)
}
if len(cellsReq.Hashes) == 0 || cellsReq.Hashes[0] != tx.Hash() {
t.Fatalf("GetCells for wrong hash: %v", cellsReq.Hashes)
}
// Respond with valid cells matching the requested mask.
cells := buildCells(sidecar, cellsReq.Mask)
cellsResp := eth.CellsPacket{
RequestId: cellsReq.RequestId,
CellsResponse: eth.CellsResponse{
Hashes: []common.Hash{tx.Hash()},
Cells: [][]kzg4844.Cell{cells},
Mask: cellsReq.Mask,
},
}
if err := cc.Write(ethProto, eth.CellsMsg, cellsResp); err != nil {
t.Fatalf("writing cells response failed: %v", err)
}
// Either peer should not be disconnected after providing valid data.
if readUntilDisconnect(cc) {
t.Fatalf("unexpected disconnect on cells-providing peer")
}
}
func (s *Suite) TestBlobTxWithInvalidCells(t *utesting.T) {
t.Log(`This test checks that a peer responding to GetCells with invalid cells is disconnected,
while the other peer is not.`)
if err := s.engine.sendForkchoiceUpdated(); err != nil {
t.Fatalf("send fcu failed: %v", err)
}
tx := s.makeBlobTxs(1, 1, 0x32)[0].WithoutBlob()
conn1, err := s.dial()
if err != nil {
t.Fatalf("dial failed: %v", err)
}
defer conn1.Close()
if err := conn1.peer(s.chain, nil); err != nil {
t.Fatalf("peering failed: %v", err)
}
conn2, err := s.dial()
if err != nil {
t.Fatalf("dial failed: %v", err)
}
defer conn2.Close()
if err := conn2.peer(s.chain, nil); err != nil {
t.Fatalf("peering failed: %v", err)
}
ann := eth.NewPooledTransactionHashesPacket72{
Types: []byte{types.BlobTxType},
Sizes: []uint32{uint32(tx.Size())},
Hashes: []common.Hash{tx.Hash()},
Mask: *types.CustodyBitmapAll,
}
if err := conn1.Write(ethProto, eth.NewPooledTransactionHashesMsg, ann); err != nil {
t.Fatalf("conn1 announce failed: %v", err)
}
if err := conn2.Write(ethProto, eth.NewPooledTransactionHashesMsg, ann); err != nil {
t.Fatalf("conn2 announce failed: %v", err)
}
pooledReq, pc, err := readAnyFrom[eth.GetPooledTransactionsPacket](conn1, conn2)
if err != nil {
t.Fatalf("failed to read GetPooledTransactions: %v", err)
}
encTxs, _ := rlp.EncodeToRawList([]*types.Transaction{tx})
if err := pc.Write(ethProto, eth.PooledTransactionsMsg,
eth.PooledTransactionsPacket{RequestId: pooledReq.RequestId, List: encTxs}); err != nil {
t.Fatalf("writing pooled tx response failed: %v", err)
}
cellsReq, cc, err := readAnyFrom[eth.GetCellsRequestPacket](conn1, conn2)
if err != nil {
t.Fatalf("failed to read GetCells: %v", err)
}
// Respond with corrupted cells (all zero bytes).
blobCount := len(tx.BlobTxSidecar().Blobs)
corrupted := make([]kzg4844.Cell, blobCount*cellsReq.Mask.OneCount())
badResp := eth.CellsPacket{
RequestId: cellsReq.RequestId,
CellsResponse: eth.CellsResponse{
Hashes: []common.Hash{tx.Hash()},
Cells: [][]kzg4844.Cell{corrupted},
Mask: cellsReq.Mask,
},
}
if err := cc.Write(ethProto, eth.CellsMsg, badResp); err != nil {
t.Fatalf("writing bad cells response failed: %v", err)
}
// The peer that sent corrupted cells must be disconnected.
if !readUntilDisconnect(cc) {
t.Fatalf("expected peer to be disconnected after invalid cells")
}
// The innocent peer must stay connected.
otherConn := conn1
if cc == conn1 {
otherConn = conn2
}
if readUntilDisconnect(otherConn) {
t.Fatalf("innocent peer should not be disconnected")
}
}