mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-02-26 15:47:21 +00:00
triedb, core/rawdb: implement the partial read in freezer (#32132)
This PR implements the partial read functionalities in the freezer, optimizing the state history reader by resolving less data from freezer. --------- Signed-off-by: jsvisa <delweng@gmail.com> Co-authored-by: Gary Rong <garyrong0905@gmail.com>
This commit is contained in:
parent
bc0a21a1d5
commit
a7359ceb69
12 changed files with 214 additions and 42 deletions
|
|
@ -188,24 +188,16 @@ func ReadStateAccountIndex(db ethdb.AncientReaderOp, id uint64) []byte {
|
|||
// data in the concatenated storage data table. Compute the position of state
|
||||
// history in freezer by minus one since the id of first state history starts
|
||||
// from one (zero for initial state).
|
||||
func ReadStateStorageIndex(db ethdb.AncientReaderOp, id uint64) []byte {
|
||||
blob, err := db.Ancient(stateHistoryStorageIndex, id-1)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
return blob
|
||||
func ReadStateStorageIndex(db ethdb.AncientReaderOp, id uint64, offset, length int) ([]byte, error) {
|
||||
return db.AncientBytes(stateHistoryStorageIndex, id-1, uint64(offset), uint64(length))
|
||||
}
|
||||
|
||||
// ReadStateAccountHistory retrieves the concatenated account data blob for the
|
||||
// specified state history. Offsets and lengths are resolved via the account
|
||||
// index. Compute the position of state history in freezer by minus one since
|
||||
// the id of first state history starts from one (zero for initial state).
|
||||
func ReadStateAccountHistory(db ethdb.AncientReaderOp, id uint64) []byte {
|
||||
blob, err := db.Ancient(stateHistoryAccountData, id-1)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
return blob
|
||||
func ReadStateAccountHistory(db ethdb.AncientReaderOp, id uint64, offset, length int) ([]byte, error) {
|
||||
return db.AncientBytes(stateHistoryAccountData, id-1, uint64(offset), uint64(length))
|
||||
}
|
||||
|
||||
// ReadStateStorageHistory retrieves the concatenated storage slot data blob for
|
||||
|
|
@ -213,12 +205,8 @@ func ReadStateAccountHistory(db ethdb.AncientReaderOp, id uint64) []byte {
|
|||
// storage indexes. Compute the position of state history in freezer by minus
|
||||
// one since the id of first state history starts from one (zero for initial
|
||||
// state).
|
||||
func ReadStateStorageHistory(db ethdb.AncientReaderOp, id uint64) []byte {
|
||||
blob, err := db.Ancient(stateHistoryStorageData, id-1)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
return blob
|
||||
func ReadStateStorageHistory(db ethdb.AncientReaderOp, id uint64, offset, length int) ([]byte, error) {
|
||||
return db.AncientBytes(stateHistoryStorageData, id-1, uint64(offset), uint64(length))
|
||||
}
|
||||
|
||||
// ReadStateHistory retrieves the state history from database with provided id.
|
||||
|
|
|
|||
|
|
@ -403,6 +403,10 @@ func (f *chainFreezer) AncientRange(kind string, start, count, maxBytes uint64)
|
|||
return f.ancients.AncientRange(kind, start, count, maxBytes)
|
||||
}
|
||||
|
||||
func (f *chainFreezer) AncientBytes(kind string, id, offset, length uint64) ([]byte, error) {
|
||||
return f.ancients.AncientBytes(kind, id, offset, length)
|
||||
}
|
||||
|
||||
func (f *chainFreezer) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (int64, error) {
|
||||
return f.ancients.ModifyAncients(fn)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -100,6 +100,12 @@ func (db *nofreezedb) AncientRange(kind string, start, max, maxByteSize uint64)
|
|||
return nil, errNotSupported
|
||||
}
|
||||
|
||||
// AncientBytes retrieves the value segment of the element specified by the id
|
||||
// and value offsets.
|
||||
func (db *nofreezedb) AncientBytes(kind string, id, offset, length uint64) ([]byte, error) {
|
||||
return nil, errNotSupported
|
||||
}
|
||||
|
||||
// Ancients returns an error as we don't have a backing chain freezer.
|
||||
func (db *nofreezedb) Ancients() (uint64, error) {
|
||||
return 0, errNotSupported
|
||||
|
|
|
|||
|
|
@ -202,6 +202,15 @@ func (f *Freezer) AncientRange(kind string, start, count, maxBytes uint64) ([][]
|
|||
return nil, errUnknownTable
|
||||
}
|
||||
|
||||
// AncientBytes retrieves the value segment of the element specified by the id
|
||||
// and value offsets.
|
||||
func (f *Freezer) AncientBytes(kind string, id, offset, length uint64) ([]byte, error) {
|
||||
if table := f.tables[kind]; table != nil {
|
||||
return table.RetrieveBytes(id, offset, length)
|
||||
}
|
||||
return nil, errUnknownTable
|
||||
}
|
||||
|
||||
// Ancients returns the length of the frozen items.
|
||||
func (f *Freezer) Ancients() (uint64, error) {
|
||||
return f.frozen.Load(), nil
|
||||
|
|
|
|||
|
|
@ -412,3 +412,28 @@ func (f *MemoryFreezer) Reset() error {
|
|||
func (f *MemoryFreezer) AncientDatadir() (string, error) {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
// AncientBytes retrieves the value segment of the element specified by the id
|
||||
// and value offsets.
|
||||
func (f *MemoryFreezer) AncientBytes(kind string, id, offset, length uint64) ([]byte, error) {
|
||||
f.lock.RLock()
|
||||
defer f.lock.RUnlock()
|
||||
|
||||
table := f.tables[kind]
|
||||
if table == nil {
|
||||
return nil, errUnknownTable
|
||||
}
|
||||
entries, err := table.retrieve(id, 1, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(entries) == 0 {
|
||||
return nil, errOutOfBounds
|
||||
}
|
||||
data := entries[0]
|
||||
|
||||
if offset > uint64(len(data)) || offset+length > uint64(len(data)) {
|
||||
return nil, fmt.Errorf("requested range out of bounds: item size %d, offset %d, length %d", len(data), offset, length)
|
||||
}
|
||||
return data[offset : offset+length], nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -126,6 +126,15 @@ func (f *resettableFreezer) AncientRange(kind string, start, count, maxBytes uin
|
|||
return f.freezer.AncientRange(kind, start, count, maxBytes)
|
||||
}
|
||||
|
||||
// AncientBytes retrieves the value segment of the element specified by the id
|
||||
// and value offsets.
|
||||
func (f *resettableFreezer) AncientBytes(kind string, id, offset, length uint64) ([]byte, error) {
|
||||
f.lock.RLock()
|
||||
defer f.lock.RUnlock()
|
||||
|
||||
return f.freezer.AncientBytes(kind, id, offset, length)
|
||||
}
|
||||
|
||||
// Ancients returns the length of the frozen items.
|
||||
func (f *resettableFreezer) Ancients() (uint64, error) {
|
||||
f.lock.RLock()
|
||||
|
|
|
|||
|
|
@ -1107,6 +1107,71 @@ func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []i
|
|||
return output, sizes, nil
|
||||
}
|
||||
|
||||
// RetrieveBytes retrieves the value segment of the element specified by the id
|
||||
// and value offsets.
|
||||
func (t *freezerTable) RetrieveBytes(item, offset, length uint64) ([]byte, error) {
|
||||
t.lock.RLock()
|
||||
defer t.lock.RUnlock()
|
||||
|
||||
if t.index == nil || t.head == nil || t.metadata.file == nil {
|
||||
return nil, errClosed
|
||||
}
|
||||
items, hidden := t.items.Load(), t.itemHidden.Load()
|
||||
if items <= item || hidden > item {
|
||||
return nil, errOutOfBounds
|
||||
}
|
||||
|
||||
// Retrieves the index entries for the specified ID and its immediate successor
|
||||
indices, err := t.getIndices(item, 1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
index0, index1 := indices[0], indices[1]
|
||||
|
||||
itemStart, itemLimit, fileId := index0.bounds(index1)
|
||||
itemSize := itemLimit - itemStart
|
||||
|
||||
dataFile, exist := t.files[fileId]
|
||||
if !exist {
|
||||
return nil, fmt.Errorf("missing data file %d", fileId)
|
||||
}
|
||||
|
||||
// Perform the partial read if no-compression was enabled upon
|
||||
if t.config.noSnappy {
|
||||
if offset > uint64(itemSize) || offset+length > uint64(itemSize) {
|
||||
return nil, fmt.Errorf("requested range out of bounds: item size %d, offset %d, length %d", itemSize, offset, length)
|
||||
}
|
||||
itemStart += uint32(offset)
|
||||
|
||||
buf := make([]byte, length)
|
||||
_, err = dataFile.ReadAt(buf, int64(itemStart))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
t.readMeter.Mark(int64(length))
|
||||
return buf, nil
|
||||
} else {
|
||||
// If compressed, read the full item, decompress, then slice.
|
||||
// Unfortunately, in this case, there is no performance gain
|
||||
// by performing the partial read at all.
|
||||
buf := make([]byte, itemSize)
|
||||
_, err = dataFile.ReadAt(buf, int64(itemStart))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
t.readMeter.Mark(int64(itemSize))
|
||||
|
||||
data, err := snappy.Decode(nil, buf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if offset > uint64(len(data)) || offset+length > uint64(len(data)) {
|
||||
return nil, fmt.Errorf("requested range out of bounds: item size %d, offset %d, length %d", len(data), offset, length)
|
||||
}
|
||||
return data[offset : offset+length], nil
|
||||
}
|
||||
}
|
||||
|
||||
// size returns the total data size in the freezer table.
|
||||
func (t *freezerTable) size() (uint64, error) {
|
||||
t.lock.RLock()
|
||||
|
|
|
|||
|
|
@ -1571,3 +1571,65 @@ func TestTailTruncationCrash(t *testing.T) {
|
|||
t.Fatalf("Unexpected index flush offset, want: %d, got: %d", 26*indexEntrySize, f.metadata.flushOffset)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFreezerAncientBytes(t *testing.T) {
|
||||
t.Parallel()
|
||||
types := []struct {
|
||||
name string
|
||||
config freezerTableConfig
|
||||
}{
|
||||
{"uncompressed", freezerTableConfig{noSnappy: true}},
|
||||
{"compressed", freezerTableConfig{noSnappy: false}},
|
||||
}
|
||||
for _, typ := range types {
|
||||
t.Run(typ.name, func(t *testing.T) {
|
||||
f, err := newTable(os.TempDir(), fmt.Sprintf("ancientbytes-%s-%d", typ.name, rand.Uint64()), metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), 1000, typ.config, false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
data := getChunk(100, i)
|
||||
batch := f.newBatch()
|
||||
require.NoError(t, batch.AppendRaw(uint64(i), data))
|
||||
require.NoError(t, batch.commit())
|
||||
}
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
full, err := f.Retrieve(uint64(i))
|
||||
require.NoError(t, err)
|
||||
|
||||
// Full read
|
||||
got, err := f.RetrieveBytes(uint64(i), 0, uint64(len(full)))
|
||||
require.NoError(t, err)
|
||||
if !bytes.Equal(got, full) {
|
||||
t.Fatalf("full read mismatch for entry %d", i)
|
||||
}
|
||||
// Empty read
|
||||
got, err = f.RetrieveBytes(uint64(i), 0, 0)
|
||||
require.NoError(t, err)
|
||||
if !bytes.Equal(got, full[:0]) {
|
||||
t.Fatalf("empty read mismatch for entry %d", i)
|
||||
}
|
||||
// Middle slice
|
||||
got, err = f.RetrieveBytes(uint64(i), 10, 50)
|
||||
require.NoError(t, err)
|
||||
if !bytes.Equal(got, full[10:60]) {
|
||||
t.Fatalf("middle slice mismatch for entry %d", i)
|
||||
}
|
||||
// Single byte
|
||||
got, err = f.RetrieveBytes(uint64(i), 99, 1)
|
||||
require.NoError(t, err)
|
||||
if !bytes.Equal(got, full[99:100]) {
|
||||
t.Fatalf("single byte mismatch for entry %d", i)
|
||||
}
|
||||
// Out of bounds
|
||||
_, err = f.RetrieveBytes(uint64(i), 100, 1)
|
||||
if err == nil {
|
||||
t.Fatalf("expected error for out-of-bounds read for entry %d", i)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -62,6 +62,12 @@ func (t *table) AncientRange(kind string, start, count, maxBytes uint64) ([][]by
|
|||
return t.db.AncientRange(kind, start, count, maxBytes)
|
||||
}
|
||||
|
||||
// AncientBytes is a noop passthrough that just forwards the request to the underlying
|
||||
// database.
|
||||
func (t *table) AncientBytes(kind string, id, offset, length uint64) ([]byte, error) {
|
||||
return t.db.AncientBytes(kind, id, offset, length)
|
||||
}
|
||||
|
||||
// Ancients is a noop passthrough that just forwards the request to the underlying
|
||||
// database.
|
||||
func (t *table) Ancients() (uint64, error) {
|
||||
|
|
|
|||
|
|
@ -121,6 +121,10 @@ type AncientReaderOp interface {
|
|||
// - if maxBytes is not specified, 'count' items will be returned if they are present
|
||||
AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error)
|
||||
|
||||
// AncientBytes retrieves the value segment of the element specified by the id
|
||||
// and value offsets.
|
||||
AncientBytes(kind string, id, offset, length uint64) ([]byte, error)
|
||||
|
||||
// Ancients returns the ancient item numbers in the ancient store.
|
||||
Ancients() (uint64, error)
|
||||
|
||||
|
|
|
|||
|
|
@ -140,6 +140,10 @@ func (db *Database) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (db *Database) AncientBytes(kind string, id, offset, length uint64) ([]byte, error) {
|
||||
panic("not supported")
|
||||
}
|
||||
|
||||
func New(client *rpc.Client) ethdb.Database {
|
||||
if client == nil {
|
||||
return nil
|
||||
|
|
|
|||
|
|
@ -144,25 +144,17 @@ func (r *historyReader) readAccountMetadata(address common.Address, historyID ui
|
|||
// readStorageMetadata resolves the storage slot metadata within the specified
|
||||
// state history.
|
||||
func (r *historyReader) readStorageMetadata(storageKey common.Hash, storageHash common.Hash, historyID uint64, slotOffset, slotNumber int) ([]byte, error) {
|
||||
// TODO(rj493456442) optimize it with partial read
|
||||
blob := rawdb.ReadStateStorageIndex(r.freezer, historyID)
|
||||
if len(blob) == 0 {
|
||||
return nil, fmt.Errorf("storage index is truncated, historyID: %d", historyID)
|
||||
data, err := rawdb.ReadStateStorageIndex(r.freezer, historyID, slotIndexSize*slotOffset, slotIndexSize*slotNumber)
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("id: %d, slot-offset: %d, slot-length: %d", historyID, slotOffset, slotNumber)
|
||||
return nil, fmt.Errorf("storage indices corrupted, %s, %w", msg, err)
|
||||
}
|
||||
if len(blob)%slotIndexSize != 0 {
|
||||
return nil, fmt.Errorf("storage indices is corrupted, historyID: %d, size: %d", historyID, len(blob))
|
||||
}
|
||||
if slotIndexSize*(slotOffset+slotNumber) > len(blob) {
|
||||
return nil, fmt.Errorf("storage indices is truncated, historyID: %d, size: %d, offset: %d, length: %d", historyID, len(blob), slotOffset, slotNumber)
|
||||
}
|
||||
subSlice := blob[slotIndexSize*slotOffset : slotIndexSize*(slotOffset+slotNumber)]
|
||||
|
||||
// TODO(rj493456442) get rid of the metadata resolution
|
||||
var (
|
||||
m meta
|
||||
target common.Hash
|
||||
)
|
||||
blob = rawdb.ReadStateHistoryMeta(r.freezer, historyID)
|
||||
blob := rawdb.ReadStateHistoryMeta(r.freezer, historyID)
|
||||
if err := m.decode(blob); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -172,17 +164,17 @@ func (r *historyReader) readStorageMetadata(storageKey common.Hash, storageHash
|
|||
target = storageKey
|
||||
}
|
||||
pos := sort.Search(slotNumber, func(i int) bool {
|
||||
slotID := subSlice[slotIndexSize*i : slotIndexSize*i+common.HashLength]
|
||||
slotID := data[slotIndexSize*i : slotIndexSize*i+common.HashLength]
|
||||
return bytes.Compare(slotID, target.Bytes()) >= 0
|
||||
})
|
||||
if pos == slotNumber {
|
||||
return nil, fmt.Errorf("storage metadata is not found, slot key: %#x, historyID: %d", storageKey, historyID)
|
||||
}
|
||||
offset := slotIndexSize * pos
|
||||
if target != common.BytesToHash(subSlice[offset:offset+common.HashLength]) {
|
||||
if target != common.BytesToHash(data[offset:offset+common.HashLength]) {
|
||||
return nil, fmt.Errorf("storage metadata is not found, slot key: %#x, historyID: %d", storageKey, historyID)
|
||||
}
|
||||
return subSlice[offset : slotIndexSize*(pos+1)], nil
|
||||
return data[offset : slotIndexSize*(pos+1)], nil
|
||||
}
|
||||
|
||||
// readAccount retrieves the account data from the specified state history.
|
||||
|
|
@ -194,12 +186,11 @@ func (r *historyReader) readAccount(address common.Address, historyID uint64) ([
|
|||
length := int(metadata[common.AddressLength]) // one byte for account data length
|
||||
offset := int(binary.BigEndian.Uint32(metadata[common.AddressLength+1 : common.AddressLength+5])) // four bytes for the account data offset
|
||||
|
||||
// TODO(rj493456442) optimize it with partial read
|
||||
data := rawdb.ReadStateAccountHistory(r.freezer, historyID)
|
||||
if len(data) < length+offset {
|
||||
data, err := rawdb.ReadStateAccountHistory(r.freezer, historyID, offset, length)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("account data is truncated, address: %#x, historyID: %d, size: %d, offset: %d, len: %d", address, historyID, len(data), offset, length)
|
||||
}
|
||||
return data[offset : offset+length], nil
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// readStorage retrieves the storage slot data from the specified state history.
|
||||
|
|
@ -222,12 +213,11 @@ func (r *historyReader) readStorage(address common.Address, storageKey common.Ha
|
|||
length := int(slotMetadata[common.HashLength]) // one byte for slot data length
|
||||
offset := int(binary.BigEndian.Uint32(slotMetadata[common.HashLength+1 : common.HashLength+5])) // four bytes for slot data offset
|
||||
|
||||
// TODO(rj493456442) optimize it with partial read
|
||||
data := rawdb.ReadStateStorageHistory(r.freezer, historyID)
|
||||
if len(data) < offset+length {
|
||||
data, err := rawdb.ReadStateStorageHistory(r.freezer, historyID, offset, length)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("storage data is truncated, address: %#x, key: %#x, historyID: %d, size: %d, offset: %d, len: %d", address, storageKey, historyID, len(data), offset, length)
|
||||
}
|
||||
return data[offset : offset+length], nil
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// read retrieves the state element data associated with the stateID.
|
||||
|
|
|
|||
Loading…
Reference in a new issue