mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-06-24 23:46:17 +00:00
eth/filters: fix potential deadlock in filter timeout loop (#22178)
This commit is contained in:
parent
83782e5368
commit
acaf943e59
4 changed files with 102 additions and 22 deletions
|
|
@ -24,6 +24,7 @@ import (
|
|||
"runtime"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/XinFinOrg/XDPoSChain/XDCx"
|
||||
"github.com/XinFinOrg/XDPoSChain/XDCxlending"
|
||||
|
|
@ -405,7 +406,7 @@ func (s *Ethereum) APIs() []rpc.API {
|
|||
}, {
|
||||
Namespace: "eth",
|
||||
Version: "1.0",
|
||||
Service: filters.NewPublicFilterAPI(s.ApiBackend, false),
|
||||
Service: filters.NewPublicFilterAPI(s.ApiBackend, false, 5*time.Minute),
|
||||
Public: true,
|
||||
}, {
|
||||
Namespace: "admin",
|
||||
|
|
|
|||
|
|
@ -41,10 +41,6 @@ var (
|
|||
// The maximum number of topic criteria allowed, vm.LOG4 - vm.LOG0
|
||||
const maxTopics = 4
|
||||
|
||||
var (
|
||||
deadline = 5 * time.Minute // consider a filter inactive if it has not been polled for within deadline
|
||||
)
|
||||
|
||||
// filter is a helper struct that holds meta information over the filter type
|
||||
// and associated subscription in the event system.
|
||||
type filter struct {
|
||||
|
|
@ -66,38 +62,49 @@ type PublicFilterAPI struct {
|
|||
events *EventSystem
|
||||
filtersMu sync.Mutex
|
||||
filters map[rpc.ID]*filter
|
||||
timeout time.Duration
|
||||
}
|
||||
|
||||
// NewPublicFilterAPI returns a new PublicFilterAPI instance.
|
||||
func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI {
|
||||
func NewPublicFilterAPI(backend Backend, lightMode bool, timeout time.Duration) *PublicFilterAPI {
|
||||
api := &PublicFilterAPI{
|
||||
backend: backend,
|
||||
chainDb: backend.ChainDb(),
|
||||
events: NewEventSystem(backend, lightMode),
|
||||
filters: make(map[rpc.ID]*filter),
|
||||
timeout: timeout,
|
||||
}
|
||||
go api.timeoutLoop()
|
||||
go api.timeoutLoop(timeout)
|
||||
|
||||
return api
|
||||
}
|
||||
|
||||
// timeoutLoop runs every 5 minutes and deletes filters that have not been recently used.
|
||||
// Tt is started when the api is created.
|
||||
func (api *PublicFilterAPI) timeoutLoop() {
|
||||
ticker := time.NewTicker(5 * time.Minute)
|
||||
func (api *PublicFilterAPI) timeoutLoop(timeout time.Duration) {
|
||||
var toUninstall []*Subscription
|
||||
ticker := time.NewTicker(timeout)
|
||||
for {
|
||||
<-ticker.C
|
||||
api.filtersMu.Lock()
|
||||
for id, f := range api.filters {
|
||||
select {
|
||||
case <-f.deadline.C:
|
||||
f.s.Unsubscribe()
|
||||
toUninstall = append(toUninstall, f.s)
|
||||
delete(api.filters, id)
|
||||
default:
|
||||
continue
|
||||
}
|
||||
}
|
||||
api.filtersMu.Unlock()
|
||||
|
||||
// Unsubscribes are processed outside the lock to avoid the following scenario:
|
||||
// event loop attempts broadcasting events to still active filters while
|
||||
// Unsubscribe is waiting for it to process the uninstall request.
|
||||
for _, s := range toUninstall {
|
||||
s.Unsubscribe()
|
||||
}
|
||||
toUninstall = nil
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -115,7 +122,7 @@ func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
|
|||
)
|
||||
|
||||
api.filtersMu.Lock()
|
||||
api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: pendingTxSub}
|
||||
api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(api.timeout), hashes: make([]common.Hash, 0), s: pendingTxSub}
|
||||
api.filtersMu.Unlock()
|
||||
|
||||
go func() {
|
||||
|
|
@ -185,7 +192,7 @@ func (api *PublicFilterAPI) NewBlockFilter() rpc.ID {
|
|||
)
|
||||
|
||||
api.filtersMu.Lock()
|
||||
api.filters[headerSub.ID] = &filter{typ: BlocksSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: headerSub}
|
||||
api.filters[headerSub.ID] = &filter{typ: BlocksSubscription, deadline: time.NewTimer(api.timeout), hashes: make([]common.Hash, 0), s: headerSub}
|
||||
api.filtersMu.Unlock()
|
||||
|
||||
go func() {
|
||||
|
|
@ -302,7 +309,7 @@ func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) {
|
|||
}
|
||||
|
||||
api.filtersMu.Lock()
|
||||
api.filters[logsSub.ID] = &filter{typ: LogsSubscription, crit: crit, deadline: time.NewTimer(deadline), logs: make([]*types.Log, 0), s: logsSub}
|
||||
api.filters[logsSub.ID] = &filter{typ: LogsSubscription, crit: crit, deadline: time.NewTimer(api.timeout), logs: make([]*types.Log, 0), s: logsSub}
|
||||
api.filtersMu.Unlock()
|
||||
|
||||
go func() {
|
||||
|
|
@ -431,7 +438,7 @@ func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
|
|||
// receive timer value and reset timer
|
||||
<-f.deadline.C
|
||||
}
|
||||
f.deadline.Reset(deadline)
|
||||
f.deadline.Reset(api.timeout)
|
||||
|
||||
switch f.typ {
|
||||
case PendingTransactionsSubscription, BlocksSubscription:
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ import (
|
|||
"math/big"
|
||||
"math/rand"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
|
@ -38,6 +39,10 @@ import (
|
|||
"github.com/XinFinOrg/XDPoSChain/rpc"
|
||||
)
|
||||
|
||||
var (
|
||||
deadline = 5 * time.Minute
|
||||
)
|
||||
|
||||
type testBackend struct {
|
||||
mux *event.TypeMux
|
||||
db ethdb.Database
|
||||
|
|
@ -149,7 +154,7 @@ func TestBlockSubscription(t *testing.T) {
|
|||
var (
|
||||
db = rawdb.NewMemoryDatabase()
|
||||
backend = &testBackend{db: db}
|
||||
api = NewPublicFilterAPI(backend, false)
|
||||
api = NewPublicFilterAPI(backend, false, deadline)
|
||||
genesis = new(core.Genesis).MustCommit(db)
|
||||
chain, _ = core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 10, func(i int, gen *core.BlockGen) {})
|
||||
chainEvents = []core.ChainEvent{}
|
||||
|
|
@ -201,7 +206,7 @@ func TestPendingTxFilter(t *testing.T) {
|
|||
var (
|
||||
db = rawdb.NewMemoryDatabase()
|
||||
backend = &testBackend{db: db}
|
||||
api = NewPublicFilterAPI(backend, false)
|
||||
api = NewPublicFilterAPI(backend, false, deadline)
|
||||
|
||||
transactions = []*types.Transaction{
|
||||
types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
|
||||
|
|
@ -256,7 +261,7 @@ func TestLogFilterCreation(t *testing.T) {
|
|||
var (
|
||||
db = rawdb.NewMemoryDatabase()
|
||||
backend = &testBackend{db: db}
|
||||
api = NewPublicFilterAPI(backend, false)
|
||||
api = NewPublicFilterAPI(backend, false, deadline)
|
||||
|
||||
testCases = []struct {
|
||||
crit FilterCriteria
|
||||
|
|
@ -300,7 +305,7 @@ func TestInvalidLogFilterCreation(t *testing.T) {
|
|||
var (
|
||||
db = rawdb.NewMemoryDatabase()
|
||||
backend = &testBackend{db: db}
|
||||
api = NewPublicFilterAPI(backend, false)
|
||||
api = NewPublicFilterAPI(backend, false, deadline)
|
||||
)
|
||||
|
||||
// different situations where log filter creation should fail.
|
||||
|
|
@ -322,7 +327,7 @@ func TestInvalidGetLogsRequest(t *testing.T) {
|
|||
var (
|
||||
db = rawdb.NewMemoryDatabase()
|
||||
backend = &testBackend{db: db}
|
||||
api = NewPublicFilterAPI(backend, false)
|
||||
api = NewPublicFilterAPI(backend, false, deadline)
|
||||
blockHash = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111")
|
||||
)
|
||||
|
||||
|
|
@ -347,7 +352,7 @@ func TestLogFilter(t *testing.T) {
|
|||
var (
|
||||
db = rawdb.NewMemoryDatabase()
|
||||
backend = &testBackend{db: db}
|
||||
api = NewPublicFilterAPI(backend, false)
|
||||
api = NewPublicFilterAPI(backend, false, deadline)
|
||||
|
||||
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
|
||||
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
|
||||
|
|
@ -461,7 +466,7 @@ func TestPendingLogsSubscription(t *testing.T) {
|
|||
var (
|
||||
db = rawdb.NewMemoryDatabase()
|
||||
backend = &testBackend{db: db}
|
||||
api = NewPublicFilterAPI(backend, false)
|
||||
api = NewPublicFilterAPI(backend, false, deadline)
|
||||
|
||||
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
|
||||
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
|
||||
|
|
@ -587,6 +592,73 @@ func TestPendingLogsSubscription(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// TestPendingTxFilterDeadlock tests if the event loop hangs when pending
|
||||
// txes arrive at the same time that one of multiple filters is timing out.
|
||||
// Please refer to #22131 for more details.
|
||||
func TestPendingTxFilterDeadlock(t *testing.T) {
|
||||
t.Parallel()
|
||||
timeout := 100 * time.Millisecond
|
||||
|
||||
var (
|
||||
db = rawdb.NewMemoryDatabase()
|
||||
backend = &testBackend{db: db}
|
||||
api = NewFilterAPI(backend, false, timeout)
|
||||
done = make(chan struct{})
|
||||
)
|
||||
|
||||
go func() {
|
||||
// Bombard feed with txes until signal was received to stop
|
||||
i := uint64(0)
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
tx := types.NewTransaction(i, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil)
|
||||
backend.txFeed.Send(core.NewTxsEvent{Txs: []*types.Transaction{tx}})
|
||||
i++
|
||||
}
|
||||
}()
|
||||
|
||||
// Create a bunch of filters that will
|
||||
// timeout either in 100ms or 200ms
|
||||
fids := make([]rpc.ID, 20)
|
||||
for i := 0; i < len(fids); i++ {
|
||||
fid := api.NewPendingTransactionFilter()
|
||||
fids[i] = fid
|
||||
// Wait for at least one tx to arrive in filter
|
||||
for {
|
||||
hashes, err := api.GetFilterChanges(fid)
|
||||
if err != nil {
|
||||
t.Fatalf("Filter should exist: %v\n", err)
|
||||
}
|
||||
if len(hashes.([]common.Hash)) > 0 {
|
||||
break
|
||||
}
|
||||
runtime.Gosched()
|
||||
}
|
||||
}
|
||||
|
||||
// Wait until filters have timed out
|
||||
time.Sleep(3 * timeout)
|
||||
|
||||
// If tx loop doesn't consume `done` after a second
|
||||
// it's hanging.
|
||||
select {
|
||||
case done <- struct{}{}:
|
||||
// Check that all filters have been uninstalled
|
||||
for _, fid := range fids {
|
||||
if _, err := api.GetFilterChanges(fid); err == nil {
|
||||
t.Errorf("Filter %s should have been uninstalled\n", fid)
|
||||
}
|
||||
}
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Error("Tx sending loop hangs")
|
||||
}
|
||||
}
|
||||
|
||||
func flattenLogs(pl [][]*types.Log) []*types.Log {
|
||||
var logs []*types.Log
|
||||
for _, l := range pl {
|
||||
|
|
|
|||
|
|
@ -190,7 +190,7 @@ func (s *LightEthereum) APIs() []rpc.API {
|
|||
}, {
|
||||
Namespace: "eth",
|
||||
Version: "1.0",
|
||||
Service: filters.NewPublicFilterAPI(s.ApiBackend, true),
|
||||
Service: filters.NewPublicFilterAPI(s.ApiBackend, true, 5*time.Minute),
|
||||
Public: true,
|
||||
}, {
|
||||
Namespace: "net",
|
||||
|
|
|
|||
Loading…
Reference in a new issue