core/filtermaps, eth: add debug_getFilterMapsProgress

This commit is contained in:
locoholy 2026-04-08 02:25:52 +05:00
parent 3d1e6aa6c3
commit c7398f9678
5 changed files with 167 additions and 0 deletions

View file

@ -135,6 +135,18 @@ type FilterMaps struct {
testProcessEventsHook func()
}
// IndexProgress reports the currently indexed log range.
type IndexProgress struct {
Disabled bool
Initialized bool
HeadIndexed bool
TailBlock uint64
HeadBlock uint64
MapsFirst uint32
MapsAfterLast uint32
TailPartialEpoch uint32
}
// filterMap is a full or partial in-memory representation of a filter map where
// rows are allowed to have a nil value meaning the row is not stored in the
// structure. Note that therefore a known empty row should be represented with
@ -298,6 +310,30 @@ func (f *FilterMaps) Stop() {
f.closeWg.Wait()
}
// IndexProgress returns the currently indexed log range.
func (f *FilterMaps) IndexProgress() IndexProgress {
f.indexLock.RLock()
defer f.indexLock.RUnlock()
progress := IndexProgress{
Initialized: f.indexedRange.initialized,
HeadIndexed: f.indexedRange.headIndexed,
MapsFirst: f.indexedRange.maps.First(),
MapsAfterLast: f.indexedRange.maps.AfterLast(),
TailPartialEpoch: f.indexedRange.tailPartialEpoch,
}
if f.indexedRange.hasIndexedBlocks() {
progress.TailBlock = f.indexedRange.blocks.First()
progress.HeadBlock = f.indexedRange.blocks.Last()
}
select {
case <-f.disabledCh:
progress.Disabled = true
default:
}
return progress
}
// checkRevertRange checks whether the existing index is consistent with the
// current indexed view and reverts inconsistent maps if necessary.
func (f *FilterMaps) checkRevertRange() {

View file

@ -219,6 +219,51 @@ func testIndexerMatcherView(t *testing.T, concurrentRead bool) {
}
}
func TestIndexProgress(t *testing.T) {
ts := newTestSetup(t)
defer ts.close()
ts.chain.addBlocks(100, 5, 2, 4, false)
ts.setHistory(80, false)
ts.fm.WaitIdle()
want := IndexProgress{
Initialized: ts.fm.indexedRange.initialized,
HeadIndexed: ts.fm.indexedRange.headIndexed,
MapsFirst: ts.fm.indexedRange.maps.First(),
MapsAfterLast: ts.fm.indexedRange.maps.AfterLast(),
TailPartialEpoch: ts.fm.indexedRange.tailPartialEpoch,
}
if ts.fm.indexedRange.hasIndexedBlocks() {
want.TailBlock = ts.fm.indexedRange.blocks.First()
want.HeadBlock = ts.fm.indexedRange.blocks.Last()
}
if got := ts.fm.IndexProgress(); got != want {
t.Fatalf("wrong progress: got %#v want %#v", got, want)
}
}
func TestIndexProgressDisabled(t *testing.T) {
ts := newTestSetup(t)
defer ts.close()
ts.chain.addBlocks(10, 5, 2, 4, false)
ts.setHistory(0, true)
ts.fm.WaitIdle()
want := IndexProgress{
Disabled: true,
Initialized: ts.fm.indexedRange.initialized,
HeadIndexed: ts.fm.indexedRange.headIndexed,
MapsFirst: ts.fm.indexedRange.maps.First(),
MapsAfterLast: ts.fm.indexedRange.maps.AfterLast(),
TailPartialEpoch: ts.fm.indexedRange.tailPartialEpoch,
}
if got := ts.fm.IndexProgress(); got != want {
t.Fatalf("wrong disabled progress: got %#v want %#v", got, want)
}
}
func TestLogsByIndex(t *testing.T) {
ts := newTestSetup(t)
defer func() {

View file

@ -211,6 +211,17 @@ type storageEntry struct {
Value common.Hash `json:"value"`
}
type filterMapsProgressResult struct {
Disabled bool `json:"disabled"`
Initialized bool `json:"initialized"`
HeadIndexed bool `json:"headIndexed"`
TailBlock hexutil.Uint64 `json:"tailBlock"`
HeadBlock hexutil.Uint64 `json:"headBlock"`
MapsFirst hexutil.Uint64 `json:"mapsFirst"`
MapsAfterLast hexutil.Uint64 `json:"mapsAfterLast"`
TailPartialEpoch hexutil.Uint64 `json:"tailPartialEpoch"`
}
// StorageRangeAt returns the storage at the given block height and transaction index.
func (api *DebugAPI) StorageRangeAt(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash, txIndex int, contractAddress common.Address, keyStart hexutil.Bytes, maxResult int) (StorageRangeResult, error) {
var block *types.Block
@ -269,6 +280,24 @@ func storageRangeAt(statedb *state.StateDB, root common.Hash, address common.Add
return result, nil
}
// GetFilterMapsProgress returns the current log indexer progress.
func (api *DebugAPI) GetFilterMapsProgress() (*filterMapsProgressResult, error) {
if api.eth.filterMaps == nil {
return nil, errors.New("filter maps are not initialized")
}
progress := api.eth.filterMaps.IndexProgress()
return &filterMapsProgressResult{
Disabled: progress.Disabled,
Initialized: progress.Initialized,
HeadIndexed: progress.HeadIndexed,
TailBlock: hexutil.Uint64(progress.TailBlock),
HeadBlock: hexutil.Uint64(progress.HeadBlock),
MapsFirst: hexutil.Uint64(progress.MapsFirst),
MapsAfterLast: hexutil.Uint64(progress.MapsAfterLast),
TailPartialEpoch: hexutil.Uint64(progress.TailPartialEpoch),
}, nil
}
// GetModifiedAccountsByNumber returns all accounts that have changed between the
// two blocks specified. A change is defined as a difference in nonce, balance,
// code hash, or storage hash.

View file

@ -29,8 +29,10 @@ import (
"github.com/davecgh/go-spew/spew"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/filtermaps"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/tracing"
@ -335,3 +337,53 @@ func TestGetModifiedAccounts(t *testing.T) {
}
})
}
func TestGetFilterMapsProgress(t *testing.T) {
t.Parallel()
genesis := &core.Genesis{
Config: params.TestChainConfig,
BaseFee: big.NewInt(params.InitialBaseFee),
}
blockChain := newTestBlockChain(t, 3, genesis, nil)
defer blockChain.Stop()
head := blockChain.CurrentBlock()
view := filtermaps.NewChainView(blockChain, head.Number.Uint64(), head.Hash())
fm, err := filtermaps.NewFilterMaps(rawdb.NewMemoryDatabase(), view, 0, 0, filtermaps.DefaultParams, filtermaps.Config{})
if err != nil {
t.Fatalf("failed to create filtermaps: %v", err)
}
fm.Start()
defer fm.Stop()
fm.WaitIdle()
api := NewDebugAPI(&Ethereum{blockchain: blockChain, filterMaps: fm})
got, err := api.GetFilterMapsProgress()
if err != nil {
t.Fatalf("GetFilterMapsProgress returned error: %v", err)
}
progress := fm.IndexProgress()
want := &filterMapsProgressResult{
Disabled: progress.Disabled,
Initialized: progress.Initialized,
HeadIndexed: progress.HeadIndexed,
TailBlock: hexutil.Uint64(progress.TailBlock),
HeadBlock: hexutil.Uint64(progress.HeadBlock),
MapsFirst: hexutil.Uint64(progress.MapsFirst),
MapsAfterLast: hexutil.Uint64(progress.MapsAfterLast),
TailPartialEpoch: hexutil.Uint64(progress.TailPartialEpoch),
}
if *got != *want {
t.Fatalf("wrong progress result: got %#v want %#v", *got, *want)
}
}
func TestGetFilterMapsProgressUnavailable(t *testing.T) {
t.Parallel()
api := NewDebugAPI(&Ethereum{})
if _, err := api.GetFilterMapsProgress(); err == nil {
t.Fatal("expected error for uninitialized filtermaps")
}
}

View file

@ -427,6 +427,11 @@ web3._extend({
params: 2,
inputFormatter:[null, null],
}),
new web3._extend.Method({
name: 'getFilterMapsProgress',
call: 'debug_getFilterMapsProgress',
params: 0,
}),
new web3._extend.Method({
name: 'getAccessibleState',
call: 'debug_getAccessibleState',