mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-06-19 13:21:37 +00:00
parent
6fc437394c
commit
59e9c72d94
4 changed files with 87 additions and 87 deletions
|
|
@ -90,70 +90,70 @@ func newTopicTable(db *nodeDB, self *Node) *topicTable {
|
|||
}
|
||||
}
|
||||
|
||||
func (topictab *topicTable) getOrNewTopic(topic Topic) *topicInfo {
|
||||
ti := topictab.topics[topic]
|
||||
func (t *topicTable) getOrNewTopic(topic Topic) *topicInfo {
|
||||
ti := t.topics[topic]
|
||||
if ti == nil {
|
||||
rqItem := &topicRequestQueueItem{
|
||||
topic: topic,
|
||||
priority: topictab.requestCnt,
|
||||
priority: t.requestCnt,
|
||||
}
|
||||
ti = &topicInfo{
|
||||
entries: make(map[uint64]*topicEntry),
|
||||
rqItem: rqItem,
|
||||
}
|
||||
topictab.topics[topic] = ti
|
||||
heap.Push(&topictab.requested, rqItem)
|
||||
t.topics[topic] = ti
|
||||
heap.Push(&t.requested, rqItem)
|
||||
}
|
||||
return ti
|
||||
}
|
||||
|
||||
func (topictab *topicTable) checkDeleteTopic(topic Topic) {
|
||||
ti := topictab.topics[topic]
|
||||
func (t *topicTable) checkDeleteTopic(topic Topic) {
|
||||
ti := t.topics[topic]
|
||||
if ti == nil {
|
||||
return
|
||||
}
|
||||
if len(ti.entries) == 0 && ti.wcl.hasMinimumWaitPeriod() {
|
||||
delete(topictab.topics, topic)
|
||||
heap.Remove(&topictab.requested, ti.rqItem.index)
|
||||
delete(t.topics, topic)
|
||||
heap.Remove(&t.requested, ti.rqItem.index)
|
||||
}
|
||||
}
|
||||
|
||||
func (topictab *topicTable) getOrNewNode(node *Node) *nodeInfo {
|
||||
n := topictab.nodes[node]
|
||||
func (t *topicTable) getOrNewNode(node *Node) *nodeInfo {
|
||||
n := t.nodes[node]
|
||||
if n == nil {
|
||||
//fmt.Printf("newNode %016x %016x\n", t.self.sha[:8], node.sha[:8])
|
||||
var issued, used uint32
|
||||
if topictab.db != nil {
|
||||
issued, used = topictab.db.fetchTopicRegTickets(node.ID)
|
||||
if t.db != nil {
|
||||
issued, used = t.db.fetchTopicRegTickets(node.ID)
|
||||
}
|
||||
n = &nodeInfo{
|
||||
entries: make(map[Topic]*topicEntry),
|
||||
lastIssuedTicket: issued,
|
||||
lastUsedTicket: used,
|
||||
}
|
||||
topictab.nodes[node] = n
|
||||
t.nodes[node] = n
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
func (topictab *topicTable) checkDeleteNode(node *Node) {
|
||||
if n, ok := topictab.nodes[node]; ok && len(n.entries) == 0 && n.noRegUntil < mclock.Now() {
|
||||
func (t *topicTable) checkDeleteNode(node *Node) {
|
||||
if n, ok := t.nodes[node]; ok && len(n.entries) == 0 && n.noRegUntil < mclock.Now() {
|
||||
//fmt.Printf("deleteNode %016x %016x\n", t.self.sha[:8], node.sha[:8])
|
||||
delete(topictab.nodes, node)
|
||||
delete(t.nodes, node)
|
||||
}
|
||||
}
|
||||
|
||||
func (topictab *topicTable) storeTicketCounters(node *Node) {
|
||||
n := topictab.getOrNewNode(node)
|
||||
if topictab.db != nil {
|
||||
topictab.db.updateTopicRegTickets(node.ID, n.lastIssuedTicket, n.lastUsedTicket)
|
||||
func (t *topicTable) storeTicketCounters(node *Node) {
|
||||
n := t.getOrNewNode(node)
|
||||
if t.db != nil {
|
||||
t.db.updateTopicRegTickets(node.ID, n.lastIssuedTicket, n.lastUsedTicket)
|
||||
}
|
||||
}
|
||||
|
||||
func (topictab *topicTable) getEntries(topic Topic) []*Node {
|
||||
topictab.collectGarbage()
|
||||
func (t *topicTable) getEntries(topic Topic) []*Node {
|
||||
t.collectGarbage()
|
||||
|
||||
te := topictab.topics[topic]
|
||||
te := t.topics[topic]
|
||||
if te == nil {
|
||||
return nil
|
||||
}
|
||||
|
|
@ -163,29 +163,29 @@ func (topictab *topicTable) getEntries(topic Topic) []*Node {
|
|||
nodes[i] = e.node
|
||||
i++
|
||||
}
|
||||
topictab.requestCnt++
|
||||
topictab.requested.update(te.rqItem, topictab.requestCnt)
|
||||
t.requestCnt++
|
||||
t.requested.update(te.rqItem, t.requestCnt)
|
||||
return nodes
|
||||
}
|
||||
|
||||
func (topictab *topicTable) addEntry(node *Node, topic Topic) {
|
||||
n := topictab.getOrNewNode(node)
|
||||
func (t *topicTable) addEntry(node *Node, topic Topic) {
|
||||
n := t.getOrNewNode(node)
|
||||
// clear previous entries by the same node
|
||||
for _, e := range n.entries {
|
||||
topictab.deleteEntry(e)
|
||||
t.deleteEntry(e)
|
||||
}
|
||||
// ***
|
||||
n = topictab.getOrNewNode(node)
|
||||
n = t.getOrNewNode(node)
|
||||
|
||||
tm := mclock.Now()
|
||||
te := topictab.getOrNewTopic(topic)
|
||||
te := t.getOrNewTopic(topic)
|
||||
|
||||
if len(te.entries) == maxEntriesPerTopic {
|
||||
topictab.deleteEntry(te.getFifoTail())
|
||||
t.deleteEntry(te.getFifoTail())
|
||||
}
|
||||
|
||||
if topictab.globalEntries == maxEntries {
|
||||
topictab.deleteEntry(topictab.leastRequested()) // not empty, no need to check for nil
|
||||
if t.globalEntries == maxEntries {
|
||||
t.deleteEntry(t.leastRequested()) // not empty, no need to check for nil
|
||||
}
|
||||
|
||||
fifoIdx := te.fifoHead
|
||||
|
|
@ -197,50 +197,50 @@ func (topictab *topicTable) addEntry(node *Node, topic Topic) {
|
|||
expire: tm.Add(fallbackRegistrationExpiry),
|
||||
}
|
||||
if printTestImgLogs {
|
||||
fmt.Printf("*+ %d %v %016x %016x\n", tm/1000000, topic, topictab.self.sha[:8], node.sha[:8])
|
||||
fmt.Printf("*+ %d %v %016x %016x\n", tm/1000000, topic, t.self.sha[:8], node.sha[:8])
|
||||
}
|
||||
te.entries[fifoIdx] = entry
|
||||
n.entries[topic] = entry
|
||||
topictab.globalEntries++
|
||||
t.globalEntries++
|
||||
te.wcl.registered(tm)
|
||||
}
|
||||
|
||||
// removes least requested element from the fifo
|
||||
func (topictab *topicTable) leastRequested() *topicEntry {
|
||||
for topictab.requested.Len() > 0 && topictab.topics[topictab.requested[0].topic] == nil {
|
||||
heap.Pop(&topictab.requested)
|
||||
func (t *topicTable) leastRequested() *topicEntry {
|
||||
for t.requested.Len() > 0 && t.topics[t.requested[0].topic] == nil {
|
||||
heap.Pop(&t.requested)
|
||||
}
|
||||
if topictab.requested.Len() == 0 {
|
||||
if t.requested.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
return topictab.topics[topictab.requested[0].topic].getFifoTail()
|
||||
return t.topics[t.requested[0].topic].getFifoTail()
|
||||
}
|
||||
|
||||
// entry should exist
|
||||
func (topictab *topicTable) deleteEntry(e *topicEntry) {
|
||||
func (t *topicTable) deleteEntry(e *topicEntry) {
|
||||
if printTestImgLogs {
|
||||
fmt.Printf("*- %d %v %016x %016x\n", mclock.Now()/1000000, e.topic, topictab.self.sha[:8], e.node.sha[:8])
|
||||
fmt.Printf("*- %d %v %016x %016x\n", mclock.Now()/1000000, e.topic, t.self.sha[:8], e.node.sha[:8])
|
||||
}
|
||||
ne := topictab.nodes[e.node].entries
|
||||
ne := t.nodes[e.node].entries
|
||||
delete(ne, e.topic)
|
||||
if len(ne) == 0 {
|
||||
topictab.checkDeleteNode(e.node)
|
||||
t.checkDeleteNode(e.node)
|
||||
}
|
||||
te := topictab.topics[e.topic]
|
||||
te := t.topics[e.topic]
|
||||
delete(te.entries, e.fifoIdx)
|
||||
if len(te.entries) == 0 {
|
||||
topictab.checkDeleteTopic(e.topic)
|
||||
t.checkDeleteTopic(e.topic)
|
||||
}
|
||||
topictab.globalEntries--
|
||||
t.globalEntries--
|
||||
}
|
||||
|
||||
// It is assumed that topics and waitPeriods have the same length.
|
||||
func (topictab *topicTable) useTicket(node *Node, serialNo uint32, topics []Topic, idx int, issueTime uint64, waitPeriods []uint32) (registered bool) {
|
||||
func (t *topicTable) useTicket(node *Node, serialNo uint32, topics []Topic, idx int, issueTime uint64, waitPeriods []uint32) (registered bool) {
|
||||
log.Trace("Using discovery ticket", "serial", serialNo, "topics", topics, "waits", waitPeriods)
|
||||
//fmt.Println("useTicket", serialNo, topics, waitPeriods)
|
||||
topictab.collectGarbage()
|
||||
t.collectGarbage()
|
||||
|
||||
n := topictab.getOrNewNode(node)
|
||||
n := t.getOrNewNode(node)
|
||||
if serialNo < n.lastUsedTicket {
|
||||
return false
|
||||
}
|
||||
|
|
@ -252,7 +252,7 @@ func (topictab *topicTable) useTicket(node *Node, serialNo uint32, topics []Topi
|
|||
if serialNo != n.lastUsedTicket {
|
||||
n.lastUsedTicket = serialNo
|
||||
n.noRegUntil = tm.Add(noRegTimeout())
|
||||
topictab.storeTicketCounters(node)
|
||||
t.storeTicketCounters(node)
|
||||
}
|
||||
|
||||
currTime := uint64(tm / mclock.AbsTime(time.Second))
|
||||
|
|
@ -260,7 +260,7 @@ func (topictab *topicTable) useTicket(node *Node, serialNo uint32, topics []Topi
|
|||
relTime := int64(currTime - regTime)
|
||||
if relTime >= -1 && relTime <= regTimeWindow+1 { // give clients a little security margin on both ends
|
||||
if e := n.entries[topics[idx]]; e == nil {
|
||||
topictab.addEntry(node, topics[idx])
|
||||
t.addEntry(node, topics[idx])
|
||||
} else {
|
||||
// if there is an active entry, don't move to the front of the FIFO but prolong expire time
|
||||
e.expire = tm.Add(fallbackRegistrationExpiry)
|
||||
|
|
@ -271,15 +271,15 @@ func (topictab *topicTable) useTicket(node *Node, serialNo uint32, topics []Topi
|
|||
return false
|
||||
}
|
||||
|
||||
func (topictab *topicTable) getTicket(node *Node, topics []Topic) *ticket {
|
||||
topictab.collectGarbage()
|
||||
func (t *topicTable) getTicket(node *Node, topics []Topic) *ticket {
|
||||
t.collectGarbage()
|
||||
|
||||
now := mclock.Now()
|
||||
n := topictab.getOrNewNode(node)
|
||||
n := t.getOrNewNode(node)
|
||||
n.lastIssuedTicket++
|
||||
topictab.storeTicketCounters(node)
|
||||
t.storeTicketCounters(node)
|
||||
|
||||
t := &ticket{
|
||||
tic := &ticket{
|
||||
issueTime: now,
|
||||
topics: topics,
|
||||
serial: n.lastIssuedTicket,
|
||||
|
|
@ -287,38 +287,38 @@ func (topictab *topicTable) getTicket(node *Node, topics []Topic) *ticket {
|
|||
}
|
||||
for i, topic := range topics {
|
||||
var waitPeriod time.Duration
|
||||
if topic := topictab.topics[topic]; topic != nil {
|
||||
if topic := t.topics[topic]; topic != nil {
|
||||
waitPeriod = topic.wcl.waitPeriod
|
||||
} else {
|
||||
waitPeriod = minWaitPeriod
|
||||
}
|
||||
|
||||
t.regTime[i] = now.Add(waitPeriod)
|
||||
tic.regTime[i] = now.Add(waitPeriod)
|
||||
}
|
||||
return t
|
||||
return tic
|
||||
}
|
||||
|
||||
const gcInterval = time.Minute
|
||||
|
||||
func (topictab *topicTable) collectGarbage() {
|
||||
func (t *topicTable) collectGarbage() {
|
||||
tm := mclock.Now()
|
||||
if time.Duration(tm-topictab.lastGarbageCollection) < gcInterval {
|
||||
if time.Duration(tm-t.lastGarbageCollection) < gcInterval {
|
||||
return
|
||||
}
|
||||
topictab.lastGarbageCollection = tm
|
||||
t.lastGarbageCollection = tm
|
||||
|
||||
for node, n := range topictab.nodes {
|
||||
for node, n := range t.nodes {
|
||||
for _, e := range n.entries {
|
||||
if e.expire <= tm {
|
||||
topictab.deleteEntry(e)
|
||||
t.deleteEntry(e)
|
||||
}
|
||||
}
|
||||
|
||||
topictab.checkDeleteNode(node)
|
||||
t.checkDeleteNode(node)
|
||||
}
|
||||
|
||||
for topic := range topictab.topics {
|
||||
topictab.checkDeleteTopic(topic)
|
||||
for topic := range t.topics {
|
||||
t.checkDeleteTopic(topic)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -272,15 +272,15 @@ func newMsgEventer(rw MsgReadWriter, feed *event.Feed, peerID discover.NodeID, p
|
|||
|
||||
// ReadMsg reads a message from the underlying MsgReadWriter and emits a
|
||||
// "message received" event
|
||||
func (e *msgEventer) ReadMsg() (Msg, error) {
|
||||
msg, err := e.MsgReadWriter.ReadMsg()
|
||||
func (ev *msgEventer) ReadMsg() (Msg, error) {
|
||||
msg, err := ev.MsgReadWriter.ReadMsg()
|
||||
if err != nil {
|
||||
return msg, err
|
||||
}
|
||||
e.feed.Send(&PeerEvent{
|
||||
ev.feed.Send(&PeerEvent{
|
||||
Type: PeerEventTypeMsgRecv,
|
||||
Peer: e.peerID,
|
||||
Protocol: e.Protocol,
|
||||
Peer: ev.peerID,
|
||||
Protocol: ev.Protocol,
|
||||
MsgCode: &msg.Code,
|
||||
MsgSize: &msg.Size,
|
||||
})
|
||||
|
|
@ -289,15 +289,15 @@ func (e *msgEventer) ReadMsg() (Msg, error) {
|
|||
|
||||
// WriteMsg writes a message to the underlying MsgReadWriter and emits a
|
||||
// "message sent" event
|
||||
func (e *msgEventer) WriteMsg(msg Msg) error {
|
||||
err := e.MsgReadWriter.WriteMsg(msg)
|
||||
func (ev *msgEventer) WriteMsg(msg Msg) error {
|
||||
err := ev.MsgReadWriter.WriteMsg(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e.feed.Send(&PeerEvent{
|
||||
ev.feed.Send(&PeerEvent{
|
||||
Type: PeerEventTypeMsgSend,
|
||||
Peer: e.peerID,
|
||||
Protocol: e.Protocol,
|
||||
Peer: ev.peerID,
|
||||
Protocol: ev.Protocol,
|
||||
MsgCode: &msg.Code,
|
||||
MsgSize: &msg.Size,
|
||||
})
|
||||
|
|
@ -306,8 +306,8 @@ func (e *msgEventer) WriteMsg(msg Msg) error {
|
|||
|
||||
// Close closes the underlying MsgReadWriter if it implements the io.Closer
|
||||
// interface
|
||||
func (e *msgEventer) Close() error {
|
||||
if v, ok := e.MsgReadWriter.(io.Closer); ok {
|
||||
func (ev *msgEventer) Close() error {
|
||||
if v, ok := ev.MsgReadWriter.(io.Closer); ok {
|
||||
return v.Close()
|
||||
}
|
||||
return nil
|
||||
|
|
|
|||
|
|
@ -48,8 +48,8 @@ func newPeerError(code int, format string, v ...interface{}) *peerError {
|
|||
return err
|
||||
}
|
||||
|
||||
func (e *peerError) Error() string {
|
||||
return e.message
|
||||
func (pe *peerError) Error() string {
|
||||
return pe.message
|
||||
}
|
||||
|
||||
var errProtocolReturned = errors.New("protocol returned")
|
||||
|
|
|
|||
|
|
@ -20,12 +20,12 @@ type SimStateStore struct {
|
|||
m map[string][]byte
|
||||
}
|
||||
|
||||
func (sss *SimStateStore) Load(s string) ([]byte, error) {
|
||||
return sss.m[s], nil
|
||||
func (st *SimStateStore) Load(s string) ([]byte, error) {
|
||||
return st.m[s], nil
|
||||
}
|
||||
|
||||
func (sss *SimStateStore) Save(s string, data []byte) error {
|
||||
sss.m[s] = data
|
||||
func (st *SimStateStore) Save(s string, data []byte) error {
|
||||
st.m[s] = data
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue