mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-06-19 21:31:37 +00:00
all: add global block logs cache (#25459)
This commit is contained in:
parent
47b7f4726d
commit
ad71d706fb
19 changed files with 291 additions and 164 deletions
|
|
@ -66,7 +66,8 @@ type SimulatedBackend struct {
|
|||
pendingState *state.StateDB // Currently pending state that will be the active on request
|
||||
pendingReceipts types.Receipts // Currently receipts for the pending block
|
||||
|
||||
events *filters.EventSystem // Event system for filtering log events live
|
||||
events *filters.EventSystem // for filtering log events live
|
||||
filterSystem *filters.FilterSystem // for filtering database logs
|
||||
|
||||
config *params.ChainConfig
|
||||
}
|
||||
|
|
@ -95,9 +96,7 @@ func SimulateWalletAddressAndSignFn() (common.Address, func(account accounts.Acc
|
|||
|
||||
// XDC simulated backend for testing purpose.
|
||||
func NewXDCSimulatedBackend(alloc core.GenesisAlloc, gasLimit uint64, chainConfig *params.ChainConfig) *SimulatedBackend {
|
||||
// database := ethdb.NewMemDatabase()
|
||||
database := rawdb.NewMemoryDatabase()
|
||||
|
||||
genesis := core.Genesis{
|
||||
GasLimit: gasLimit, // need this big, support initial smart contract
|
||||
Config: chainConfig,
|
||||
|
|
@ -128,7 +127,11 @@ func NewXDCSimulatedBackend(alloc core.GenesisAlloc, gasLimit uint64, chainConfi
|
|||
blockchain: blockchain,
|
||||
config: genesis.Config,
|
||||
}
|
||||
backend.events = filters.NewEventSystem(&filterBackend{database, blockchain, backend}, false)
|
||||
|
||||
filterBackend := &filterBackend{database, blockchain, backend}
|
||||
backend.filterSystem = filters.NewFilterSystem(filterBackend, filters.Config{})
|
||||
backend.events = filters.NewEventSystem(backend.filterSystem, false)
|
||||
|
||||
blockchain.Client = backend
|
||||
backend.rollback()
|
||||
return backend
|
||||
|
|
@ -148,7 +151,11 @@ func NewSimulatedBackend(alloc core.GenesisAlloc) *SimulatedBackend {
|
|||
blockchain: blockchain,
|
||||
config: genesis.Config,
|
||||
}
|
||||
backend.events = filters.NewEventSystem(&filterBackend{database, blockchain, backend}, false)
|
||||
|
||||
filterBackend := &filterBackend{database, blockchain, backend}
|
||||
backend.filterSystem = filters.NewFilterSystem(filterBackend, filters.Config{})
|
||||
backend.events = filters.NewEventSystem(backend.filterSystem, false)
|
||||
|
||||
backend.rollback()
|
||||
return backend
|
||||
}
|
||||
|
|
@ -423,7 +430,7 @@ func (b *SimulatedBackend) FilterLogs(ctx context.Context, query XDPoSChain.Filt
|
|||
var filter *filters.Filter
|
||||
if query.BlockHash != nil {
|
||||
// Block filter requested, construct a single-shot filter
|
||||
filter = filters.NewBlockFilter(&filterBackend{b.database, b.blockchain, b}, *query.BlockHash, query.Addresses, query.Topics)
|
||||
filter = b.filterSystem.NewBlockFilter(*query.BlockHash, query.Addresses, query.Topics)
|
||||
} else {
|
||||
// Initialize unset filter boundaried to run from genesis to chain head
|
||||
from := int64(0)
|
||||
|
|
@ -435,7 +442,7 @@ func (b *SimulatedBackend) FilterLogs(ctx context.Context, query XDPoSChain.Filt
|
|||
to = query.ToBlock.Int64()
|
||||
}
|
||||
// Construct the range filter
|
||||
filter = filters.NewRangeFilter(&filterBackend{b.database, b.blockchain, b}, from, to, query.Addresses, query.Topics)
|
||||
filter = b.filterSystem.NewRangeFilter(from, to, query.Addresses, query.Topics)
|
||||
}
|
||||
// Run the filter and return all the logs
|
||||
logs, err := filter.Logs(ctx)
|
||||
|
|
@ -552,15 +559,8 @@ func (fb *filterBackend) PendingBlockAndReceipts() (*types.Block, types.Receipts
|
|||
return fb.backend.pendingBlock, fb.backend.pendingReceipts
|
||||
}
|
||||
|
||||
func (fb *filterBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) {
|
||||
receipts := core.GetBlockReceipts(fb.db, hash, core.GetBlockNumber(fb.db, hash))
|
||||
if receipts == nil {
|
||||
return nil, nil
|
||||
}
|
||||
logs := make([][]*types.Log, len(receipts))
|
||||
for i, receipt := range receipts {
|
||||
logs[i] = receipt.Logs
|
||||
}
|
||||
func (fb *filterBackend) GetLogs(ctx context.Context, hash common.Hash, number uint64) ([][]*types.Log, error) {
|
||||
logs := rawdb.ReadLogs(fb.db, hash, number)
|
||||
return logs, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -93,6 +93,7 @@ var (
|
|||
utils.CacheDatabaseFlag,
|
||||
//utils.CacheGCFlag,
|
||||
//utils.TrieCacheGenFlag,
|
||||
utils.CacheLogSizeFlag,
|
||||
utils.FDLimitFlag,
|
||||
utils.ListenPortFlag,
|
||||
utils.MaxPeersFlag,
|
||||
|
|
|
|||
|
|
@ -42,8 +42,10 @@ import (
|
|||
"github.com/XinFinOrg/XDPoSChain/crypto"
|
||||
"github.com/XinFinOrg/XDPoSChain/eth/downloader"
|
||||
"github.com/XinFinOrg/XDPoSChain/eth/ethconfig"
|
||||
"github.com/XinFinOrg/XDPoSChain/eth/filters"
|
||||
"github.com/XinFinOrg/XDPoSChain/eth/gasprice"
|
||||
"github.com/XinFinOrg/XDPoSChain/ethdb"
|
||||
"github.com/XinFinOrg/XDPoSChain/internal/ethapi"
|
||||
"github.com/XinFinOrg/XDPoSChain/log"
|
||||
"github.com/XinFinOrg/XDPoSChain/metrics"
|
||||
"github.com/XinFinOrg/XDPoSChain/metrics/exp"
|
||||
|
|
@ -54,6 +56,7 @@ import (
|
|||
"github.com/XinFinOrg/XDPoSChain/p2p/nat"
|
||||
"github.com/XinFinOrg/XDPoSChain/p2p/netutil"
|
||||
"github.com/XinFinOrg/XDPoSChain/params"
|
||||
"github.com/XinFinOrg/XDPoSChain/rpc"
|
||||
whisper "github.com/XinFinOrg/XDPoSChain/whisper/whisperv6"
|
||||
gopsutil "github.com/shirou/gopsutil/mem"
|
||||
"gopkg.in/urfave/cli.v1"
|
||||
|
|
@ -316,6 +319,11 @@ var (
|
|||
Usage: "Percentage of cache memory allowance to use for trie pruning",
|
||||
Value: 25,
|
||||
}
|
||||
CacheLogSizeFlag = &cli.IntFlag{
|
||||
Name: "cache.blocklogs",
|
||||
Usage: "Size (in number of blocks) of the log cache for filtering",
|
||||
Value: ethconfig.Defaults.FilterLogCacheSize,
|
||||
}
|
||||
FDLimitFlag = cli.IntFlag{
|
||||
Name: "fdlimit",
|
||||
Usage: "Raise the open file descriptor resource limit (default = system fd limit)",
|
||||
|
|
@ -1244,6 +1252,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
|
|||
if ctx.GlobalIsSet(GasPriceFlag.Name) {
|
||||
cfg.GasPrice = GlobalBig(ctx, GasPriceFlag.Name)
|
||||
}
|
||||
if ctx.IsSet(CacheLogSizeFlag.Name) {
|
||||
cfg.FilterLogCacheSize = ctx.Int(CacheLogSizeFlag.Name)
|
||||
}
|
||||
if ctx.GlobalIsSet(VMEnableDebugFlag.Name) {
|
||||
// TODO(fjl): force-enable this in --dev mode
|
||||
cfg.EnablePreimageRecording = ctx.GlobalBool(VMEnableDebugFlag.Name)
|
||||
|
|
@ -1443,6 +1454,19 @@ func WalkMatch(root, pattern string) ([]string, error) {
|
|||
return matches, nil
|
||||
}
|
||||
|
||||
// RegisterFilterAPI adds the eth log filtering RPC API to the node.
|
||||
func RegisterFilterAPI(stack *node.Node, backend ethapi.Backend, ethcfg *ethconfig.Config) *filters.FilterSystem {
|
||||
isLightClient := ethcfg.SyncMode == downloader.LightSync
|
||||
filterSystem := filters.NewFilterSystem(backend, filters.Config{
|
||||
LogCacheSize: ethcfg.FilterLogCacheSize,
|
||||
})
|
||||
stack.RegisterAPIs([]rpc.API{{
|
||||
Namespace: "eth",
|
||||
Service: filters.NewFilterAPI(filterSystem, isLightClient),
|
||||
}})
|
||||
return filterSystem
|
||||
}
|
||||
|
||||
func SetupMetrics(ctx *cli.Context) {
|
||||
if metrics.Enabled {
|
||||
log.Info("Enabling metrics collection")
|
||||
|
|
|
|||
|
|
@ -47,8 +47,7 @@ func RegisterShhService(stack *node.Node, cfg *whisper.Config) {
|
|||
}
|
||||
}
|
||||
|
||||
// RegisterEthStatsService configures the Ethereum Stats daemon and adds it to
|
||||
// th egiven node.
|
||||
// RegisterEthStatsService configures the Ethereum Stats daemon and adds it to the node.
|
||||
func RegisterEthStatsService(stack *node.Node, url string) {
|
||||
if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
|
||||
// Retrieve both eth and les services
|
||||
|
|
|
|||
|
|
@ -113,6 +113,15 @@ func (b *BlockGen) AddTxWithChain(bc *BlockChain, tx *types.Transaction) {
|
|||
}
|
||||
}
|
||||
|
||||
// AddUncheckedTx forcefully adds a transaction to the block without any
|
||||
// validation.
|
||||
//
|
||||
// AddUncheckedTx will cause consensus failures when used during real
|
||||
// chain processing. This is best used in conjunction with raw block insertion.
|
||||
func (b *BlockGen) AddUncheckedTx(tx *types.Transaction) {
|
||||
b.txs = append(b.txs, tx)
|
||||
}
|
||||
|
||||
// Number returns the block number of the block being generated.
|
||||
func (b *BlockGen) Number() *big.Int {
|
||||
return new(big.Int).Set(b.header.Number)
|
||||
|
|
|
|||
|
|
@ -233,17 +233,8 @@ func (b *EthApiBackend) GetReceipts(ctx context.Context, blockHash common.Hash)
|
|||
return core.GetBlockReceipts(b.eth.chainDb, blockHash, core.GetBlockNumber(b.eth.chainDb, blockHash)), nil
|
||||
}
|
||||
|
||||
func (b *EthApiBackend) GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error) {
|
||||
db := b.eth.ChainDb()
|
||||
number := rawdb.ReadHeaderNumber(db, blockHash)
|
||||
if number == nil {
|
||||
return nil, errors.New("failed to get block number from hash")
|
||||
}
|
||||
logs := rawdb.ReadLogs(db, blockHash, *number)
|
||||
if logs == nil {
|
||||
return nil, errors.New("failed to get logs for block")
|
||||
}
|
||||
return logs, nil
|
||||
func (b *EthApiBackend) GetLogs(ctx context.Context, hash common.Hash, number uint64) ([][]*types.Log, error) {
|
||||
return rawdb.ReadLogs(b.eth.chainDb, hash, number), nil
|
||||
}
|
||||
|
||||
func (b *EthApiBackend) GetTd(blockHash common.Hash) *big.Int {
|
||||
|
|
|
|||
|
|
@ -24,7 +24,6 @@ import (
|
|||
"runtime"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/XinFinOrg/XDPoSChain/XDCx"
|
||||
"github.com/XinFinOrg/XDPoSChain/XDCxlending"
|
||||
|
|
@ -406,7 +405,7 @@ func (s *Ethereum) APIs() []rpc.API {
|
|||
}, {
|
||||
Namespace: "eth",
|
||||
Version: "1.0",
|
||||
Service: filters.NewFilterAPI(s.ApiBackend, false, 5*time.Minute),
|
||||
Service: filters.NewFilterAPI(filters.NewFilterSystem(s.ApiBackend, filters.Config{LogCacheSize: s.config.FilterLogCacheSize}), false),
|
||||
Public: true,
|
||||
}, {
|
||||
Namespace: "admin",
|
||||
|
|
|
|||
|
|
@ -60,12 +60,13 @@ var Defaults = Config{
|
|||
DatasetsInMem: 1,
|
||||
DatasetsOnDisk: 2,
|
||||
},
|
||||
NetworkId: 88,
|
||||
LightPeers: 100,
|
||||
DatabaseCache: 768,
|
||||
TrieCache: 256,
|
||||
TrieTimeout: 5 * time.Minute,
|
||||
GasPrice: big.NewInt(0.25 * params.Shannon),
|
||||
NetworkId: 88,
|
||||
LightPeers: 100,
|
||||
DatabaseCache: 768,
|
||||
TrieCache: 256,
|
||||
TrieTimeout: 5 * time.Minute,
|
||||
FilterLogCacheSize: 32,
|
||||
GasPrice: big.NewInt(0.25 * params.Shannon),
|
||||
|
||||
TxPool: core.DefaultTxPoolConfig,
|
||||
RPCGasCap: 25000000,
|
||||
|
|
@ -111,6 +112,9 @@ type Config struct {
|
|||
TrieCache int
|
||||
TrieTimeout time.Duration
|
||||
|
||||
// This is the number of blocks for which logs will be cached in the filter system.
|
||||
FilterLogCacheSize int
|
||||
|
||||
// Mining-related options
|
||||
Etherbase common.Address `toml:",omitempty"`
|
||||
MinerThreads int `toml:",omitempty"`
|
||||
|
|
|
|||
|
|
@ -31,6 +31,7 @@ func (c Config) MarshalTOML() (interface{}, error) {
|
|||
MinerThreads int `toml:",omitempty"`
|
||||
ExtraData []byte `toml:",omitempty"`
|
||||
GasPrice *big.Int
|
||||
FilterLogCacheSize int
|
||||
Ethash ethash.Config
|
||||
TxPool core.TxPoolConfig
|
||||
GPO gasprice.Config
|
||||
|
|
@ -55,6 +56,7 @@ func (c Config) MarshalTOML() (interface{}, error) {
|
|||
enc.MinerThreads = c.MinerThreads
|
||||
enc.ExtraData = c.ExtraData
|
||||
enc.GasPrice = c.GasPrice
|
||||
enc.FilterLogCacheSize = c.FilterLogCacheSize
|
||||
enc.Ethash = c.Ethash
|
||||
enc.TxPool = c.TxPool
|
||||
enc.GPO = c.GPO
|
||||
|
|
@ -83,6 +85,7 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error {
|
|||
MinerThreads *int `toml:",omitempty"`
|
||||
ExtraData []byte `toml:",omitempty"`
|
||||
GasPrice *big.Int
|
||||
FilterLogCacheSize *int
|
||||
Ethash *ethash.Config
|
||||
TxPool *core.TxPoolConfig
|
||||
GPO *gasprice.Config
|
||||
|
|
@ -140,6 +143,9 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error {
|
|||
if dec.GasPrice != nil {
|
||||
c.GasPrice = dec.GasPrice
|
||||
}
|
||||
if dec.FilterLogCacheSize != nil {
|
||||
c.FilterLogCacheSize = *dec.FilterLogCacheSize
|
||||
}
|
||||
if dec.Ethash != nil {
|
||||
c.Ethash = *dec.Ethash
|
||||
}
|
||||
|
|
|
|||
|
|
@ -30,6 +30,8 @@ import (
|
|||
"github.com/XinFinOrg/XDPoSChain/common/hexutil"
|
||||
"github.com/XinFinOrg/XDPoSChain/core/types"
|
||||
"github.com/XinFinOrg/XDPoSChain/ethdb"
|
||||
|
||||
// "github.com/XinFinOrg/XDPoSChain/ethdb"
|
||||
"github.com/XinFinOrg/XDPoSChain/rpc"
|
||||
)
|
||||
|
||||
|
|
@ -54,7 +56,7 @@ type filter struct {
|
|||
// FilterAPI offers support to create and manage filters. This will allow external clients to retrieve various
|
||||
// information related to the Ethereum protocol such als blocks, transactions and logs.
|
||||
type FilterAPI struct {
|
||||
backend Backend
|
||||
sys *FilterSystem
|
||||
chainDb ethdb.Database
|
||||
events *EventSystem
|
||||
filtersMu sync.Mutex
|
||||
|
|
@ -63,15 +65,15 @@ type FilterAPI struct {
|
|||
}
|
||||
|
||||
// NewFilterAPI returns a new FilterAPI instance.
|
||||
func NewFilterAPI(backend Backend, lightMode bool, timeout time.Duration) *FilterAPI {
|
||||
func NewFilterAPI(system *FilterSystem, lightMode bool) *FilterAPI {
|
||||
api := &FilterAPI{
|
||||
backend: backend,
|
||||
chainDb: backend.ChainDb(),
|
||||
events: NewEventSystem(backend, lightMode),
|
||||
sys: system,
|
||||
chainDb: system.backend.ChainDb(),
|
||||
events: NewEventSystem(system, lightMode),
|
||||
filters: make(map[rpc.ID]*filter),
|
||||
timeout: timeout,
|
||||
timeout: system.cfg.Timeout,
|
||||
}
|
||||
go api.timeoutLoop(timeout)
|
||||
go api.timeoutLoop(system.cfg.Timeout)
|
||||
|
||||
return api
|
||||
}
|
||||
|
|
@ -341,7 +343,7 @@ func (api *FilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*type
|
|||
var filter *Filter
|
||||
if crit.BlockHash != nil {
|
||||
// Block filter requested, construct a single-shot filter
|
||||
filter = NewBlockFilter(api.backend, *crit.BlockHash, crit.Addresses, crit.Topics)
|
||||
filter = api.sys.NewBlockFilter(*crit.BlockHash, crit.Addresses, crit.Topics)
|
||||
} else {
|
||||
// Convert the RPC block numbers into internal representations
|
||||
begin := rpc.LatestBlockNumber.Int64()
|
||||
|
|
@ -353,7 +355,7 @@ func (api *FilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*type
|
|||
end = crit.ToBlock.Int64()
|
||||
}
|
||||
// Construct the range filter
|
||||
filter = NewRangeFilter(api.backend, begin, end, crit.Addresses, crit.Topics)
|
||||
filter = api.sys.NewRangeFilter(begin, end, crit.Addresses, crit.Topics)
|
||||
}
|
||||
// Run the filter and return all the logs
|
||||
logs, err := filter.Logs(ctx)
|
||||
|
|
@ -396,7 +398,7 @@ func (api *FilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*types.Lo
|
|||
var filter *Filter
|
||||
if f.crit.BlockHash != nil {
|
||||
// Block filter requested, construct a single-shot filter
|
||||
filter = NewBlockFilter(api.backend, *f.crit.BlockHash, f.crit.Addresses, f.crit.Topics)
|
||||
filter = api.sys.NewBlockFilter(*f.crit.BlockHash, f.crit.Addresses, f.crit.Topics)
|
||||
} else {
|
||||
// Convert the RPC block numbers into internal representations
|
||||
begin := rpc.LatestBlockNumber.Int64()
|
||||
|
|
@ -408,7 +410,7 @@ func (api *FilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*types.Lo
|
|||
end = f.crit.ToBlock.Int64()
|
||||
}
|
||||
// Construct the range filter
|
||||
filter = NewRangeFilter(api.backend, begin, end, f.crit.Addresses, f.crit.Topics)
|
||||
filter = api.sys.NewRangeFilter(begin, end, f.crit.Addresses, f.crit.Topics)
|
||||
}
|
||||
// Run the filter and return all the logs
|
||||
logs, err := filter.Logs(ctx)
|
||||
|
|
|
|||
|
|
@ -123,20 +123,25 @@ func benchmarkBloomBits(b *testing.B, sectionSize uint64) {
|
|||
|
||||
b.Log("Running filter benchmarks...")
|
||||
start = time.Now()
|
||||
var backend *testBackend
|
||||
|
||||
var (
|
||||
backend *testBackend
|
||||
sys *FilterSystem
|
||||
)
|
||||
|
||||
for i := 0; i < benchFilterCnt; i++ {
|
||||
if i%20 == 0 {
|
||||
db.Close()
|
||||
db, _ = rawdb.NewLevelDBDatabase(benchDataDir, 128, 1024, "")
|
||||
backend = &testBackend{db: db, sections: cnt}
|
||||
sys = NewFilterSystem(backend, Config{})
|
||||
}
|
||||
var addr common.Address
|
||||
addr[0] = byte(i)
|
||||
addr[1] = byte(i / 256)
|
||||
filter := NewRangeFilter(backend, 0, int64(cnt*sectionSize-1), []common.Address{addr}, nil)
|
||||
filter := sys.NewRangeFilter(0, int64(cnt*sectionSize-1), []common.Address{addr}, nil)
|
||||
if _, err := filter.Logs(context.Background()); err != nil {
|
||||
b.Error("filter.Find error:", err)
|
||||
b.Error("filter.Logs error:", err)
|
||||
}
|
||||
}
|
||||
d = time.Since(start)
|
||||
|
|
@ -186,10 +191,11 @@ func BenchmarkNoBloomBits(b *testing.B) {
|
|||
|
||||
clearBloomBits(db)
|
||||
|
||||
_, sys := newTestFilterSystem(b, db, Config{})
|
||||
|
||||
b.Log("Running filter benchmarks...")
|
||||
start := time.Now()
|
||||
backend := &testBackend{db: db}
|
||||
filter := NewRangeFilter(backend, 0, int64(headNum), []common.Address{{}}, nil)
|
||||
filter := sys.NewRangeFilter(0, int64(headNum), []common.Address{{}}, nil)
|
||||
filter.Logs(context.Background())
|
||||
d := time.Since(start)
|
||||
b.Log("Finished running filter benchmarks")
|
||||
|
|
|
|||
|
|
@ -22,37 +22,15 @@ import (
|
|||
"math/big"
|
||||
|
||||
"github.com/XinFinOrg/XDPoSChain/common"
|
||||
"github.com/XinFinOrg/XDPoSChain/core"
|
||||
"github.com/XinFinOrg/XDPoSChain/core/bloombits"
|
||||
"github.com/XinFinOrg/XDPoSChain/core/types"
|
||||
"github.com/XinFinOrg/XDPoSChain/ethdb"
|
||||
"github.com/XinFinOrg/XDPoSChain/event"
|
||||
"github.com/XinFinOrg/XDPoSChain/rpc"
|
||||
)
|
||||
|
||||
type Backend interface {
|
||||
ChainDb() ethdb.Database
|
||||
HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error)
|
||||
HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error)
|
||||
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
|
||||
GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error)
|
||||
PendingBlockAndReceipts() (*types.Block, types.Receipts)
|
||||
|
||||
SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
|
||||
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
|
||||
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
|
||||
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
|
||||
SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription
|
||||
|
||||
BloomStatus() (uint64, uint64)
|
||||
ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
|
||||
}
|
||||
|
||||
// Filter can be used to retrieve and filter logs.
|
||||
type Filter struct {
|
||||
backend Backend
|
||||
sys *FilterSystem
|
||||
|
||||
db ethdb.Database
|
||||
addresses []common.Address
|
||||
topics [][]common.Hash
|
||||
|
||||
|
|
@ -64,7 +42,7 @@ type Filter struct {
|
|||
|
||||
// NewRangeFilter creates a new filter which uses a bloom filter on blocks to
|
||||
// figure out whether a particular block is interesting or not.
|
||||
func NewRangeFilter(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter {
|
||||
func (sys *FilterSystem) NewRangeFilter(begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter {
|
||||
// Flatten the address and topic filter clauses into a single bloombits filter
|
||||
// system. Since the bloombits are not positional, nil topics are permitted,
|
||||
// which get flattened into a nil byte slice.
|
||||
|
|
@ -83,10 +61,10 @@ func NewRangeFilter(backend Backend, begin, end int64, addresses []common.Addres
|
|||
}
|
||||
filters = append(filters, filter)
|
||||
}
|
||||
size, _ := backend.BloomStatus()
|
||||
size, _ := sys.backend.BloomStatus()
|
||||
|
||||
// Create a generic filter and convert it into a range filter
|
||||
filter := newFilter(backend, addresses, topics)
|
||||
filter := newFilter(sys, addresses, topics)
|
||||
|
||||
filter.matcher = bloombits.NewMatcher(size, filters)
|
||||
filter.begin = begin
|
||||
|
|
@ -97,21 +75,20 @@ func NewRangeFilter(backend Backend, begin, end int64, addresses []common.Addres
|
|||
|
||||
// NewBlockFilter creates a new filter which directly inspects the contents of
|
||||
// a block to figure out whether it is interesting or not.
|
||||
func NewBlockFilter(backend Backend, block common.Hash, addresses []common.Address, topics [][]common.Hash) *Filter {
|
||||
func (sys *FilterSystem) NewBlockFilter(block common.Hash, addresses []common.Address, topics [][]common.Hash) *Filter {
|
||||
// Create a generic filter and convert it into a block filter
|
||||
filter := newFilter(backend, addresses, topics)
|
||||
filter := newFilter(sys, addresses, topics)
|
||||
filter.block = block
|
||||
return filter
|
||||
}
|
||||
|
||||
// newFilter creates a generic filter that can either filter based on a block hash,
|
||||
// or based on range queries. The search criteria needs to be explicitly set.
|
||||
func newFilter(backend Backend, addresses []common.Address, topics [][]common.Hash) *Filter {
|
||||
func newFilter(sys *FilterSystem, addresses []common.Address, topics [][]common.Hash) *Filter {
|
||||
return &Filter{
|
||||
backend: backend,
|
||||
sys: sys,
|
||||
addresses: addresses,
|
||||
topics: topics,
|
||||
db: backend.ChainDb(),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -120,14 +97,14 @@ func newFilter(backend Backend, addresses []common.Address, topics [][]common.Ha
|
|||
func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
|
||||
// If we're doing singleton block filtering, execute and return
|
||||
if f.block != (common.Hash{}) {
|
||||
header, err := f.backend.HeaderByHash(ctx, f.block)
|
||||
header, err := f.sys.backend.HeaderByHash(ctx, f.block)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if header == nil {
|
||||
return nil, errors.New("unknown block")
|
||||
}
|
||||
return f.blockLogs(ctx, header)
|
||||
return f.blockLogs(ctx, header, false)
|
||||
}
|
||||
// Short-cut if all we care about is pending logs
|
||||
if f.begin == rpc.PendingBlockNumber.Int64() {
|
||||
|
|
@ -137,7 +114,7 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
|
|||
return f.pendingLogs()
|
||||
}
|
||||
// Figure out the limits of the filter range
|
||||
header, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
|
||||
header, _ := f.sys.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
|
||||
if header == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
|
@ -156,7 +133,7 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
|
|||
var (
|
||||
logs []*types.Log
|
||||
err error
|
||||
size, sections = f.backend.BloomStatus()
|
||||
size, sections = f.sys.backend.BloomStatus()
|
||||
)
|
||||
if indexed := sections * size; indexed > uint64(f.begin) {
|
||||
if indexed > end {
|
||||
|
|
@ -192,7 +169,7 @@ func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, err
|
|||
}
|
||||
defer session.Close()
|
||||
|
||||
f.backend.ServiceFilter(ctx, session)
|
||||
f.sys.backend.ServiceFilter(ctx, session)
|
||||
|
||||
// Iterate over the matches until exhausted or context closed
|
||||
var logs []*types.Log
|
||||
|
|
@ -211,11 +188,11 @@ func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, err
|
|||
f.begin = int64(number) + 1
|
||||
|
||||
// Retrieve the suggested block and pull any truly matching logs
|
||||
header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(number))
|
||||
header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(number))
|
||||
if header == nil || err != nil {
|
||||
return logs, err
|
||||
}
|
||||
found, err := f.checkMatches(ctx, header)
|
||||
found, err := f.blockLogs(ctx, header, true)
|
||||
if err != nil {
|
||||
return logs, err
|
||||
}
|
||||
|
|
@ -233,11 +210,11 @@ func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, e
|
|||
var logs []*types.Log
|
||||
|
||||
for ; f.begin <= int64(end); f.begin++ {
|
||||
header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin))
|
||||
header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin))
|
||||
if header == nil || err != nil {
|
||||
return logs, err
|
||||
}
|
||||
found, err := f.blockLogs(ctx, header)
|
||||
found, err := f.blockLogs(ctx, header, false)
|
||||
if err != nil {
|
||||
return logs, err
|
||||
}
|
||||
|
|
@ -247,34 +224,34 @@ func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, e
|
|||
}
|
||||
|
||||
// blockLogs returns the logs matching the filter criteria within a single block.
|
||||
func (f *Filter) blockLogs(ctx context.Context, header *types.Header) (logs []*types.Log, err error) {
|
||||
if bloomFilter(header.Bloom, f.addresses, f.topics) {
|
||||
found, err := f.checkMatches(ctx, header)
|
||||
func (f *Filter) blockLogs(ctx context.Context, header *types.Header, skipBloom bool) ([]*types.Log, error) {
|
||||
// Fast track: no filtering criteria
|
||||
if len(f.addresses) == 0 && len(f.topics) == 0 {
|
||||
list, err := f.sys.cachedGetLogs(ctx, header.Hash(), header.Number.Uint64())
|
||||
if err != nil {
|
||||
return logs, err
|
||||
return nil, err
|
||||
}
|
||||
logs = append(logs, found...)
|
||||
return flatten(list), nil
|
||||
} else if skipBloom || bloomFilter(header.Bloom, f.addresses, f.topics) {
|
||||
return f.checkMatches(ctx, header)
|
||||
}
|
||||
return logs, nil
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// checkMatches checks if the receipts belonging to the given header contain any log events that
|
||||
// match the filter criteria. This function is called when the bloom filter signals a potential match.
|
||||
func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs []*types.Log, err error) {
|
||||
// Get the logs of the block
|
||||
logsList, err := f.backend.GetLogs(ctx, header.Hash())
|
||||
func (f *Filter) checkMatches(ctx context.Context, header *types.Header) ([]*types.Log, error) {
|
||||
logsList, err := f.sys.cachedGetLogs(ctx, header.Hash(), header.Number.Uint64())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var unfiltered []*types.Log
|
||||
for _, logs := range logsList {
|
||||
unfiltered = append(unfiltered, logs...)
|
||||
}
|
||||
logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
|
||||
|
||||
unfiltered := flatten(logsList)
|
||||
logs := filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
|
||||
if len(logs) > 0 {
|
||||
// We have matching logs, check if we need to resolve full logs via the light client
|
||||
if logs[0].TxHash == (common.Hash{}) {
|
||||
receipts, err := f.backend.GetReceipts(ctx, header.Hash())
|
||||
receipts, err := f.sys.backend.GetReceipts(ctx, header.Hash())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -291,7 +268,7 @@ func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs [
|
|||
|
||||
// pendingLogs returns the logs matching the filter criteria within the pending block.
|
||||
func (f *Filter) pendingLogs() ([]*types.Log, error) {
|
||||
block, receipts := f.backend.PendingBlockAndReceipts()
|
||||
block, receipts := f.sys.backend.PendingBlockAndReceipts()
|
||||
if bloomFilter(block.Bloom(), f.addresses, f.topics) {
|
||||
var unfiltered []*types.Log
|
||||
for _, r := range receipts {
|
||||
|
|
@ -376,3 +353,11 @@ func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]commo
|
|||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func flatten(list [][]*types.Log) []*types.Log {
|
||||
var flat []*types.Log
|
||||
for _, logs := range list {
|
||||
flat = append(flat, logs...)
|
||||
}
|
||||
return flat
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,18 +21,96 @@ package filters
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
ethereum "github.com/XinFinOrg/XDPoSChain"
|
||||
"github.com/XinFinOrg/XDPoSChain/common"
|
||||
"github.com/XinFinOrg/XDPoSChain/core"
|
||||
"github.com/XinFinOrg/XDPoSChain/core/bloombits"
|
||||
"github.com/XinFinOrg/XDPoSChain/core/types"
|
||||
"github.com/XinFinOrg/XDPoSChain/ethdb"
|
||||
"github.com/XinFinOrg/XDPoSChain/event"
|
||||
"github.com/XinFinOrg/XDPoSChain/log"
|
||||
"github.com/XinFinOrg/XDPoSChain/rpc"
|
||||
lru "github.com/hashicorp/golang-lru"
|
||||
)
|
||||
|
||||
// Config represents the configuration of the filter system.
|
||||
type Config struct {
|
||||
LogCacheSize int // maximum number of cached blocks (default: 32)
|
||||
Timeout time.Duration // how long filters stay active (default: 5min)
|
||||
}
|
||||
|
||||
func (cfg Config) withDefaults() Config {
|
||||
if cfg.Timeout == 0 {
|
||||
cfg.Timeout = 5 * time.Minute
|
||||
}
|
||||
if cfg.LogCacheSize == 0 {
|
||||
cfg.LogCacheSize = 32
|
||||
}
|
||||
return cfg
|
||||
}
|
||||
|
||||
type Backend interface {
|
||||
ChainDb() ethdb.Database
|
||||
HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error)
|
||||
HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error)
|
||||
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
|
||||
GetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error)
|
||||
PendingBlockAndReceipts() (*types.Block, types.Receipts)
|
||||
|
||||
SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
|
||||
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
|
||||
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
|
||||
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
|
||||
SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription
|
||||
|
||||
BloomStatus() (uint64, uint64)
|
||||
ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
|
||||
}
|
||||
|
||||
// FilterSystem holds resources shared by all filters.
|
||||
type FilterSystem struct {
|
||||
backend Backend
|
||||
logsCache *lru.Cache
|
||||
cfg *Config
|
||||
}
|
||||
|
||||
// NewFilterSystem creates a filter system.
|
||||
func NewFilterSystem(backend Backend, config Config) *FilterSystem {
|
||||
config = config.withDefaults()
|
||||
|
||||
cache, err := lru.New(config.LogCacheSize)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return &FilterSystem{
|
||||
backend: backend,
|
||||
logsCache: cache,
|
||||
cfg: &config,
|
||||
}
|
||||
}
|
||||
|
||||
// cachedGetLogs loads block logs from the backend and caches the result.
|
||||
func (sys *FilterSystem) cachedGetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error) {
|
||||
cached, ok := sys.logsCache.Get(blockHash)
|
||||
if ok {
|
||||
return cached.([][]*types.Log), nil
|
||||
}
|
||||
|
||||
logs, err := sys.backend.GetLogs(ctx, blockHash, number)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if logs == nil {
|
||||
return nil, fmt.Errorf("failed to get logs for block #%d (0x%s)", number, blockHash.TerminalString())
|
||||
}
|
||||
sys.logsCache.Add(blockHash, logs)
|
||||
return logs, nil
|
||||
}
|
||||
|
||||
// Type determines the kind of filter and is used to put the filter in to
|
||||
// the correct bucket when added.
|
||||
type Type byte
|
||||
|
|
@ -83,6 +161,7 @@ type subscription struct {
|
|||
// subscription which match the subscription criteria.
|
||||
type EventSystem struct {
|
||||
backend Backend
|
||||
sys *FilterSystem
|
||||
lightMode bool
|
||||
lastHead *types.Header
|
||||
|
||||
|
|
@ -109,9 +188,10 @@ type EventSystem struct {
|
|||
//
|
||||
// The returned manager has a loop that needs to be stopped with the Stop function
|
||||
// or by stopping the given mux.
|
||||
func NewEventSystem(backend Backend, lightMode bool) *EventSystem {
|
||||
func NewEventSystem(sys *FilterSystem, lightMode bool) *EventSystem {
|
||||
m := &EventSystem{
|
||||
backend: backend,
|
||||
sys: sys,
|
||||
backend: sys.backend,
|
||||
lightMode: lightMode,
|
||||
install: make(chan *subscription),
|
||||
uninstall: make(chan *subscription),
|
||||
|
|
@ -405,7 +485,7 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.
|
|||
// Get the logs of the block
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
|
||||
defer cancel()
|
||||
logsList, err := es.backend.GetLogs(ctx, header.Hash())
|
||||
logsList, err := es.sys.cachedGetLogs(ctx, header.Hash(), header.Number.Uint64())
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -39,10 +39,6 @@ import (
|
|||
"github.com/XinFinOrg/XDPoSChain/rpc"
|
||||
)
|
||||
|
||||
var (
|
||||
deadline = 5 * time.Minute
|
||||
)
|
||||
|
||||
type testBackend struct {
|
||||
mux *event.TypeMux
|
||||
db ethdb.Database
|
||||
|
|
@ -81,14 +77,8 @@ func (b *testBackend) GetReceipts(ctx context.Context, blockHash common.Hash) (t
|
|||
return core.GetBlockReceipts(b.db, blockHash, number), nil
|
||||
}
|
||||
|
||||
func (b *testBackend) GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error) {
|
||||
number := core.GetBlockNumber(b.db, blockHash)
|
||||
receipts := core.GetBlockReceipts(b.db, blockHash, number)
|
||||
|
||||
logs := make([][]*types.Log, len(receipts))
|
||||
for i, receipt := range receipts {
|
||||
logs[i] = receipt.Logs
|
||||
}
|
||||
func (b *testBackend) GetLogs(ctx context.Context, hash common.Hash, number uint64) ([][]*types.Log, error) {
|
||||
logs := rawdb.ReadLogs(b.db, hash, number)
|
||||
return logs, nil
|
||||
}
|
||||
|
||||
|
|
@ -147,6 +137,12 @@ func (b *testBackend) ServiceFilter(ctx context.Context, session *bloombits.Matc
|
|||
}()
|
||||
}
|
||||
|
||||
func newTestFilterSystem(t testing.TB, db ethdb.Database, cfg Config) (*testBackend, *FilterSystem) {
|
||||
backend := &testBackend{db: db}
|
||||
sys := NewFilterSystem(backend, cfg)
|
||||
return backend, sys
|
||||
}
|
||||
|
||||
// TestBlockSubscription tests if a block subscription returns block hashes for posted chain events.
|
||||
// It creates multiple subscriptions:
|
||||
// - one at the start and should receive all posted chain events and a second (blockHashes)
|
||||
|
|
@ -156,12 +152,12 @@ func TestBlockSubscription(t *testing.T) {
|
|||
t.Parallel()
|
||||
|
||||
var (
|
||||
db = rawdb.NewMemoryDatabase()
|
||||
backend = &testBackend{db: db}
|
||||
api = NewFilterAPI(backend, false, deadline)
|
||||
genesis = new(core.Genesis).MustCommit(db)
|
||||
chain, _ = core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 10, func(i int, gen *core.BlockGen) {})
|
||||
chainEvents = []core.ChainEvent{}
|
||||
db = rawdb.NewMemoryDatabase()
|
||||
backend, sys = newTestFilterSystem(t, db, Config{})
|
||||
api = NewFilterAPI(sys, false)
|
||||
genesis = new(core.Genesis).MustCommit(db)
|
||||
chain, _ = core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 10, func(i int, gen *core.BlockGen) {})
|
||||
chainEvents = []core.ChainEvent{}
|
||||
)
|
||||
|
||||
for _, blk := range chain {
|
||||
|
|
@ -208,9 +204,9 @@ func TestPendingTxFilter(t *testing.T) {
|
|||
t.Parallel()
|
||||
|
||||
var (
|
||||
db = rawdb.NewMemoryDatabase()
|
||||
backend = &testBackend{db: db}
|
||||
api = NewFilterAPI(backend, false, deadline)
|
||||
db = rawdb.NewMemoryDatabase()
|
||||
backend, sys = newTestFilterSystem(t, db, Config{})
|
||||
api = NewFilterAPI(sys, false)
|
||||
|
||||
transactions = []*types.Transaction{
|
||||
types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
|
||||
|
|
@ -263,9 +259,9 @@ func TestPendingTxFilter(t *testing.T) {
|
|||
// If not it must return an error.
|
||||
func TestLogFilterCreation(t *testing.T) {
|
||||
var (
|
||||
db = rawdb.NewMemoryDatabase()
|
||||
backend = &testBackend{db: db}
|
||||
api = NewFilterAPI(backend, false, deadline)
|
||||
db = rawdb.NewMemoryDatabase()
|
||||
_, sys = newTestFilterSystem(t, db, Config{})
|
||||
api = NewFilterAPI(sys, false)
|
||||
|
||||
testCases = []struct {
|
||||
crit FilterCriteria
|
||||
|
|
@ -307,9 +303,9 @@ func TestInvalidLogFilterCreation(t *testing.T) {
|
|||
t.Parallel()
|
||||
|
||||
var (
|
||||
db = rawdb.NewMemoryDatabase()
|
||||
backend = &testBackend{db: db}
|
||||
api = NewFilterAPI(backend, false, deadline)
|
||||
db = rawdb.NewMemoryDatabase()
|
||||
_, sys = newTestFilterSystem(t, db, Config{})
|
||||
api = NewFilterAPI(sys, false)
|
||||
)
|
||||
|
||||
// different situations where log filter creation should fail.
|
||||
|
|
@ -330,8 +326,8 @@ func TestInvalidLogFilterCreation(t *testing.T) {
|
|||
func TestInvalidGetLogsRequest(t *testing.T) {
|
||||
var (
|
||||
db = rawdb.NewMemoryDatabase()
|
||||
backend = &testBackend{db: db}
|
||||
api = NewFilterAPI(backend, false, deadline)
|
||||
_, sys = newTestFilterSystem(t, db, Config{})
|
||||
api = NewFilterAPI(sys, false)
|
||||
blockHash = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111")
|
||||
)
|
||||
|
||||
|
|
@ -354,9 +350,9 @@ func TestLogFilter(t *testing.T) {
|
|||
t.Parallel()
|
||||
|
||||
var (
|
||||
db = rawdb.NewMemoryDatabase()
|
||||
backend = &testBackend{db: db}
|
||||
api = NewFilterAPI(backend, false, deadline)
|
||||
db = rawdb.NewMemoryDatabase()
|
||||
backend, sys = newTestFilterSystem(t, db, Config{})
|
||||
api = NewFilterAPI(sys, false)
|
||||
|
||||
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
|
||||
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
|
||||
|
|
@ -468,9 +464,9 @@ func TestPendingLogsSubscription(t *testing.T) {
|
|||
t.Parallel()
|
||||
|
||||
var (
|
||||
db = rawdb.NewMemoryDatabase()
|
||||
backend = &testBackend{db: db}
|
||||
api = NewFilterAPI(backend, false, deadline)
|
||||
db = rawdb.NewMemoryDatabase()
|
||||
backend, sys = newTestFilterSystem(t, db, Config{})
|
||||
api = NewFilterAPI(sys, false)
|
||||
|
||||
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
|
||||
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
|
||||
|
|
@ -604,10 +600,10 @@ func TestPendingTxFilterDeadlock(t *testing.T) {
|
|||
timeout := 100 * time.Millisecond
|
||||
|
||||
var (
|
||||
db = rawdb.NewMemoryDatabase()
|
||||
backend = &testBackend{db: db}
|
||||
api = NewFilterAPI(backend, false, timeout)
|
||||
done = make(chan struct{})
|
||||
db = rawdb.NewMemoryDatabase()
|
||||
backend, sys = newTestFilterSystem(t, db, Config{Timeout: timeout})
|
||||
api = NewFilterAPI(sys, false)
|
||||
done = make(chan struct{})
|
||||
)
|
||||
|
||||
go func() {
|
||||
|
|
|
|||
|
|
@ -50,7 +50,7 @@ func BenchmarkFilters(b *testing.B) {
|
|||
|
||||
var (
|
||||
db, _ = rawdb.NewLevelDBDatabase(dir, 0, 0, "")
|
||||
backend = &testBackend{db: db}
|
||||
_, sys = newTestFilterSystem(b, db, Config{})
|
||||
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
|
||||
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
|
||||
addr2 = common.BytesToAddress([]byte("jeff"))
|
||||
|
|
@ -91,7 +91,7 @@ func BenchmarkFilters(b *testing.B) {
|
|||
}
|
||||
b.ResetTimer()
|
||||
|
||||
filter := NewRangeFilter(backend, 0, -1, []common.Address{addr1, addr2, addr3, addr4}, nil)
|
||||
filter := sys.NewRangeFilter(0, -1, []common.Address{addr1, addr2, addr3, addr4}, nil)
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
logs, _ := filter.Logs(context.Background())
|
||||
|
|
@ -110,7 +110,7 @@ func TestFilters(t *testing.T) {
|
|||
|
||||
var (
|
||||
db, _ = rawdb.NewLevelDBDatabase(dir, 0, 0, "")
|
||||
backend = &testBackend{db: db}
|
||||
_, sys = newTestFilterSystem(t, db, Config{})
|
||||
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
|
||||
addr = crypto.PubkeyToAddress(key1.PublicKey)
|
||||
|
||||
|
|
@ -133,6 +133,7 @@ func TestFilters(t *testing.T) {
|
|||
},
|
||||
}
|
||||
gen.AddUncheckedReceipt(receipt)
|
||||
gen.AddUncheckedTx(types.NewTransaction(1, common.HexToAddress("0x1"), big.NewInt(1), 1, big.NewInt(2100), nil))
|
||||
case 2:
|
||||
receipt := types.NewReceipt(nil, false, 0)
|
||||
receipt.Logs = []*types.Log{
|
||||
|
|
@ -142,6 +143,7 @@ func TestFilters(t *testing.T) {
|
|||
},
|
||||
}
|
||||
gen.AddUncheckedReceipt(receipt)
|
||||
gen.AddUncheckedTx(types.NewTransaction(2, common.HexToAddress("0x2"), big.NewInt(2), 2, big.NewInt(2100), nil))
|
||||
case 998:
|
||||
receipt := types.NewReceipt(nil, false, 0)
|
||||
receipt.Logs = []*types.Log{
|
||||
|
|
@ -151,6 +153,7 @@ func TestFilters(t *testing.T) {
|
|||
},
|
||||
}
|
||||
gen.AddUncheckedReceipt(receipt)
|
||||
gen.AddUncheckedTx(types.NewTransaction(998, common.HexToAddress("0x998"), big.NewInt(998), 998, big.NewInt(2100), nil))
|
||||
case 999:
|
||||
receipt := types.NewReceipt(nil, false, 0)
|
||||
receipt.Logs = []*types.Log{
|
||||
|
|
@ -160,6 +163,7 @@ func TestFilters(t *testing.T) {
|
|||
},
|
||||
}
|
||||
gen.AddUncheckedReceipt(receipt)
|
||||
gen.AddUncheckedTx(types.NewTransaction(999, common.HexToAddress("0x999"), big.NewInt(999), 999, big.NewInt(2100), nil))
|
||||
}
|
||||
})
|
||||
for i, block := range chain {
|
||||
|
|
@ -175,14 +179,14 @@ func TestFilters(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
filter := NewRangeFilter(backend, 0, -1, []common.Address{addr}, [][]common.Hash{{hash1, hash2, hash3, hash4}})
|
||||
filter := sys.NewRangeFilter(0, -1, []common.Address{addr}, [][]common.Hash{{hash1, hash2, hash3, hash4}})
|
||||
|
||||
logs, _ := filter.Logs(context.Background())
|
||||
if len(logs) != 4 {
|
||||
t.Error("expected 4 log, got", len(logs))
|
||||
}
|
||||
|
||||
filter = NewRangeFilter(backend, 900, 999, []common.Address{addr}, [][]common.Hash{{hash3}})
|
||||
filter = sys.NewRangeFilter(900, 999, []common.Address{addr}, [][]common.Hash{{hash3}})
|
||||
logs, _ = filter.Logs(context.Background())
|
||||
if len(logs) != 1 {
|
||||
t.Error("expected 1 log, got", len(logs))
|
||||
|
|
@ -191,7 +195,7 @@ func TestFilters(t *testing.T) {
|
|||
t.Errorf("expected log[0].Topics[0] to be %x, got %x", hash3, logs[0].Topics[0])
|
||||
}
|
||||
|
||||
filter = NewRangeFilter(backend, 990, -1, []common.Address{addr}, [][]common.Hash{{hash3}})
|
||||
filter = sys.NewRangeFilter(990, -1, []common.Address{addr}, [][]common.Hash{{hash3}})
|
||||
logs, _ = filter.Logs(context.Background())
|
||||
if len(logs) != 1 {
|
||||
t.Error("expected 1 log, got", len(logs))
|
||||
|
|
@ -200,7 +204,7 @@ func TestFilters(t *testing.T) {
|
|||
t.Errorf("expected log[0].Topics[0] to be %x, got %x", hash3, logs[0].Topics[0])
|
||||
}
|
||||
|
||||
filter = NewRangeFilter(backend, 1, 10, nil, [][]common.Hash{{hash1, hash2}})
|
||||
filter = sys.NewRangeFilter(1, 10, nil, [][]common.Hash{{hash1, hash2}})
|
||||
|
||||
logs, _ = filter.Logs(context.Background())
|
||||
if len(logs) != 2 {
|
||||
|
|
@ -208,7 +212,7 @@ func TestFilters(t *testing.T) {
|
|||
}
|
||||
|
||||
failHash := common.BytesToHash([]byte("fail"))
|
||||
filter = NewRangeFilter(backend, 0, -1, nil, [][]common.Hash{{failHash}})
|
||||
filter = sys.NewRangeFilter(0, -1, nil, [][]common.Hash{{failHash}})
|
||||
|
||||
logs, _ = filter.Logs(context.Background())
|
||||
if len(logs) != 0 {
|
||||
|
|
@ -216,14 +220,14 @@ func TestFilters(t *testing.T) {
|
|||
}
|
||||
|
||||
failAddr := common.BytesToAddress([]byte("failmenow"))
|
||||
filter = NewRangeFilter(backend, 0, -1, []common.Address{failAddr}, nil)
|
||||
filter = sys.NewRangeFilter(0, -1, []common.Address{failAddr}, nil)
|
||||
|
||||
logs, _ = filter.Logs(context.Background())
|
||||
if len(logs) != 0 {
|
||||
t.Error("expected 0 log, got", len(logs))
|
||||
}
|
||||
|
||||
filter = NewRangeFilter(backend, 0, -1, nil, [][]common.Hash{{failHash}, {hash1}})
|
||||
filter = sys.NewRangeFilter(0, -1, nil, [][]common.Hash{{failHash}, {hash1}})
|
||||
|
||||
logs, _ = filter.Logs(context.Background())
|
||||
if len(logs) != 0 {
|
||||
|
|
|
|||
|
|
@ -35,6 +35,7 @@ import (
|
|||
"github.com/XinFinOrg/XDPoSChain/core/types"
|
||||
"github.com/XinFinOrg/XDPoSChain/core/vm"
|
||||
"github.com/XinFinOrg/XDPoSChain/eth/downloader"
|
||||
"github.com/XinFinOrg/XDPoSChain/eth/filters"
|
||||
"github.com/XinFinOrg/XDPoSChain/ethdb"
|
||||
"github.com/XinFinOrg/XDPoSChain/event"
|
||||
"github.com/XinFinOrg/XDPoSChain/params"
|
||||
|
|
@ -103,6 +104,10 @@ type Backend interface {
|
|||
GetBlocksHashCache(blockNr uint64) []common.Hash
|
||||
AreTwoBlockSamePath(newBlock common.Hash, oldBlock common.Hash) bool
|
||||
GetOrderNonce(address common.Hash) (uint64, error)
|
||||
|
||||
// eth/filters needs to be initialized from this backend type, so methods needed by
|
||||
// it must also be included here.
|
||||
filters.Backend
|
||||
}
|
||||
|
||||
func GetAPIs(apiBackend Backend, chainReader consensus.ChainReader) []rpc.API {
|
||||
|
|
|
|||
|
|
@ -167,8 +167,8 @@ func (b *LesApiBackend) GetReceipts(ctx context.Context, blockHash common.Hash)
|
|||
return light.GetBlockReceipts(ctx, b.eth.odr, blockHash, core.GetBlockNumber(b.eth.chainDb, blockHash))
|
||||
}
|
||||
|
||||
func (b *LesApiBackend) GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error) {
|
||||
return light.GetBlockLogs(ctx, b.eth.odr, blockHash, core.GetBlockNumber(b.eth.chainDb, blockHash))
|
||||
func (b *LesApiBackend) GetLogs(ctx context.Context, hash common.Hash, number uint64) ([][]*types.Log, error) {
|
||||
return light.GetBlockLogs(ctx, b.eth.odr, hash, number)
|
||||
}
|
||||
|
||||
func (b *LesApiBackend) GetTd(blockHash common.Hash) *big.Int {
|
||||
|
|
|
|||
|
|
@ -190,7 +190,7 @@ func (s *LightEthereum) APIs() []rpc.API {
|
|||
}, {
|
||||
Namespace: "eth",
|
||||
Version: "1.0",
|
||||
Service: filters.NewFilterAPI(s.ApiBackend, true, 5*time.Minute),
|
||||
Service: filters.NewFilterAPI(filters.NewFilterSystem(s.ApiBackend, filters.Config{LogCacheSize: s.config.FilterLogCacheSize}), true),
|
||||
Public: true,
|
||||
}, {
|
||||
Namespace: "net",
|
||||
|
|
|
|||
16
node/node.go
16
node/node.go
|
|
@ -48,6 +48,7 @@ type Node struct {
|
|||
|
||||
serverConfig p2p.Config
|
||||
server *p2p.Server // Currently running P2P networking layer
|
||||
state int // Tracks state of node lifecycle
|
||||
|
||||
serviceFuncs []ServiceConstructor // Service constructors (in dependency order)
|
||||
services map[reflect.Type]Service // Currently running services
|
||||
|
|
@ -74,6 +75,10 @@ type Node struct {
|
|||
log log.Logger
|
||||
}
|
||||
|
||||
const (
|
||||
initializingState = iota
|
||||
)
|
||||
|
||||
// New creates a new P2P node, ready for protocol registration.
|
||||
func New(conf *Config) (*Node, error) {
|
||||
// Copy config and resolve the datadir so future changes to the current
|
||||
|
|
@ -302,6 +307,17 @@ func (n *Node) stopInProc() {
|
|||
}
|
||||
}
|
||||
|
||||
// RegisterAPIs registers the APIs a service provides on the node.
|
||||
func (n *Node) RegisterAPIs(apis []rpc.API) {
|
||||
n.lock.Lock()
|
||||
defer n.lock.Unlock()
|
||||
|
||||
if n.state != initializingState {
|
||||
panic("can't register APIs on running/stopped node")
|
||||
}
|
||||
n.rpcAPIs = append(n.rpcAPIs, apis...)
|
||||
}
|
||||
|
||||
// startIPC initializes and starts the IPC RPC endpoint.
|
||||
func (n *Node) startIPC(apis []rpc.API) error {
|
||||
// Short circuit if the IPC endpoint isn't being exposed
|
||||
|
|
|
|||
Loading…
Reference in a new issue