From d73bfeb3d91753f6692092650770a8ed79f8e270 Mon Sep 17 00:00:00 2001 From: Kyrin Date: Tue, 21 Oct 2025 15:41:38 +0800 Subject: [PATCH] core/txpool: Initialize journal writer for tx tracker (#32921) Previously, the journal writer is nil until the first time rejournal (default 1h), which means during this period, txs submitted to this node are not written into journal file (transactions.rlp). If this node is shutdown before the first time rejournal, then txs in pending or queue will get lost. Here, this PR initializes the journal writer soon after launch to solve this issue. --------- Co-authored-by: Gary Rong --- core/txpool/locals/journal.go | 20 +++++++++- core/txpool/locals/tx_tracker.go | 38 +++++++++++------- core/txpool/locals/tx_tracker_test.go | 55 ++++++++++++++++++++++++--- 3 files changed, 92 insertions(+), 21 deletions(-) diff --git a/core/txpool/locals/journal.go b/core/txpool/locals/journal.go index 46fd6de346..cd2be8a794 100644 --- a/core/txpool/locals/journal.go +++ b/core/txpool/locals/journal.go @@ -117,6 +117,25 @@ func (journal *journal) load(add func([]*types.Transaction) []error) error { return failure } +func (journal *journal) setupWriter() error { + if journal.writer != nil { + if err := journal.writer.Close(); err != nil { + return err + } + journal.writer = nil + } + + // Re-open the journal file for appending + // Use O_APPEND to ensure we always write to the end of the file + sink, err := os.OpenFile(journal.path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) + if err != nil { + return err + } + journal.writer = sink + + return nil +} + // insert adds the specified transaction to the local disk journal. func (journal *journal) insert(tx *types.Transaction) error { if journal.writer == nil { @@ -177,7 +196,6 @@ func (journal *journal) rotate(all map[common.Address]types.Transactions) error // close flushes the transaction journal contents to disk and closes the file. func (journal *journal) close() error { var err error - if journal.writer != nil { err = journal.writer.Close() journal.writer = nil diff --git a/core/txpool/locals/tx_tracker.go b/core/txpool/locals/tx_tracker.go index e08384ce71..bb178f175e 100644 --- a/core/txpool/locals/tx_tracker.go +++ b/core/txpool/locals/tx_tracker.go @@ -114,13 +114,14 @@ func (tracker *TxTracker) TrackAll(txs []*types.Transaction) { } // recheck checks and returns any transactions that needs to be resubmitted. -func (tracker *TxTracker) recheck(journalCheck bool) (resubmits []*types.Transaction, rejournal map[common.Address]types.Transactions) { +func (tracker *TxTracker) recheck(journalCheck bool) []*types.Transaction { tracker.mu.Lock() defer tracker.mu.Unlock() var ( numStales = 0 numOk = 0 + resubmits []*types.Transaction ) for sender, txs := range tracker.byAddr { // Wipe the stales @@ -141,7 +142,7 @@ func (tracker *TxTracker) recheck(journalCheck bool) (resubmits []*types.Transac } if journalCheck { // rejournal - rejournal = make(map[common.Address]types.Transactions) + rejournal := make(map[common.Address]types.Transactions) for _, tx := range tracker.all { addr, _ := types.Sender(tracker.signer, tx) rejournal[addr] = append(rejournal[addr], tx) @@ -153,10 +154,18 @@ func (tracker *TxTracker) recheck(journalCheck bool) (resubmits []*types.Transac return int(a.Nonce() - b.Nonce()) }) } + // Rejournal the tracker while holding the lock. No new transactions will + // be added to the old journal during this period, preventing any potential + // transaction loss. + if tracker.journal != nil { + if err := tracker.journal.rotate(rejournal); err != nil { + log.Warn("Transaction journal rotation failed", "err", err) + } + } } localGauge.Update(int64(len(tracker.all))) log.Debug("Tx tracker status", "need-resubmit", len(resubmits), "stale", numStales, "ok", numOk) - return resubmits, rejournal + return resubmits } // Start implements node.Lifecycle interface @@ -185,6 +194,12 @@ func (tracker *TxTracker) loop() { tracker.TrackAll(transactions) return nil }) + + // Setup the writer for the upcoming transactions + if err := tracker.journal.setupWriter(); err != nil { + log.Error("Failed to setup the journal writer", "err", err) + return + } defer tracker.journal.close() } var ( @@ -196,20 +211,15 @@ func (tracker *TxTracker) loop() { case <-tracker.shutdownCh: return case <-timer.C: - checkJournal := tracker.journal != nil && time.Since(lastJournal) > tracker.rejournal - resubmits, rejournal := tracker.recheck(checkJournal) + var rejournal bool + if tracker.journal != nil && time.Since(lastJournal) > tracker.rejournal { + rejournal, lastJournal = true, time.Now() + log.Debug("Rejournal the transaction tracker") + } + resubmits := tracker.recheck(rejournal) if len(resubmits) > 0 { tracker.pool.Add(resubmits, false) } - if checkJournal { - // Lock to prevent journal.rotate <-> journal.insert (via TrackAll) conflicts - tracker.mu.Lock() - lastJournal = time.Now() - if err := tracker.journal.rotate(rejournal); err != nil { - log.Warn("Transaction journal rotation failed", "err", err) - } - tracker.mu.Unlock() - } timer.Reset(recheckInterval) } } diff --git a/core/txpool/locals/tx_tracker_test.go b/core/txpool/locals/tx_tracker_test.go index 367fb6b6da..dde8754605 100644 --- a/core/txpool/locals/tx_tracker_test.go +++ b/core/txpool/locals/tx_tracker_test.go @@ -17,7 +17,11 @@ package locals import ( + "fmt" + "maps" "math/big" + "math/rand" + "path/filepath" "testing" "time" @@ -146,20 +150,59 @@ func TestResubmit(t *testing.T) { txsA := txs[:len(txs)/2] txsB := txs[len(txs)/2:] env.pool.Add(txsA, true) + pending, queued := env.pool.ContentFrom(address) if len(pending) != len(txsA) || len(queued) != 0 { t.Fatalf("Unexpected txpool content: %d, %d", len(pending), len(queued)) } env.tracker.TrackAll(txs) - resubmit, all := env.tracker.recheck(true) + resubmit := env.tracker.recheck(true) if len(resubmit) != len(txsB) { t.Fatalf("Unexpected transactions to resubmit, got: %d, want: %d", len(resubmit), len(txsB)) } - if len(all) == 0 || len(all[address]) == 0 { - t.Fatalf("Unexpected transactions being tracked, got: %d, want: %d", 0, len(txs)) - } - if len(all[address]) != len(txs) { - t.Fatalf("Unexpected transactions being tracked, got: %d, want: %d", len(all[address]), len(txs)) + env.tracker.mu.Lock() + allCopy := maps.Clone(env.tracker.all) + env.tracker.mu.Unlock() + + if len(allCopy) != len(txs) { + t.Fatalf("Unexpected transactions being tracked, got: %d, want: %d", len(allCopy), len(txs)) + } +} + +func TestJournal(t *testing.T) { + journalPath := filepath.Join(t.TempDir(), fmt.Sprintf("%d", rand.Int63())) + env := newTestEnv(t, 10, 0, journalPath) + defer env.close() + + env.tracker.Start() + defer env.tracker.Stop() + + txs := env.makeTxs(10) + txsA := txs[:len(txs)/2] + txsB := txs[len(txs)/2:] + env.pool.Add(txsA, true) + + pending, queued := env.pool.ContentFrom(address) + if len(pending) != len(txsA) || len(queued) != 0 { + t.Fatalf("Unexpected txpool content: %d, %d", len(pending), len(queued)) + } + env.tracker.TrackAll(txsA) + env.tracker.TrackAll(txsB) + env.tracker.recheck(true) // manually rejournal the tracker + + // Make sure all the transactions are properly journalled + trackerB := New(journalPath, time.Minute, gspec.Config, env.pool) + trackerB.journal.load(func(transactions []*types.Transaction) []error { + trackerB.TrackAll(transactions) + return nil + }) + + trackerB.mu.Lock() + allCopy := maps.Clone(trackerB.all) + trackerB.mu.Unlock() + + if len(allCopy) != len(txs) { + t.Fatalf("Unexpected transactions being tracked, got: %d, want: %d", len(allCopy), len(txs)) } }