From f039b26e7a5c42be3426be37a13896d2a7349c78 Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Thu, 6 Mar 2025 13:25:57 +0800 Subject: [PATCH] light: CHT and bloom trie indexers working in light mode (#16534) --- core/chain_indexer.go | 30 ++++++-- core/chain_indexer_test.go | 9 ++- eth/backend.go | 2 +- eth/bloombits.go | 22 +++--- les/backend.go | 13 ++-- les/commons.go | 30 ++++++-- les/distributor.go | 4 - les/handler.go | 4 +- les/helper_test.go | 6 +- les/odr_test.go | 2 +- les/request_test.go | 2 +- les/retrieve.go | 3 +- les/server.go | 4 +- light/lightchain.go | 14 ++-- light/odr.go | 4 + light/postprocess.go | 148 ++++++++++++++++++++++++++++++------- 16 files changed, 214 insertions(+), 83 deletions(-) diff --git a/core/chain_indexer.go b/core/chain_indexer.go index d497d760f6..39de6d56a3 100644 --- a/core/chain_indexer.go +++ b/core/chain_indexer.go @@ -17,6 +17,7 @@ package core import ( + "context" "encoding/binary" "errors" "fmt" @@ -38,11 +39,11 @@ import ( type ChainIndexerBackend interface { // Reset initiates the processing of a new chain segment, potentially terminating // any partially completed operations (in case of a reorg). - Reset(section uint64, prevHead common.Hash) error + Reset(ctx context.Context, section uint64, prevHead common.Hash) error // Process crunches through the next header in the chain segment. The caller // will ensure a sequential order of headers. - Process(header *types.Header) + Process(ctx context.Context, header *types.Header) error // Commit finalizes the section metadata and stores it into the database. Commit() error @@ -72,9 +73,11 @@ type ChainIndexer struct { backend ChainIndexerBackend // Background processor generating the index data content children []*ChainIndexer // Child indexers to cascade chain updates to - active uint32 // Flag whether the event loop was started - update chan struct{} // Notification channel that headers should be processed - quit chan chan error // Quit channel to tear down running goroutines + active uint32 // Flag whether the event loop was started + update chan struct{} // Notification channel that headers should be processed + quit chan chan error // Quit channel to tear down running goroutines + ctx context.Context + ctxCancel func() sectionSize uint64 // Number of blocks in a single chain segment to process confirmsReq uint64 // Number of confirmations before processing a completed segment @@ -106,6 +109,8 @@ func NewChainIndexer(chainDb, indexDb ethdb.Database, backend ChainIndexerBacken } // Initialize database dependent fields and start the updater c.loadValidSections() + c.ctx, c.ctxCancel = context.WithCancel(context.Background()) + go c.updateLoop() return c @@ -139,6 +144,8 @@ func (c *ChainIndexer) Start(chain ChainIndexerChain) { func (c *ChainIndexer) Close() error { var errs []error + c.ctxCancel() + // Tear down the primary update loop errc := make(chan error) c.quit <- errc @@ -298,6 +305,12 @@ func (c *ChainIndexer) updateLoop() { c.lock.Unlock() newHead, err := c.processSection(section, oldHead) if err != nil { + select { + case <-c.ctx.Done(): + <-c.quit <- nil + return + default: + } c.log.Error("Section processing failed", "error", err) } c.lock.Lock() @@ -345,7 +358,7 @@ func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (com // Reset and partial processing - if err := c.backend.Reset(section, lastHead); err != nil { + if err := c.backend.Reset(c.ctx, section, lastHead); err != nil { c.setValidSections(0) return common.Hash{}, err } @@ -361,11 +374,12 @@ func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (com } else if header.ParentHash != lastHead { return common.Hash{}, errors.New("chain reorged during section processing") } - c.backend.Process(header) + if err := c.backend.Process(c.ctx, header); err != nil { + return common.Hash{}, err + } lastHead = header.Hash() } if err := c.backend.Commit(); err != nil { - c.log.Error("Section commit failed", "error", err) return common.Hash{}, err } return lastHead, nil diff --git a/core/chain_indexer_test.go b/core/chain_indexer_test.go index d276a99a2a..4a677e52fc 100644 --- a/core/chain_indexer_test.go +++ b/core/chain_indexer_test.go @@ -17,15 +17,15 @@ package core import ( + "context" "fmt" "math/big" "math/rand" "testing" "time" - "github.com/XinFinOrg/XDPoSChain/core/rawdb" - "github.com/XinFinOrg/XDPoSChain/common" + "github.com/XinFinOrg/XDPoSChain/core/rawdb" "github.com/XinFinOrg/XDPoSChain/core/types" ) @@ -210,13 +210,13 @@ func (b *testChainIndexBackend) reorg(headNum uint64) uint64 { return b.stored * b.indexer.sectionSize } -func (b *testChainIndexBackend) Reset(section uint64, prevHead common.Hash) error { +func (b *testChainIndexBackend) Reset(ctx context.Context, section uint64, prevHead common.Hash) error { b.section = section b.headerCnt = 0 return nil } -func (b *testChainIndexBackend) Process(header *types.Header) { +func (b *testChainIndexBackend) Process(ctx context.Context, header *types.Header) error { b.headerCnt++ if b.headerCnt > b.indexer.sectionSize { b.t.Error("Processing too many headers") @@ -227,6 +227,7 @@ func (b *testChainIndexBackend) Process(header *types.Header) { b.t.Fatal("Unexpected call to Process") case b.processCh <- header.Number.Uint64(): } + return nil } func (b *testChainIndexBackend) Commit() error { diff --git a/eth/backend.go b/eth/backend.go index d04b091d6a..be286c900d 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -155,7 +155,7 @@ func New(ctx *node.ServiceContext, config *ethconfig.Config, XDCXServ *XDCx.XDCX gasPrice: config.GasPrice, etherbase: config.Etherbase, bloomRequests: make(chan chan *bloombits.Retrieval), - bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks), + bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks, bloomConfirms), } // Inject XDCX Service into main Eth Service. if XDCXServ != nil { diff --git a/eth/bloombits.go b/eth/bloombits.go index 4fa02c9d74..69ea792768 100644 --- a/eth/bloombits.go +++ b/eth/bloombits.go @@ -17,6 +17,7 @@ package eth import ( + "context" "time" "github.com/XinFinOrg/XDPoSChain/common" @@ -92,30 +93,28 @@ const ( // BloomIndexer implements a core.ChainIndexer, building up a rotated bloom bits index // for the Ethereum header bloom filters, permitting blazing fast filtering. type BloomIndexer struct { - size uint64 // section size to generate bloombits for - - db ethdb.Database // database instance to write index data and metadata into - gen *bloombits.Generator // generator to rotate the bloom bits crating the bloom index - - section uint64 // Section is the section number being processed currently - head common.Hash // Head is the hash of the last header processed + size uint64 // section size to generate bloombits for + db ethdb.Database // database instance to write index data and metadata into + gen *bloombits.Generator // generator to rotate the bloom bits crating the bloom index + section uint64 // Section is the section number being processed currently + head common.Hash // Head is the hash of the last header processed } // NewBloomIndexer returns a chain indexer that generates bloom bits data for the // canonical chain for fast logs filtering. -func NewBloomIndexer(db ethdb.Database, size uint64) *core.ChainIndexer { +func NewBloomIndexer(db ethdb.Database, size, confReq uint64) *core.ChainIndexer { backend := &BloomIndexer{ db: db, size: size, } table := rawdb.NewTable(db, string(rawdb.BloomBitsIndexPrefix)) - return core.NewChainIndexer(db, table, backend, size, bloomConfirms, bloomThrottling, "bloombits") + return core.NewChainIndexer(db, table, backend, size, confReq, bloomThrottling, "bloombits") } // Reset implements core.ChainIndexerBackend, starting a new bloombits index // section. -func (b *BloomIndexer) Reset(section uint64, lastSectionHead common.Hash) error { +func (b *BloomIndexer) Reset(ctx context.Context, section uint64, lastSectionHead common.Hash) error { gen, err := bloombits.NewGenerator(uint(b.size)) b.gen, b.section, b.head = gen, section, common.Hash{} return err @@ -123,9 +122,10 @@ func (b *BloomIndexer) Reset(section uint64, lastSectionHead common.Hash) error // Process implements core.ChainIndexerBackend, adding a new header's bloom into // the index. -func (b *BloomIndexer) Process(header *types.Header) { +func (b *BloomIndexer) Process(ctx context.Context, header *types.Header) error { b.gen.AddBloom(uint(header.Number.Uint64()-b.section*b.size), header.Bloom) b.head = header.Hash() + return nil } // Commit implements core.ChainIndexerBackend, finalizing the bloom section and diff --git a/les/backend.go b/les/backend.go index 9ccc76a7af..6f023a5ed6 100644 --- a/les/backend.go +++ b/les/backend.go @@ -118,7 +118,7 @@ func New(ctx *node.ServiceContext, config *ethconfig.Config) (*LightEthereum, er shutdownChan: make(chan bool), networkId: networkID, bloomRequests: make(chan chan *bloombits.Retrieval), - bloomIndexer: eth.NewBloomIndexer(chainDb, light.BloomTrieFrequency), + bloomIndexer: eth.NewBloomIndexer(chainDb, light.BloomTrieFrequency, light.HelperTrieConfirmations), } leth.relay = NewLesTxRelay(peers, leth.reqDist) @@ -126,8 +126,8 @@ func New(ctx *node.ServiceContext, config *ethconfig.Config) (*LightEthereum, er leth.retriever = newRetrieveManager(peers, leth.reqDist, leth.serverPool) leth.odr = NewLesOdr(chainDb, leth.retriever) - leth.chtIndexer = light.NewChtIndexer(chainDb, true) - leth.bloomTrieIndexer = light.NewBloomTrieIndexer(chainDb, true) + leth.chtIndexer = light.NewChtIndexer(chainDb, true, leth.odr) + leth.bloomTrieIndexer = light.NewBloomTrieIndexer(chainDb, true, leth.odr) leth.odr.SetIndexers(leth.chtIndexer, leth.bloomTrieIndexer, leth.bloomIndexer) // Note: NewLightChain adds the trusted checkpoint so it needs an ODR with @@ -135,6 +135,10 @@ func New(ctx *node.ServiceContext, config *ethconfig.Config) (*LightEthereum, er if leth.blockchain, err = light.NewLightChain(leth.odr, leth.chainConfig, leth.engine); err != nil { return nil, err } + + // Note: AddChildIndexer starts the update process for the child + leth.bloomIndexer.AddChildIndexer(leth.bloomTrieIndexer) + leth.chtIndexer.Start(leth.blockchain) leth.bloomIndexer.Start(leth.blockchain) // Rewind the chain in case of an incompatible config upgrade. @@ -252,9 +256,6 @@ func (s *LightEthereum) Stop() error { s.odr.Stop() s.bloomIndexer.Close() s.chtIndexer.Close() - if s.bloomTrieIndexer != nil { - s.bloomTrieIndexer.Close() - } s.blockchain.Stop() s.protocolManager.Stop() s.txPool.Stop() diff --git a/les/commons.go b/les/commons.go index 5d30dde56a..3ccaef5fd8 100644 --- a/les/commons.go +++ b/les/commons.go @@ -24,6 +24,7 @@ import ( "github.com/XinFinOrg/XDPoSChain/core" "github.com/XinFinOrg/XDPoSChain/eth/ethconfig" "github.com/XinFinOrg/XDPoSChain/ethdb" + "github.com/XinFinOrg/XDPoSChain/light" "github.com/XinFinOrg/XDPoSChain/p2p" "github.com/XinFinOrg/XDPoSChain/p2p/discover" "github.com/XinFinOrg/XDPoSChain/params" @@ -40,11 +41,12 @@ type lesCommons struct { // NodeInfo represents a short summary of the Ethereum sub-protocol metadata // known about the host peer. type NodeInfo struct { - Network uint64 `json:"network"` // XDC network ID (50=xinfin, 51=apothem, 551=devnet) - Difficulty *big.Int `json:"difficulty"` // Total difficulty of the host's blockchain - Genesis common.Hash `json:"genesis"` // SHA3 hash of the host's genesis block - Config *params.ChainConfig `json:"config"` // Chain configuration for the fork rules - Head common.Hash `json:"head"` // SHA3 hash of the host's best owned block + Network uint64 `json:"network"` // XDC network ID (50=xinfin, 51=apothem, 551=devnet) + Difficulty *big.Int `json:"difficulty"` // Total difficulty of the host's blockchain + Genesis common.Hash `json:"genesis"` // SHA3 hash of the host's genesis block + Config *params.ChainConfig `json:"config"` // Chain configuration for the fork rules + Head common.Hash `json:"head"` // SHA3 hash of the host's best owned block + CHT light.TrustedCheckpoint `json:"cht"` // Trused CHT checkpoint for fast catchup } // makeProtocols creates protocol descriptors for the given LES versions. @@ -73,6 +75,23 @@ func (c *lesCommons) makeProtocols(versions []uint) []p2p.Protocol { // nodeInfo retrieves some protocol metadata about the running host node. func (c *lesCommons) nodeInfo() interface{} { + var cht light.TrustedCheckpoint + sections, _, sectionHead := c.chtIndexer.Sections() + sections2, _, sectionHead2 := c.bloomTrieIndexer.Sections() + if sections2 < sections { + sections = sections2 + sectionHead = sectionHead2 + } + if sections > 0 { + sectionIndex := sections - 1 + cht = light.TrustedCheckpoint{ + SectionIdx: sectionIndex, + SectionHead: sectionHead, + CHTRoot: light.GetChtRoot(c.chainDb, sectionIndex, sectionHead), + BloomRoot: light.GetBloomTrieRoot(c.chainDb, sectionIndex, sectionHead), + } + } + chain := c.protocolManager.blockchain head := chain.CurrentHeader() hash := head.Hash() @@ -82,5 +101,6 @@ func (c *lesCommons) nodeInfo() interface{} { Genesis: chain.Genesis().Hash(), Config: chain.Config(), Head: chain.CurrentHeader().Hash(), + CHT: cht, } } diff --git a/les/distributor.go b/les/distributor.go index 159fa4c73f..d3f6b21d18 100644 --- a/les/distributor.go +++ b/les/distributor.go @@ -20,14 +20,10 @@ package les import ( "container/list" - "errors" "sync" "time" ) -// ErrNoPeers is returned if no peers capable of serving a queued request are available -var ErrNoPeers = errors.New("no suitable peers available") - // requestDistributor implements a mechanism that distributes requests to // suitable peers, obeying flow control rules and prioritizing them in creation // order (even when a resend is necessary). diff --git a/les/handler.go b/les/handler.go index a411bf9346..6837a5b2e3 100644 --- a/les/handler.go +++ b/les/handler.go @@ -1182,7 +1182,7 @@ func (pc *peerConnection) RequestHeadersByHash(origin common.Hash, amount int, s } _, ok := <-pc.manager.reqDist.queue(rq) if !ok { - return ErrNoPeers + return light.ErrNoPeers } return nil } @@ -1206,7 +1206,7 @@ func (pc *peerConnection) RequestHeadersByNumber(origin uint64, amount int, skip } _, ok := <-pc.manager.reqDist.queue(rq) if !ok { - return ErrNoPeers + return light.ErrNoPeers } return nil } diff --git a/les/helper_test.go b/les/helper_test.go index 07e069b996..be8edde68f 100644 --- a/les/helper_test.go +++ b/les/helper_test.go @@ -156,12 +156,12 @@ func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *cor } else { blockchain, _ := core.NewBlockChain(db, nil, gspec.Config, engine, vm.Config{}) - chtIndexer := light.NewChtIndexer(db, false) + chtIndexer := light.NewChtIndexer(db, false, nil) chtIndexer.Start(blockchain) - bbtIndexer := light.NewBloomTrieIndexer(db, false) + bbtIndexer := light.NewBloomTrieIndexer(db, false, nil) - bloomIndexer := eth.NewBloomIndexer(db, params.BloomBitsBlocks) + bloomIndexer := eth.NewBloomIndexer(db, params.BloomBitsBlocks, light.HelperTrieProcessConfirmations) bloomIndexer.AddChildIndexer(bbtIndexer) bloomIndexer.Start(blockchain) diff --git a/les/odr_test.go b/les/odr_test.go index b5628c41d2..723ccaa7e5 100644 --- a/les/odr_test.go +++ b/les/odr_test.go @@ -183,7 +183,7 @@ func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) { db := rawdb.NewMemoryDatabase() ldb := rawdb.NewMemoryDatabase() odr := NewLesOdr(ldb, rm) - odr.SetIndexers(light.NewChtIndexer(db, true), light.NewBloomTrieIndexer(db, true), eth.NewBloomIndexer(db, light.BloomTrieFrequency)) + odr.SetIndexers(light.NewChtIndexer(db, true, nil), light.NewBloomTrieIndexer(db, true, nil), eth.NewBloomIndexer(db, light.BloomTrieFrequency, light.HelperTrieConfirmations)) pm := newTestProtocolManagerMust(t, false, 4, testChainGen, nil, nil, db) lpm := newTestProtocolManagerMust(t, true, 0, nil, peers, odr, ldb) _, err1, lpeer, err2 := newTestPeerPair("peer", protocol, pm, lpm) diff --git a/les/request_test.go b/les/request_test.go index 5c9595a589..fb3f2bbf94 100644 --- a/les/request_test.go +++ b/les/request_test.go @@ -92,7 +92,7 @@ func testAccess(t *testing.T, protocol int, fn accessTestFn) { db := rawdb.NewMemoryDatabase() ldb := rawdb.NewMemoryDatabase() odr := NewLesOdr(ldb, rm) - odr.SetIndexers(light.NewChtIndexer(db, true), light.NewBloomTrieIndexer(db, true), eth.NewBloomIndexer(db, light.BloomTrieFrequency)) + odr.SetIndexers(light.NewChtIndexer(db, true, nil), light.NewBloomTrieIndexer(db, true, nil), eth.NewBloomIndexer(db, light.BloomTrieFrequency, light.HelperTrieConfirmations)) pm := newTestProtocolManagerMust(t, false, 4, testChainGen, nil, nil, db) lpm := newTestProtocolManagerMust(t, true, 0, nil, peers, odr, ldb) diff --git a/les/retrieve.go b/les/retrieve.go index f73f4fe6c9..a3ee4275ea 100644 --- a/les/retrieve.go +++ b/les/retrieve.go @@ -27,6 +27,7 @@ import ( "time" "github.com/XinFinOrg/XDPoSChain/common/mclock" + "github.com/XinFinOrg/XDPoSChain/light" ) var ( @@ -207,7 +208,7 @@ func (r *sentReq) stateRequesting() reqStateFn { return r.stateNoMorePeers } // nothing to wait for, no more peers to ask, return with error - r.stop(ErrNoPeers) + r.stop(light.ErrNoPeers) // no need to go to stopped state because waiting() already returned false return nil } diff --git a/les/server.go b/les/server.go index 0a67426be1..ae3891edec 100644 --- a/les/server.go +++ b/les/server.go @@ -65,8 +65,8 @@ func NewLesServer(eth *eth.Ethereum, config *ethconfig.Config) (*LesServer, erro lesCommons: lesCommons{ config: config, chainDb: eth.ChainDb(), - chtIndexer: light.NewChtIndexer(eth.ChainDb(), false), - bloomTrieIndexer: light.NewBloomTrieIndexer(eth.ChainDb(), false), + chtIndexer: light.NewChtIndexer(eth.ChainDb(), false, nil), + bloomTrieIndexer: light.NewBloomTrieIndexer(eth.ChainDb(), false, nil), protocolManager: pm, }, quitSync: quitSync, diff --git a/light/lightchain.go b/light/lightchain.go index 02f3210d02..2c52cf7989 100644 --- a/light/lightchain.go +++ b/light/lightchain.go @@ -111,19 +111,19 @@ func NewLightChain(odr OdrBackend, config *params.ChainConfig, engine consensus. } // addTrustedCheckpoint adds a trusted checkpoint to the blockchain -func (lc *LightChain) addTrustedCheckpoint(cp trustedCheckpoint) { +func (lc *LightChain) addTrustedCheckpoint(cp TrustedCheckpoint) { if lc.odr.ChtIndexer() != nil { - StoreChtRoot(lc.chainDb, cp.sectionIdx, cp.sectionHead, cp.chtRoot) - lc.odr.ChtIndexer().AddKnownSectionHead(cp.sectionIdx, cp.sectionHead) + StoreChtRoot(lc.chainDb, cp.SectionIdx, cp.SectionHead, cp.CHTRoot) + lc.odr.ChtIndexer().AddKnownSectionHead(cp.SectionIdx, cp.SectionHead) } if lc.odr.BloomTrieIndexer() != nil { - StoreBloomTrieRoot(lc.chainDb, cp.sectionIdx, cp.sectionHead, cp.bloomTrieRoot) - lc.odr.BloomTrieIndexer().AddKnownSectionHead(cp.sectionIdx, cp.sectionHead) + StoreBloomTrieRoot(lc.chainDb, cp.SectionIdx, cp.SectionHead, cp.BloomRoot) + lc.odr.BloomTrieIndexer().AddKnownSectionHead(cp.SectionIdx, cp.SectionHead) } if lc.odr.BloomIndexer() != nil { - lc.odr.BloomIndexer().AddKnownSectionHead(cp.sectionIdx, cp.sectionHead) + lc.odr.BloomIndexer().AddKnownSectionHead(cp.SectionIdx, cp.SectionHead) } - log.Info("Added trusted checkpoint", "chain", cp.name, "block", (cp.sectionIdx+1)*CHTFrequencyClient-1, "hash", cp.sectionHead) + log.Info("Added trusted checkpoint", "chain", cp.name, "block", (cp.SectionIdx+1)*CHTFrequencyClient-1, "hash", cp.SectionHead) } func (lc *LightChain) getProcInterrupt() bool { diff --git a/light/odr.go b/light/odr.go index 9eed46f143..51affeb969 100644 --- a/light/odr.go +++ b/light/odr.go @@ -20,6 +20,7 @@ package light import ( "context" + "errors" "math/big" "github.com/XinFinOrg/XDPoSChain/common" @@ -33,6 +34,9 @@ import ( // service is not required. var NoOdr = context.Background() +// ErrNoPeers is returned if no peers capable of serving a queued request are available +var ErrNoPeers = errors.New("no suitable peers available") + // OdrBackend is an interface to a backend service that handles ODR retrievals type type OdrBackend interface { Database() ethdb.Database diff --git a/light/postprocess.go b/light/postprocess.go index 1fada4b059..af553c657d 100644 --- a/light/postprocess.go +++ b/light/postprocess.go @@ -17,8 +17,10 @@ package light import ( + "context" "encoding/binary" "errors" + "fmt" "math/big" "time" @@ -46,17 +48,17 @@ const ( HelperTrieProcessConfirmations = 256 // number of confirmations before a HelperTrie is generated ) -// trustedCheckpoint represents a set of post-processed trie roots (CHT and BloomTrie) associated with +// TrustedCheckpoint represents a set of post-processed trie roots (CHT and BloomTrie) associated with // the appropriate section index and head hash. It is used to start light syncing from this checkpoint // and avoid downloading the entire header chain while still being able to securely access old headers/logs. -type trustedCheckpoint struct { - name string - sectionIdx uint64 - sectionHead, chtRoot, bloomTrieRoot common.Hash +type TrustedCheckpoint struct { + name string + SectionIdx uint64 + SectionHead, CHTRoot, BloomRoot common.Hash } // trustedCheckpoints associates each known checkpoint with the genesis hash of the chain it belongs to -var trustedCheckpoints = map[common.Hash]trustedCheckpoint{} +var trustedCheckpoints = map[common.Hash]TrustedCheckpoint{} var ( ErrNoTrustedCht = errors.New("no trusted canonical hash trie") @@ -97,15 +99,16 @@ func StoreChtRoot(db ethdb.Database, sectionIdx uint64, sectionHead, root common // ChtIndexerBackend implements core.ChainIndexerBackend type ChtIndexerBackend struct { - diskdb ethdb.Database + diskdb, trieTable ethdb.Database + odr OdrBackend triedb *trie.Database section, sectionSize uint64 lastHash common.Hash trie *trie.Trie } -// NewBloomTrieIndexer creates a BloomTrie chain indexer -func NewChtIndexer(db ethdb.Database, clientMode bool) *core.ChainIndexer { +// NewChtIndexer creates a BloomTrie chain indexer +func NewChtIndexer(db ethdb.Database, clientMode bool, odr OdrBackend) *core.ChainIndexer { var sectionSize, confirmReq uint64 if clientMode { sectionSize = CHTFrequencyClient @@ -115,28 +118,64 @@ func NewChtIndexer(db ethdb.Database, clientMode bool) *core.ChainIndexer { confirmReq = HelperTrieProcessConfirmations } idb := rawdb.NewTable(db, "chtIndex-") + trieTable := rawdb.NewTable(db, ChtTablePrefix) backend := &ChtIndexerBackend{ diskdb: db, - triedb: trie.NewDatabase(rawdb.NewTable(db, ChtTablePrefix)), + odr: odr, + trieTable: trieTable, + triedb: trie.NewDatabase(trieTable), sectionSize: sectionSize, } return core.NewChainIndexer(db, idb, backend, sectionSize, confirmReq, time.Millisecond*100, "cht") } +// fetchMissingNodes tries to retrieve the last entry of the latest trusted CHT from the +// ODR backend in order to be able to add new entries and calculate subsequent root hashes +func (c *ChtIndexerBackend) fetchMissingNodes(ctx context.Context, section uint64, root common.Hash) error { + batch := c.trieTable.NewBatch() + r := &ChtRequest{ChtRoot: root, ChtNum: section - 1, BlockNum: section*c.sectionSize - 1} + for { + err := c.odr.Retrieve(ctx, r) + switch err { + case nil: + r.Proof.Store(batch) + return batch.Write() + case ErrNoPeers: + // if there are no peers to serve, retry later + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(time.Second * 10): + // stay in the loop and try again + } + default: + return err + } + } +} + // Reset implements core.ChainIndexerBackend -func (c *ChtIndexerBackend) Reset(section uint64, lastSectionHead common.Hash) error { +func (c *ChtIndexerBackend) Reset(ctx context.Context, section uint64, lastSectionHead common.Hash) error { root := types.EmptyRootHash if section > 0 { root = GetChtRoot(c.diskdb, section-1, lastSectionHead) } var err error c.trie, err = trie.New(root, c.triedb) + + if err != nil && c.odr != nil { + err = c.fetchMissingNodes(ctx, section, root) + if err == nil { + c.trie, err = trie.New(root, c.triedb) + } + } + c.section = section return err } // Process implements core.ChainIndexerBackend -func (c *ChtIndexerBackend) Process(header *types.Header) { +func (c *ChtIndexerBackend) Process(ctx context.Context, header *types.Header) error { hash, num := header.Hash(), header.Number.Uint64() c.lastHash = hash @@ -148,6 +187,7 @@ func (c *ChtIndexerBackend) Process(header *types.Header) { binary.BigEndian.PutUint64(encNumber[:], num) data, _ := rlp.EncodeToBytes(ChtNode{hash, td}) c.trie.Update(encNumber[:], data) + return nil } // Commit implements core.ChainIndexerBackend @@ -159,16 +199,15 @@ func (c *ChtIndexerBackend) Commit() error { c.triedb.Commit(root, false) if ((c.section+1)*c.sectionSize)%CHTFrequencyClient == 0 { - log.Info("Storing CHT", "section", c.section*c.sectionSize/CHTFrequencyClient, "head", c.lastHash, "root", root) + log.Info("Storing CHT", "section", c.section*c.sectionSize/CHTFrequencyClient, "head", fmt.Sprintf("%064x", c.lastHash), "root", fmt.Sprintf("%064x", root)) } StoreChtRoot(c.diskdb, c.section, c.lastHash, root) return nil } const ( - BloomTrieFrequency = 32768 - ethBloomBitsSection = 4096 - ethBloomBitsConfirmations = 256 + BloomTrieFrequency = 32768 + ethBloomBitsSection = 4096 ) var ( @@ -193,7 +232,8 @@ func StoreBloomTrieRoot(db ethdb.Database, sectionIdx uint64, sectionHead, root // BloomTrieIndexerBackend implements core.ChainIndexerBackend type BloomTrieIndexerBackend struct { - diskdb ethdb.Database + diskdb, trieTable ethdb.Database + odr OdrBackend triedb *trie.Database section, parentSectionSize, bloomTrieRatio uint64 trie *trie.Trie @@ -201,44 +241,98 @@ type BloomTrieIndexerBackend struct { } // NewBloomTrieIndexer creates a BloomTrie chain indexer -func NewBloomTrieIndexer(db ethdb.Database, clientMode bool) *core.ChainIndexer { +func NewBloomTrieIndexer(db ethdb.Database, clientMode bool, odr OdrBackend) *core.ChainIndexer { + trieTable := rawdb.NewTable(db, BloomTrieTablePrefix) backend := &BloomTrieIndexerBackend{ - diskdb: db, - triedb: trie.NewDatabase(rawdb.NewTable(db, BloomTrieTablePrefix)), + diskdb: db, + odr: odr, + trieTable: trieTable, + triedb: trie.NewDatabase(trieTable), } idb := rawdb.NewTable(db, "bltIndex-") - var confirmReq uint64 if clientMode { backend.parentSectionSize = BloomTrieFrequency - confirmReq = HelperTrieConfirmations } else { backend.parentSectionSize = ethBloomBitsSection - confirmReq = HelperTrieProcessConfirmations } backend.bloomTrieRatio = BloomTrieFrequency / backend.parentSectionSize backend.sectionHeads = make([]common.Hash, backend.bloomTrieRatio) - return core.NewChainIndexer(db, idb, backend, BloomTrieFrequency, confirmReq-ethBloomBitsConfirmations, time.Millisecond*100, "bloomtrie") + return core.NewChainIndexer(db, idb, backend, BloomTrieFrequency, 0, time.Millisecond*100, "bloomtrie") +} + +// fetchMissingNodes tries to retrieve the last entries of the latest trusted bloom trie from the +// ODR backend in order to be able to add new entries and calculate subsequent root hashes +func (b *BloomTrieIndexerBackend) fetchMissingNodes(ctx context.Context, section uint64, root common.Hash) error { + indexCh := make(chan uint, types.BloomBitLength) + type res struct { + nodes *NodeSet + err error + } + resCh := make(chan res, types.BloomBitLength) + for i := 0; i < 20; i++ { + go func() { + for bitIndex := range indexCh { + r := &BloomRequest{BloomTrieRoot: root, BloomTrieNum: section - 1, BitIdx: bitIndex, SectionIdxList: []uint64{section - 1}} + for { + if err := b.odr.Retrieve(ctx, r); err == ErrNoPeers { + // if there are no peers to serve, retry later + select { + case <-ctx.Done(): + resCh <- res{nil, ctx.Err()} + return + case <-time.After(time.Second * 10): + // stay in the loop and try again + } + } else { + resCh <- res{r.Proofs, err} + break + } + } + } + }() + } + + for i := uint(0); i < types.BloomBitLength; i++ { + indexCh <- i + } + close(indexCh) + batch := b.trieTable.NewBatch() + for i := uint(0); i < types.BloomBitLength; i++ { + res := <-resCh + if res.err != nil { + return res.err + } + res.nodes.Store(batch) + } + return batch.Write() } // Reset implements core.ChainIndexerBackend -func (b *BloomTrieIndexerBackend) Reset(section uint64, lastSectionHead common.Hash) error { +func (b *BloomTrieIndexerBackend) Reset(ctx context.Context, section uint64, lastSectionHead common.Hash) error { root := types.EmptyRootHash if section > 0 { root = GetBloomTrieRoot(b.diskdb, section-1, lastSectionHead) } var err error b.trie, err = trie.New(root, b.triedb) + if err != nil && b.odr != nil { + err = b.fetchMissingNodes(ctx, section, root) + if err == nil { + b.trie, err = trie.New(root, b.triedb) + } + } b.section = section return err } // Process implements core.ChainIndexerBackend -func (b *BloomTrieIndexerBackend) Process(header *types.Header) { +func (b *BloomTrieIndexerBackend) Process(ctx context.Context, header *types.Header) error { num := header.Number.Uint64() - b.section*BloomTrieFrequency if (num+1)%b.parentSectionSize == 0 { b.sectionHeads[num/b.parentSectionSize] = header.Hash() } + return nil } // Commit implements core.ChainIndexerBackend @@ -278,7 +372,7 @@ func (b *BloomTrieIndexerBackend) Commit() error { b.triedb.Commit(root, false) sectionHead := b.sectionHeads[b.bloomTrieRatio-1] - log.Info("Storing bloom trie", "section", b.section, "head", sectionHead, "root", root, "compression", float64(compSize)/float64(decompSize)) + log.Info("Storing bloom trie", "section", b.section, "head", fmt.Sprintf("%064x", sectionHead), "root", fmt.Sprintf("%064x", root), "compression", float64(compSize)/float64(decompSize)) StoreBloomTrieRoot(b.diskdb, b.section, sectionHead, root) return nil