light: CHT and bloom trie indexers working in light mode (#16534)

This commit is contained in:
Daniel Liu 2025-03-06 13:25:57 +08:00
parent a898812123
commit f039b26e7a
16 changed files with 214 additions and 83 deletions

View file

@ -17,6 +17,7 @@
package core
import (
"context"
"encoding/binary"
"errors"
"fmt"
@ -38,11 +39,11 @@ import (
type ChainIndexerBackend interface {
// Reset initiates the processing of a new chain segment, potentially terminating
// any partially completed operations (in case of a reorg).
Reset(section uint64, prevHead common.Hash) error
Reset(ctx context.Context, section uint64, prevHead common.Hash) error
// Process crunches through the next header in the chain segment. The caller
// will ensure a sequential order of headers.
Process(header *types.Header)
Process(ctx context.Context, header *types.Header) error
// Commit finalizes the section metadata and stores it into the database.
Commit() error
@ -72,9 +73,11 @@ type ChainIndexer struct {
backend ChainIndexerBackend // Background processor generating the index data content
children []*ChainIndexer // Child indexers to cascade chain updates to
active uint32 // Flag whether the event loop was started
update chan struct{} // Notification channel that headers should be processed
quit chan chan error // Quit channel to tear down running goroutines
active uint32 // Flag whether the event loop was started
update chan struct{} // Notification channel that headers should be processed
quit chan chan error // Quit channel to tear down running goroutines
ctx context.Context
ctxCancel func()
sectionSize uint64 // Number of blocks in a single chain segment to process
confirmsReq uint64 // Number of confirmations before processing a completed segment
@ -106,6 +109,8 @@ func NewChainIndexer(chainDb, indexDb ethdb.Database, backend ChainIndexerBacken
}
// Initialize database dependent fields and start the updater
c.loadValidSections()
c.ctx, c.ctxCancel = context.WithCancel(context.Background())
go c.updateLoop()
return c
@ -139,6 +144,8 @@ func (c *ChainIndexer) Start(chain ChainIndexerChain) {
func (c *ChainIndexer) Close() error {
var errs []error
c.ctxCancel()
// Tear down the primary update loop
errc := make(chan error)
c.quit <- errc
@ -298,6 +305,12 @@ func (c *ChainIndexer) updateLoop() {
c.lock.Unlock()
newHead, err := c.processSection(section, oldHead)
if err != nil {
select {
case <-c.ctx.Done():
<-c.quit <- nil
return
default:
}
c.log.Error("Section processing failed", "error", err)
}
c.lock.Lock()
@ -345,7 +358,7 @@ func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (com
// Reset and partial processing
if err := c.backend.Reset(section, lastHead); err != nil {
if err := c.backend.Reset(c.ctx, section, lastHead); err != nil {
c.setValidSections(0)
return common.Hash{}, err
}
@ -361,11 +374,12 @@ func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (com
} else if header.ParentHash != lastHead {
return common.Hash{}, errors.New("chain reorged during section processing")
}
c.backend.Process(header)
if err := c.backend.Process(c.ctx, header); err != nil {
return common.Hash{}, err
}
lastHead = header.Hash()
}
if err := c.backend.Commit(); err != nil {
c.log.Error("Section commit failed", "error", err)
return common.Hash{}, err
}
return lastHead, nil

View file

@ -17,15 +17,15 @@
package core
import (
"context"
"fmt"
"math/big"
"math/rand"
"testing"
"time"
"github.com/XinFinOrg/XDPoSChain/core/rawdb"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/core/rawdb"
"github.com/XinFinOrg/XDPoSChain/core/types"
)
@ -210,13 +210,13 @@ func (b *testChainIndexBackend) reorg(headNum uint64) uint64 {
return b.stored * b.indexer.sectionSize
}
func (b *testChainIndexBackend) Reset(section uint64, prevHead common.Hash) error {
func (b *testChainIndexBackend) Reset(ctx context.Context, section uint64, prevHead common.Hash) error {
b.section = section
b.headerCnt = 0
return nil
}
func (b *testChainIndexBackend) Process(header *types.Header) {
func (b *testChainIndexBackend) Process(ctx context.Context, header *types.Header) error {
b.headerCnt++
if b.headerCnt > b.indexer.sectionSize {
b.t.Error("Processing too many headers")
@ -227,6 +227,7 @@ func (b *testChainIndexBackend) Process(header *types.Header) {
b.t.Fatal("Unexpected call to Process")
case b.processCh <- header.Number.Uint64():
}
return nil
}
func (b *testChainIndexBackend) Commit() error {

View file

@ -155,7 +155,7 @@ func New(ctx *node.ServiceContext, config *ethconfig.Config, XDCXServ *XDCx.XDCX
gasPrice: config.GasPrice,
etherbase: config.Etherbase,
bloomRequests: make(chan chan *bloombits.Retrieval),
bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks),
bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks, bloomConfirms),
}
// Inject XDCX Service into main Eth Service.
if XDCXServ != nil {

View file

@ -17,6 +17,7 @@
package eth
import (
"context"
"time"
"github.com/XinFinOrg/XDPoSChain/common"
@ -92,30 +93,28 @@ const (
// BloomIndexer implements a core.ChainIndexer, building up a rotated bloom bits index
// for the Ethereum header bloom filters, permitting blazing fast filtering.
type BloomIndexer struct {
size uint64 // section size to generate bloombits for
db ethdb.Database // database instance to write index data and metadata into
gen *bloombits.Generator // generator to rotate the bloom bits crating the bloom index
section uint64 // Section is the section number being processed currently
head common.Hash // Head is the hash of the last header processed
size uint64 // section size to generate bloombits for
db ethdb.Database // database instance to write index data and metadata into
gen *bloombits.Generator // generator to rotate the bloom bits crating the bloom index
section uint64 // Section is the section number being processed currently
head common.Hash // Head is the hash of the last header processed
}
// NewBloomIndexer returns a chain indexer that generates bloom bits data for the
// canonical chain for fast logs filtering.
func NewBloomIndexer(db ethdb.Database, size uint64) *core.ChainIndexer {
func NewBloomIndexer(db ethdb.Database, size, confReq uint64) *core.ChainIndexer {
backend := &BloomIndexer{
db: db,
size: size,
}
table := rawdb.NewTable(db, string(rawdb.BloomBitsIndexPrefix))
return core.NewChainIndexer(db, table, backend, size, bloomConfirms, bloomThrottling, "bloombits")
return core.NewChainIndexer(db, table, backend, size, confReq, bloomThrottling, "bloombits")
}
// Reset implements core.ChainIndexerBackend, starting a new bloombits index
// section.
func (b *BloomIndexer) Reset(section uint64, lastSectionHead common.Hash) error {
func (b *BloomIndexer) Reset(ctx context.Context, section uint64, lastSectionHead common.Hash) error {
gen, err := bloombits.NewGenerator(uint(b.size))
b.gen, b.section, b.head = gen, section, common.Hash{}
return err
@ -123,9 +122,10 @@ func (b *BloomIndexer) Reset(section uint64, lastSectionHead common.Hash) error
// Process implements core.ChainIndexerBackend, adding a new header's bloom into
// the index.
func (b *BloomIndexer) Process(header *types.Header) {
func (b *BloomIndexer) Process(ctx context.Context, header *types.Header) error {
b.gen.AddBloom(uint(header.Number.Uint64()-b.section*b.size), header.Bloom)
b.head = header.Hash()
return nil
}
// Commit implements core.ChainIndexerBackend, finalizing the bloom section and

View file

@ -118,7 +118,7 @@ func New(ctx *node.ServiceContext, config *ethconfig.Config) (*LightEthereum, er
shutdownChan: make(chan bool),
networkId: networkID,
bloomRequests: make(chan chan *bloombits.Retrieval),
bloomIndexer: eth.NewBloomIndexer(chainDb, light.BloomTrieFrequency),
bloomIndexer: eth.NewBloomIndexer(chainDb, light.BloomTrieFrequency, light.HelperTrieConfirmations),
}
leth.relay = NewLesTxRelay(peers, leth.reqDist)
@ -126,8 +126,8 @@ func New(ctx *node.ServiceContext, config *ethconfig.Config) (*LightEthereum, er
leth.retriever = newRetrieveManager(peers, leth.reqDist, leth.serverPool)
leth.odr = NewLesOdr(chainDb, leth.retriever)
leth.chtIndexer = light.NewChtIndexer(chainDb, true)
leth.bloomTrieIndexer = light.NewBloomTrieIndexer(chainDb, true)
leth.chtIndexer = light.NewChtIndexer(chainDb, true, leth.odr)
leth.bloomTrieIndexer = light.NewBloomTrieIndexer(chainDb, true, leth.odr)
leth.odr.SetIndexers(leth.chtIndexer, leth.bloomTrieIndexer, leth.bloomIndexer)
// Note: NewLightChain adds the trusted checkpoint so it needs an ODR with
@ -135,6 +135,10 @@ func New(ctx *node.ServiceContext, config *ethconfig.Config) (*LightEthereum, er
if leth.blockchain, err = light.NewLightChain(leth.odr, leth.chainConfig, leth.engine); err != nil {
return nil, err
}
// Note: AddChildIndexer starts the update process for the child
leth.bloomIndexer.AddChildIndexer(leth.bloomTrieIndexer)
leth.chtIndexer.Start(leth.blockchain)
leth.bloomIndexer.Start(leth.blockchain)
// Rewind the chain in case of an incompatible config upgrade.
@ -252,9 +256,6 @@ func (s *LightEthereum) Stop() error {
s.odr.Stop()
s.bloomIndexer.Close()
s.chtIndexer.Close()
if s.bloomTrieIndexer != nil {
s.bloomTrieIndexer.Close()
}
s.blockchain.Stop()
s.protocolManager.Stop()
s.txPool.Stop()

View file

@ -24,6 +24,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/core"
"github.com/XinFinOrg/XDPoSChain/eth/ethconfig"
"github.com/XinFinOrg/XDPoSChain/ethdb"
"github.com/XinFinOrg/XDPoSChain/light"
"github.com/XinFinOrg/XDPoSChain/p2p"
"github.com/XinFinOrg/XDPoSChain/p2p/discover"
"github.com/XinFinOrg/XDPoSChain/params"
@ -40,11 +41,12 @@ type lesCommons struct {
// NodeInfo represents a short summary of the Ethereum sub-protocol metadata
// known about the host peer.
type NodeInfo struct {
Network uint64 `json:"network"` // XDC network ID (50=xinfin, 51=apothem, 551=devnet)
Difficulty *big.Int `json:"difficulty"` // Total difficulty of the host's blockchain
Genesis common.Hash `json:"genesis"` // SHA3 hash of the host's genesis block
Config *params.ChainConfig `json:"config"` // Chain configuration for the fork rules
Head common.Hash `json:"head"` // SHA3 hash of the host's best owned block
Network uint64 `json:"network"` // XDC network ID (50=xinfin, 51=apothem, 551=devnet)
Difficulty *big.Int `json:"difficulty"` // Total difficulty of the host's blockchain
Genesis common.Hash `json:"genesis"` // SHA3 hash of the host's genesis block
Config *params.ChainConfig `json:"config"` // Chain configuration for the fork rules
Head common.Hash `json:"head"` // SHA3 hash of the host's best owned block
CHT light.TrustedCheckpoint `json:"cht"` // Trused CHT checkpoint for fast catchup
}
// makeProtocols creates protocol descriptors for the given LES versions.
@ -73,6 +75,23 @@ func (c *lesCommons) makeProtocols(versions []uint) []p2p.Protocol {
// nodeInfo retrieves some protocol metadata about the running host node.
func (c *lesCommons) nodeInfo() interface{} {
var cht light.TrustedCheckpoint
sections, _, sectionHead := c.chtIndexer.Sections()
sections2, _, sectionHead2 := c.bloomTrieIndexer.Sections()
if sections2 < sections {
sections = sections2
sectionHead = sectionHead2
}
if sections > 0 {
sectionIndex := sections - 1
cht = light.TrustedCheckpoint{
SectionIdx: sectionIndex,
SectionHead: sectionHead,
CHTRoot: light.GetChtRoot(c.chainDb, sectionIndex, sectionHead),
BloomRoot: light.GetBloomTrieRoot(c.chainDb, sectionIndex, sectionHead),
}
}
chain := c.protocolManager.blockchain
head := chain.CurrentHeader()
hash := head.Hash()
@ -82,5 +101,6 @@ func (c *lesCommons) nodeInfo() interface{} {
Genesis: chain.Genesis().Hash(),
Config: chain.Config(),
Head: chain.CurrentHeader().Hash(),
CHT: cht,
}
}

View file

@ -20,14 +20,10 @@ package les
import (
"container/list"
"errors"
"sync"
"time"
)
// ErrNoPeers is returned if no peers capable of serving a queued request are available
var ErrNoPeers = errors.New("no suitable peers available")
// requestDistributor implements a mechanism that distributes requests to
// suitable peers, obeying flow control rules and prioritizing them in creation
// order (even when a resend is necessary).

View file

@ -1182,7 +1182,7 @@ func (pc *peerConnection) RequestHeadersByHash(origin common.Hash, amount int, s
}
_, ok := <-pc.manager.reqDist.queue(rq)
if !ok {
return ErrNoPeers
return light.ErrNoPeers
}
return nil
}
@ -1206,7 +1206,7 @@ func (pc *peerConnection) RequestHeadersByNumber(origin uint64, amount int, skip
}
_, ok := <-pc.manager.reqDist.queue(rq)
if !ok {
return ErrNoPeers
return light.ErrNoPeers
}
return nil
}

View file

@ -156,12 +156,12 @@ func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *cor
} else {
blockchain, _ := core.NewBlockChain(db, nil, gspec.Config, engine, vm.Config{})
chtIndexer := light.NewChtIndexer(db, false)
chtIndexer := light.NewChtIndexer(db, false, nil)
chtIndexer.Start(blockchain)
bbtIndexer := light.NewBloomTrieIndexer(db, false)
bbtIndexer := light.NewBloomTrieIndexer(db, false, nil)
bloomIndexer := eth.NewBloomIndexer(db, params.BloomBitsBlocks)
bloomIndexer := eth.NewBloomIndexer(db, params.BloomBitsBlocks, light.HelperTrieProcessConfirmations)
bloomIndexer.AddChildIndexer(bbtIndexer)
bloomIndexer.Start(blockchain)

View file

@ -183,7 +183,7 @@ func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) {
db := rawdb.NewMemoryDatabase()
ldb := rawdb.NewMemoryDatabase()
odr := NewLesOdr(ldb, rm)
odr.SetIndexers(light.NewChtIndexer(db, true), light.NewBloomTrieIndexer(db, true), eth.NewBloomIndexer(db, light.BloomTrieFrequency))
odr.SetIndexers(light.NewChtIndexer(db, true, nil), light.NewBloomTrieIndexer(db, true, nil), eth.NewBloomIndexer(db, light.BloomTrieFrequency, light.HelperTrieConfirmations))
pm := newTestProtocolManagerMust(t, false, 4, testChainGen, nil, nil, db)
lpm := newTestProtocolManagerMust(t, true, 0, nil, peers, odr, ldb)
_, err1, lpeer, err2 := newTestPeerPair("peer", protocol, pm, lpm)

View file

@ -92,7 +92,7 @@ func testAccess(t *testing.T, protocol int, fn accessTestFn) {
db := rawdb.NewMemoryDatabase()
ldb := rawdb.NewMemoryDatabase()
odr := NewLesOdr(ldb, rm)
odr.SetIndexers(light.NewChtIndexer(db, true), light.NewBloomTrieIndexer(db, true), eth.NewBloomIndexer(db, light.BloomTrieFrequency))
odr.SetIndexers(light.NewChtIndexer(db, true, nil), light.NewBloomTrieIndexer(db, true, nil), eth.NewBloomIndexer(db, light.BloomTrieFrequency, light.HelperTrieConfirmations))
pm := newTestProtocolManagerMust(t, false, 4, testChainGen, nil, nil, db)
lpm := newTestProtocolManagerMust(t, true, 0, nil, peers, odr, ldb)

View file

@ -27,6 +27,7 @@ import (
"time"
"github.com/XinFinOrg/XDPoSChain/common/mclock"
"github.com/XinFinOrg/XDPoSChain/light"
)
var (
@ -207,7 +208,7 @@ func (r *sentReq) stateRequesting() reqStateFn {
return r.stateNoMorePeers
}
// nothing to wait for, no more peers to ask, return with error
r.stop(ErrNoPeers)
r.stop(light.ErrNoPeers)
// no need to go to stopped state because waiting() already returned false
return nil
}

View file

@ -65,8 +65,8 @@ func NewLesServer(eth *eth.Ethereum, config *ethconfig.Config) (*LesServer, erro
lesCommons: lesCommons{
config: config,
chainDb: eth.ChainDb(),
chtIndexer: light.NewChtIndexer(eth.ChainDb(), false),
bloomTrieIndexer: light.NewBloomTrieIndexer(eth.ChainDb(), false),
chtIndexer: light.NewChtIndexer(eth.ChainDb(), false, nil),
bloomTrieIndexer: light.NewBloomTrieIndexer(eth.ChainDb(), false, nil),
protocolManager: pm,
},
quitSync: quitSync,

View file

@ -111,19 +111,19 @@ func NewLightChain(odr OdrBackend, config *params.ChainConfig, engine consensus.
}
// addTrustedCheckpoint adds a trusted checkpoint to the blockchain
func (lc *LightChain) addTrustedCheckpoint(cp trustedCheckpoint) {
func (lc *LightChain) addTrustedCheckpoint(cp TrustedCheckpoint) {
if lc.odr.ChtIndexer() != nil {
StoreChtRoot(lc.chainDb, cp.sectionIdx, cp.sectionHead, cp.chtRoot)
lc.odr.ChtIndexer().AddKnownSectionHead(cp.sectionIdx, cp.sectionHead)
StoreChtRoot(lc.chainDb, cp.SectionIdx, cp.SectionHead, cp.CHTRoot)
lc.odr.ChtIndexer().AddKnownSectionHead(cp.SectionIdx, cp.SectionHead)
}
if lc.odr.BloomTrieIndexer() != nil {
StoreBloomTrieRoot(lc.chainDb, cp.sectionIdx, cp.sectionHead, cp.bloomTrieRoot)
lc.odr.BloomTrieIndexer().AddKnownSectionHead(cp.sectionIdx, cp.sectionHead)
StoreBloomTrieRoot(lc.chainDb, cp.SectionIdx, cp.SectionHead, cp.BloomRoot)
lc.odr.BloomTrieIndexer().AddKnownSectionHead(cp.SectionIdx, cp.SectionHead)
}
if lc.odr.BloomIndexer() != nil {
lc.odr.BloomIndexer().AddKnownSectionHead(cp.sectionIdx, cp.sectionHead)
lc.odr.BloomIndexer().AddKnownSectionHead(cp.SectionIdx, cp.SectionHead)
}
log.Info("Added trusted checkpoint", "chain", cp.name, "block", (cp.sectionIdx+1)*CHTFrequencyClient-1, "hash", cp.sectionHead)
log.Info("Added trusted checkpoint", "chain", cp.name, "block", (cp.SectionIdx+1)*CHTFrequencyClient-1, "hash", cp.SectionHead)
}
func (lc *LightChain) getProcInterrupt() bool {

View file

@ -20,6 +20,7 @@ package light
import (
"context"
"errors"
"math/big"
"github.com/XinFinOrg/XDPoSChain/common"
@ -33,6 +34,9 @@ import (
// service is not required.
var NoOdr = context.Background()
// ErrNoPeers is returned if no peers capable of serving a queued request are available
var ErrNoPeers = errors.New("no suitable peers available")
// OdrBackend is an interface to a backend service that handles ODR retrievals type
type OdrBackend interface {
Database() ethdb.Database

View file

@ -17,8 +17,10 @@
package light
import (
"context"
"encoding/binary"
"errors"
"fmt"
"math/big"
"time"
@ -46,17 +48,17 @@ const (
HelperTrieProcessConfirmations = 256 // number of confirmations before a HelperTrie is generated
)
// trustedCheckpoint represents a set of post-processed trie roots (CHT and BloomTrie) associated with
// TrustedCheckpoint represents a set of post-processed trie roots (CHT and BloomTrie) associated with
// the appropriate section index and head hash. It is used to start light syncing from this checkpoint
// and avoid downloading the entire header chain while still being able to securely access old headers/logs.
type trustedCheckpoint struct {
name string
sectionIdx uint64
sectionHead, chtRoot, bloomTrieRoot common.Hash
type TrustedCheckpoint struct {
name string
SectionIdx uint64
SectionHead, CHTRoot, BloomRoot common.Hash
}
// trustedCheckpoints associates each known checkpoint with the genesis hash of the chain it belongs to
var trustedCheckpoints = map[common.Hash]trustedCheckpoint{}
var trustedCheckpoints = map[common.Hash]TrustedCheckpoint{}
var (
ErrNoTrustedCht = errors.New("no trusted canonical hash trie")
@ -97,15 +99,16 @@ func StoreChtRoot(db ethdb.Database, sectionIdx uint64, sectionHead, root common
// ChtIndexerBackend implements core.ChainIndexerBackend
type ChtIndexerBackend struct {
diskdb ethdb.Database
diskdb, trieTable ethdb.Database
odr OdrBackend
triedb *trie.Database
section, sectionSize uint64
lastHash common.Hash
trie *trie.Trie
}
// NewBloomTrieIndexer creates a BloomTrie chain indexer
func NewChtIndexer(db ethdb.Database, clientMode bool) *core.ChainIndexer {
// NewChtIndexer creates a BloomTrie chain indexer
func NewChtIndexer(db ethdb.Database, clientMode bool, odr OdrBackend) *core.ChainIndexer {
var sectionSize, confirmReq uint64
if clientMode {
sectionSize = CHTFrequencyClient
@ -115,28 +118,64 @@ func NewChtIndexer(db ethdb.Database, clientMode bool) *core.ChainIndexer {
confirmReq = HelperTrieProcessConfirmations
}
idb := rawdb.NewTable(db, "chtIndex-")
trieTable := rawdb.NewTable(db, ChtTablePrefix)
backend := &ChtIndexerBackend{
diskdb: db,
triedb: trie.NewDatabase(rawdb.NewTable(db, ChtTablePrefix)),
odr: odr,
trieTable: trieTable,
triedb: trie.NewDatabase(trieTable),
sectionSize: sectionSize,
}
return core.NewChainIndexer(db, idb, backend, sectionSize, confirmReq, time.Millisecond*100, "cht")
}
// fetchMissingNodes tries to retrieve the last entry of the latest trusted CHT from the
// ODR backend in order to be able to add new entries and calculate subsequent root hashes
func (c *ChtIndexerBackend) fetchMissingNodes(ctx context.Context, section uint64, root common.Hash) error {
batch := c.trieTable.NewBatch()
r := &ChtRequest{ChtRoot: root, ChtNum: section - 1, BlockNum: section*c.sectionSize - 1}
for {
err := c.odr.Retrieve(ctx, r)
switch err {
case nil:
r.Proof.Store(batch)
return batch.Write()
case ErrNoPeers:
// if there are no peers to serve, retry later
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second * 10):
// stay in the loop and try again
}
default:
return err
}
}
}
// Reset implements core.ChainIndexerBackend
func (c *ChtIndexerBackend) Reset(section uint64, lastSectionHead common.Hash) error {
func (c *ChtIndexerBackend) Reset(ctx context.Context, section uint64, lastSectionHead common.Hash) error {
root := types.EmptyRootHash
if section > 0 {
root = GetChtRoot(c.diskdb, section-1, lastSectionHead)
}
var err error
c.trie, err = trie.New(root, c.triedb)
if err != nil && c.odr != nil {
err = c.fetchMissingNodes(ctx, section, root)
if err == nil {
c.trie, err = trie.New(root, c.triedb)
}
}
c.section = section
return err
}
// Process implements core.ChainIndexerBackend
func (c *ChtIndexerBackend) Process(header *types.Header) {
func (c *ChtIndexerBackend) Process(ctx context.Context, header *types.Header) error {
hash, num := header.Hash(), header.Number.Uint64()
c.lastHash = hash
@ -148,6 +187,7 @@ func (c *ChtIndexerBackend) Process(header *types.Header) {
binary.BigEndian.PutUint64(encNumber[:], num)
data, _ := rlp.EncodeToBytes(ChtNode{hash, td})
c.trie.Update(encNumber[:], data)
return nil
}
// Commit implements core.ChainIndexerBackend
@ -159,16 +199,15 @@ func (c *ChtIndexerBackend) Commit() error {
c.triedb.Commit(root, false)
if ((c.section+1)*c.sectionSize)%CHTFrequencyClient == 0 {
log.Info("Storing CHT", "section", c.section*c.sectionSize/CHTFrequencyClient, "head", c.lastHash, "root", root)
log.Info("Storing CHT", "section", c.section*c.sectionSize/CHTFrequencyClient, "head", fmt.Sprintf("%064x", c.lastHash), "root", fmt.Sprintf("%064x", root))
}
StoreChtRoot(c.diskdb, c.section, c.lastHash, root)
return nil
}
const (
BloomTrieFrequency = 32768
ethBloomBitsSection = 4096
ethBloomBitsConfirmations = 256
BloomTrieFrequency = 32768
ethBloomBitsSection = 4096
)
var (
@ -193,7 +232,8 @@ func StoreBloomTrieRoot(db ethdb.Database, sectionIdx uint64, sectionHead, root
// BloomTrieIndexerBackend implements core.ChainIndexerBackend
type BloomTrieIndexerBackend struct {
diskdb ethdb.Database
diskdb, trieTable ethdb.Database
odr OdrBackend
triedb *trie.Database
section, parentSectionSize, bloomTrieRatio uint64
trie *trie.Trie
@ -201,44 +241,98 @@ type BloomTrieIndexerBackend struct {
}
// NewBloomTrieIndexer creates a BloomTrie chain indexer
func NewBloomTrieIndexer(db ethdb.Database, clientMode bool) *core.ChainIndexer {
func NewBloomTrieIndexer(db ethdb.Database, clientMode bool, odr OdrBackend) *core.ChainIndexer {
trieTable := rawdb.NewTable(db, BloomTrieTablePrefix)
backend := &BloomTrieIndexerBackend{
diskdb: db,
triedb: trie.NewDatabase(rawdb.NewTable(db, BloomTrieTablePrefix)),
diskdb: db,
odr: odr,
trieTable: trieTable,
triedb: trie.NewDatabase(trieTable),
}
idb := rawdb.NewTable(db, "bltIndex-")
var confirmReq uint64
if clientMode {
backend.parentSectionSize = BloomTrieFrequency
confirmReq = HelperTrieConfirmations
} else {
backend.parentSectionSize = ethBloomBitsSection
confirmReq = HelperTrieProcessConfirmations
}
backend.bloomTrieRatio = BloomTrieFrequency / backend.parentSectionSize
backend.sectionHeads = make([]common.Hash, backend.bloomTrieRatio)
return core.NewChainIndexer(db, idb, backend, BloomTrieFrequency, confirmReq-ethBloomBitsConfirmations, time.Millisecond*100, "bloomtrie")
return core.NewChainIndexer(db, idb, backend, BloomTrieFrequency, 0, time.Millisecond*100, "bloomtrie")
}
// fetchMissingNodes tries to retrieve the last entries of the latest trusted bloom trie from the
// ODR backend in order to be able to add new entries and calculate subsequent root hashes
func (b *BloomTrieIndexerBackend) fetchMissingNodes(ctx context.Context, section uint64, root common.Hash) error {
indexCh := make(chan uint, types.BloomBitLength)
type res struct {
nodes *NodeSet
err error
}
resCh := make(chan res, types.BloomBitLength)
for i := 0; i < 20; i++ {
go func() {
for bitIndex := range indexCh {
r := &BloomRequest{BloomTrieRoot: root, BloomTrieNum: section - 1, BitIdx: bitIndex, SectionIdxList: []uint64{section - 1}}
for {
if err := b.odr.Retrieve(ctx, r); err == ErrNoPeers {
// if there are no peers to serve, retry later
select {
case <-ctx.Done():
resCh <- res{nil, ctx.Err()}
return
case <-time.After(time.Second * 10):
// stay in the loop and try again
}
} else {
resCh <- res{r.Proofs, err}
break
}
}
}
}()
}
for i := uint(0); i < types.BloomBitLength; i++ {
indexCh <- i
}
close(indexCh)
batch := b.trieTable.NewBatch()
for i := uint(0); i < types.BloomBitLength; i++ {
res := <-resCh
if res.err != nil {
return res.err
}
res.nodes.Store(batch)
}
return batch.Write()
}
// Reset implements core.ChainIndexerBackend
func (b *BloomTrieIndexerBackend) Reset(section uint64, lastSectionHead common.Hash) error {
func (b *BloomTrieIndexerBackend) Reset(ctx context.Context, section uint64, lastSectionHead common.Hash) error {
root := types.EmptyRootHash
if section > 0 {
root = GetBloomTrieRoot(b.diskdb, section-1, lastSectionHead)
}
var err error
b.trie, err = trie.New(root, b.triedb)
if err != nil && b.odr != nil {
err = b.fetchMissingNodes(ctx, section, root)
if err == nil {
b.trie, err = trie.New(root, b.triedb)
}
}
b.section = section
return err
}
// Process implements core.ChainIndexerBackend
func (b *BloomTrieIndexerBackend) Process(header *types.Header) {
func (b *BloomTrieIndexerBackend) Process(ctx context.Context, header *types.Header) error {
num := header.Number.Uint64() - b.section*BloomTrieFrequency
if (num+1)%b.parentSectionSize == 0 {
b.sectionHeads[num/b.parentSectionSize] = header.Hash()
}
return nil
}
// Commit implements core.ChainIndexerBackend
@ -278,7 +372,7 @@ func (b *BloomTrieIndexerBackend) Commit() error {
b.triedb.Commit(root, false)
sectionHead := b.sectionHeads[b.bloomTrieRatio-1]
log.Info("Storing bloom trie", "section", b.section, "head", sectionHead, "root", root, "compression", float64(compSize)/float64(decompSize))
log.Info("Storing bloom trie", "section", b.section, "head", fmt.Sprintf("%064x", sectionHead), "root", fmt.Sprintf("%064x", root), "compression", float64(compSize)/float64(decompSize))
StoreBloomTrieRoot(b.diskdb, b.section, sectionHead, root)
return nil