Skip to content

Commit b48c0c9

Browse files
committed
triedb/pathdb, eth: introduce Double-Buffer Mechanism in PathDB
Previously, PathDB used a single buffer to aggregate database writes, which needed to be flushed atomically. However, flushing large amounts of data (e.g., 256MB) caused significant overhead, often blocking the system for around 3 seconds during the flush. To mitigate this overhead and reduce performance spikes, a double-buffer mechanism is introduced. When the active buffer fills up, it is marked as frozen and a background flushing process is triggered. Meanwhile, a new buffer is allocated for incoming writes, allowing operations to continue uninterrupted. This approach reduces system blocking times and provides flexibility in adjusting buffer parameters for improved performance.
1 parent 8032b63 commit b48c0c9

File tree

9 files changed

+128
-105
lines changed

9 files changed

+128
-105
lines changed

eth/handler.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import (
2727
"github.com/ethereum/go-ethereum/common"
2828
"github.com/ethereum/go-ethereum/core"
2929
"github.com/ethereum/go-ethereum/core/forkid"
30-
"github.com/ethereum/go-ethereum/core/rawdb"
3130
"github.com/ethereum/go-ethereum/core/txpool"
3231
"github.com/ethereum/go-ethereum/core/types"
3332
"github.com/ethereum/go-ethereum/crypto"
@@ -41,7 +40,6 @@ import (
4140
"github.com/ethereum/go-ethereum/metrics"
4241
"github.com/ethereum/go-ethereum/p2p"
4342
"github.com/ethereum/go-ethereum/p2p/enode"
44-
"github.com/ethereum/go-ethereum/triedb/pathdb"
4543
)
4644

4745
const (
@@ -558,7 +556,4 @@ func (h *handler) enableSyncedFeatures() {
558556
log.Info("Snap sync complete, auto disabling")
559557
h.snapSync.Store(false)
560558
}
561-
if h.chain.TrieDB().Scheme() == rawdb.PathScheme {
562-
h.chain.TrieDB().SetBufferSize(pathdb.DefaultBufferSize)
563-
}
564559
}

triedb/database.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -314,17 +314,6 @@ func (db *Database) Journal(root common.Hash) error {
314314
return pdb.Journal(root)
315315
}
316316

317-
// SetBufferSize sets the node buffer size to the provided value(in bytes).
318-
// It's only supported by path-based database and will return an error for
319-
// others.
320-
func (db *Database) SetBufferSize(size int) error {
321-
pdb, ok := db.backend.(*pathdb.Database)
322-
if !ok {
323-
return errors.New("not supported")
324-
}
325-
return pdb.SetBufferSize(size)
326-
}
327-
328317
// IsVerkle returns the indicator if the database is holding a verkle tree.
329318
func (db *Database) IsVerkle() bool {
330319
return db.config.IsVerkle

triedb/pathdb/database.go

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,12 @@ const (
4444
// support is 4GB, node will panic if batch size exceeds this limit.
4545
maxBufferSize = 256 * 1024 * 1024
4646

47-
// DefaultBufferSize is the default memory allowance of node buffer
47+
// defaultBufferSize is the default memory allowance of node buffer
4848
// that aggregates the writes from above until it's flushed into the
4949
// disk. It's meant to be used once the initial sync is finished.
5050
// Do not increase the buffer size arbitrarily, otherwise the system
5151
// pause time will increase when the database writes happen.
52-
DefaultBufferSize = 64 * 1024 * 1024
52+
defaultBufferSize = 64 * 1024 * 1024
5353
)
5454

5555
var (
@@ -111,7 +111,7 @@ func (c *Config) sanitize() *Config {
111111
var Defaults = &Config{
112112
StateHistory: params.FullImmutabilityThreshold,
113113
CleanCacheSize: defaultCleanSize,
114-
DirtyCacheSize: DefaultBufferSize,
114+
DirtyCacheSize: defaultBufferSize,
115115
}
116116

117117
// ReadOnly is the config in order to open database in read only mode.
@@ -341,7 +341,7 @@ func (db *Database) Enable(root common.Hash) error {
341341
}
342342
// Re-construct a new disk layer backed by persistent state
343343
// with **empty clean cache and node buffer**.
344-
db.tree.reset(newDiskLayer(root, 0, db, nil, newNodeBuffer(db.bufferSize, nil, 0)))
344+
db.tree.reset(newDiskLayer(root, 0, db, nil, newNodeBuffer(db.bufferSize, nil, 0), nil))
345345

346346
// Re-enable the database as the final step.
347347
db.waitSync = false
@@ -440,7 +440,13 @@ func (db *Database) Close() error {
440440
db.readOnly = true
441441

442442
// Release the memory held by clean cache.
443-
db.tree.bottom().resetCache()
443+
disk := db.tree.bottom()
444+
if disk.frozen != nil {
445+
if err := disk.frozen.flushed(); err != nil {
446+
return err
447+
}
448+
}
449+
disk.resetCache()
444450

445451
// Close the attached state history freezer.
446452
if db.freezer == nil {
@@ -478,19 +484,6 @@ func (db *Database) Initialized(genesisRoot common.Hash) bool {
478484
return inited
479485
}
480486

481-
// SetBufferSize sets the node buffer size to the provided value(in bytes).
482-
func (db *Database) SetBufferSize(size int) error {
483-
db.lock.Lock()
484-
defer db.lock.Unlock()
485-
486-
if size > maxBufferSize {
487-
log.Info("Capped node buffer size", "provided", common.StorageSize(size), "adjusted", common.StorageSize(maxBufferSize))
488-
size = maxBufferSize
489-
}
490-
db.bufferSize = size
491-
return db.tree.bottom().setBufferSize(db.bufferSize)
492-
}
493-
494487
// modifyAllowed returns the indicator if mutation is allowed. This function
495488
// assumes the db.lock is already held.
496489
func (db *Database) modifyAllowed() error {

triedb/pathdb/difflayer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ func (dl *diffLayer) update(root common.Hash, id uint64, block uint64, nodes map
125125
}
126126

127127
// persist flushes the diff layer and all its parent layers to disk layer.
128-
func (dl *diffLayer) persist(force bool) (layer, error) {
128+
func (dl *diffLayer) persist(force bool) (*diskLayer, error) {
129129
if parent, ok := dl.parentLayer().(*diffLayer); ok {
130130
// Hold the lock to prevent any read operation until the new
131131
// parent is linked correctly.
@@ -147,7 +147,7 @@ func (dl *diffLayer) persist(force bool) (layer, error) {
147147

148148
// diffToDisk merges a bottom-most diff into the persistent disk layer underneath
149149
// it. The method will panic if called onto a non-bottom-most diff layer.
150-
func diffToDisk(layer *diffLayer, force bool) (layer, error) {
150+
func diffToDisk(layer *diffLayer, force bool) (*diskLayer, error) {
151151
disk, ok := layer.parentLayer().(*diskLayer)
152152
if !ok {
153153
panic(fmt.Sprintf("unknown layer type: %T", layer.parentLayer()))

triedb/pathdb/difflayer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import (
3030
func emptyLayer() *diskLayer {
3131
return &diskLayer{
3232
db: New(rawdb.NewMemoryDatabase(), nil, false),
33-
buffer: newNodeBuffer(DefaultBufferSize, nil, 0),
33+
buffer: newNodeBuffer(defaultBufferSize, nil, 0),
3434
}
3535
}
3636

triedb/pathdb/disklayer.go

Lines changed: 56 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,13 @@ type diskLayer struct {
3636
db *Database // Path-based trie database
3737
cleans *fastcache.Cache // GC friendly memory cache of clean node RLPs
3838
buffer *nodebuffer // Node buffer to aggregate writes
39+
frozen *nodebuffer // Frozen node buffer waiting for flushing
3940
stale bool // Signals that the layer became stale (state progressed)
4041
lock sync.RWMutex // Lock used to protect stale flag
4142
}
4243

4344
// newDiskLayer creates a new disk layer based on the passing arguments.
44-
func newDiskLayer(root common.Hash, id uint64, db *Database, cleans *fastcache.Cache, buffer *nodebuffer) *diskLayer {
45+
func newDiskLayer(root common.Hash, id uint64, db *Database, cleans *fastcache.Cache, buffer *nodebuffer, frozen *nodebuffer) *diskLayer {
4546
// Initialize a clean cache if the memory allowance is not zero
4647
// or reuse the provided cache if it is not nil (inherited from
4748
// the original disk layer).
@@ -54,6 +55,7 @@ func newDiskLayer(root common.Hash, id uint64, db *Database, cleans *fastcache.C
5455
db: db,
5556
cleans: cleans,
5657
buffer: buffer,
58+
frozen: frozen,
5759
}
5860
}
5961

@@ -102,16 +104,19 @@ func (dl *diskLayer) node(owner common.Hash, path []byte, depth int) ([]byte, co
102104
if dl.stale {
103105
return nil, common.Hash{}, nil, errSnapshotStale
104106
}
105-
// Try to retrieve the trie node from the not-yet-written
106-
// node buffer first. Note the buffer is lock free since
107-
// it's impossible to mutate the buffer before tagging the
108-
// layer as stale.
109-
n, found := dl.buffer.node(owner, path)
110-
if found {
111-
dirtyHitMeter.Mark(1)
112-
dirtyReadMeter.Mark(int64(len(n.Blob)))
113-
dirtyNodeHitDepthHist.Update(int64(depth))
114-
return n.Blob, n.Hash, &nodeLoc{loc: locDirtyCache, depth: depth}, nil
107+
// Try to retrieve the trie node from the not-yet-written node buffer first
108+
// (both the live one and the frozen one). Note the buffer is lock free since
109+
// it's impossible to mutate the buffer before tagging the layer as stale.
110+
for _, buffer := range []*nodebuffer{dl.buffer, dl.frozen} {
111+
if buffer != nil {
112+
n, found := buffer.node(owner, path)
113+
if found {
114+
dirtyHitMeter.Mark(1)
115+
dirtyReadMeter.Mark(int64(len(n.Blob)))
116+
dirtyNodeHitDepthHist.Update(int64(depth))
117+
return n.Blob, n.Hash, &nodeLoc{loc: locDirtyCache, depth: depth}, nil
118+
}
119+
}
115120
}
116121
dirtyMissMeter.Mark(1)
117122

@@ -182,29 +187,51 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) {
182187
// Mark the diskLayer as stale before applying any mutations on top.
183188
dl.stale = true
184189

185-
// Store the root->id lookup afterwards. All stored lookups are identified
190+
// Store the root->id lookup afterward. All stored lookups are identified
186191
// by the **unique** state root. It's impossible that in the same chain
187192
// blocks are not adjacent but have the same root.
188193
if dl.id == 0 {
189194
rawdb.WriteStateID(dl.db.diskdb, dl.root, 0)
190195
}
191196
rawdb.WriteStateID(dl.db.diskdb, bottom.rootHash(), bottom.stateID())
192197

193-
// Construct a new disk layer by merging the nodes from the provided diff
194-
// layer, and flush the content in disk layer if there are too many nodes
195-
// cached. The clean cache is inherited from the original disk layer.
196-
ndl := newDiskLayer(bottom.root, bottom.stateID(), dl.db, dl.cleans, dl.buffer.commit(bottom.nodes))
197-
198198
// In a unique scenario where the ID of the oldest history object (after tail
199199
// truncation) surpasses the persisted state ID, we take the necessary action
200-
// of forcibly committing the cached dirty nodes to ensure that the persisted
200+
// of forcibly committing the cached dirty states to ensure that the persisted
201201
// state ID remains higher.
202-
if !force && rawdb.ReadPersistentStateID(dl.db.diskdb) < oldest {
202+
persistedID := rawdb.ReadPersistentStateID(dl.db.diskdb)
203+
if !force && persistedID < oldest {
203204
force = true
204205
}
205-
if err := ndl.buffer.flush(ndl.db.diskdb, ndl.cleans, ndl.id, force); err != nil {
206-
return nil, err
206+
// Merge the nodes of the bottom-most diff layer into the buffer as the combined one
207+
combined := dl.buffer.commit(bottom.nodes)
208+
if combined.full() || force {
209+
// Wait until the previous frozen buffer is fully flushed
210+
if dl.frozen != nil {
211+
if err := dl.frozen.flushed(); err != nil {
212+
return nil, err
213+
}
214+
}
215+
dl.frozen = nil
216+
217+
// Freeze the live buffer and schedule background flushing
218+
dl.frozen = combined
219+
dl.frozen.flush(dl.db.diskdb, dl.cleans, bottom.stateID())
220+
221+
// Block until the frozen buffer is fully flushed out if the oldest history
222+
// surpasses the persisted state ID.
223+
if persistedID < oldest {
224+
if err := dl.frozen.flushed(); err != nil {
225+
return nil, err
226+
}
227+
}
228+
combined = newNodeBuffer(dl.db.bufferSize, nil, 0)
207229
}
230+
// Construct a new disk layer by merging the nodes from the provided diff
231+
// layer, and flush the content in disk layer if there are too many nodes
232+
// cached. The clean cache is inherited from the original disk layer.
233+
ndl := newDiskLayer(bottom.root, bottom.stateID(), dl.db, dl.cleans, combined, dl.frozen)
234+
208235
// To remove outdated history objects from the end, we set the 'tail' parameter
209236
// to 'oldest-1' due to the offset between the freezer index and the history ID.
210237
if overflow {
@@ -249,25 +276,21 @@ func (dl *diskLayer) revert(h *history) (*diskLayer, error) {
249276
return nil, err
250277
}
251278
} else {
279+
// Block until the frozen buffer is fully flushed
280+
if dl.frozen != nil {
281+
if err := dl.frozen.flushed(); err != nil {
282+
return nil, err
283+
}
284+
dl.frozen = nil // unset the frozen buffer
285+
}
252286
batch := dl.db.diskdb.NewBatch()
253287
writeNodes(batch, nodes, dl.cleans)
254288
rawdb.WritePersistentStateID(batch, dl.id-1)
255289
if err := batch.Write(); err != nil {
256290
log.Crit("Failed to write states", "err", err)
257291
}
258292
}
259-
return newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.cleans, dl.buffer), nil
260-
}
261-
262-
// setBufferSize sets the node buffer size to the provided value.
263-
func (dl *diskLayer) setBufferSize(size int) error {
264-
dl.lock.RLock()
265-
defer dl.lock.RUnlock()
266-
267-
if dl.stale {
268-
return errSnapshotStale
269-
}
270-
return dl.buffer.setSize(size, dl.db.diskdb, dl.cleans, dl.id)
293+
return newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.cleans, dl.buffer, dl.frozen), nil
271294
}
272295

273296
// size returns the approximate size of cached nodes in the disk layer.

triedb/pathdb/journal.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ func (db *Database) loadLayers() layer {
136136
log.Info("Failed to load journal, discard it", "err", err)
137137
}
138138
// Return single layer with persistent state.
139-
return newDiskLayer(root, rawdb.ReadPersistentStateID(db.diskdb), db, nil, newNodeBuffer(db.bufferSize, nil, 0))
139+
return newDiskLayer(root, rawdb.ReadPersistentStateID(db.diskdb), db, nil, newNodeBuffer(db.bufferSize, nil, 0), nil)
140140
}
141141

142142
// loadDiskLayer reads the binary blob from the layer journal, reconstructing
@@ -176,7 +176,7 @@ func (db *Database) loadDiskLayer(r *rlp.Stream) (layer, error) {
176176
nodes[entry.Owner] = subset
177177
}
178178
// Calculate the internal state transitions by id difference.
179-
base := newDiskLayer(root, id, db, nil, newNodeBuffer(db.bufferSize, nodes, id-stored))
179+
base := newDiskLayer(root, id, db, nil, newNodeBuffer(db.bufferSize, nodes, id-stored), nil)
180180
return base, nil
181181
}
182182

@@ -342,6 +342,11 @@ func (db *Database) Journal(root common.Hash) error {
342342
return fmt.Errorf("triedb layer [%#x] missing", root)
343343
}
344344
disk := db.tree.bottom()
345+
if disk.frozen != nil {
346+
if err := disk.frozen.flushed(); err != nil {
347+
return err
348+
}
349+
}
345350
if l, ok := l.(*diffLayer); ok {
346351
log.Info("Persisting dirty state to disk", "head", l.block, "root", root, "layers", l.id-disk.id+disk.buffer.layers)
347352
} else { // disk layer only on noop runs (likely) or deep reorgs (unlikely)

triedb/pathdb/layertree.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,12 @@ func (tree *layerTree) cap(root common.Hash, layers int) error {
131131
if err != nil {
132132
return err
133133
}
134+
// Block until the frozen buffer is fully flushed
135+
if base.frozen != nil {
136+
if err := base.frozen.flushed(); err != nil {
137+
return err
138+
}
139+
}
134140
// Replace the entire layer tree with the flat base
135141
tree.layers = map[common.Hash]layer{base.rootHash(): base}
136142
return nil

0 commit comments

Comments
 (0)