eth/filters: remove use of event.TypeMux for pending logs (#20312)

This commit is contained in:
JukLee0ira 2024-07-10 15:30:39 +08:00 committed by Daniel Liu
parent 99c2e18b88
commit 83782e5368
13 changed files with 257 additions and 266 deletions

View file

@ -126,7 +126,7 @@ func NewXDCSimulatedBackend(alloc core.GenesisAlloc, gasLimit uint64, chainConfi
database: database,
blockchain: blockchain,
config: genesis.Config,
events: filters.NewEventSystem(new(event.TypeMux), &filterBackend{database, blockchain}, false),
events: filters.NewEventSystem(&filterBackend{database, blockchain}, false),
}
blockchain.Client = backend
backend.rollback()
@ -146,7 +146,7 @@ func NewSimulatedBackend(alloc core.GenesisAlloc) *SimulatedBackend {
database: database,
blockchain: blockchain,
config: genesis.Config,
events: filters.NewEventSystem(new(event.TypeMux), &filterBackend{database, blockchain}, false),
events: filters.NewEventSystem(&filterBackend{database, blockchain}, false),
}
backend.rollback()
return backend
@ -558,22 +558,34 @@ func (fb *filterBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*ty
}
func (fb *filterBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
return nullSubscription()
}
func (fb *filterBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
return fb.bc.SubscribeChainEvent(ch)
}
func (fb *filterBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription {
return fb.bc.SubscribeRemovedLogsEvent(ch)
}
func (fb *filterBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
return fb.bc.SubscribeLogsEvent(ch)
}
func (fb *filterBackend) SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription {
return nullSubscription()
}
func (fb *filterBackend) BloomStatus() (uint64, uint64) { return 4096, 0 }
func (fb *filterBackend) ServiceFilter(ctx context.Context, ms *bloombits.MatcherSession) {
panic("not supported")
}
func nullSubscription() event.Subscription {
return event.NewSubscription(func(quit <-chan struct{}) error {
<-quit
return nil
})
}
func (fb *filterBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
return fb.bc.SubscribeChainEvent(ch)
}
func (fb *filterBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription {
return fb.bc.SubscribeRemovedLogsEvent(ch)
}
func (fb *filterBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
return fb.bc.SubscribeLogsEvent(ch)
}
func (fb *filterBackend) BloomStatus() (uint64, uint64) { return 4096, 0 }
func (fb *filterBackend) ServiceFilter(ctx context.Context, ms *bloombits.MatcherSession) {
panic("not supported")
}

View file

@ -30,11 +30,6 @@ type OrderTxPreEvent struct{ Tx *types.OrderTransaction }
// LendingTxPreEvent is posted when a order transaction enters the order transaction pool.
type LendingTxPreEvent struct{ Tx *types.LendingTransaction }
// PendingLogsEvent is posted pre mining and notifies of pending logs.
type PendingLogsEvent struct {
Logs []*types.Log
}
// PendingStateEvent is posted pre mining and notifies of pending state changes.
type PendingStateEvent struct{}

View file

@ -258,6 +258,10 @@ func (b *EthApiBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEven
return b.eth.BlockChain().SubscribeRemovedLogsEvent(ch)
}
func (b *EthApiBackend) SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription {
return b.eth.miner.SubscribePendingLogs(ch)
}
func (b *EthApiBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
return b.eth.BlockChain().SubscribeChainEvent(ch)
}

View file

@ -72,9 +72,8 @@ type PublicFilterAPI struct {
func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI {
api := &PublicFilterAPI{
backend: backend,
mux: backend.EventMux(),
chainDb: backend.ChainDb(),
events: NewEventSystem(backend.EventMux(), backend, lightMode),
events: NewEventSystem(backend, lightMode),
filters: make(map[rpc.ID]*filter),
}
go api.timeoutLoop()
@ -439,7 +438,7 @@ func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
hashes := f.hashes
f.hashes = nil
return returnHashes(hashes), nil
case LogsSubscription:
case LogsSubscription, MinedAndPendingLogsSubscription:
logs := f.logs
f.logs = nil
return returnLogs(logs), nil

View file

@ -30,7 +30,6 @@ import (
"github.com/XinFinOrg/XDPoSChain/core/rawdb"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/ethdb"
"github.com/XinFinOrg/XDPoSChain/event"
"github.com/XinFinOrg/XDPoSChain/node"
)
@ -124,14 +123,13 @@ func benchmarkBloomBits(b *testing.B, sectionSize uint64) {
b.Log("Running filter benchmarks...")
start = time.Now()
mux := new(event.TypeMux)
var backend *testBackend
for i := 0; i < benchFilterCnt; i++ {
if i%20 == 0 {
db.Close()
db, _ = rawdb.NewLevelDBDatabase(benchDataDir, 128, 1024, "")
backend = &testBackend{mux, db, cnt, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)}
backend = &testBackend{db: db, sections: cnt}
}
var addr common.Address
addr[0] = byte(i)
@ -190,8 +188,7 @@ func BenchmarkNoBloomBits(b *testing.B) {
b.Log("Running filter benchmarks...")
start := time.Now()
mux := new(event.TypeMux)
backend := &testBackend{mux, db, 0, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)}
backend := &testBackend{db: db}
filter := NewRangeFilter(backend, 0, int64(headNum), []common.Address{{}}, nil)
filter.Logs(context.Background())
d := time.Since(start)

View file

@ -32,7 +32,6 @@ import (
type Backend interface {
ChainDb() ethdb.Database
EventMux() *event.TypeMux
HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error)
HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error)
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
@ -42,6 +41,7 @@ type Backend interface {
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription
BloomStatus() (uint64, uint64)
ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)

View file

@ -67,10 +67,6 @@ const (
chainEvChanSize = 10
)
var (
ErrInvalidSubscriptionID = errors.New("invalid id")
)
type subscription struct {
id rpc.ID
typ Type
@ -86,25 +82,25 @@ type subscription struct {
// EventSystem creates subscriptions, processes events and broadcasts them to the
// subscription which match the subscription criteria.
type EventSystem struct {
mux *event.TypeMux
backend Backend
lightMode bool
lastHead *types.Header
// Subscriptions
txsSub event.Subscription // Subscription for new transaction event
logsSub event.Subscription // Subscription for new log event
rmLogsSub event.Subscription // Subscription for removed log event
chainSub event.Subscription // Subscription for new chain event
pendingLogSub *event.TypeMuxSubscription // Subscription for pending log event
txsSub event.Subscription // Subscription for new transaction event
logsSub event.Subscription // Subscription for new log event
rmLogsSub event.Subscription // Subscription for removed log event
pendingLogsSub event.Subscription // Subscription for pending log event
chainSub event.Subscription // Subscription for new chain event
// Channels
install chan *subscription // install filter for event notification
uninstall chan *subscription // remove filter for event notification
txsCh chan core.NewTxsEvent // Channel to receive new transactions event
logsCh chan []*types.Log // Channel to receive new log event
rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
chainCh chan core.ChainEvent // Channel to receive new chain event
install chan *subscription // install filter for event notification
uninstall chan *subscription // remove filter for event notification
txsCh chan core.NewTxsEvent // Channel to receive new transactions event
logsCh chan []*types.Log // Channel to receive new log event
pendingLogsCh chan []*types.Log // Channel to receive new log event
rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
chainCh chan core.ChainEvent // Channel to receive new chain event
}
// NewEventSystem creates a new manager that listens for event on the given mux,
@ -113,17 +109,17 @@ type EventSystem struct {
//
// The returned manager has a loop that needs to be stopped with the Stop function
// or by stopping the given mux.
func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventSystem {
func NewEventSystem(backend Backend, lightMode bool) *EventSystem {
m := &EventSystem{
mux: mux,
backend: backend,
lightMode: lightMode,
install: make(chan *subscription),
uninstall: make(chan *subscription),
txsCh: make(chan core.NewTxsEvent, txChanSize),
logsCh: make(chan []*types.Log, logsChanSize),
rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
chainCh: make(chan core.ChainEvent, chainEvChanSize),
backend: backend,
lightMode: lightMode,
install: make(chan *subscription),
uninstall: make(chan *subscription),
txsCh: make(chan core.NewTxsEvent, txChanSize),
logsCh: make(chan []*types.Log, logsChanSize),
rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
pendingLogsCh: make(chan []*types.Log, logsChanSize),
chainCh: make(chan core.ChainEvent, chainEvChanSize),
}
// Subscribe events
@ -131,12 +127,10 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS
m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh)
m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh)
m.chainSub = m.backend.SubscribeChainEvent(m.chainCh)
// TODO(rjl493456442): use feed to subscribe pending log event
m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{})
m.pendingLogsSub = m.backend.SubscribePendingLogsEvent(m.pendingLogsCh)
// Make sure none of the subscriptions are empty
if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil ||
m.pendingLogSub.Closed() {
if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogsSub == nil {
log.Crit("Subscribe for event system failed")
}
@ -314,58 +308,61 @@ func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscript
type filterIndex map[Type]map[rpc.ID]*subscription
// broadcast event to filters that match criteria.
func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) {
if ev == nil {
func (es *EventSystem) handleLogs(filters filterIndex, ev []*types.Log) {
if len(ev) == 0 {
return
}
for _, f := range filters[LogsSubscription] {
matchedLogs := filterLogs(ev, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics)
if len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
}
}
switch e := ev.(type) {
case []*types.Log:
if len(e) > 0 {
func (es *EventSystem) handlePendingLogs(filters filterIndex, ev []*types.Log) {
if len(ev) == 0 {
return
}
for _, f := range filters[PendingLogsSubscription] {
matchedLogs := filterLogs(ev, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics)
if len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
}
}
func (es *EventSystem) handleRemovedLogs(filters filterIndex, ev core.RemovedLogsEvent) {
for _, f := range filters[LogsSubscription] {
matchedLogs := filterLogs(ev.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics)
if len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
}
}
func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) {
hashes := make([]common.Hash, 0, len(ev.Txs))
for _, tx := range ev.Txs {
hashes = append(hashes, tx.Hash())
}
for _, f := range filters[PendingTransactionsSubscription] {
f.hashes <- hashes
}
}
func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent) {
for _, f := range filters[BlocksSubscription] {
f.headers <- ev.Block.Header()
}
if es.lightMode && len(filters[LogsSubscription]) > 0 {
es.lightFilterNewHead(ev.Block.Header(), func(header *types.Header, remove bool) {
for _, f := range filters[LogsSubscription] {
if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
}
}
case core.RemovedLogsEvent:
for _, f := range filters[LogsSubscription] {
if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
}
case *event.TypeMuxEvent:
if muxe, ok := e.Data.(core.PendingLogsEvent); ok {
for _, f := range filters[PendingLogsSubscription] {
if e.Time.After(f.created) {
if matchedLogs := filterLogs(muxe.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
}
}
}
case core.NewTxsEvent:
hashes := make([]common.Hash, 0, len(e.Txs))
for _, tx := range e.Txs {
hashes = append(hashes, tx.Hash())
}
for _, f := range filters[PendingTransactionsSubscription] {
f.hashes <- hashes
}
case core.ChainEvent:
for _, f := range filters[BlocksSubscription] {
f.headers <- e.Block.Header()
}
if es.lightMode && len(filters[LogsSubscription]) > 0 {
es.lightFilterNewHead(e.Block.Header(), func(header *types.Header, remove bool) {
for _, f := range filters[LogsSubscription] {
if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
}
})
}
})
}
}
@ -446,10 +443,10 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.
func (es *EventSystem) eventLoop() {
// Ensure all subscriptions get cleaned up
defer func() {
es.pendingLogSub.Unsubscribe()
es.txsSub.Unsubscribe()
es.logsSub.Unsubscribe()
es.rmLogsSub.Unsubscribe()
es.pendingLogsSub.Unsubscribe()
es.chainSub.Unsubscribe()
}()
@ -460,20 +457,16 @@ func (es *EventSystem) eventLoop() {
for {
select {
// Handle subscribed events
case ev := <-es.txsCh:
es.broadcast(index, ev)
es.handleTxsEvent(index, ev)
case ev := <-es.logsCh:
es.broadcast(index, ev)
es.handleLogs(index, ev)
case ev := <-es.rmLogsCh:
es.broadcast(index, ev)
es.handleRemovedLogs(index, ev)
case ev := <-es.pendingLogsCh:
es.handlePendingLogs(index, ev)
case ev := <-es.chainCh:
es.broadcast(index, ev)
case ev, active := <-es.pendingLogSub.Chan():
if !active { // system stopped
return
}
es.broadcast(index, ev)
es.handleChainEvent(index, ev)
case f := <-es.install:
if f.typ == MinedAndPendingLogsSubscription {

View file

@ -39,23 +39,20 @@ import (
)
type testBackend struct {
mux *event.TypeMux
db ethdb.Database
sections uint64
txFeed *event.Feed
rmLogsFeed *event.Feed
logsFeed *event.Feed
chainFeed *event.Feed
mux *event.TypeMux
db ethdb.Database
sections uint64
txFeed event.Feed
logsFeed event.Feed
rmLogsFeed event.Feed
pendingLogsFeed event.Feed
chainFeed event.Feed
}
func (b *testBackend) ChainDb() ethdb.Database {
return b.db
}
func (b *testBackend) EventMux() *event.TypeMux {
return b.mux
}
func (b *testBackend) HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) {
var hash common.Hash
var num uint64
@ -102,6 +99,10 @@ func (b *testBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscript
return b.logsFeed.Subscribe(ch)
}
func (b *testBackend) SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription {
return b.pendingLogsFeed.Subscribe(ch)
}
func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
return b.chainFeed.Subscribe(ch)
}
@ -146,13 +147,8 @@ func TestBlockSubscription(t *testing.T) {
t.Parallel()
var (
mux = new(event.TypeMux)
db = rawdb.NewMemoryDatabase()
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
backend = &testBackend{db: db}
api = NewPublicFilterAPI(backend, false)
genesis = new(core.Genesis).MustCommit(db)
chain, _ = core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 10, func(i int, gen *core.BlockGen) {})
@ -191,7 +187,7 @@ func TestBlockSubscription(t *testing.T) {
time.Sleep(1 * time.Second)
for _, e := range chainEvents {
chainFeed.Send(e)
backend.chainFeed.Send(e)
}
<-sub0.Err()
@ -203,14 +199,9 @@ func TestPendingTxFilter(t *testing.T) {
t.Parallel()
var (
mux = new(event.TypeMux)
db = rawdb.NewMemoryDatabase()
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
api = NewPublicFilterAPI(backend, false)
db = rawdb.NewMemoryDatabase()
backend = &testBackend{db: db}
api = NewPublicFilterAPI(backend, false)
transactions = []*types.Transaction{
types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
@ -226,7 +217,7 @@ func TestPendingTxFilter(t *testing.T) {
fid0 := api.NewPendingTransactionFilter()
time.Sleep(1 * time.Second)
txFeed.Send(core.NewTxsEvent{Txs: transactions})
backend.txFeed.Send(core.NewTxsEvent{Txs: transactions})
timeout := time.Now().Add(1 * time.Second)
for {
@ -263,14 +254,9 @@ func TestPendingTxFilter(t *testing.T) {
// If not it must return an error.
func TestLogFilterCreation(t *testing.T) {
var (
mux = new(event.TypeMux)
db = rawdb.NewMemoryDatabase()
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
api = NewPublicFilterAPI(backend, false)
db = rawdb.NewMemoryDatabase()
backend = &testBackend{db: db}
api = NewPublicFilterAPI(backend, false)
testCases = []struct {
crit FilterCriteria
@ -312,14 +298,9 @@ func TestInvalidLogFilterCreation(t *testing.T) {
t.Parallel()
var (
mux = new(event.TypeMux)
db = rawdb.NewMemoryDatabase()
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
api = NewPublicFilterAPI(backend, false)
db = rawdb.NewMemoryDatabase()
backend = &testBackend{db: db}
api = NewPublicFilterAPI(backend, false)
)
// different situations where log filter creation should fail.
@ -339,15 +320,10 @@ func TestInvalidLogFilterCreation(t *testing.T) {
func TestInvalidGetLogsRequest(t *testing.T) {
var (
mux = new(event.TypeMux)
db = rawdb.NewMemoryDatabase()
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
api = NewPublicFilterAPI(backend, false)
blockHash = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111")
db = rawdb.NewMemoryDatabase()
backend = &testBackend{db: db}
api = NewPublicFilterAPI(backend, false)
blockHash = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111")
)
// Reason: Cannot specify both BlockHash and FromBlock/ToBlock)
@ -369,14 +345,9 @@ func TestLogFilter(t *testing.T) {
t.Parallel()
var (
mux = new(event.TypeMux)
db = rawdb.NewMemoryDatabase()
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
api = NewPublicFilterAPI(backend, false)
db = rawdb.NewMemoryDatabase()
backend = &testBackend{db: db}
api = NewPublicFilterAPI(backend, false)
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
@ -386,7 +357,7 @@ func TestLogFilter(t *testing.T) {
secondTopic = common.HexToHash("0x2222222222222222222222222222222222222222222222222222222222222222")
notUsedTopic = common.HexToHash("0x9999999999999999999999999999999999999999999999999999999999999999")
// posted twice, once as vm.Logs and once as core.PendingLogsEvent
// posted twice, once as regular logs and once as pending logs.
allLogs = []*types.Log{
{Address: firstAddr},
{Address: firstAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 1},
@ -439,11 +410,11 @@ func TestLogFilter(t *testing.T) {
// raise events
time.Sleep(1 * time.Second)
if nsend := logsFeed.Send(allLogs); nsend == 0 {
t.Fatal("Shoud have at least one subscription")
if nsend := backend.logsFeed.Send(allLogs); nsend == 0 {
t.Fatal("Logs event not delivered")
}
if err := mux.Post(core.PendingLogsEvent{Logs: allLogs}); err != nil {
t.Fatal(err)
if nsend := backend.pendingLogsFeed.Send(allLogs); nsend == 0 {
t.Fatal("Pending logs event not delivered")
}
for i, tt := range testCases {
@ -488,14 +459,9 @@ func TestPendingLogsSubscription(t *testing.T) {
t.Parallel()
var (
mux = new(event.TypeMux)
db = rawdb.NewMemoryDatabase()
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
api = NewPublicFilterAPI(backend, false)
db = rawdb.NewMemoryDatabase()
backend = &testBackend{db: db}
api = NewPublicFilterAPI(backend, false)
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
@ -507,26 +473,18 @@ func TestPendingLogsSubscription(t *testing.T) {
fourthTopic = common.HexToHash("0x4444444444444444444444444444444444444444444444444444444444444444")
notUsedTopic = common.HexToHash("0x9999999999999999999999999999999999999999999999999999999999999999")
allLogs = []core.PendingLogsEvent{
{Logs: []*types.Log{{Address: firstAddr, Topics: []common.Hash{}, BlockNumber: 0}}},
{Logs: []*types.Log{{Address: firstAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 1}}},
{Logs: []*types.Log{{Address: secondAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 2}}},
{Logs: []*types.Log{{Address: thirdAddress, Topics: []common.Hash{secondTopic}, BlockNumber: 3}}},
{Logs: []*types.Log{{Address: thirdAddress, Topics: []common.Hash{secondTopic}, BlockNumber: 4}}},
{Logs: []*types.Log{
allLogs = [][]*types.Log{
{{Address: firstAddr, Topics: []common.Hash{}, BlockNumber: 0}},
{{Address: firstAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 1}},
{{Address: secondAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 2}},
{{Address: thirdAddress, Topics: []common.Hash{secondTopic}, BlockNumber: 3}},
{{Address: thirdAddress, Topics: []common.Hash{secondTopic}, BlockNumber: 4}},
{
{Address: thirdAddress, Topics: []common.Hash{firstTopic}, BlockNumber: 5},
{Address: thirdAddress, Topics: []common.Hash{thirdTopic}, BlockNumber: 5},
{Address: thirdAddress, Topics: []common.Hash{fourthTopic}, BlockNumber: 5},
{Address: firstAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 5},
}},
}
convertLogs = func(pl []core.PendingLogsEvent) []*types.Log {
var logs []*types.Log
for _, l := range pl {
logs = append(logs, l.Logs...)
}
return logs
},
}
testCases = []struct {
@ -536,21 +494,52 @@ func TestPendingLogsSubscription(t *testing.T) {
sub *Subscription
}{
// match all
{ethereum.FilterQuery{}, convertLogs(allLogs), nil, nil},
{
ethereum.FilterQuery{}, flattenLogs(allLogs),
nil, nil,
},
// match none due to no matching addresses
{ethereum.FilterQuery{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{nil}}, []*types.Log{}, nil, nil},
{
ethereum.FilterQuery{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{nil}},
nil,
nil, nil,
},
// match logs based on addresses, ignore topics
{ethereum.FilterQuery{Addresses: []common.Address{firstAddr}}, append(convertLogs(allLogs[:2]), allLogs[5].Logs[3]), nil, nil},
{
ethereum.FilterQuery{Addresses: []common.Address{firstAddr}},
append(flattenLogs(allLogs[:2]), allLogs[5][3]),
nil, nil,
},
// match none due to no matching topics (match with address)
{ethereum.FilterQuery{Addresses: []common.Address{secondAddr}, Topics: [][]common.Hash{{notUsedTopic}}}, []*types.Log{}, nil, nil},
{
ethereum.FilterQuery{Addresses: []common.Address{secondAddr}, Topics: [][]common.Hash{{notUsedTopic}}},
nil, nil, nil,
},
// match logs based on addresses and topics
{ethereum.FilterQuery{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}}, append(convertLogs(allLogs[3:5]), allLogs[5].Logs[0]), nil, nil},
{
ethereum.FilterQuery{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}},
append(flattenLogs(allLogs[3:5]), allLogs[5][0]),
nil, nil,
},
// match logs based on multiple addresses and "or" topics
{ethereum.FilterQuery{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}}, append(convertLogs(allLogs[2:5]), allLogs[5].Logs[0]), nil, nil},
{
ethereum.FilterQuery{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}},
append(flattenLogs(allLogs[2:5]), allLogs[5][0]),
nil,
nil,
},
// block numbers are ignored for filters created with New***Filter, these return all logs that match the given criteria when the state changes
{ethereum.FilterQuery{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(2), ToBlock: big.NewInt(3)}, append(convertLogs(allLogs[:2]), allLogs[5].Logs[3]), nil, nil},
{
ethereum.FilterQuery{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(2), ToBlock: big.NewInt(3)},
append(flattenLogs(allLogs[:2]), allLogs[5][3]),
nil, nil,
},
// multiple pending logs, should match only 2 topics from the logs in block 5
{ethereum.FilterQuery{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, fourthTopic}}}, []*types.Log{allLogs[5].Logs[0], allLogs[5].Logs[2]}, nil, nil},
{
ethereum.FilterQuery{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, fourthTopic}}},
[]*types.Log{allLogs[5][0], allLogs[5][2]},
nil, nil,
},
}
)
@ -593,10 +582,15 @@ func TestPendingLogsSubscription(t *testing.T) {
// raise events
time.Sleep(1 * time.Second)
// allLogs are type of core.PendingLogsEvent
for _, l := range allLogs {
if err := mux.Post(l); err != nil {
t.Fatal(err)
}
for _, ev := range allLogs {
backend.pendingLogsFeed.Send(ev)
}
}
func flattenLogs(pl [][]*types.Log) []*types.Log {
var logs []*types.Log
for _, l := range pl {
logs = append(logs, l...)
}
return logs
}

View file

@ -29,7 +29,6 @@ import (
"github.com/XinFinOrg/XDPoSChain/core"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/crypto"
"github.com/XinFinOrg/XDPoSChain/event"
"github.com/XinFinOrg/XDPoSChain/params"
)
@ -50,18 +49,13 @@ func BenchmarkFilters(b *testing.B) {
defer os.RemoveAll(dir)
var (
db, _ = rawdb.NewLevelDBDatabase(dir, 0, 0, "")
mux = new(event.TypeMux)
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
addr2 = common.BytesToAddress([]byte("jeff"))
addr3 = common.BytesToAddress([]byte("ethereum"))
addr4 = common.BytesToAddress([]byte("random addresses please"))
db, _ = rawdb.NewLevelDBDatabase(dir, 0, 0, "")
backend = &testBackend{db: db}
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
addr2 = common.BytesToAddress([]byte("jeff"))
addr3 = common.BytesToAddress([]byte("ethereum"))
addr4 = common.BytesToAddress([]byte("random addresses please"))
)
defer db.Close()
@ -115,15 +109,10 @@ func TestFilters(t *testing.T) {
defer os.RemoveAll(dir)
var (
db, _ = rawdb.NewLevelDBDatabase(dir, 0, 0, "")
mux = new(event.TypeMux)
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr = crypto.PubkeyToAddress(key1.PublicKey)
db, _ = rawdb.NewLevelDBDatabase(dir, 0, 0, "")
backend = &testBackend{db: db}
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr = crypto.PubkeyToAddress(key1.PublicKey)
hash1 = common.BytesToHash([]byte("topic1"))
hash2 = common.BytesToHash([]byte("topic2"))

View file

@ -49,9 +49,8 @@ type Backend interface {
ProtocolVersion() int
SuggestPrice(ctx context.Context) (*big.Int, error)
ChainDb() ethdb.Database
EventMux() *event.TypeMux
AccountManager() *accounts.Manager
RPCGasCap() uint64 // global gas cap for eth_call over rpc: DoS protection
RPCGasCap() uint64 // global gas cap for eth_call over rpc: DoS protection
RPCTxFeeCap() float64 // global tx fee cap for all transaction related APIs
XDCxService() *XDCx.XDCX
LendingService() *XDCxlending.Lending
@ -88,6 +87,7 @@ type Backend interface {
OrderTxPoolContent() (map[common.Address]types.OrderTransactions, map[common.Address]types.OrderTransactions)
OrderStats() (pending int, queued int)
SendLendingTx(ctx context.Context, signedTx *types.LendingTransaction) error
SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription
ChainConfig() *params.ChainConfig
CurrentBlock() *types.Block

View file

@ -241,6 +241,13 @@ func (b *LesApiBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscri
return b.eth.blockchain.SubscribeLogsEvent(ch)
}
func (b *LesApiBackend) SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription {
return event.NewSubscription(func(quit <-chan struct{}) error {
<-quit
return nil
})
}
func (b *LesApiBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription {
return b.eth.blockchain.SubscribeRemovedLogsEvent(ch)
}
@ -261,10 +268,6 @@ func (b *LesApiBackend) ChainDb() ethdb.Database {
return b.eth.chainDb
}
func (b *LesApiBackend) EventMux() *event.TypeMux {
return b.eth.eventMux
}
func (b *LesApiBackend) AccountManager() *accounts.Manager {
return b.eth.accountManager
}

View file

@ -182,3 +182,9 @@ func (self *Miner) SetEtherbase(addr common.Address) {
self.coinbase = addr
self.worker.setEtherbase(addr)
}
// SubscribePendingLogs starts delivering logs from pending transactions
// to the given channel.
func (self *Miner) SubscribePendingLogs(ch chan<- []*types.Log) event.Subscription {
return self.worker.pendingLogsFeed.Subscribe(ch)
}

View file

@ -104,6 +104,9 @@ type worker struct {
mu sync.Mutex
// Feeds
pendingLogsFeed event.Feed
// update loop
mux *event.TypeMux
txsCh chan core.NewTxsEvent
@ -322,7 +325,7 @@ func (self *worker) update() {
}
feeCapacity := state.GetTRC21FeeCapacityFromState(self.current.state)
txset, specialTxs := types.NewTransactionsByPriceAndNonce(self.current.signer, txs, nil, feeCapacity)
self.current.commitTransactions(self.mux, feeCapacity, txset, specialTxs, self.chain, self.coinbase)
self.current.commitTransactions(self.mux, feeCapacity, txset, specialTxs, self.chain, self.coinbase, &self.pendingLogsFeed)
self.currentMu.Unlock()
} else {
// If we're mining, but nothing is being processed, wake on new transactions
@ -781,7 +784,7 @@ func (self *worker) commitNewWork() {
specialTxs = append(specialTxs, txStateRoot)
}
}
work.commitTransactions(self.mux, feeCapacity, txs, specialTxs, self.chain, self.coinbase)
work.commitTransactions(self.mux, feeCapacity, txs, specialTxs, self.chain, self.coinbase, &self.pendingLogsFeed)
// compute uncles for the new block.
var (
uncles []*types.Header
@ -801,7 +804,7 @@ func (self *worker) commitNewWork() {
self.push(work)
}
func (env *Work) commitTransactions(mux *event.TypeMux, balanceFee map[common.Address]*big.Int, txs *types.TransactionsByPriceAndNonce, specialTxs types.Transactions, bc *core.BlockChain, coinbase common.Address) {
func (env *Work) commitTransactions(mux *event.TypeMux, balanceFee map[common.Address]*big.Int, txs *types.TransactionsByPriceAndNonce, specialTxs types.Transactions, bc *core.BlockChain, coinbase common.Address, pendingLogsFeed *event.Feed) {
gp := new(core.GasPool).AddGas(env.header.GasLimit)
balanceUpdated := map[common.Address]*big.Int{}
totalFeeUsed := big.NewInt(0)
@ -1028,29 +1031,25 @@ func (env *Work) commitTransactions(mux *event.TypeMux, balanceFee map[common.Ad
}
}
state.UpdateTRC21Fee(env.state, balanceUpdated, totalFeeUsed)
if len(coalescedLogs) > 0 || env.tcount > 0 {
// make a copy, the state caches the logs and these logs get "upgraded" from pending to mined
// logs by filling in the block hash when the block was mined by the local miner. This can
// cause a race condition if a log was "upgraded" before the PendingLogsEvent is processed.
// make a copy, the state caches the logs and these logs get "upgraded" from pending to mined
// logs by filling in the block hash when the block was mined by the local miner. This can
// cause a race condition if a log was "upgraded" before the PendingLogsEvent is processed.
if len(coalescedLogs) > 0 {
cpy := make([]*types.Log, len(coalescedLogs))
for i, l := range coalescedLogs {
cpy[i] = new(types.Log)
*cpy[i] = *l
}
go func(logs []*types.Log, tcount int) {
if len(logs) > 0 {
err := mux.Post(core.PendingLogsEvent{Logs: logs})
if err != nil {
log.Warn("[commitTransactions] Error when sending PendingLogsEvent", "LogLength", len(logs))
}
pendingLogsFeed.Send(cpy)
}
if env.tcount > 0 {
go func(tcount int) {
err := mux.Post(core.PendingStateEvent{})
if err != nil {
log.Warn("[commitTransactions] Error when sending PendingStateEvent", "tcount", tcount)
}
if tcount > 0 {
err := mux.Post(core.PendingStateEvent{})
if err != nil {
log.Warn("[commitTransactions] Error when sending PendingStateEvent", "tcount", tcount)
}
}
}(cpy, env.tcount)
}(env.tcount)
}
}