1
0
Fork 0
forked from forks/go-ethereum

core/bloombits: remove old bloombits logic and chain indexer (#31081)

This PR is #3 of a 3-part series that implements the new log index
intended to replace core/bloombits.
Based on https://github.com/ethereum/go-ethereum/pull/31079 and
https://github.com/ethereum/go-ethereum/pull/31080
Replaces https://github.com/ethereum/go-ethereum/pull/30370

This part removes the old bloombits package and the chain indexer that
was only used by bloombits. Deletes the old bloombits database.

FilterMaps data structure explanation:
https://gist.github.com/zsfelfoldi/a60795f9da7ae6422f28c7a34e02a07e

Log index generator code overview:
https://gist.github.com/zsfelfoldi/97105dff0b1a4f5ed557924a24b9b9e7

Search pattern matcher code overview:
https://gist.github.com/zsfelfoldi/5981735641c956afb18065e84f8aff34

Note that the possibility of a tree hashing scheme and remote proof
protocol are mentioned in the documents above but they are not exactly
specified yet. These specs are WIP and will be finalized after the local
log indexer/filter code is finalized and merged.

---------

Co-authored-by: Felix Lange <fjl@twurst.com>
This commit is contained in:
Felföldi Zsolt 2025-03-21 10:47:58 +01:00 committed by GitHub
parent 7fed9584b5
commit 07cca7ab9f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
21 changed files with 80 additions and 2561 deletions

View file

@ -906,7 +906,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
rawdb.DeleteBody(db, hash, num)
rawdb.DeleteReceipts(db, hash, num)
}
// Todo(rjl493456442) txlookup, bloombits, etc
// Todo(rjl493456442) txlookup, log index, etc
}
// If SetHead was only called as a chain reparation method, try to skip
// touching the header chain altogether, unless the freezer is broken

View file

@ -1,92 +0,0 @@
// Copyright 2021 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package core
import (
"context"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/bitutil"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
)
const (
// bloomThrottling is the time to wait between processing two consecutive index
// sections. It's useful during chain upgrades to prevent disk overload.
bloomThrottling = 100 * time.Millisecond
)
// BloomIndexer implements a core.ChainIndexer, building up a rotated bloom bits index
// for the Ethereum header bloom filters, permitting blazing fast filtering.
type BloomIndexer struct {
size uint64 // section size to generate bloombits for
db ethdb.Database // database instance to write index data and metadata into
gen *bloombits.Generator // generator to rotate the bloom bits crating the bloom index
section uint64 // Section is the section number being processed currently
head common.Hash // Head is the hash of the last header processed
}
// NewBloomIndexer returns a chain indexer that generates bloom bits data for the
// canonical chain for fast logs filtering.
func NewBloomIndexer(db ethdb.Database, size, confirms uint64) *ChainIndexer {
backend := &BloomIndexer{
db: db,
size: size,
}
table := rawdb.NewTable(db, string(rawdb.BloomBitsIndexPrefix))
return NewChainIndexer(db, table, backend, size, confirms, bloomThrottling, "bloombits")
}
// Reset implements core.ChainIndexerBackend, starting a new bloombits index
// section.
func (b *BloomIndexer) Reset(ctx context.Context, section uint64, lastSectionHead common.Hash) error {
gen, err := bloombits.NewGenerator(uint(b.size))
b.gen, b.section, b.head = gen, section, common.Hash{}
return err
}
// Process implements core.ChainIndexerBackend, adding a new header's bloom into
// the index.
func (b *BloomIndexer) Process(ctx context.Context, header *types.Header) error {
b.gen.AddBloom(uint(header.Number.Uint64()-b.section*b.size), header.Bloom)
b.head = header.Hash()
return nil
}
// Commit implements core.ChainIndexerBackend, finalizing the bloom section and
// writing it out into the database.
func (b *BloomIndexer) Commit() error {
batch := b.db.NewBatchWithSize((int(b.size) / 8) * types.BloomBitLength)
for i := 0; i < types.BloomBitLength; i++ {
bits, err := b.gen.Bitset(uint(i))
if err != nil {
return err
}
rawdb.WriteBloomBits(batch, uint(i), b.section, b.head, bitutil.CompressBytes(bits))
}
return batch.Write()
}
// Prune returns an empty error since we don't support pruning here.
func (b *BloomIndexer) Prune(threshold uint64) error {
return nil
}

View file

@ -1,18 +0,0 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Package bloombits implements bloom filtering on batches of data.
package bloombits

View file

@ -1,98 +0,0 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package bloombits
import (
"errors"
"github.com/ethereum/go-ethereum/core/types"
)
var (
// errSectionOutOfBounds is returned if the user tried to add more bloom filters
// to the batch than available space, or if tries to retrieve above the capacity.
errSectionOutOfBounds = errors.New("section out of bounds")
// errBloomBitOutOfBounds is returned if the user tried to retrieve specified
// bit bloom above the capacity.
errBloomBitOutOfBounds = errors.New("bloom bit out of bounds")
)
// Generator takes a number of bloom filters and generates the rotated bloom bits
// to be used for batched filtering.
type Generator struct {
blooms [types.BloomBitLength][]byte // Rotated blooms for per-bit matching
sections uint // Number of sections to batch together
nextSec uint // Next section to set when adding a bloom
}
// NewGenerator creates a rotated bloom generator that can iteratively fill a
// batched bloom filter's bits.
func NewGenerator(sections uint) (*Generator, error) {
if sections%8 != 0 {
return nil, errors.New("section count not multiple of 8")
}
b := &Generator{sections: sections}
for i := 0; i < types.BloomBitLength; i++ {
b.blooms[i] = make([]byte, sections/8)
}
return b, nil
}
// AddBloom takes a single bloom filter and sets the corresponding bit column
// in memory accordingly.
func (b *Generator) AddBloom(index uint, bloom types.Bloom) error {
// Make sure we're not adding more bloom filters than our capacity
if b.nextSec >= b.sections {
return errSectionOutOfBounds
}
if b.nextSec != index {
return errors.New("bloom filter with unexpected index")
}
// Rotate the bloom and insert into our collection
byteIndex := b.nextSec / 8
bitIndex := byte(7 - b.nextSec%8)
for byt := 0; byt < types.BloomByteLength; byt++ {
bloomByte := bloom[types.BloomByteLength-1-byt]
if bloomByte == 0 {
continue
}
base := 8 * byt
b.blooms[base+7][byteIndex] |= ((bloomByte >> 7) & 1) << bitIndex
b.blooms[base+6][byteIndex] |= ((bloomByte >> 6) & 1) << bitIndex
b.blooms[base+5][byteIndex] |= ((bloomByte >> 5) & 1) << bitIndex
b.blooms[base+4][byteIndex] |= ((bloomByte >> 4) & 1) << bitIndex
b.blooms[base+3][byteIndex] |= ((bloomByte >> 3) & 1) << bitIndex
b.blooms[base+2][byteIndex] |= ((bloomByte >> 2) & 1) << bitIndex
b.blooms[base+1][byteIndex] |= ((bloomByte >> 1) & 1) << bitIndex
b.blooms[base][byteIndex] |= (bloomByte & 1) << bitIndex
}
b.nextSec++
return nil
}
// Bitset returns the bit vector belonging to the given bit index after all
// blooms have been added.
func (b *Generator) Bitset(idx uint) ([]byte, error) {
if b.nextSec != b.sections {
return nil, errors.New("bloom not fully generated yet")
}
if idx >= types.BloomBitLength {
return nil, errBloomBitOutOfBounds
}
return b.blooms[idx], nil
}

View file

@ -1,100 +0,0 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package bloombits
import (
"bytes"
crand "crypto/rand"
"math/rand"
"testing"
"github.com/ethereum/go-ethereum/core/types"
)
// Tests that batched bloom bits are correctly rotated from the input bloom
// filters.
func TestGenerator(t *testing.T) {
// Generate the input and the rotated output
var input, output [types.BloomBitLength][types.BloomByteLength]byte
for i := 0; i < types.BloomBitLength; i++ {
for j := 0; j < types.BloomBitLength; j++ {
bit := byte(rand.Int() % 2)
input[i][j/8] |= bit << byte(7-j%8)
output[types.BloomBitLength-1-j][i/8] |= bit << byte(7-i%8)
}
}
// Crunch the input through the generator and verify the result
gen, err := NewGenerator(types.BloomBitLength)
if err != nil {
t.Fatalf("failed to create bloombit generator: %v", err)
}
for i, bloom := range input {
if err := gen.AddBloom(uint(i), bloom); err != nil {
t.Fatalf("bloom %d: failed to add: %v", i, err)
}
}
for i, want := range output {
have, err := gen.Bitset(uint(i))
if err != nil {
t.Fatalf("output %d: failed to retrieve bits: %v", i, err)
}
if !bytes.Equal(have, want[:]) {
t.Errorf("output %d: bit vector mismatch have %x, want %x", i, have, want)
}
}
}
func BenchmarkGenerator(b *testing.B) {
var input [types.BloomBitLength][types.BloomByteLength]byte
b.Run("empty", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Crunch the input through the generator and verify the result
gen, err := NewGenerator(types.BloomBitLength)
if err != nil {
b.Fatalf("failed to create bloombit generator: %v", err)
}
for j, bloom := range &input {
if err := gen.AddBloom(uint(j), bloom); err != nil {
b.Fatalf("bloom %d: failed to add: %v", i, err)
}
}
}
})
for i := 0; i < types.BloomBitLength; i++ {
crand.Read(input[i][:])
}
b.Run("random", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Crunch the input through the generator and verify the result
gen, err := NewGenerator(types.BloomBitLength)
if err != nil {
b.Fatalf("failed to create bloombit generator: %v", err)
}
for j, bloom := range &input {
if err := gen.AddBloom(uint(j), bloom); err != nil {
b.Fatalf("bloom %d: failed to add: %v", i, err)
}
}
}
})
}

View file

@ -1,649 +0,0 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package bloombits
import (
"bytes"
"context"
"errors"
"math"
"sort"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common/bitutil"
"github.com/ethereum/go-ethereum/crypto"
)
// bloomIndexes represents the bit indexes inside the bloom filter that belong
// to some key.
type bloomIndexes [3]uint
// calcBloomIndexes returns the bloom filter bit indexes belonging to the given key.
func calcBloomIndexes(b []byte) bloomIndexes {
b = crypto.Keccak256(b)
var idxs bloomIndexes
for i := 0; i < len(idxs); i++ {
idxs[i] = (uint(b[2*i])<<8)&2047 + uint(b[2*i+1])
}
return idxs
}
// partialMatches with a non-nil vector represents a section in which some sub-
// matchers have already found potential matches. Subsequent sub-matchers will
// binary AND their matches with this vector. If vector is nil, it represents a
// section to be processed by the first sub-matcher.
type partialMatches struct {
section uint64
bitset []byte
}
// Retrieval represents a request for retrieval task assignments for a given
// bit with the given number of fetch elements, or a response for such a request.
// It can also have the actual results set to be used as a delivery data struct.
//
// The context and error fields are used by the light client to terminate matching
// early if an error is encountered on some path of the pipeline.
type Retrieval struct {
Bit uint
Sections []uint64
Bitsets [][]byte
Context context.Context
Error error
}
// Matcher is a pipelined system of schedulers and logic matchers which perform
// binary AND/OR operations on the bit-streams, creating a stream of potential
// blocks to inspect for data content.
type Matcher struct {
sectionSize uint64 // Size of the data batches to filter on
filters [][]bloomIndexes // Filter the system is matching for
schedulers map[uint]*scheduler // Retrieval schedulers for loading bloom bits
retrievers chan chan uint // Retriever processes waiting for bit allocations
counters chan chan uint // Retriever processes waiting for task count reports
retrievals chan chan *Retrieval // Retriever processes waiting for task allocations
deliveries chan *Retrieval // Retriever processes waiting for task response deliveries
running atomic.Bool // Atomic flag whether a session is live or not
}
// NewMatcher creates a new pipeline for retrieving bloom bit streams and doing
// address and topic filtering on them. Setting a filter component to `nil` is
// allowed and will result in that filter rule being skipped (OR 0x11...1).
func NewMatcher(sectionSize uint64, filters [][][]byte) *Matcher {
// Create the matcher instance
m := &Matcher{
sectionSize: sectionSize,
schedulers: make(map[uint]*scheduler),
retrievers: make(chan chan uint),
counters: make(chan chan uint),
retrievals: make(chan chan *Retrieval),
deliveries: make(chan *Retrieval),
}
// Calculate the bloom bit indexes for the groups we're interested in
m.filters = nil
for _, filter := range filters {
// Gather the bit indexes of the filter rule, special casing the nil filter
if len(filter) == 0 {
continue
}
bloomBits := make([]bloomIndexes, len(filter))
for i, clause := range filter {
if clause == nil {
bloomBits = nil
break
}
bloomBits[i] = calcBloomIndexes(clause)
}
// Accumulate the filter rules if no nil rule was within
if bloomBits != nil {
m.filters = append(m.filters, bloomBits)
}
}
// For every bit, create a scheduler to load/download the bit vectors
for _, bloomIndexLists := range m.filters {
for _, bloomIndexList := range bloomIndexLists {
for _, bloomIndex := range bloomIndexList {
m.addScheduler(bloomIndex)
}
}
}
return m
}
// addScheduler adds a bit stream retrieval scheduler for the given bit index if
// it has not existed before. If the bit is already selected for filtering, the
// existing scheduler can be used.
func (m *Matcher) addScheduler(idx uint) {
if _, ok := m.schedulers[idx]; ok {
return
}
m.schedulers[idx] = newScheduler(idx)
}
// Start starts the matching process and returns a stream of bloom matches in
// a given range of blocks. If there are no more matches in the range, the result
// channel is closed.
func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uint64) (*MatcherSession, error) {
// Make sure we're not creating concurrent sessions
if m.running.Swap(true) {
return nil, errors.New("matcher already running")
}
defer m.running.Store(false)
// Initiate a new matching round
session := &MatcherSession{
matcher: m,
quit: make(chan struct{}),
ctx: ctx,
}
for _, scheduler := range m.schedulers {
scheduler.reset()
}
sink := m.run(begin, end, cap(results), session)
// Read the output from the result sink and deliver to the user
session.pend.Add(1)
go func() {
defer session.pend.Done()
defer close(results)
for {
select {
case <-session.quit:
return
case res, ok := <-sink:
// New match result found
if !ok {
return
}
// Calculate the first and last blocks of the section
sectionStart := res.section * m.sectionSize
first := sectionStart
if begin > first {
first = begin
}
last := sectionStart + m.sectionSize - 1
if end < last {
last = end
}
// Iterate over all the blocks in the section and return the matching ones
for i := first; i <= last; i++ {
// Skip the entire byte if no matches are found inside (and we're processing an entire byte!)
next := res.bitset[(i-sectionStart)/8]
if next == 0 {
if i%8 == 0 {
i += 7
}
continue
}
// Some bit it set, do the actual submatching
if bit := 7 - i%8; next&(1<<bit) != 0 {
select {
case <-session.quit:
return
case results <- i:
}
}
}
}
}
}()
return session, nil
}
// run creates a daisy-chain of sub-matchers, one for the address set and one
// for each topic set, each sub-matcher receiving a section only if the previous
// ones have all found a potential match in one of the blocks of the section,
// then binary AND-ing its own matches and forwarding the result to the next one.
//
// The method starts feeding the section indexes into the first sub-matcher on a
// new goroutine and returns a sink channel receiving the results.
func (m *Matcher) run(begin, end uint64, buffer int, session *MatcherSession) chan *partialMatches {
// Create the source channel and feed section indexes into
source := make(chan *partialMatches, buffer)
session.pend.Add(1)
go func() {
defer session.pend.Done()
defer close(source)
for i := begin / m.sectionSize; i <= end/m.sectionSize; i++ {
select {
case <-session.quit:
return
case source <- &partialMatches{i, bytes.Repeat([]byte{0xff}, int(m.sectionSize/8))}:
}
}
}()
// Assemble the daisy-chained filtering pipeline
next := source
dist := make(chan *request, buffer)
for _, bloom := range m.filters {
next = m.subMatch(next, dist, bloom, session)
}
// Start the request distribution
session.pend.Add(1)
go m.distributor(dist, session)
return next
}
// subMatch creates a sub-matcher that filters for a set of addresses or topics, binary OR-s those matches, then
// binary AND-s the result to the daisy-chain input (source) and forwards it to the daisy-chain output.
// The matches of each address/topic are calculated by fetching the given sections of the three bloom bit indexes belonging to
// that address/topic, and binary AND-ing those vectors together.
func (m *Matcher) subMatch(source chan *partialMatches, dist chan *request, bloom []bloomIndexes, session *MatcherSession) chan *partialMatches {
// Start the concurrent schedulers for each bit required by the bloom filter
sectionSources := make([][3]chan uint64, len(bloom))
sectionSinks := make([][3]chan []byte, len(bloom))
for i, bits := range bloom {
for j, bit := range bits {
sectionSources[i][j] = make(chan uint64, cap(source))
sectionSinks[i][j] = make(chan []byte, cap(source))
m.schedulers[bit].run(sectionSources[i][j], dist, sectionSinks[i][j], session.quit, &session.pend)
}
}
process := make(chan *partialMatches, cap(source)) // entries from source are forwarded here after fetches have been initiated
results := make(chan *partialMatches, cap(source))
session.pend.Add(2)
go func() {
// Tear down the goroutine and terminate all source channels
defer session.pend.Done()
defer close(process)
defer func() {
for _, bloomSources := range sectionSources {
for _, bitSource := range bloomSources {
close(bitSource)
}
}
}()
// Read sections from the source channel and multiplex into all bit-schedulers
for {
select {
case <-session.quit:
return
case subres, ok := <-source:
// New subresult from previous link
if !ok {
return
}
// Multiplex the section index to all bit-schedulers
for _, bloomSources := range sectionSources {
for _, bitSource := range bloomSources {
select {
case <-session.quit:
return
case bitSource <- subres.section:
}
}
}
// Notify the processor that this section will become available
select {
case <-session.quit:
return
case process <- subres:
}
}
}
}()
go func() {
// Tear down the goroutine and terminate the final sink channel
defer session.pend.Done()
defer close(results)
// Read the source notifications and collect the delivered results
for {
select {
case <-session.quit:
return
case subres, ok := <-process:
// Notified of a section being retrieved
if !ok {
return
}
// Gather all the sub-results and merge them together
var orVector []byte
for _, bloomSinks := range sectionSinks {
var andVector []byte
for _, bitSink := range bloomSinks {
var data []byte
select {
case <-session.quit:
return
case data = <-bitSink:
}
if andVector == nil {
andVector = make([]byte, int(m.sectionSize/8))
copy(andVector, data)
} else {
bitutil.ANDBytes(andVector, andVector, data)
}
}
if orVector == nil {
orVector = andVector
} else {
bitutil.ORBytes(orVector, orVector, andVector)
}
}
if orVector == nil {
orVector = make([]byte, int(m.sectionSize/8))
}
if subres.bitset != nil {
bitutil.ANDBytes(orVector, orVector, subres.bitset)
}
if bitutil.TestBytes(orVector) {
select {
case <-session.quit:
return
case results <- &partialMatches{subres.section, orVector}:
}
}
}
}
}()
return results
}
// distributor receives requests from the schedulers and queues them into a set
// of pending requests, which are assigned to retrievers wanting to fulfil them.
func (m *Matcher) distributor(dist chan *request, session *MatcherSession) {
defer session.pend.Done()
var (
requests = make(map[uint][]uint64) // Per-bit list of section requests, ordered by section number
unallocs = make(map[uint]struct{}) // Bits with pending requests but not allocated to any retriever
retrievers chan chan uint // Waiting retrievers (toggled to nil if unallocs is empty)
allocs int // Number of active allocations to handle graceful shutdown requests
shutdown = session.quit // Shutdown request channel, will gracefully wait for pending requests
)
// assign is a helper method to try to assign a pending bit an actively
// listening servicer, or schedule it up for later when one arrives.
assign := func(bit uint) {
select {
case fetcher := <-m.retrievers:
allocs++
fetcher <- bit
default:
// No retrievers active, start listening for new ones
retrievers = m.retrievers
unallocs[bit] = struct{}{}
}
}
for {
select {
case <-shutdown:
// Shutdown requested. No more retrievers can be allocated,
// but we still need to wait until all pending requests have returned.
shutdown = nil
if allocs == 0 {
return
}
case req := <-dist:
// New retrieval request arrived to be distributed to some fetcher process
queue := requests[req.bit]
index := sort.Search(len(queue), func(i int) bool { return queue[i] >= req.section })
requests[req.bit] = append(queue[:index], append([]uint64{req.section}, queue[index:]...)...)
// If it's a new bit and we have waiting fetchers, allocate to them
if len(queue) == 0 {
assign(req.bit)
}
case fetcher := <-retrievers:
// New retriever arrived, find the lowest section-ed bit to assign
bit, best := uint(0), uint64(math.MaxUint64)
for idx := range unallocs {
if requests[idx][0] < best {
bit, best = idx, requests[idx][0]
}
}
// Stop tracking this bit (and alloc notifications if no more work is available)
delete(unallocs, bit)
if len(unallocs) == 0 {
retrievers = nil
}
allocs++
fetcher <- bit
case fetcher := <-m.counters:
// New task count request arrives, return number of items
fetcher <- uint(len(requests[<-fetcher]))
case fetcher := <-m.retrievals:
// New fetcher waiting for tasks to retrieve, assign
task := <-fetcher
if want := len(task.Sections); want >= len(requests[task.Bit]) {
task.Sections = requests[task.Bit]
delete(requests, task.Bit)
} else {
task.Sections = append(task.Sections[:0], requests[task.Bit][:want]...)
requests[task.Bit] = append(requests[task.Bit][:0], requests[task.Bit][want:]...)
}
fetcher <- task
// If anything was left unallocated, try to assign to someone else
if len(requests[task.Bit]) > 0 {
assign(task.Bit)
}
case result := <-m.deliveries:
// New retrieval task response from fetcher, split out missing sections and
// deliver complete ones
var (
sections = make([]uint64, 0, len(result.Sections))
bitsets = make([][]byte, 0, len(result.Bitsets))
missing = make([]uint64, 0, len(result.Sections))
)
for i, bitset := range result.Bitsets {
if len(bitset) == 0 {
missing = append(missing, result.Sections[i])
continue
}
sections = append(sections, result.Sections[i])
bitsets = append(bitsets, bitset)
}
m.schedulers[result.Bit].deliver(sections, bitsets)
allocs--
// Reschedule missing sections and allocate bit if newly available
if len(missing) > 0 {
queue := requests[result.Bit]
for _, section := range missing {
index := sort.Search(len(queue), func(i int) bool { return queue[i] >= section })
queue = append(queue[:index], append([]uint64{section}, queue[index:]...)...)
}
requests[result.Bit] = queue
if len(queue) == len(missing) {
assign(result.Bit)
}
}
// End the session when all pending deliveries have arrived.
if shutdown == nil && allocs == 0 {
return
}
}
}
}
// MatcherSession is returned by a started matcher to be used as a terminator
// for the actively running matching operation.
type MatcherSession struct {
matcher *Matcher
closer sync.Once // Sync object to ensure we only ever close once
quit chan struct{} // Quit channel to request pipeline termination
ctx context.Context // Context used by the light client to abort filtering
err error // Global error to track retrieval failures deep in the chain
errLock sync.Mutex
pend sync.WaitGroup
}
// Close stops the matching process and waits for all subprocesses to terminate
// before returning. The timeout may be used for graceful shutdown, allowing the
// currently running retrievals to complete before this time.
func (s *MatcherSession) Close() {
s.closer.Do(func() {
// Signal termination and wait for all goroutines to tear down
close(s.quit)
s.pend.Wait()
})
}
// Error returns any failure encountered during the matching session.
func (s *MatcherSession) Error() error {
s.errLock.Lock()
defer s.errLock.Unlock()
return s.err
}
// allocateRetrieval assigns a bloom bit index to a client process that can either
// immediately request and fetch the section contents assigned to this bit or wait
// a little while for more sections to be requested.
func (s *MatcherSession) allocateRetrieval() (uint, bool) {
fetcher := make(chan uint)
select {
case <-s.quit:
return 0, false
case s.matcher.retrievers <- fetcher:
bit, ok := <-fetcher
return bit, ok
}
}
// pendingSections returns the number of pending section retrievals belonging to
// the given bloom bit index.
func (s *MatcherSession) pendingSections(bit uint) int {
fetcher := make(chan uint)
select {
case <-s.quit:
return 0
case s.matcher.counters <- fetcher:
fetcher <- bit
return int(<-fetcher)
}
}
// allocateSections assigns all or part of an already allocated bit-task queue
// to the requesting process.
func (s *MatcherSession) allocateSections(bit uint, count int) []uint64 {
fetcher := make(chan *Retrieval)
select {
case <-s.quit:
return nil
case s.matcher.retrievals <- fetcher:
task := &Retrieval{
Bit: bit,
Sections: make([]uint64, count),
}
fetcher <- task
return (<-fetcher).Sections
}
}
// deliverSections delivers a batch of section bit-vectors for a specific bloom
// bit index to be injected into the processing pipeline.
func (s *MatcherSession) deliverSections(bit uint, sections []uint64, bitsets [][]byte) {
s.matcher.deliveries <- &Retrieval{Bit: bit, Sections: sections, Bitsets: bitsets}
}
// Multiplex polls the matcher session for retrieval tasks and multiplexes it into
// the requested retrieval queue to be serviced together with other sessions.
//
// This method will block for the lifetime of the session. Even after termination
// of the session, any request in-flight need to be responded to! Empty responses
// are fine though in that case.
func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan *Retrieval) {
waitTimer := time.NewTimer(wait)
defer waitTimer.Stop()
for {
// Allocate a new bloom bit index to retrieve data for, stopping when done
bit, ok := s.allocateRetrieval()
if !ok {
return
}
// Bit allocated, throttle a bit if we're below our batch limit
if s.pendingSections(bit) < batch {
waitTimer.Reset(wait)
select {
case <-s.quit:
// Session terminating, we can't meaningfully service, abort
s.allocateSections(bit, 0)
s.deliverSections(bit, []uint64{}, [][]byte{})
return
case <-waitTimer.C:
// Throttling up, fetch whatever is available
}
}
// Allocate as much as we can handle and request servicing
sections := s.allocateSections(bit, batch)
request := make(chan *Retrieval)
select {
case <-s.quit:
// Session terminating, we can't meaningfully service, abort
s.deliverSections(bit, sections, make([][]byte, len(sections)))
return
case mux <- request:
// Retrieval accepted, something must arrive before we're aborting
request <- &Retrieval{Bit: bit, Sections: sections, Context: s.ctx}
result := <-request
// Deliver a result before s.Close() to avoid a deadlock
s.deliverSections(result.Bit, result.Sections, result.Bitsets)
if result.Error != nil {
s.errLock.Lock()
s.err = result.Error
s.errLock.Unlock()
s.Close()
}
}
}
}

View file

@ -1,292 +0,0 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package bloombits
import (
"context"
"math/rand"
"sync/atomic"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
)
const testSectionSize = 4096
// Tests that wildcard filter rules (nil) can be specified and are handled well.
func TestMatcherWildcards(t *testing.T) {
t.Parallel()
matcher := NewMatcher(testSectionSize, [][][]byte{
{common.Address{}.Bytes(), common.Address{0x01}.Bytes()}, // Default address is not a wildcard
{common.Hash{}.Bytes(), common.Hash{0x01}.Bytes()}, // Default hash is not a wildcard
{common.Hash{0x01}.Bytes()}, // Plain rule, sanity check
{common.Hash{0x01}.Bytes(), nil}, // Wildcard suffix, drop rule
{nil, common.Hash{0x01}.Bytes()}, // Wildcard prefix, drop rule
{nil, nil}, // Wildcard combo, drop rule
{}, // Inited wildcard rule, drop rule
nil, // Proper wildcard rule, drop rule
})
if len(matcher.filters) != 3 {
t.Fatalf("filter system size mismatch: have %d, want %d", len(matcher.filters), 3)
}
if len(matcher.filters[0]) != 2 {
t.Fatalf("address clause size mismatch: have %d, want %d", len(matcher.filters[0]), 2)
}
if len(matcher.filters[1]) != 2 {
t.Fatalf("combo topic clause size mismatch: have %d, want %d", len(matcher.filters[1]), 2)
}
if len(matcher.filters[2]) != 1 {
t.Fatalf("singletone topic clause size mismatch: have %d, want %d", len(matcher.filters[2]), 1)
}
}
// Tests the matcher pipeline on a single continuous workflow without interrupts.
func TestMatcherContinuous(t *testing.T) {
t.Parallel()
testMatcherDiffBatches(t, [][]bloomIndexes{{{10, 20, 30}}}, 0, 100000, false, 75)
testMatcherDiffBatches(t, [][]bloomIndexes{{{32, 3125, 100}}, {{40, 50, 10}}}, 0, 100000, false, 81)
testMatcherDiffBatches(t, [][]bloomIndexes{{{4, 8, 11}, {7, 8, 17}}, {{9, 9, 12}, {15, 20, 13}}, {{18, 15, 15}, {12, 10, 4}}}, 0, 10000, false, 36)
}
// Tests the matcher pipeline on a constantly interrupted and resumed work pattern
// with the aim of ensuring data items are requested only once.
func TestMatcherIntermittent(t *testing.T) {
t.Parallel()
testMatcherDiffBatches(t, [][]bloomIndexes{{{10, 20, 30}}}, 0, 100000, true, 75)
testMatcherDiffBatches(t, [][]bloomIndexes{{{32, 3125, 100}}, {{40, 50, 10}}}, 0, 100000, true, 81)
testMatcherDiffBatches(t, [][]bloomIndexes{{{4, 8, 11}, {7, 8, 17}}, {{9, 9, 12}, {15, 20, 13}}, {{18, 15, 15}, {12, 10, 4}}}, 0, 10000, true, 36)
}
// Tests the matcher pipeline on random input to hopefully catch anomalies.
func TestMatcherRandom(t *testing.T) {
t.Parallel()
for i := 0; i < 10; i++ {
testMatcherBothModes(t, makeRandomIndexes([]int{1}, 50), 0, 10000, 0)
testMatcherBothModes(t, makeRandomIndexes([]int{3}, 50), 0, 10000, 0)
testMatcherBothModes(t, makeRandomIndexes([]int{2, 2, 2}, 20), 0, 10000, 0)
testMatcherBothModes(t, makeRandomIndexes([]int{5, 5, 5}, 50), 0, 10000, 0)
testMatcherBothModes(t, makeRandomIndexes([]int{4, 4, 4}, 20), 0, 10000, 0)
}
}
// Tests that the matcher can properly find matches if the starting block is
// shifted from a multiple of 8. This is needed to cover an optimisation with
// bitset matching https://github.com/ethereum/go-ethereum/issues/15309.
func TestMatcherShifted(t *testing.T) {
t.Parallel()
// Block 0 always matches in the tests, skip ahead of first 8 blocks with the
// start to get a potential zero byte in the matcher bitset.
// To keep the second bitset byte zero, the filter must only match for the first
// time in block 16, so doing an all-16 bit filter should suffice.
// To keep the starting block non divisible by 8, block number 9 is the first
// that would introduce a shift and not match block 0.
testMatcherBothModes(t, [][]bloomIndexes{{{16, 16, 16}}}, 9, 64, 0)
}
// Tests that matching on everything doesn't crash (special case internally).
func TestWildcardMatcher(t *testing.T) {
t.Parallel()
testMatcherBothModes(t, nil, 0, 10000, 0)
}
// makeRandomIndexes generates a random filter system, composed of multiple filter
// criteria, each having one bloom list component for the address and arbitrarily
// many topic bloom list components.
func makeRandomIndexes(lengths []int, max int) [][]bloomIndexes {
res := make([][]bloomIndexes, len(lengths))
for i, topics := range lengths {
res[i] = make([]bloomIndexes, topics)
for j := 0; j < topics; j++ {
for k := 0; k < len(res[i][j]); k++ {
res[i][j][k] = uint(rand.Intn(max-1) + 2)
}
}
}
return res
}
// testMatcherDiffBatches runs the given matches test in single-delivery and also
// in batches delivery mode, verifying that all kinds of deliveries are handled
// correctly within.
func testMatcherDiffBatches(t *testing.T, filter [][]bloomIndexes, start, blocks uint64, intermittent bool, retrievals uint32) {
singleton := testMatcher(t, filter, start, blocks, intermittent, retrievals, 1)
batched := testMatcher(t, filter, start, blocks, intermittent, retrievals, 16)
if singleton != batched {
t.Errorf("filter = %v blocks = %v intermittent = %v: request count mismatch, %v in singleton vs. %v in batched mode", filter, blocks, intermittent, singleton, batched)
}
}
// testMatcherBothModes runs the given matcher test in both continuous as well as
// in intermittent mode, verifying that the request counts match each other.
func testMatcherBothModes(t *testing.T, filter [][]bloomIndexes, start, blocks uint64, retrievals uint32) {
continuous := testMatcher(t, filter, start, blocks, false, retrievals, 16)
intermittent := testMatcher(t, filter, start, blocks, true, retrievals, 16)
if continuous != intermittent {
t.Errorf("filter = %v blocks = %v: request count mismatch, %v in continuous vs. %v in intermittent mode", filter, blocks, continuous, intermittent)
}
}
// testMatcher is a generic tester to run the given matcher test and return the
// number of requests made for cross validation between different modes.
func testMatcher(t *testing.T, filter [][]bloomIndexes, start, blocks uint64, intermittent bool, retrievals uint32, maxReqCount int) uint32 {
// Create a new matcher an simulate our explicit random bitsets
matcher := NewMatcher(testSectionSize, nil)
matcher.filters = filter
for _, rule := range filter {
for _, topic := range rule {
for _, bit := range topic {
matcher.addScheduler(bit)
}
}
}
// Track the number of retrieval requests made
var requested atomic.Uint32
// Start the matching session for the filter and the retriever goroutines
quit := make(chan struct{})
matches := make(chan uint64, 16)
session, err := matcher.Start(context.Background(), start, blocks-1, matches)
if err != nil {
t.Fatalf("failed to stat matcher session: %v", err)
}
startRetrievers(session, quit, &requested, maxReqCount)
// Iterate over all the blocks and verify that the pipeline produces the correct matches
for i := start; i < blocks; i++ {
if expMatch3(filter, i) {
match, ok := <-matches
if !ok {
t.Errorf("filter = %v blocks = %v intermittent = %v: expected #%v, results channel closed", filter, blocks, intermittent, i)
return 0
}
if match != i {
t.Errorf("filter = %v blocks = %v intermittent = %v: expected #%v, got #%v", filter, blocks, intermittent, i, match)
}
// If we're testing intermittent mode, abort and restart the pipeline
if intermittent {
session.Close()
close(quit)
quit = make(chan struct{})
matches = make(chan uint64, 16)
session, err = matcher.Start(context.Background(), i+1, blocks-1, matches)
if err != nil {
t.Fatalf("failed to stat matcher session: %v", err)
}
startRetrievers(session, quit, &requested, maxReqCount)
}
}
}
// Ensure the result channel is torn down after the last block
match, ok := <-matches
if ok {
t.Errorf("filter = %v blocks = %v intermittent = %v: expected closed channel, got #%v", filter, blocks, intermittent, match)
}
// Clean up the session and ensure we match the expected retrieval count
session.Close()
close(quit)
if retrievals != 0 && requested.Load() != retrievals {
t.Errorf("filter = %v blocks = %v intermittent = %v: request count mismatch, have #%v, want #%v", filter, blocks, intermittent, requested.Load(), retrievals)
}
return requested.Load()
}
// startRetrievers starts a batch of goroutines listening for section requests
// and serving them.
func startRetrievers(session *MatcherSession, quit chan struct{}, retrievals *atomic.Uint32, batch int) {
requests := make(chan chan *Retrieval)
for i := 0; i < 10; i++ {
// Start a multiplexer to test multiple threaded execution
go session.Multiplex(batch, 100*time.Microsecond, requests)
// Start a services to match the above multiplexer
go func() {
for {
// Wait for a service request or a shutdown
select {
case <-quit:
return
case request := <-requests:
task := <-request
task.Bitsets = make([][]byte, len(task.Sections))
for i, section := range task.Sections {
if rand.Int()%4 != 0 { // Handle occasional missing deliveries
task.Bitsets[i] = generateBitset(task.Bit, section)
retrievals.Add(1)
}
}
request <- task
}
}
}()
}
}
// generateBitset generates the rotated bitset for the given bloom bit and section
// numbers.
func generateBitset(bit uint, section uint64) []byte {
bitset := make([]byte, testSectionSize/8)
for i := 0; i < len(bitset); i++ {
for b := 0; b < 8; b++ {
blockIdx := section*testSectionSize + uint64(i*8+b)
bitset[i] += bitset[i]
if (blockIdx % uint64(bit)) == 0 {
bitset[i]++
}
}
}
return bitset
}
func expMatch1(filter bloomIndexes, i uint64) bool {
for _, ii := range filter {
if (i % uint64(ii)) != 0 {
return false
}
}
return true
}
func expMatch2(filter []bloomIndexes, i uint64) bool {
for _, ii := range filter {
if expMatch1(ii, i) {
return true
}
}
return false
}
func expMatch3(filter [][]bloomIndexes, i uint64) bool {
for _, ii := range filter {
if !expMatch2(ii, i) {
return false
}
}
return true
}

View file

@ -1,181 +0,0 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package bloombits
import (
"sync"
)
// request represents a bloom retrieval task to prioritize and pull from the local
// database or remotely from the network.
type request struct {
section uint64 // Section index to retrieve the bit-vector from
bit uint // Bit index within the section to retrieve the vector of
}
// response represents the state of a requested bit-vector through a scheduler.
type response struct {
cached []byte // Cached bits to dedup multiple requests
done chan struct{} // Channel to allow waiting for completion
}
// scheduler handles the scheduling of bloom-filter retrieval operations for
// entire section-batches belonging to a single bloom bit. Beside scheduling the
// retrieval operations, this struct also deduplicates the requests and caches
// the results to minimize network/database overhead even in complex filtering
// scenarios.
type scheduler struct {
bit uint // Index of the bit in the bloom filter this scheduler is responsible for
responses map[uint64]*response // Currently pending retrieval requests or already cached responses
lock sync.Mutex // Lock protecting the responses from concurrent access
}
// newScheduler creates a new bloom-filter retrieval scheduler for a specific
// bit index.
func newScheduler(idx uint) *scheduler {
return &scheduler{
bit: idx,
responses: make(map[uint64]*response),
}
}
// run creates a retrieval pipeline, receiving section indexes from sections and
// returning the results in the same order through the done channel. Concurrent
// runs of the same scheduler are allowed, leading to retrieval task deduplication.
func (s *scheduler) run(sections chan uint64, dist chan *request, done chan []byte, quit chan struct{}, wg *sync.WaitGroup) {
// Create a forwarder channel between requests and responses of the same size as
// the distribution channel (since that will block the pipeline anyway).
pend := make(chan uint64, cap(dist))
// Start the pipeline schedulers to forward between user -> distributor -> user
wg.Add(2)
go s.scheduleRequests(sections, dist, pend, quit, wg)
go s.scheduleDeliveries(pend, done, quit, wg)
}
// reset cleans up any leftovers from previous runs. This is required before a
// restart to ensure the no previously requested but never delivered state will
// cause a lockup.
func (s *scheduler) reset() {
s.lock.Lock()
defer s.lock.Unlock()
for section, res := range s.responses {
if res.cached == nil {
delete(s.responses, section)
}
}
}
// scheduleRequests reads section retrieval requests from the input channel,
// deduplicates the stream and pushes unique retrieval tasks into the distribution
// channel for a database or network layer to honour.
func (s *scheduler) scheduleRequests(reqs chan uint64, dist chan *request, pend chan uint64, quit chan struct{}, wg *sync.WaitGroup) {
// Clean up the goroutine and pipeline when done
defer wg.Done()
defer close(pend)
// Keep reading and scheduling section requests
for {
select {
case <-quit:
return
case section, ok := <-reqs:
// New section retrieval requested
if !ok {
return
}
// Deduplicate retrieval requests
unique := false
s.lock.Lock()
if s.responses[section] == nil {
s.responses[section] = &response{
done: make(chan struct{}),
}
unique = true
}
s.lock.Unlock()
// Schedule the section for retrieval and notify the deliverer to expect this section
if unique {
select {
case <-quit:
return
case dist <- &request{bit: s.bit, section: section}:
}
}
select {
case <-quit:
return
case pend <- section:
}
}
}
}
// scheduleDeliveries reads section acceptance notifications and waits for them
// to be delivered, pushing them into the output data buffer.
func (s *scheduler) scheduleDeliveries(pend chan uint64, done chan []byte, quit chan struct{}, wg *sync.WaitGroup) {
// Clean up the goroutine and pipeline when done
defer wg.Done()
defer close(done)
// Keep reading notifications and scheduling deliveries
for {
select {
case <-quit:
return
case idx, ok := <-pend:
// New section retrieval pending
if !ok {
return
}
// Wait until the request is honoured
s.lock.Lock()
res := s.responses[idx]
s.lock.Unlock()
select {
case <-quit:
return
case <-res.done:
}
// Deliver the result
select {
case <-quit:
return
case done <- res.cached:
}
}
}
}
// deliver is called by the request distributor when a reply to a request arrives.
func (s *scheduler) deliver(sections []uint64, data [][]byte) {
s.lock.Lock()
defer s.lock.Unlock()
for i, section := range sections {
if res := s.responses[section]; res != nil && res.cached == nil { // Avoid non-requests and double deliveries
res.cached = data[i]
close(res.done)
}
}
}

View file

@ -1,103 +0,0 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package bloombits
import (
"bytes"
"math/big"
"sync"
"sync/atomic"
"testing"
)
// Tests that the scheduler can deduplicate and forward retrieval requests to
// underlying fetchers and serve responses back, irrelevant of the concurrency
// of the requesting clients or serving data fetchers.
func TestSchedulerSingleClientSingleFetcher(t *testing.T) { testScheduler(t, 1, 1, 5000) }
func TestSchedulerSingleClientMultiFetcher(t *testing.T) { testScheduler(t, 1, 10, 5000) }
func TestSchedulerMultiClientSingleFetcher(t *testing.T) { testScheduler(t, 10, 1, 5000) }
func TestSchedulerMultiClientMultiFetcher(t *testing.T) { testScheduler(t, 10, 10, 5000) }
func testScheduler(t *testing.T, clients int, fetchers int, requests int) {
t.Parallel()
f := newScheduler(0)
// Create a batch of handler goroutines that respond to bloom bit requests and
// deliver them to the scheduler.
var fetchPend sync.WaitGroup
fetchPend.Add(fetchers)
defer fetchPend.Wait()
fetch := make(chan *request, 16)
defer close(fetch)
var delivered atomic.Uint32
for i := 0; i < fetchers; i++ {
go func() {
defer fetchPend.Done()
for req := range fetch {
delivered.Add(1)
f.deliver([]uint64{
req.section + uint64(requests), // Non-requested data (ensure it doesn't go out of bounds)
req.section, // Requested data
req.section, // Duplicated data (ensure it doesn't double close anything)
}, [][]byte{
{},
new(big.Int).SetUint64(req.section).Bytes(),
new(big.Int).SetUint64(req.section).Bytes(),
})
}
}()
}
// Start a batch of goroutines to concurrently run scheduling tasks
quit := make(chan struct{})
var pend sync.WaitGroup
pend.Add(clients)
for i := 0; i < clients; i++ {
go func() {
defer pend.Done()
in := make(chan uint64, 16)
out := make(chan []byte, 16)
f.run(in, fetch, out, quit, &pend)
go func() {
for j := 0; j < requests; j++ {
in <- uint64(j)
}
close(in)
}()
b := new(big.Int)
for j := 0; j < requests; j++ {
bits := <-out
if want := b.SetUint64(uint64(j)).Bytes(); !bytes.Equal(bits, want) {
t.Errorf("vector %d: delivered content mismatch: have %x, want %x", j, bits, want)
}
}
}()
}
pend.Wait()
if have := delivered.Load(); int(have) != requests {
t.Errorf("request count mismatch: have %v, want %v", have, requests)
}
}

View file

@ -1,522 +0,0 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package core
import (
"context"
"encoding/binary"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
)
// ChainIndexerBackend defines the methods needed to process chain segments in
// the background and write the segment results into the database. These can be
// used to create filter blooms or CHTs.
type ChainIndexerBackend interface {
// Reset initiates the processing of a new chain segment, potentially terminating
// any partially completed operations (in case of a reorg).
Reset(ctx context.Context, section uint64, prevHead common.Hash) error
// Process crunches through the next header in the chain segment. The caller
// will ensure a sequential order of headers.
Process(ctx context.Context, header *types.Header) error
// Commit finalizes the section metadata and stores it into the database.
Commit() error
// Prune deletes the chain index older than the given threshold.
Prune(threshold uint64) error
}
// ChainIndexerChain interface is used for connecting the indexer to a blockchain
type ChainIndexerChain interface {
// CurrentHeader retrieves the latest locally known header.
CurrentHeader() *types.Header
// SubscribeChainHeadEvent subscribes to new head header notifications.
SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription
}
// ChainIndexer does a post-processing job for equally sized sections of the
// canonical chain (like BlooomBits and CHT structures). A ChainIndexer is
// connected to the blockchain through the event system by starting a
// ChainHeadEventLoop in a goroutine.
//
// Further child ChainIndexers can be added which use the output of the parent
// section indexer. These child indexers receive new head notifications only
// after an entire section has been finished or in case of rollbacks that might
// affect already finished sections.
type ChainIndexer struct {
chainDb ethdb.Database // Chain database to index the data from
indexDb ethdb.Database // Prefixed table-view of the db to write index metadata into
backend ChainIndexerBackend // Background processor generating the index data content
children []*ChainIndexer // Child indexers to cascade chain updates to
active atomic.Bool // Flag whether the event loop was started
update chan struct{} // Notification channel that headers should be processed
quit chan chan error // Quit channel to tear down running goroutines
ctx context.Context
ctxCancel func()
sectionSize uint64 // Number of blocks in a single chain segment to process
confirmsReq uint64 // Number of confirmations before processing a completed segment
storedSections uint64 // Number of sections successfully indexed into the database
knownSections uint64 // Number of sections known to be complete (block wise)
cascadedHead uint64 // Block number of the last completed section cascaded to subindexers
checkpointSections uint64 // Number of sections covered by the checkpoint
checkpointHead common.Hash // Section head belonging to the checkpoint
throttling time.Duration // Disk throttling to prevent a heavy upgrade from hogging resources
log log.Logger
lock sync.Mutex
}
// NewChainIndexer creates a new chain indexer to do background processing on
// chain segments of a given size after certain number of confirmations passed.
// The throttling parameter might be used to prevent database thrashing.
func NewChainIndexer(chainDb ethdb.Database, indexDb ethdb.Database, backend ChainIndexerBackend, section, confirm uint64, throttling time.Duration, kind string) *ChainIndexer {
c := &ChainIndexer{
chainDb: chainDb,
indexDb: indexDb,
backend: backend,
update: make(chan struct{}, 1),
quit: make(chan chan error),
sectionSize: section,
confirmsReq: confirm,
throttling: throttling,
log: log.New("type", kind),
}
// Initialize database dependent fields and start the updater
c.loadValidSections()
c.ctx, c.ctxCancel = context.WithCancel(context.Background())
go c.updateLoop()
return c
}
// AddCheckpoint adds a checkpoint. Sections are never processed and the chain
// is not expected to be available before this point. The indexer assumes that
// the backend has sufficient information available to process subsequent sections.
//
// Note: knownSections == 0 and storedSections == checkpointSections until
// syncing reaches the checkpoint
func (c *ChainIndexer) AddCheckpoint(section uint64, shead common.Hash) {
c.lock.Lock()
defer c.lock.Unlock()
// Short circuit if the given checkpoint is below than local's.
if c.checkpointSections >= section+1 || section < c.storedSections {
return
}
c.checkpointSections = section + 1
c.checkpointHead = shead
c.setSectionHead(section, shead)
c.setValidSections(section + 1)
}
// Start creates a goroutine to feed chain head events into the indexer for
// cascading background processing. Children do not need to be started, they
// are notified about new events by their parents.
func (c *ChainIndexer) Start(chain ChainIndexerChain) {
events := make(chan ChainHeadEvent, 10)
sub := chain.SubscribeChainHeadEvent(events)
go c.eventLoop(chain.CurrentHeader(), events, sub)
}
// Close tears down all goroutines belonging to the indexer and returns any error
// that might have occurred internally.
func (c *ChainIndexer) Close() error {
var errs []error
c.ctxCancel()
// Tear down the primary update loop
errc := make(chan error)
c.quit <- errc
if err := <-errc; err != nil {
errs = append(errs, err)
}
// If needed, tear down the secondary event loop
if c.active.Load() {
c.quit <- errc
if err := <-errc; err != nil {
errs = append(errs, err)
}
}
// Close all children
for _, child := range c.children {
if err := child.Close(); err != nil {
errs = append(errs, err)
}
}
// Return any failures
switch {
case len(errs) == 0:
return nil
case len(errs) == 1:
return errs[0]
default:
return fmt.Errorf("%v", errs)
}
}
// eventLoop is a secondary - optional - event loop of the indexer which is only
// started for the outermost indexer to push chain head events into a processing
// queue.
func (c *ChainIndexer) eventLoop(currentHeader *types.Header, events chan ChainHeadEvent, sub event.Subscription) {
// Mark the chain indexer as active, requiring an additional teardown
c.active.Store(true)
defer sub.Unsubscribe()
// Fire the initial new head event to start any outstanding processing
c.newHead(currentHeader.Number.Uint64(), false)
var (
prevHeader = currentHeader
prevHash = currentHeader.Hash()
)
for {
select {
case errc := <-c.quit:
// Chain indexer terminating, report no failure and abort
errc <- nil
return
case ev, ok := <-events:
// Received a new event, ensure it's not nil (closing) and update
if !ok {
errc := <-c.quit
errc <- nil
return
}
if ev.Header.ParentHash != prevHash {
// Reorg to the common ancestor if needed (might not exist in light sync mode, skip reorg then)
// TODO(karalabe, zsfelfoldi): This seems a bit brittle, can we detect this case explicitly?
if rawdb.ReadCanonicalHash(c.chainDb, prevHeader.Number.Uint64()) != prevHash {
if h := rawdb.FindCommonAncestor(c.chainDb, prevHeader, ev.Header); h != nil {
c.newHead(h.Number.Uint64(), true)
}
}
}
c.newHead(ev.Header.Number.Uint64(), false)
prevHeader, prevHash = ev.Header, ev.Header.Hash()
}
}
}
// newHead notifies the indexer about new chain heads and/or reorgs.
func (c *ChainIndexer) newHead(head uint64, reorg bool) {
c.lock.Lock()
defer c.lock.Unlock()
// If a reorg happened, invalidate all sections until that point
if reorg {
// Revert the known section number to the reorg point
known := (head + 1) / c.sectionSize
stored := known
if known < c.checkpointSections {
known = 0
}
if stored < c.checkpointSections {
stored = c.checkpointSections
}
if known < c.knownSections {
c.knownSections = known
}
// Revert the stored sections from the database to the reorg point
if stored < c.storedSections {
c.setValidSections(stored)
}
// Update the new head number to the finalized section end and notify children
head = known * c.sectionSize
if head < c.cascadedHead {
c.cascadedHead = head
for _, child := range c.children {
child.newHead(c.cascadedHead, true)
}
}
return
}
// No reorg, calculate the number of newly known sections and update if high enough
var sections uint64
if head >= c.confirmsReq {
sections = (head + 1 - c.confirmsReq) / c.sectionSize
if sections < c.checkpointSections {
sections = 0
}
if sections > c.knownSections {
if c.knownSections < c.checkpointSections {
// syncing reached the checkpoint, verify section head
syncedHead := rawdb.ReadCanonicalHash(c.chainDb, c.checkpointSections*c.sectionSize-1)
if syncedHead != c.checkpointHead {
c.log.Error("Synced chain does not match checkpoint", "number", c.checkpointSections*c.sectionSize-1, "expected", c.checkpointHead, "synced", syncedHead)
return
}
}
c.knownSections = sections
select {
case c.update <- struct{}{}:
default:
}
}
}
}
// updateLoop is the main event loop of the indexer which pushes chain segments
// down into the processing backend.
func (c *ChainIndexer) updateLoop() {
var (
updating bool
updated time.Time
)
for {
select {
case errc := <-c.quit:
// Chain indexer terminating, report no failure and abort
errc <- nil
return
case <-c.update:
// Section headers completed (or rolled back), update the index
c.lock.Lock()
if c.knownSections > c.storedSections {
// Periodically print an upgrade log message to the user
if time.Since(updated) > 8*time.Second {
if c.knownSections > c.storedSections+1 {
updating = true
c.log.Info("Upgrading chain index", "percentage", c.storedSections*100/c.knownSections)
}
updated = time.Now()
}
// Cache the current section count and head to allow unlocking the mutex
c.verifyLastHead()
section := c.storedSections
var oldHead common.Hash
if section > 0 {
oldHead = c.SectionHead(section - 1)
}
// Process the newly defined section in the background
c.lock.Unlock()
newHead, err := c.processSection(section, oldHead)
if err != nil {
select {
case <-c.ctx.Done():
<-c.quit <- nil
return
default:
}
c.log.Error("Section processing failed", "error", err)
}
c.lock.Lock()
// If processing succeeded and no reorgs occurred, mark the section completed
if err == nil && (section == 0 || oldHead == c.SectionHead(section-1)) {
c.setSectionHead(section, newHead)
c.setValidSections(section + 1)
if c.storedSections == c.knownSections && updating {
updating = false
c.log.Info("Finished upgrading chain index")
}
c.cascadedHead = c.storedSections*c.sectionSize - 1
for _, child := range c.children {
c.log.Trace("Cascading chain index update", "head", c.cascadedHead)
child.newHead(c.cascadedHead, false)
}
} else {
// If processing failed, don't retry until further notification
c.log.Debug("Chain index processing failed", "section", section, "err", err)
c.verifyLastHead()
c.knownSections = c.storedSections
}
}
// If there are still further sections to process, reschedule
if c.knownSections > c.storedSections {
time.AfterFunc(c.throttling, func() {
select {
case c.update <- struct{}{}:
default:
}
})
}
c.lock.Unlock()
}
}
}
// processSection processes an entire section by calling backend functions while
// ensuring the continuity of the passed headers. Since the chain mutex is not
// held while processing, the continuity can be broken by a long reorg, in which
// case the function returns with an error.
func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (common.Hash, error) {
c.log.Trace("Processing new chain section", "section", section)
// Reset and partial processing
if err := c.backend.Reset(c.ctx, section, lastHead); err != nil {
c.setValidSections(0)
return common.Hash{}, err
}
for number := section * c.sectionSize; number < (section+1)*c.sectionSize; number++ {
hash := rawdb.ReadCanonicalHash(c.chainDb, number)
if hash == (common.Hash{}) {
return common.Hash{}, fmt.Errorf("canonical block #%d unknown", number)
}
header := rawdb.ReadHeader(c.chainDb, hash, number)
if header == nil {
return common.Hash{}, fmt.Errorf("block #%d [%x..] not found", number, hash[:4])
} else if header.ParentHash != lastHead {
return common.Hash{}, errors.New("chain reorged during section processing")
}
if err := c.backend.Process(c.ctx, header); err != nil {
return common.Hash{}, err
}
lastHead = header.Hash()
}
if err := c.backend.Commit(); err != nil {
return common.Hash{}, err
}
return lastHead, nil
}
// verifyLastHead compares last stored section head with the corresponding block hash in the
// actual canonical chain and rolls back reorged sections if necessary to ensure that stored
// sections are all valid
func (c *ChainIndexer) verifyLastHead() {
for c.storedSections > 0 && c.storedSections > c.checkpointSections {
if c.SectionHead(c.storedSections-1) == rawdb.ReadCanonicalHash(c.chainDb, c.storedSections*c.sectionSize-1) {
return
}
c.setValidSections(c.storedSections - 1)
}
}
// Sections returns the number of processed sections maintained by the indexer
// and also the information about the last header indexed for potential canonical
// verifications.
func (c *ChainIndexer) Sections() (uint64, uint64, common.Hash) {
c.lock.Lock()
defer c.lock.Unlock()
c.verifyLastHead()
return c.storedSections, c.storedSections*c.sectionSize - 1, c.SectionHead(c.storedSections - 1)
}
// AddChildIndexer adds a child ChainIndexer that can use the output of this one
func (c *ChainIndexer) AddChildIndexer(indexer *ChainIndexer) {
if indexer == c {
panic("can't add indexer as a child of itself")
}
c.lock.Lock()
defer c.lock.Unlock()
c.children = append(c.children, indexer)
// Cascade any pending updates to new children too
sections := c.storedSections
if c.knownSections < sections {
// if a section is "stored" but not "known" then it is a checkpoint without
// available chain data so we should not cascade it yet
sections = c.knownSections
}
if sections > 0 {
indexer.newHead(sections*c.sectionSize-1, false)
}
}
// Prune deletes all chain data older than given threshold.
func (c *ChainIndexer) Prune(threshold uint64) error {
return c.backend.Prune(threshold)
}
// loadValidSections reads the number of valid sections from the index database
// and caches is into the local state.
func (c *ChainIndexer) loadValidSections() {
data, _ := c.indexDb.Get([]byte("count"))
if len(data) == 8 {
c.storedSections = binary.BigEndian.Uint64(data)
}
}
// setValidSections writes the number of valid sections to the index database
func (c *ChainIndexer) setValidSections(sections uint64) {
// Set the current number of valid sections in the database
var data [8]byte
binary.BigEndian.PutUint64(data[:], sections)
c.indexDb.Put([]byte("count"), data[:])
// Remove any reorged sections, caching the valids in the mean time
for c.storedSections > sections {
c.storedSections--
c.removeSectionHead(c.storedSections)
}
c.storedSections = sections // needed if new > old
}
// SectionHead retrieves the last block hash of a processed section from the
// index database.
func (c *ChainIndexer) SectionHead(section uint64) common.Hash {
var data [8]byte
binary.BigEndian.PutUint64(data[:], section)
hash, _ := c.indexDb.Get(append([]byte("shead"), data[:]...))
if len(hash) == len(common.Hash{}) {
return common.BytesToHash(hash)
}
return common.Hash{}
}
// setSectionHead writes the last block hash of a processed section to the index
// database.
func (c *ChainIndexer) setSectionHead(section uint64, hash common.Hash) {
var data [8]byte
binary.BigEndian.PutUint64(data[:], section)
c.indexDb.Put(append([]byte("shead"), data[:]...), hash.Bytes())
}
// removeSectionHead removes the reference to a processed section from the index
// database.
func (c *ChainIndexer) removeSectionHead(section uint64) {
var data [8]byte
binary.BigEndian.PutUint64(data[:], section)
c.indexDb.Delete(append([]byte("shead"), data[:]...))
}

View file

@ -1,246 +0,0 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package core
import (
"context"
"errors"
"fmt"
"math/big"
"math/rand"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
)
// Runs multiple tests with randomized parameters.
func TestChainIndexerSingle(t *testing.T) {
for i := 0; i < 10; i++ {
testChainIndexer(t, 1)
}
}
// Runs multiple tests with randomized parameters and different number of
// chain backends.
func TestChainIndexerWithChildren(t *testing.T) {
for i := 2; i < 8; i++ {
testChainIndexer(t, i)
}
}
// testChainIndexer runs a test with either a single chain indexer or a chain of
// multiple backends. The section size and required confirmation count parameters
// are randomized.
func testChainIndexer(t *testing.T, count int) {
db := rawdb.NewMemoryDatabase()
defer db.Close()
// Create a chain of indexers and ensure they all report empty
backends := make([]*testChainIndexBackend, count)
for i := 0; i < count; i++ {
var (
sectionSize = uint64(rand.Intn(100) + 1)
confirmsReq = uint64(rand.Intn(10))
)
backends[i] = &testChainIndexBackend{t: t, processCh: make(chan uint64)}
backends[i].indexer = NewChainIndexer(db, rawdb.NewTable(db, string([]byte{byte(i)})), backends[i], sectionSize, confirmsReq, 0, fmt.Sprintf("indexer-%d", i))
if sections, _, _ := backends[i].indexer.Sections(); sections != 0 {
t.Fatalf("Canonical section count mismatch: have %v, want %v", sections, 0)
}
if i > 0 {
backends[i-1].indexer.AddChildIndexer(backends[i].indexer)
}
}
defer backends[0].indexer.Close() // parent indexer shuts down children
// notify pings the root indexer about a new head or reorg, then expect
// processed blocks if a section is processable
notify := func(headNum, failNum uint64, reorg bool) {
backends[0].indexer.newHead(headNum, reorg)
if reorg {
for _, backend := range backends {
headNum = backend.reorg(headNum)
backend.assertSections()
}
return
}
var cascade bool
for _, backend := range backends {
headNum, cascade = backend.assertBlocks(headNum, failNum)
if !cascade {
break
}
backend.assertSections()
}
}
// inject inserts a new random canonical header into the database directly
inject := func(number uint64) {
header := &types.Header{Number: big.NewInt(int64(number)), Extra: big.NewInt(rand.Int63()).Bytes()}
if number > 0 {
header.ParentHash = rawdb.ReadCanonicalHash(db, number-1)
}
rawdb.WriteHeader(db, header)
rawdb.WriteCanonicalHash(db, header.Hash(), number)
}
// Start indexer with an already existing chain
for i := uint64(0); i <= 100; i++ {
inject(i)
}
notify(100, 100, false)
// Add new blocks one by one
for i := uint64(101); i <= 1000; i++ {
inject(i)
notify(i, i, false)
}
// Do a reorg
notify(500, 500, true)
// Create new fork
for i := uint64(501); i <= 1000; i++ {
inject(i)
notify(i, i, false)
}
for i := uint64(1001); i <= 1500; i++ {
inject(i)
}
// Failed processing scenario where less blocks are available than notified
notify(2000, 1500, false)
// Notify about a reorg (which could have caused the missing blocks if happened during processing)
notify(1500, 1500, true)
// Create new fork
for i := uint64(1501); i <= 2000; i++ {
inject(i)
notify(i, i, false)
}
}
// testChainIndexBackend implements ChainIndexerBackend
type testChainIndexBackend struct {
t *testing.T
indexer *ChainIndexer
section, headerCnt, stored uint64
processCh chan uint64
}
// assertSections verifies if a chain indexer has the correct number of section.
func (b *testChainIndexBackend) assertSections() {
// Keep trying for 3 seconds if it does not match
var sections uint64
for i := 0; i < 300; i++ {
sections, _, _ = b.indexer.Sections()
if sections == b.stored {
return
}
time.Sleep(10 * time.Millisecond)
}
b.t.Fatalf("Canonical section count mismatch: have %v, want %v", sections, b.stored)
}
// assertBlocks expects processing calls after new blocks have arrived. If the
// failNum < headNum then we are simulating a scenario where a reorg has happened
// after the processing has started and the processing of a section fails.
func (b *testChainIndexBackend) assertBlocks(headNum, failNum uint64) (uint64, bool) {
var sections uint64
if headNum >= b.indexer.confirmsReq {
sections = (headNum + 1 - b.indexer.confirmsReq) / b.indexer.sectionSize
if sections > b.stored {
// expect processed blocks
for expectd := b.stored * b.indexer.sectionSize; expectd < sections*b.indexer.sectionSize; expectd++ {
if expectd > failNum {
// rolled back after processing started, no more process calls expected
// wait until updating is done to make sure that processing actually fails
var updating bool
for i := 0; i < 300; i++ {
b.indexer.lock.Lock()
updating = b.indexer.knownSections > b.indexer.storedSections
b.indexer.lock.Unlock()
if !updating {
break
}
time.Sleep(10 * time.Millisecond)
}
if updating {
b.t.Fatalf("update did not finish")
}
sections = expectd / b.indexer.sectionSize
break
}
select {
case <-time.After(10 * time.Second):
b.t.Fatalf("Expected processed block #%d, got nothing", expectd)
case processed := <-b.processCh:
if processed != expectd {
b.t.Errorf("Expected processed block #%d, got #%d", expectd, processed)
}
}
}
b.stored = sections
}
}
if b.stored == 0 {
return 0, false
}
return b.stored*b.indexer.sectionSize - 1, true
}
func (b *testChainIndexBackend) reorg(headNum uint64) uint64 {
firstChanged := (headNum + 1) / b.indexer.sectionSize
if firstChanged < b.stored {
b.stored = firstChanged
}
return b.stored * b.indexer.sectionSize
}
func (b *testChainIndexBackend) Reset(ctx context.Context, section uint64, prevHead common.Hash) error {
b.section = section
b.headerCnt = 0
return nil
}
func (b *testChainIndexBackend) Process(ctx context.Context, header *types.Header) error {
b.headerCnt++
if b.headerCnt > b.indexer.sectionSize {
b.t.Error("Processing too many headers")
}
//t.processCh <- header.Number.Uint64()
select {
case <-time.After(10 * time.Second):
b.t.Error("Unexpected call to Process")
// Can't use Fatal since this is not the test's goroutine.
// Returning error stops the chainIndexer's updateLoop
return errors.New("unexpected call to Process")
case b.processCh <- header.Number.Uint64():
}
return nil
}
func (b *testChainIndexBackend) Commit() error {
if b.headerCnt != b.indexer.sectionSize {
b.t.Error("Not enough headers processed")
}
return nil
}
func (b *testChainIndexBackend) Prune(threshold uint64) error {
return nil
}

View file

@ -17,7 +17,6 @@
package filtermaps
import (
"bytes"
"errors"
"fmt"
"os"
@ -242,7 +241,8 @@ func (f *FilterMaps) Start() {
log.Error("Could not load head filter map snapshot", "error", err)
}
}
f.closeWg.Add(1)
f.closeWg.Add(2)
go f.removeBloomBits()
go f.indexerLoop()
}
@ -292,7 +292,7 @@ func (f *FilterMaps) reset() bool {
// deleting the range first ensures that resetDb will be called again at next
// startup and any leftover data will be removed even if it cannot finish now.
rawdb.DeleteFilterMapsRange(f.db)
return f.removeDbWithPrefix([]byte(rawdb.FilterMapsPrefix), "Resetting log index database")
return f.safeDeleteRange(rawdb.DeleteFilterMapsDb, "Resetting log index database")
}
// init initializes an empty log index according to the current targetView.
@ -335,24 +335,25 @@ func (f *FilterMaps) init() error {
return batch.Write()
}
// removeDbWithPrefix removes data with the given prefix from the database and
// returns true if everything was successfully removed.
func (f *FilterMaps) removeDbWithPrefix(prefix []byte, action string) bool {
it := f.db.NewIterator(prefix, nil)
hasData := it.Next()
it.Release()
if !hasData {
return true
}
// removeBloomBits removes old bloom bits data from the database.
func (f *FilterMaps) removeBloomBits() {
f.safeDeleteRange(rawdb.DeleteBloomBitsDb, "Removing old bloom bits database")
f.closeWg.Done()
}
end := bytes.Clone(prefix)
end[len(end)-1]++
// safeDeleteRange calls the specified database range deleter function
// repeatedly as long as it returns leveldb.ErrTooManyKeys.
// This wrapper is necessary because of the leveldb fallback implementation
// of DeleteRange.
func (f *FilterMaps) safeDeleteRange(removeFn func(ethdb.KeyValueRangeDeleter) error, action string) bool {
start := time.Now()
var retry bool
for {
err := f.db.DeleteRange(prefix, end)
err := removeFn(f.db)
if err == nil {
if retry {
log.Info(action+" finished", "elapsed", time.Since(start))
}
return true
}
if err != leveldb.ErrTooManyKeys {
@ -365,7 +366,7 @@ func (f *FilterMaps) removeDbWithPrefix(prefix []byte, action string) bool {
default:
}
if !retry {
log.Info(action + " in progress...")
log.Info(action+" in progress...", "elapsed", time.Since(start))
retry = true
}
}

View file

@ -44,13 +44,18 @@ func (f *FilterMaps) indexerLoop() {
if !f.indexedRange.initialized {
if err := f.init(); err != nil {
log.Error("Error initializing log index", "error", err)
f.waitForEvent()
// unexpected error; there is not a lot we can do here, maybe it
// recovers, maybe not. Calling event processing here ensures
// that we can still properly shutdown in case of an infinite loop.
f.processSingleEvent(true)
continue
}
}
if !f.targetHeadIndexed() {
if !f.tryIndexHead() {
f.waitForEvent()
// either shutdown or unexpected error; in the latter case ensure
// that proper shutdown is still possible.
f.processSingleEvent(true)
}
} else {
if f.finalBlock != f.lastFinal {
@ -60,7 +65,7 @@ func (f *FilterMaps) indexerLoop() {
f.lastFinal = f.finalBlock
}
if f.tryIndexTail() && f.tryUnindexTail() {
f.waitForEvent()
f.waitForNewHead()
}
}
}
@ -119,8 +124,9 @@ func (f *FilterMaps) WaitIdle() {
}
}
// waitForEvent blocks until an event happens that the indexer might react to.
func (f *FilterMaps) waitForEvent() {
// waitForNewHead blocks until there is a new target head to index and block
// processing has been finished.
func (f *FilterMaps) waitForNewHead() {
for !f.stop && (f.blockProcessing || f.targetHeadIndexed()) {
f.processSingleEvent(true)
}
@ -129,13 +135,16 @@ func (f *FilterMaps) waitForEvent() {
// processEvents processes all events, blocking only if a block processing is
// happening and indexing should be suspended.
func (f *FilterMaps) processEvents() {
for !f.stop && f.processSingleEvent(f.blockProcessing) {
for f.processSingleEvent(f.blockProcessing) {
}
}
// processSingleEvent processes a single event either in a blocking or
// non-blocking manner.
// non-blocking manner. It returns true if it did process an event.
func (f *FilterMaps) processSingleEvent(blocking bool) bool {
if f.stop {
return false
}
if !f.hasTempRange {
for _, mb := range f.matcherSyncRequests {
mb.synced()
@ -356,16 +365,18 @@ func (f *FilterMaps) needTailEpoch(epoch uint32) bool {
if epoch+1 < firstEpoch {
return false
}
var lastBlockOfPrevEpoch uint64
if epoch > 0 {
lastBlockOfPrevEpoch, _, err := f.getLastBlockOfMap(epoch<<f.logMapsPerEpoch - 1)
var err error
lastBlockOfPrevEpoch, _, err = f.getLastBlockOfMap(epoch<<f.logMapsPerEpoch - 1)
if err != nil {
log.Error("Could not get last block of previous epoch", "epoch", epoch-1, "error", err)
return epoch >= firstEpoch
}
}
if f.historyCutoff > lastBlockOfPrevEpoch {
return false
}
}
lastBlockOfEpoch, _, err := f.getLastBlockOfMap((epoch+1)<<f.logMapsPerEpoch - 1)
if err != nil {
log.Error("Could not get last block of epoch", "epoch", epoch, "error", err)

View file

@ -147,41 +147,6 @@ func ReadReceipt(db ethdb.Reader, hash common.Hash, config *params.ChainConfig)
return nil, common.Hash{}, 0, 0
}
// ReadBloomBits retrieves the compressed bloom bit vector belonging to the given
// section and bit index from the.
func ReadBloomBits(db ethdb.KeyValueReader, bit uint, section uint64, head common.Hash) ([]byte, error) {
return db.Get(bloomBitsKey(bit, section, head))
}
// WriteBloomBits stores the compressed bloom bits vector belonging to the given
// section and bit index.
func WriteBloomBits(db ethdb.KeyValueWriter, bit uint, section uint64, head common.Hash, bits []byte) {
if err := db.Put(bloomBitsKey(bit, section, head), bits); err != nil {
log.Crit("Failed to store bloom bits", "err", err)
}
}
// DeleteBloombits removes all compressed bloom bits vector belonging to the
// given section range and bit index.
func DeleteBloombits(db ethdb.Database, bit uint, from uint64, to uint64) {
start, end := bloomBitsKey(bit, from, common.Hash{}), bloomBitsKey(bit, to, common.Hash{})
it := db.NewIterator(nil, start)
defer it.Release()
for it.Next() {
if bytes.Compare(it.Key(), end) >= 0 {
break
}
if len(it.Key()) != len(bloomBitsPrefix)+2+8+32 {
continue
}
db.Delete(it.Key())
}
if it.Error() != nil {
log.Crit("Failed to delete bloom bits", "err", it.Error())
}
}
// ReadFilterMapRow retrieves a filter map row at the given mapRowIndex
// (see filtermaps.mapRowIndex for the storage index encoding).
// Note that zero length rows are not stored in the database and therefore all
@ -485,3 +450,24 @@ func DeleteFilterMapsRange(db ethdb.KeyValueWriter) {
log.Crit("Failed to delete filter maps range", "err", err)
}
}
// deletePrefixRange deletes everything with the given prefix from the database.
func deletePrefixRange(db ethdb.KeyValueRangeDeleter, prefix []byte) error {
end := bytes.Clone(prefix)
end[len(end)-1]++
return db.DeleteRange(prefix, end)
}
// DeleteFilterMapsDb removes the entire filter maps database
func DeleteFilterMapsDb(db ethdb.KeyValueRangeDeleter) error {
return deletePrefixRange(db, []byte(filterMapsPrefix))
}
// DeleteFilterMapsDb removes the old bloombits database and the associated
// chain indexer database.
func DeleteBloomBitsDb(db ethdb.KeyValueRangeDeleter) error {
if err := deletePrefixRange(db, bloomBitsPrefix); err != nil {
return err
}
return deletePrefixRange(db, bloomBitsIndexPrefix)
}

View file

@ -17,7 +17,6 @@
package rawdb
import (
"bytes"
"math/big"
"testing"
@ -25,7 +24,6 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/internal/blocktest"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
)
@ -111,54 +109,3 @@ func TestLookupStorage(t *testing.T) {
})
}
}
func TestDeleteBloomBits(t *testing.T) {
// Prepare testing data
db := NewMemoryDatabase()
for i := uint(0); i < 2; i++ {
for s := uint64(0); s < 2; s++ {
WriteBloomBits(db, i, s, params.MainnetGenesisHash, []byte{0x01, 0x02})
WriteBloomBits(db, i, s, params.SepoliaGenesisHash, []byte{0x01, 0x02})
WriteBloomBits(db, i, s, params.HoodiGenesisHash, []byte{0x01, 0x02})
}
}
check := func(bit uint, section uint64, head common.Hash, exist bool) {
bits, _ := ReadBloomBits(db, bit, section, head)
if exist && !bytes.Equal(bits, []byte{0x01, 0x02}) {
t.Fatalf("Bloombits mismatch")
}
if !exist && len(bits) > 0 {
t.Fatalf("Bloombits should be removed")
}
}
// Check the existence of written data.
check(0, 0, params.MainnetGenesisHash, true)
check(0, 0, params.SepoliaGenesisHash, true)
check(0, 0, params.HoodiGenesisHash, true)
// Check the existence of deleted data.
DeleteBloombits(db, 0, 0, 1)
check(0, 0, params.MainnetGenesisHash, false)
check(0, 0, params.SepoliaGenesisHash, false)
check(0, 0, params.HoodiGenesisHash, false)
check(0, 1, params.MainnetGenesisHash, true)
check(0, 1, params.SepoliaGenesisHash, true)
check(0, 1, params.HoodiGenesisHash, true)
// Check the existence of deleted data.
DeleteBloombits(db, 0, 0, 2)
check(0, 0, params.MainnetGenesisHash, false)
check(0, 0, params.SepoliaGenesisHash, false)
check(0, 0, params.HoodiGenesisHash, false)
check(0, 1, params.MainnetGenesisHash, false)
check(0, 1, params.SepoliaGenesisHash, false)
check(0, 1, params.HoodiGenesisHash, false)
// Bit1 shouldn't be affect.
check(1, 0, params.MainnetGenesisHash, true)
check(1, 0, params.SepoliaGenesisHash, true)
check(1, 0, params.HoodiGenesisHash, true)
check(1, 1, params.MainnetGenesisHash, true)
check(1, 1, params.SepoliaGenesisHash, true)
check(1, 1, params.HoodiGenesisHash, true)
}

View file

@ -375,7 +375,6 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
accountSnaps stat
storageSnaps stat
preimages stat
bloomBits stat
filterMaps stat
beaconHeaders stat
cliqueSnaps stat
@ -437,11 +436,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
metadata.Add(size)
case bytes.HasPrefix(key, genesisPrefix) && len(key) == (len(genesisPrefix)+common.HashLength):
metadata.Add(size)
case bytes.HasPrefix(key, bloomBitsPrefix) && len(key) == (len(bloomBitsPrefix)+10+common.HashLength):
bloomBits.Add(size)
case bytes.HasPrefix(key, BloomBitsIndexPrefix):
bloomBits.Add(size)
case bytes.HasPrefix(key, []byte(FilterMapsPrefix)):
case bytes.HasPrefix(key, []byte(filterMapsPrefix)):
filterMaps.Add(size)
case bytes.HasPrefix(key, skeletonHeaderPrefix) && len(key) == (len(skeletonHeaderPrefix)+8):
beaconHeaders.Add(size)
@ -507,7 +502,6 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
{"Key-Value store", "Block number->hash", numHashPairings.Size(), numHashPairings.Count()},
{"Key-Value store", "Block hash->number", hashNumPairings.Size(), hashNumPairings.Count()},
{"Key-Value store", "Transaction index", txLookups.Size(), txLookups.Count()},
{"Key-Value store", "Bloombit index", bloomBits.Size(), bloomBits.Count()},
{"Key-Value store", "Log search index", filterMaps.Size(), filterMaps.Count()},
{"Key-Value store", "Contract codes", codes.Size(), codes.Count()},
{"Key-Value store", "Hash trie nodes", legacyTries.Size(), legacyTries.Count()},

View file

@ -128,8 +128,8 @@ var (
configPrefix = []byte("ethereum-config-") // config prefix for the db
genesisPrefix = []byte("ethereum-genesis-") // genesis state prefix for the db
// BloomBitsIndexPrefix is the data table of a chain indexer to track its progress
BloomBitsIndexPrefix = []byte("iB")
// bloomBitsIndexPrefix is the data table of a chain indexer to track its progress
bloomBitsIndexPrefix = []byte("iB")
ChtPrefix = []byte("chtRootV2-") // ChtPrefix + chtNum (uint64 big endian) -> trie root hash
ChtTablePrefix = []byte("cht-")
@ -145,11 +145,11 @@ var (
FixedCommitteeRootKey = []byte("fixedRoot-") // bigEndian64(syncPeriod) -> committee root hash
SyncCommitteeKey = []byte("committee-") // bigEndian64(syncPeriod) -> serialized committee
FilterMapsPrefix = "fm-"
filterMapsRangeKey = []byte(FilterMapsPrefix + "R")
filterMapRowPrefix = []byte(FilterMapsPrefix + "r") // filterMapRowPrefix + mapRowIndex (uint64 big endian) -> filter row
filterMapLastBlockPrefix = []byte(FilterMapsPrefix + "b") // filterMapLastBlockPrefix + mapIndex (uint32 big endian) -> block number (uint64 big endian)
filterMapBlockLVPrefix = []byte(FilterMapsPrefix + "p") // filterMapBlockLVPrefix + num (uint64 big endian) -> log value pointer (uint64 big endian)
filterMapsPrefix = "fm-"
filterMapsRangeKey = []byte(filterMapsPrefix + "R")
filterMapRowPrefix = []byte(filterMapsPrefix + "r") // filterMapRowPrefix + mapRowIndex (uint64 big endian) -> filter row
filterMapLastBlockPrefix = []byte(filterMapsPrefix + "b") // filterMapLastBlockPrefix + mapIndex (uint32 big endian) -> block number (uint64 big endian)
filterMapBlockLVPrefix = []byte(filterMapsPrefix + "p") // filterMapBlockLVPrefix + num (uint64 big endian) -> log value pointer (uint64 big endian)
preimageCounter = metrics.NewRegisteredCounter("db/preimage/total", nil)
preimageHitsCounter = metrics.NewRegisteredCounter("db/preimage/hits", nil)
@ -225,16 +225,6 @@ func storageSnapshotsKey(accountHash common.Hash) []byte {
return append(SnapshotStoragePrefix, accountHash.Bytes()...)
}
// bloomBitsKey = bloomBitsPrefix + bit (uint16 big endian) + section (uint64 big endian) + hash
func bloomBitsKey(bit uint, section uint64, hash common.Hash) []byte {
key := append(append(bloomBitsPrefix, make([]byte, 10)...), hash.Bytes()...)
binary.BigEndian.PutUint16(key[1:], uint16(bit))
binary.BigEndian.PutUint64(key[3:], section)
return key
}
// skeletonHeaderKey = skeletonHeaderPrefix + num (uint64 big endian)
func skeletonHeaderKey(number uint64) []byte {
return append(skeletonHeaderPrefix, encodeBlockNumber(number)...)

View file

@ -28,7 +28,6 @@ import (
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/consensus/misc/eip4844"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/filtermaps"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
@ -403,17 +402,6 @@ func (b *EthAPIBackend) RPCTxFeeCap() float64 {
return b.eth.config.RPCTxFeeCap
}
func (b *EthAPIBackend) BloomStatus() (uint64, uint64) {
sections, _, _ := b.eth.bloomIndexer.Sections()
return params.BloomBitsBlocks, sections
}
func (b *EthAPIBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) {
for i := 0; i < bloomFilterThreads; i++ {
go session.Multiplex(bloomRetrievalBatch, bloomRetrievalWait, b.eth.bloomRequests)
}
}
func (b *EthAPIBackend) NewMatcherBackend() filtermaps.MatcherBackend {
return b.eth.filterMaps.NewMatcherBackend()
}

View file

@ -30,7 +30,6 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/filtermaps"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state/pruner"
@ -85,10 +84,6 @@ type Ethereum struct {
engine consensus.Engine
accountManager *accounts.Manager
bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests
bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports
closeBloomHandler chan struct{}
filterMaps *filtermaps.FilterMaps
closeFilterMaps chan chan struct{}
@ -181,11 +176,8 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
eventMux: stack.EventMux(),
accountManager: stack.AccountManager(),
engine: engine,
closeBloomHandler: make(chan struct{}),
networkID: networkID,
gasPrice: config.Miner.GasPrice,
bloomRequests: make(chan chan *bloombits.Retrieval),
bloomIndexer: core.NewBloomIndexer(chainDb, params.BloomBitsBlocks, params.BloomConfirms),
p2pServer: stack.Server(),
discmix: enode.NewFairMix(0),
shutdownTracker: shutdowncheck.NewShutdownTracker(chainDb),
@ -247,7 +239,6 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
if err != nil {
return nil, err
}
eth.bloomIndexer.Start(eth.blockchain)
fmConfig := filtermaps.Config{History: config.LogHistory, Disabled: config.LogNoHistory, ExportFileName: config.LogExportCheckpoints}
chainView := eth.newChainView(eth.blockchain.CurrentBlock())
eth.filterMaps = filtermaps.NewFilterMaps(chainDb, chainView, 0, 0, filtermaps.DefaultParams, fmConfig)
@ -379,7 +370,6 @@ func (s *Ethereum) Downloader() *downloader.Downloader { return s.handler.downlo
func (s *Ethereum) Synced() bool { return s.handler.synced.Load() }
func (s *Ethereum) SetSynced() { s.handler.enableSyncedFeatures() }
func (s *Ethereum) ArchiveMode() bool { return s.config.NoPruning }
func (s *Ethereum) BloomIndexer() *core.ChainIndexer { return s.bloomIndexer }
// Protocols returns all the currently configured
// network protocols to start.
@ -398,9 +388,6 @@ func (s *Ethereum) Start() error {
return err
}
// Start the bloom bits servicing goroutines
s.startBloomHandlers(params.BloomBitsBlocks)
// Regularly update shutdown marker
s.shutdownTracker.Start()
@ -511,8 +498,6 @@ func (s *Ethereum) Stop() error {
s.handler.Stop()
// Then stop everything else.
s.bloomIndexer.Close()
close(s.closeBloomHandler)
ch := make(chan struct{})
s.closeFilterMaps <- ch
<-ch

View file

@ -1,74 +0,0 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package eth
import (
"time"
"github.com/ethereum/go-ethereum/common/bitutil"
"github.com/ethereum/go-ethereum/core/rawdb"
)
const (
// bloomServiceThreads is the number of goroutines used globally by an Ethereum
// instance to service bloombits lookups for all running filters.
bloomServiceThreads = 16
// bloomFilterThreads is the number of goroutines used locally per filter to
// multiplex requests onto the global servicing goroutines.
bloomFilterThreads = 3
// bloomRetrievalBatch is the maximum number of bloom bit retrievals to service
// in a single batch.
bloomRetrievalBatch = 16
// bloomRetrievalWait is the maximum time to wait for enough bloom bit requests
// to accumulate request an entire batch (avoiding hysteresis).
bloomRetrievalWait = time.Duration(0)
)
// startBloomHandlers starts a batch of goroutines to accept bloom bit database
// retrievals from possibly a range of filters and serving the data to satisfy.
func (eth *Ethereum) startBloomHandlers(sectionSize uint64) {
for i := 0; i < bloomServiceThreads; i++ {
go func() {
for {
select {
case <-eth.closeBloomHandler:
return
case request := <-eth.bloomRequests:
task := <-request
task.Bitsets = make([][]byte, len(task.Sections))
for i, section := range task.Sections {
head := rawdb.ReadCanonicalHash(eth.chainDb, (section+1)*sectionSize-1)
if compVector, err := rawdb.ReadBloomBits(eth.chainDb, task.Bit, section, head); err == nil {
if blob, err := bitutil.DecompressBytes(compVector, int(sectionSize/8)); err == nil {
task.Bitsets[i] = blob
} else {
task.Error = err
}
} else {
task.Error = err
}
}
request <- task
}
}
}()
}
}

View file

@ -20,14 +20,6 @@ package params
// aren't necessarily consensus related.
const (
// BloomBitsBlocks is the number of blocks a single bloom bit section vector
// contains on the server side.
BloomBitsBlocks uint64 = 4096
// BloomConfirms is the number of confirmation blocks before a bloom section is
// considered probably final and its rotated bits are calculated.
BloomConfirms = 256
// FullImmutabilityThreshold is the number of blocks after which a chain segment is
// considered immutable (i.e. soft finality). It is used by the downloader as a
// hard limit against deep ancestors, by the blockchain against deep reorgs, by