mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-06-19 21:31:37 +00:00
core: improve shutdown synchronization in BlockChain (#22853)
This commit is contained in:
parent
2d5dc550d0
commit
2baadc6e87
3 changed files with 193 additions and 62 deletions
|
|
@ -48,6 +48,7 @@ import (
|
|||
"github.com/XinFinOrg/XDPoSChain/ethclient"
|
||||
"github.com/XinFinOrg/XDPoSChain/ethdb"
|
||||
"github.com/XinFinOrg/XDPoSChain/event"
|
||||
"github.com/XinFinOrg/XDPoSChain/internal/syncx"
|
||||
"github.com/XinFinOrg/XDPoSChain/log"
|
||||
"github.com/XinFinOrg/XDPoSChain/metrics"
|
||||
"github.com/XinFinOrg/XDPoSChain/params"
|
||||
|
|
@ -81,6 +82,9 @@ var (
|
|||
blockReorgAddMeter = metrics.NewRegisteredMeter("chain/reorg/add", nil)
|
||||
blockReorgDropMeter = metrics.NewRegisteredMeter("chain/reorg/drop", nil)
|
||||
|
||||
errInsertionInterrupted = errors.New("insertion is interrupted")
|
||||
errChainStopped = errors.New("blockchain is stopped")
|
||||
|
||||
CheckpointCh = make(chan int)
|
||||
)
|
||||
|
||||
|
|
@ -149,8 +153,11 @@ type BlockChain struct {
|
|||
scope event.SubscriptionScope
|
||||
genesisBlock *types.Block
|
||||
|
||||
chainmu sync.RWMutex // blockchain insertion lock
|
||||
procmu sync.RWMutex // block processor lock
|
||||
// This mutex synchronizes chain write operations.
|
||||
// Readers don't need to take it, they can just read the database.
|
||||
chainmu *syncx.ClosableMutex
|
||||
|
||||
procmu sync.RWMutex // block processor lock
|
||||
|
||||
currentBlock atomic.Value // Current head of the block chain
|
||||
currentFastBlock atomic.Value // Current head of the fast-sync chain (may be above the block chain!)
|
||||
|
|
@ -169,10 +176,10 @@ type BlockChain struct {
|
|||
// future blocks are blocks added for later processing
|
||||
futureBlocks *lru.Cache[common.Hash, *types.Block]
|
||||
|
||||
wg sync.WaitGroup // chain processing wait group for shutting down
|
||||
quit chan struct{} // shutdown signal, closed in Stop.
|
||||
running int32 // 0 if chain is running, 1 when stopped
|
||||
procInterrupt int32 // interrupt signaler for block processing
|
||||
wg sync.WaitGroup
|
||||
quit chan struct{} // shutdown signal, closed in Stop.
|
||||
running int32 // 0 if chain is running, 1 when stopped
|
||||
procInterrupt int32 // interrupt signaler for block processing
|
||||
|
||||
engine consensus.Engine
|
||||
processor Processor // block processor interface
|
||||
|
|
@ -211,6 +218,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
|
|||
triegc: prque.New[int64, common.Hash](nil),
|
||||
stateCache: state.NewDatabase(db),
|
||||
quit: make(chan struct{}),
|
||||
chainmu: syncx.NewClosableMutex(),
|
||||
bodyCache: lru.NewCache[common.Hash, *types.Body](bodyCacheLimit),
|
||||
bodyRLPCache: lru.NewCache[common.Hash, rlp.RawValue](bodyCacheLimit),
|
||||
receiptsCache: lru.NewCache[common.Hash, types.Receipts](receiptsCacheLimit),
|
||||
|
|
@ -261,8 +269,11 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
|
|||
}
|
||||
}
|
||||
}
|
||||
// Take ownership of this particular state
|
||||
go bc.update()
|
||||
|
||||
// Start future block processor.
|
||||
bc.wg.Add(1)
|
||||
go bc.futureBlocksLoop()
|
||||
|
||||
return bc, nil
|
||||
}
|
||||
|
||||
|
|
@ -410,7 +421,9 @@ func (bc *BlockChain) loadLastState() error {
|
|||
func (bc *BlockChain) SetHead(head uint64) error {
|
||||
log.Warn("Rewinding blockchain", "target", head)
|
||||
|
||||
bc.chainmu.Lock()
|
||||
if !bc.chainmu.TryLock() {
|
||||
return errChainStopped
|
||||
}
|
||||
defer bc.chainmu.Unlock()
|
||||
|
||||
updateFn := func(db ethdb.KeyValueWriter, header *types.Header) {
|
||||
|
|
@ -499,8 +512,11 @@ func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error {
|
|||
if _, err := trie.NewSecure(block.Root(), bc.stateCache.TrieDB()); err != nil {
|
||||
return err
|
||||
}
|
||||
// If all checks out, manually set the head block
|
||||
bc.chainmu.Lock()
|
||||
|
||||
// If all checks out, manually set the head block.
|
||||
if !bc.chainmu.TryLock() {
|
||||
return errChainStopped
|
||||
}
|
||||
bc.currentBlock.Store(block)
|
||||
headBlockGauge.Update(int64(block.NumberU64()))
|
||||
bc.chainmu.Unlock()
|
||||
|
|
@ -622,7 +638,9 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error {
|
|||
if err := bc.SetHead(0); err != nil {
|
||||
return err
|
||||
}
|
||||
bc.chainmu.Lock()
|
||||
if !bc.chainmu.TryLock() {
|
||||
return errChainStopped
|
||||
}
|
||||
defer bc.chainmu.Unlock()
|
||||
|
||||
// Prepare the genesis block and reinitialise the chain
|
||||
|
|
@ -698,8 +716,10 @@ func (bc *BlockChain) Export(w io.Writer) error {
|
|||
|
||||
// ExportN writes a subset of the active chain to the given writer.
|
||||
func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error {
|
||||
bc.chainmu.RLock()
|
||||
defer bc.chainmu.RUnlock()
|
||||
if !bc.chainmu.TryLock() {
|
||||
return errChainStopped
|
||||
}
|
||||
defer bc.chainmu.Unlock()
|
||||
|
||||
if first > last {
|
||||
return fmt.Errorf("export failed: first (%d) is greater than last (%d)", first, last)
|
||||
|
|
@ -1033,15 +1053,38 @@ func (bc *BlockChain) Stop() {
|
|||
if !atomic.CompareAndSwapInt32(&bc.running, 0, 1) {
|
||||
return
|
||||
}
|
||||
// Unsubscribe all subscriptions registered from blockchain
|
||||
|
||||
// Unsubscribe all subscriptions registered from blockchain.
|
||||
bc.scope.Close()
|
||||
|
||||
// Signal shutdown to all goroutines.
|
||||
close(bc.quit)
|
||||
atomic.StoreInt32(&bc.procInterrupt, 1)
|
||||
bc.StopInsert()
|
||||
|
||||
// Now wait for all chain modifications to end and persistent goroutines to exit.
|
||||
//
|
||||
// Note: Close waits for the mutex to become available, i.e. any running chain
|
||||
// modification will have exited when Close returns. Since we also called StopInsert,
|
||||
// the mutex should become available quickly. It cannot be taken again after Close has
|
||||
// returned.
|
||||
bc.chainmu.Close()
|
||||
bc.wg.Wait()
|
||||
bc.saveData()
|
||||
log.Info("Blockchain manager stopped")
|
||||
}
|
||||
|
||||
// StopInsert interrupts all insertion methods, causing them to return
|
||||
// errInsertionInterrupted as soon as possible. Insertion is permanently disabled after
|
||||
// calling this method.
|
||||
func (bc *BlockChain) StopInsert() {
|
||||
atomic.StoreInt32(&bc.procInterrupt, 1)
|
||||
}
|
||||
|
||||
// insertStopped returns true after StopInsert has been called.
|
||||
func (bc *BlockChain) insertStopped() bool {
|
||||
return atomic.LoadInt32(&bc.procInterrupt) == 1
|
||||
}
|
||||
|
||||
func (bc *BlockChain) procFutureBlocks() {
|
||||
blocks := make([]*types.Block, 0, bc.futureBlocks.Len())
|
||||
for _, hash := range bc.futureBlocks.Keys() {
|
||||
|
|
@ -1085,7 +1128,9 @@ const (
|
|||
// Rollback is designed to remove a chain of links from the database that aren't
|
||||
// certain enough to be valid.
|
||||
func (bc *BlockChain) Rollback(chain []common.Hash) {
|
||||
bc.chainmu.Lock()
|
||||
if !bc.chainmu.TryLock() {
|
||||
return
|
||||
}
|
||||
defer bc.chainmu.Unlock()
|
||||
|
||||
batch := bc.db.NewBatch()
|
||||
|
|
@ -1124,6 +1169,8 @@ func (bc *BlockChain) Rollback(chain []common.Hash) {
|
|||
// InsertReceiptChain attempts to complete an already existing header chain with
|
||||
// transaction and receipt data.
|
||||
func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
|
||||
// We don't require the chainMu here since we want to maximize the
|
||||
// concurrency of header insertion and receipt insertion.
|
||||
bc.wg.Add(1)
|
||||
defer bc.wg.Done()
|
||||
|
||||
|
|
@ -1195,7 +1242,9 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
|
|||
}
|
||||
|
||||
// Update the head fast sync block if better
|
||||
bc.chainmu.Lock()
|
||||
if !bc.chainmu.TryLock() {
|
||||
return 0, errChainStopped
|
||||
}
|
||||
head := blockChain[len(blockChain)-1]
|
||||
if td := bc.GetTd(head.Hash(), head.NumberU64()); td != nil { // Rewind may have occurred, skip in that case
|
||||
currentFastBlock := bc.CurrentFastBlock()
|
||||
|
|
@ -1221,12 +1270,13 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
|
|||
|
||||
var lastWrite uint64
|
||||
|
||||
// WriteBlockWithoutState writes only the block and its metadata to the database,
|
||||
// writeBlockWithoutState writes only the block and its metadata to the database,
|
||||
// but does not write any state. This is used to construct competing side forks
|
||||
// up to the point where they exceed the canonical total difficulty.
|
||||
func (bc *BlockChain) WriteBlockWithoutState(block *types.Block, td *big.Int) (err error) {
|
||||
bc.wg.Add(1)
|
||||
defer bc.wg.Done()
|
||||
func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (err error) {
|
||||
if bc.insertStopped() {
|
||||
return errInsertionInterrupted
|
||||
}
|
||||
|
||||
batch := bc.db.NewBatch()
|
||||
rawdb.WriteTd(batch, block.Hash(), block.NumberU64(), td)
|
||||
|
|
@ -1239,17 +1289,19 @@ func (bc *BlockChain) WriteBlockWithoutState(block *types.Block, td *big.Int) (e
|
|||
|
||||
// WriteBlockWithState writes the block and all associated state to the database.
|
||||
func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB, tradingState *tradingstate.TradingStateDB, lendingState *lendingstate.LendingStateDB) (status WriteStatus, err error) {
|
||||
bc.chainmu.Lock()
|
||||
if !bc.chainmu.TryLock() {
|
||||
return NonStatTy, errInsertionInterrupted
|
||||
}
|
||||
defer bc.chainmu.Unlock()
|
||||
|
||||
return bc.writeBlockWithState(block, receipts, state, tradingState, lendingState)
|
||||
}
|
||||
|
||||
// writeBlockWithState writes the block and all associated state to the database,
|
||||
// but is expects the chain mutex to be held.
|
||||
func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB, tradingState *tradingstate.TradingStateDB, lendingState *lendingstate.LendingStateDB) (status WriteStatus, err error) {
|
||||
bc.wg.Add(1)
|
||||
defer bc.wg.Done()
|
||||
if bc.insertStopped() {
|
||||
return NonStatTy, errInsertionInterrupted
|
||||
}
|
||||
|
||||
// Calculate the total difficulty of the block
|
||||
ptd := bc.GetTd(block.ParentHash(), block.NumberU64()-1)
|
||||
|
|
@ -1470,38 +1522,51 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
|
|||
//
|
||||
// After insertion is done, all accumulated events will be fired.
|
||||
func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
|
||||
// Sanity check that we have something meaningful to import
|
||||
if len(chain) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// Do a sanity check that the provided chain is actually ordered and linked
|
||||
for i := 1; i < len(chain); i++ {
|
||||
block, prev := chain[i], chain[i-1]
|
||||
if block.NumberU64() != chain[i-1].NumberU64()+1 || block.ParentHash() != chain[i-1].Hash() {
|
||||
// Chain broke ancestry, log a messge (programming error) and skip insertion
|
||||
log.Error("Non contiguous block insert",
|
||||
"number", block.Number(),
|
||||
"hash", block.Hash(),
|
||||
"parent", block.ParentHash(),
|
||||
"prevnumber", prev.Number(),
|
||||
"prevhash", prev.Hash())
|
||||
|
||||
return 0, fmt.Errorf("non contiguous insert: item %d is #%d [%x..], item %d is #%d [%x..] (parent [%x..])", i-1, prev.NumberU64(),
|
||||
prev.Hash().Bytes()[:4], i, block.NumberU64(), block.Hash().Bytes()[:4], block.ParentHash().Bytes()[:4])
|
||||
}
|
||||
}
|
||||
|
||||
// Pre-check passed, start the full block imports.
|
||||
if !bc.chainmu.TryLock() {
|
||||
return 0, errChainStopped
|
||||
}
|
||||
defer bc.chainmu.Unlock()
|
||||
n, events, logs, err := bc.insertChain(chain, true)
|
||||
bc.PostChainEvents(events, logs)
|
||||
return n, err
|
||||
}
|
||||
|
||||
// insertChain will execute the actual chain insertion and event aggregation. The
|
||||
// only reason this method exists as a separate one is to make locking cleaner
|
||||
// with deferred statements.
|
||||
// insertChain is the internal implementation of InsertChain, which assumes that
|
||||
// 1) chains are contiguous, and 2) The chain mutex is held.
|
||||
//
|
||||
// This method is split out so that import batches that require re-injecting
|
||||
// historical blocks can do so without releasing the lock, which could lead to
|
||||
// racey behaviour. If a sidechain import is in progress, and the historic state
|
||||
// is imported, but then new canon-head is added before the actual sidechain
|
||||
// completes, then the historic state could be pruned again
|
||||
func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, error) {
|
||||
// Sanity check that we have something meaningful to import
|
||||
if len(chain) == 0 {
|
||||
// If the chain is terminating, don't even bother starting up.
|
||||
if bc.insertStopped() {
|
||||
return 0, nil, nil, nil
|
||||
}
|
||||
engine, _ := bc.Engine().(*XDPoS.XDPoS)
|
||||
|
||||
// Do a sanity check that the provided chain is actually ordered and linked
|
||||
for i := 1; i < len(chain); i++ {
|
||||
if chain[i].NumberU64() != chain[i-1].NumberU64()+1 || chain[i].ParentHash() != chain[i-1].Hash() {
|
||||
// Chain broke ancestry, log a messge (programming error) and skip insertion
|
||||
log.Error("Non contiguous block insert", "number", chain[i].Number(), "hash", chain[i].Hash(),
|
||||
"parent", chain[i].ParentHash(), "prevnumber", chain[i-1].Number(), "prevhash", chain[i-1].Hash())
|
||||
|
||||
return 0, nil, nil, fmt.Errorf("non contiguous insert: item %d is #%d [%x..], item %d is #%d [%x..] (parent [%x..])", i-1, chain[i-1].NumberU64(),
|
||||
chain[i-1].Hash().Bytes()[:4], i, chain[i].NumberU64(), chain[i].Hash().Bytes()[:4], chain[i].ParentHash().Bytes()[:4])
|
||||
}
|
||||
}
|
||||
// Pre-checks passed, start the full block imports
|
||||
bc.wg.Add(1)
|
||||
defer bc.wg.Done()
|
||||
|
||||
bc.chainmu.Lock()
|
||||
defer bc.chainmu.Unlock()
|
||||
|
||||
// A queued approach to delivering events. This is generally
|
||||
// faster than direct delivery and requires much less mutex
|
||||
|
|
@ -1578,7 +1643,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
|
|||
localTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
|
||||
externTd := new(big.Int).Add(bc.GetTd(block.ParentHash(), block.NumberU64()-1), block.Difficulty())
|
||||
if localTd.Cmp(externTd) > 0 {
|
||||
if err = bc.WriteBlockWithoutState(block, externTd); err != nil {
|
||||
if err = bc.writeBlockWithoutState(block, externTd); err != nil {
|
||||
return i, events, coalescedLogs, err
|
||||
}
|
||||
continue
|
||||
|
|
@ -1596,10 +1661,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
|
|||
}
|
||||
log.Debug("Number block need calculated again", "number", block.NumberU64(), "hash", block.Hash().Hex(), "winners", len(winner))
|
||||
// Import all the pruned blocks to make the state available
|
||||
bc.chainmu.Unlock()
|
||||
// During reorg, we use verifySeals=false
|
||||
_, evs, logs, err := bc.insertChain(winner, false)
|
||||
bc.chainmu.Lock()
|
||||
events, coalescedLogs = evs, logs
|
||||
|
||||
if err != nil {
|
||||
|
|
@ -1628,6 +1691,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
|
|||
var tradingService utils.TradingService
|
||||
var lendingService utils.LendingService
|
||||
isSDKNode := false
|
||||
engine, _ := bc.Engine().(*XDPoS.XDPoS)
|
||||
if bc.Config().IsTIPXDCXReceiver(block.Number()) && bc.chainConfig.XDPoS != nil && engine != nil && block.NumberU64() > bc.chainConfig.XDPoS.Epoch {
|
||||
author, err := bc.Engine().Author(block.Header()) // Ignore error, we're past header validation
|
||||
if err != nil {
|
||||
|
|
@ -2091,7 +2155,9 @@ func (bc *BlockChain) insertBlock(block *types.Block) ([]interface{}, []*types.L
|
|||
bc.wg.Add(1)
|
||||
defer bc.wg.Done()
|
||||
// Write the block to the chain and get the status.
|
||||
bc.chainmu.Lock()
|
||||
if !bc.chainmu.TryLock() {
|
||||
return nil, nil, errChainStopped
|
||||
}
|
||||
defer bc.chainmu.Unlock()
|
||||
if bc.HasBlockAndFullState(block.Hash(), block.NumberU64()) {
|
||||
return events, coalescedLogs, nil
|
||||
|
|
@ -2387,7 +2453,10 @@ func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) {
|
|||
}
|
||||
}
|
||||
|
||||
func (bc *BlockChain) update() {
|
||||
// futureBlocksLoop processes the 'future block' queue.
|
||||
func (bc *BlockChain) futureBlocksLoop() {
|
||||
defer bc.wg.Done()
|
||||
|
||||
futureTimer := time.NewTicker(10 * time.Millisecond)
|
||||
defer futureTimer.Stop()
|
||||
for {
|
||||
|
|
@ -2471,13 +2540,11 @@ func (bc *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i
|
|||
return i, err
|
||||
}
|
||||
|
||||
// Make sure only one thread manipulates the chain at once
|
||||
bc.chainmu.Lock()
|
||||
if !bc.chainmu.TryLock() {
|
||||
return 0, errChainStopped
|
||||
}
|
||||
defer bc.chainmu.Unlock()
|
||||
|
||||
bc.wg.Add(1)
|
||||
defer bc.wg.Done()
|
||||
|
||||
whFunc := func(header *types.Header) error {
|
||||
_, err := bc.hc.WriteHeader(header)
|
||||
return err
|
||||
|
|
|
|||
|
|
@ -128,7 +128,7 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error {
|
|||
blockchain.reportBlock(block, receipts, err)
|
||||
return err
|
||||
}
|
||||
blockchain.chainmu.Lock()
|
||||
blockchain.chainmu.MustLock()
|
||||
WriteTd(blockchain.db, block.Hash(), block.NumberU64(), new(big.Int).Add(block.Difficulty(), blockchain.GetTdByHash(block.ParentHash())))
|
||||
rawdb.WriteBlock(blockchain.db, block)
|
||||
statedb.Commit(true)
|
||||
|
|
@ -146,7 +146,7 @@ func testHeaderChainImport(chain []*types.Header, blockchain *BlockChain) error
|
|||
return err
|
||||
}
|
||||
// Manually insert the header into the database, but don't reorganise (allows subsequent testing)
|
||||
blockchain.chainmu.Lock()
|
||||
blockchain.chainmu.MustLock()
|
||||
WriteTd(blockchain.db, header.Hash(), header.Number.Uint64(), new(big.Int).Add(header.Difficulty, blockchain.GetTdByHash(header.ParentHash)))
|
||||
rawdb.WriteHeader(blockchain.db, header)
|
||||
blockchain.chainmu.Unlock()
|
||||
|
|
|
|||
64
internal/syncx/mutex.go
Normal file
64
internal/syncx/mutex.go
Normal file
|
|
@ -0,0 +1,64 @@
|
|||
// Copyright 2021 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
// Package syncx contains exotic synchronization primitives.
|
||||
package syncx
|
||||
|
||||
// ClosableMutex is a mutex that can also be closed.
|
||||
// Once closed, it can never be taken again.
|
||||
type ClosableMutex struct {
|
||||
ch chan struct{}
|
||||
}
|
||||
|
||||
func NewClosableMutex() *ClosableMutex {
|
||||
ch := make(chan struct{}, 1)
|
||||
ch <- struct{}{}
|
||||
return &ClosableMutex{ch}
|
||||
}
|
||||
|
||||
// TryLock attempts to lock cm.
|
||||
// If the mutex is closed, TryLock returns false.
|
||||
func (cm *ClosableMutex) TryLock() bool {
|
||||
_, ok := <-cm.ch
|
||||
return ok
|
||||
}
|
||||
|
||||
// MustLock locks cm.
|
||||
// If the mutex is closed, MustLock panics.
|
||||
func (cm *ClosableMutex) MustLock() {
|
||||
_, ok := <-cm.ch
|
||||
if !ok {
|
||||
panic("mutex closed")
|
||||
}
|
||||
}
|
||||
|
||||
// Unlock unlocks cm.
|
||||
func (cm *ClosableMutex) Unlock() {
|
||||
select {
|
||||
case cm.ch <- struct{}{}:
|
||||
default:
|
||||
panic("Unlock of already-unlocked ClosableMutex")
|
||||
}
|
||||
}
|
||||
|
||||
// Close locks the mutex, then closes it.
|
||||
func (cm *ClosableMutex) Close() {
|
||||
_, ok := <-cm.ch
|
||||
if !ok {
|
||||
panic("Close of already-closed ClosableMutex")
|
||||
}
|
||||
close(cm.ch)
|
||||
}
|
||||
Loading…
Reference in a new issue