Skip to content

Commit fc0cd1e

Browse files
committed
triedb/pathdb, eth: introduce Double-Buffer Mechanism in PathDB
Previously, PathDB used a single buffer to aggregate database writes, which needed to be flushed atomically. However, flushing large amounts of data (e.g., 256MB) caused significant overhead, often blocking the system for around 3 seconds during the flush. To mitigate this overhead and reduce performance spikes, a double-buffer mechanism is introduced. When the active buffer fills up, it is marked as frozen and a background flushing process is triggered. Meanwhile, a new buffer is allocated for incoming writes, allowing operations to continue uninterrupted. This approach reduces system blocking times and provides flexibility in adjusting buffer parameters for improved performance.
1 parent 8032b63 commit fc0cd1e

File tree

12 files changed

+191
-123
lines changed

12 files changed

+191
-123
lines changed

core/state/snapshot/generate_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,16 @@ func (t *testHelper) Commit() common.Hash {
224224
}
225225
t.triedb.Update(root, types.EmptyRootHash, 0, t.nodes, nil)
226226
t.triedb.Commit(root, false)
227+
228+
// re-open the trie database to ensure the frozen buffer
229+
// is not referenced
230+
config := &triedb.Config{}
231+
if t.triedb.Scheme() == rawdb.PathScheme {
232+
config.PathDB = &pathdb.Config{} // disable caching
233+
} else {
234+
config.HashDB = &hashdb.Config{} // disable caching
235+
}
236+
t.triedb = triedb.NewDatabase(t.triedb.Disk(), config)
227237
return root
228238
}
229239

core/state/statedb_test.go

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -976,19 +976,22 @@ func TestMissingTrieNodes(t *testing.T) {
976976
func testMissingTrieNodes(t *testing.T, scheme string) {
977977
// Create an initial state with a few accounts
978978
var (
979-
tdb *triedb.Database
980-
memDb = rawdb.NewMemoryDatabase()
979+
tdb *triedb.Database
980+
memDb = rawdb.NewMemoryDatabase()
981+
openDb = func() *triedb.Database {
982+
if scheme == rawdb.PathScheme {
983+
return triedb.NewDatabase(memDb, &triedb.Config{PathDB: &pathdb.Config{
984+
CleanCacheSize: 0,
985+
DirtyCacheSize: 0,
986+
}}) // disable caching
987+
} else {
988+
return triedb.NewDatabase(memDb, &triedb.Config{HashDB: &hashdb.Config{
989+
CleanCacheSize: 0,
990+
}}) // disable caching
991+
}
992+
}
981993
)
982-
if scheme == rawdb.PathScheme {
983-
tdb = triedb.NewDatabase(memDb, &triedb.Config{PathDB: &pathdb.Config{
984-
CleanCacheSize: 0,
985-
DirtyCacheSize: 0,
986-
}}) // disable caching
987-
} else {
988-
tdb = triedb.NewDatabase(memDb, &triedb.Config{HashDB: &hashdb.Config{
989-
CleanCacheSize: 0,
990-
}}) // disable caching
991-
}
994+
tdb = openDb()
992995
db := NewDatabase(tdb, nil)
993996

994997
var root common.Hash
@@ -1006,17 +1009,29 @@ func testMissingTrieNodes(t *testing.T, scheme string) {
10061009
tdb.Commit(root, false)
10071010
}
10081011
// Create a new state on the old root
1009-
state, _ = New(root, db)
10101012
// Now we clear out the memdb
10111013
it := memDb.NewIterator(nil, nil)
10121014
for it.Next() {
10131015
k := it.Key()
1016+
10141017
// Leave the root intact
1015-
if !bytes.Equal(k, root[:]) {
1016-
t.Logf("key: %x", k)
1017-
memDb.Delete(k)
1018+
if scheme == rawdb.HashScheme {
1019+
if !bytes.Equal(k, root[:]) {
1020+
t.Logf("key: %x", k)
1021+
memDb.Delete(k)
1022+
}
1023+
}
1024+
if scheme == rawdb.PathScheme {
1025+
rk := k[len(rawdb.TrieNodeAccountPrefix):]
1026+
if len(rk) != 0 {
1027+
t.Logf("key: %x", k)
1028+
memDb.Delete(k)
1029+
}
10181030
}
10191031
}
1032+
tdb = openDb()
1033+
db = NewDatabase(tdb, nil)
1034+
state, _ = New(root, db)
10201035
balance := state.GetBalance(addr)
10211036
// The removed elem should lead to it returning zero balance
10221037
if exp, got := uint64(0), balance.Uint64(); got != exp {

eth/handler.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import (
2727
"github.com/ethereum/go-ethereum/common"
2828
"github.com/ethereum/go-ethereum/core"
2929
"github.com/ethereum/go-ethereum/core/forkid"
30-
"github.com/ethereum/go-ethereum/core/rawdb"
3130
"github.com/ethereum/go-ethereum/core/txpool"
3231
"github.com/ethereum/go-ethereum/core/types"
3332
"github.com/ethereum/go-ethereum/crypto"
@@ -41,7 +40,6 @@ import (
4140
"github.com/ethereum/go-ethereum/metrics"
4241
"github.com/ethereum/go-ethereum/p2p"
4342
"github.com/ethereum/go-ethereum/p2p/enode"
44-
"github.com/ethereum/go-ethereum/triedb/pathdb"
4543
)
4644

4745
const (
@@ -558,7 +556,4 @@ func (h *handler) enableSyncedFeatures() {
558556
log.Info("Snap sync complete, auto disabling")
559557
h.snapSync.Store(false)
560558
}
561-
if h.chain.TrieDB().Scheme() == rawdb.PathScheme {
562-
h.chain.TrieDB().SetBufferSize(pathdb.DefaultBufferSize)
563-
}
564559
}

ethdb/pebble/pebble.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,9 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e
207207

208208
// The default compaction concurrency(1 thread),
209209
// Here use all available CPUs for faster compaction.
210-
MaxConcurrentCompactions: runtime.NumCPU,
210+
MaxConcurrentCompactions: func() int {
211+
return runtime.NumCPU() / 2
212+
},
211213

212214
// Per-level options. Options for at least one level must be specified. The
213215
// options for the last level are used for all subsequent levels.

triedb/database.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -314,17 +314,6 @@ func (db *Database) Journal(root common.Hash) error {
314314
return pdb.Journal(root)
315315
}
316316

317-
// SetBufferSize sets the node buffer size to the provided value(in bytes).
318-
// It's only supported by path-based database and will return an error for
319-
// others.
320-
func (db *Database) SetBufferSize(size int) error {
321-
pdb, ok := db.backend.(*pathdb.Database)
322-
if !ok {
323-
return errors.New("not supported")
324-
}
325-
return pdb.SetBufferSize(size)
326-
}
327-
328317
// IsVerkle returns the indicator if the database is holding a verkle tree.
329318
func (db *Database) IsVerkle() bool {
330319
return db.config.IsVerkle

triedb/pathdb/database.go

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,12 @@ const (
4444
// support is 4GB, node will panic if batch size exceeds this limit.
4545
maxBufferSize = 256 * 1024 * 1024
4646

47-
// DefaultBufferSize is the default memory allowance of node buffer
47+
// defaultBufferSize is the default memory allowance of node buffer
4848
// that aggregates the writes from above until it's flushed into the
4949
// disk. It's meant to be used once the initial sync is finished.
5050
// Do not increase the buffer size arbitrarily, otherwise the system
5151
// pause time will increase when the database writes happen.
52-
DefaultBufferSize = 64 * 1024 * 1024
52+
defaultBufferSize = 64 * 1024 * 1024
5353
)
5454

5555
var (
@@ -111,7 +111,7 @@ func (c *Config) sanitize() *Config {
111111
var Defaults = &Config{
112112
StateHistory: params.FullImmutabilityThreshold,
113113
CleanCacheSize: defaultCleanSize,
114-
DirtyCacheSize: DefaultBufferSize,
114+
DirtyCacheSize: defaultBufferSize,
115115
}
116116

117117
// ReadOnly is the config in order to open database in read only mode.
@@ -341,7 +341,7 @@ func (db *Database) Enable(root common.Hash) error {
341341
}
342342
// Re-construct a new disk layer backed by persistent state
343343
// with **empty clean cache and node buffer**.
344-
db.tree.reset(newDiskLayer(root, 0, db, nil, newNodeBuffer(db.bufferSize, nil, 0)))
344+
db.tree.reset(newDiskLayer(root, 0, db, nil, newNodeBuffer(db.bufferSize, nil, 0), nil))
345345

346346
// Re-enable the database as the final step.
347347
db.waitSync = false
@@ -440,7 +440,13 @@ func (db *Database) Close() error {
440440
db.readOnly = true
441441

442442
// Release the memory held by clean cache.
443-
db.tree.bottom().resetCache()
443+
disk := db.tree.bottom()
444+
if disk.frozen != nil {
445+
if err := disk.frozen.waitFlush(); err != nil {
446+
return err
447+
}
448+
}
449+
disk.resetCache()
444450

445451
// Close the attached state history freezer.
446452
if db.freezer == nil {
@@ -478,19 +484,6 @@ func (db *Database) Initialized(genesisRoot common.Hash) bool {
478484
return inited
479485
}
480486

481-
// SetBufferSize sets the node buffer size to the provided value(in bytes).
482-
func (db *Database) SetBufferSize(size int) error {
483-
db.lock.Lock()
484-
defer db.lock.Unlock()
485-
486-
if size > maxBufferSize {
487-
log.Info("Capped node buffer size", "provided", common.StorageSize(size), "adjusted", common.StorageSize(maxBufferSize))
488-
size = maxBufferSize
489-
}
490-
db.bufferSize = size
491-
return db.tree.bottom().setBufferSize(db.bufferSize)
492-
}
493-
494487
// modifyAllowed returns the indicator if mutation is allowed. This function
495488
// assumes the db.lock is already held.
496489
func (db *Database) modifyAllowed() error {

triedb/pathdb/difflayer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ func (dl *diffLayer) update(root common.Hash, id uint64, block uint64, nodes map
125125
}
126126

127127
// persist flushes the diff layer and all its parent layers to disk layer.
128-
func (dl *diffLayer) persist(force bool) (layer, error) {
128+
func (dl *diffLayer) persist(force bool) (*diskLayer, error) {
129129
if parent, ok := dl.parentLayer().(*diffLayer); ok {
130130
// Hold the lock to prevent any read operation until the new
131131
// parent is linked correctly.
@@ -147,7 +147,7 @@ func (dl *diffLayer) persist(force bool) (layer, error) {
147147

148148
// diffToDisk merges a bottom-most diff into the persistent disk layer underneath
149149
// it. The method will panic if called onto a non-bottom-most diff layer.
150-
func diffToDisk(layer *diffLayer, force bool) (layer, error) {
150+
func diffToDisk(layer *diffLayer, force bool) (*diskLayer, error) {
151151
disk, ok := layer.parentLayer().(*diskLayer)
152152
if !ok {
153153
panic(fmt.Sprintf("unknown layer type: %T", layer.parentLayer()))

triedb/pathdb/difflayer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import (
3030
func emptyLayer() *diskLayer {
3131
return &diskLayer{
3232
db: New(rawdb.NewMemoryDatabase(), nil, false),
33-
buffer: newNodeBuffer(DefaultBufferSize, nil, 0),
33+
buffer: newNodeBuffer(defaultBufferSize, nil, 0),
3434
}
3535
}
3636

triedb/pathdb/disklayer.go

Lines changed: 63 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,13 @@ type diskLayer struct {
3636
db *Database // Path-based trie database
3737
cleans *fastcache.Cache // GC friendly memory cache of clean node RLPs
3838
buffer *nodebuffer // Node buffer to aggregate writes
39+
frozen *nodebuffer // Frozen node buffer waiting for flushing
3940
stale bool // Signals that the layer became stale (state progressed)
4041
lock sync.RWMutex // Lock used to protect stale flag
4142
}
4243

4344
// newDiskLayer creates a new disk layer based on the passing arguments.
44-
func newDiskLayer(root common.Hash, id uint64, db *Database, cleans *fastcache.Cache, buffer *nodebuffer) *diskLayer {
45+
func newDiskLayer(root common.Hash, id uint64, db *Database, cleans *fastcache.Cache, buffer *nodebuffer, frozen *nodebuffer) *diskLayer {
4546
// Initialize a clean cache if the memory allowance is not zero
4647
// or reuse the provided cache if it is not nil (inherited from
4748
// the original disk layer).
@@ -54,6 +55,7 @@ func newDiskLayer(root common.Hash, id uint64, db *Database, cleans *fastcache.C
5455
db: db,
5556
cleans: cleans,
5657
buffer: buffer,
58+
frozen: frozen,
5759
}
5860
}
5961

@@ -102,16 +104,19 @@ func (dl *diskLayer) node(owner common.Hash, path []byte, depth int) ([]byte, co
102104
if dl.stale {
103105
return nil, common.Hash{}, nil, errSnapshotStale
104106
}
105-
// Try to retrieve the trie node from the not-yet-written
106-
// node buffer first. Note the buffer is lock free since
107-
// it's impossible to mutate the buffer before tagging the
108-
// layer as stale.
109-
n, found := dl.buffer.node(owner, path)
110-
if found {
111-
dirtyHitMeter.Mark(1)
112-
dirtyReadMeter.Mark(int64(len(n.Blob)))
113-
dirtyNodeHitDepthHist.Update(int64(depth))
114-
return n.Blob, n.Hash, &nodeLoc{loc: locDirtyCache, depth: depth}, nil
107+
// Try to retrieve the trie node from the not-yet-written node buffer first
108+
// (both the live one and the frozen one). Note the buffer is lock free since
109+
// it's impossible to mutate the buffer before tagging the layer as stale.
110+
for _, buffer := range []*nodebuffer{dl.buffer, dl.frozen} {
111+
if buffer != nil {
112+
n, found := buffer.node(owner, path)
113+
if found {
114+
dirtyHitMeter.Mark(1)
115+
dirtyReadMeter.Mark(int64(len(n.Blob)))
116+
dirtyNodeHitDepthHist.Update(int64(depth))
117+
return n.Blob, n.Hash, &nodeLoc{loc: locDirtyCache, depth: depth}, nil
118+
}
119+
}
115120
}
116121
dirtyMissMeter.Mark(1)
117122

@@ -135,6 +140,11 @@ func (dl *diskLayer) node(owner common.Hash, path []byte, depth int) ([]byte, co
135140
} else {
136141
blob = rawdb.ReadStorageTrieNode(dl.db.diskdb, owner, path)
137142
}
143+
// Store the resolved data in the clean cache. The background buffer flusher
144+
// may also write to the clean cache concurrently, but two writers cannot
145+
// write the same item with different content. If the item already exists,
146+
// it will be found in the frozen buffer, eliminating the need to check the
147+
// database.
138148
if dl.cleans != nil && len(blob) > 0 {
139149
dl.cleans.Set(key, blob)
140150
cleanWriteMeter.Mark(int64(len(blob)))
@@ -182,29 +192,51 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) {
182192
// Mark the diskLayer as stale before applying any mutations on top.
183193
dl.stale = true
184194

185-
// Store the root->id lookup afterwards. All stored lookups are identified
195+
// Store the root->id lookup afterward. All stored lookups are identified
186196
// by the **unique** state root. It's impossible that in the same chain
187197
// blocks are not adjacent but have the same root.
188198
if dl.id == 0 {
189199
rawdb.WriteStateID(dl.db.diskdb, dl.root, 0)
190200
}
191201
rawdb.WriteStateID(dl.db.diskdb, bottom.rootHash(), bottom.stateID())
192202

193-
// Construct a new disk layer by merging the nodes from the provided diff
194-
// layer, and flush the content in disk layer if there are too many nodes
195-
// cached. The clean cache is inherited from the original disk layer.
196-
ndl := newDiskLayer(bottom.root, bottom.stateID(), dl.db, dl.cleans, dl.buffer.commit(bottom.nodes))
197-
198203
// In a unique scenario where the ID of the oldest history object (after tail
199204
// truncation) surpasses the persisted state ID, we take the necessary action
200-
// of forcibly committing the cached dirty nodes to ensure that the persisted
205+
// of forcibly committing the cached dirty states to ensure that the persisted
201206
// state ID remains higher.
202-
if !force && rawdb.ReadPersistentStateID(dl.db.diskdb) < oldest {
207+
persistedID := rawdb.ReadPersistentStateID(dl.db.diskdb)
208+
if !force && persistedID < oldest {
203209
force = true
204210
}
205-
if err := ndl.buffer.flush(ndl.db.diskdb, ndl.cleans, ndl.id, force); err != nil {
206-
return nil, err
211+
// Merge the nodes of the bottom-most diff layer into the buffer as the combined one
212+
combined := dl.buffer.commit(bottom.nodes)
213+
if combined.full() || force {
214+
// Wait until the previous frozen buffer is fully flushed
215+
if dl.frozen != nil {
216+
if err := dl.frozen.waitFlush(); err != nil {
217+
return nil, err
218+
}
219+
}
220+
dl.frozen = nil
221+
222+
// Freeze the live buffer and schedule background flushing
223+
dl.frozen = combined
224+
dl.frozen.flush(dl.db.diskdb, dl.cleans, bottom.stateID())
225+
226+
// Block until the frozen buffer is fully flushed out if the oldest history
227+
// surpasses the persisted state ID.
228+
if persistedID < oldest {
229+
if err := dl.frozen.waitFlush(); err != nil {
230+
return nil, err
231+
}
232+
}
233+
combined = newNodeBuffer(dl.db.bufferSize, nil, 0)
207234
}
235+
// Construct a new disk layer by merging the nodes from the provided diff
236+
// layer, and flush the content in disk layer if there are too many nodes
237+
// cached. The clean cache is inherited from the original disk layer.
238+
ndl := newDiskLayer(bottom.root, bottom.stateID(), dl.db, dl.cleans, combined, dl.frozen)
239+
208240
// To remove outdated history objects from the end, we set the 'tail' parameter
209241
// to 'oldest-1' due to the offset between the freezer index and the history ID.
210242
if overflow {
@@ -249,25 +281,23 @@ func (dl *diskLayer) revert(h *history) (*diskLayer, error) {
249281
return nil, err
250282
}
251283
} else {
284+
// Block until the frozen buffer is fully flushed
285+
if dl.frozen != nil {
286+
if err := dl.frozen.waitFlush(); err != nil {
287+
return nil, err
288+
}
289+
// Unset the frozen buffer if it exists, otherwise these "reverted"
290+
// states will still be accessible after revert in frozen buffer.
291+
dl.frozen = nil
292+
}
252293
batch := dl.db.diskdb.NewBatch()
253294
writeNodes(batch, nodes, dl.cleans)
254295
rawdb.WritePersistentStateID(batch, dl.id-1)
255296
if err := batch.Write(); err != nil {
256297
log.Crit("Failed to write states", "err", err)
257298
}
258299
}
259-
return newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.cleans, dl.buffer), nil
260-
}
261-
262-
// setBufferSize sets the node buffer size to the provided value.
263-
func (dl *diskLayer) setBufferSize(size int) error {
264-
dl.lock.RLock()
265-
defer dl.lock.RUnlock()
266-
267-
if dl.stale {
268-
return errSnapshotStale
269-
}
270-
return dl.buffer.setSize(size, dl.db.diskdb, dl.cleans, dl.id)
300+
return newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.cleans, dl.buffer, dl.frozen), nil
271301
}
272302

273303
// size returns the approximate size of cached nodes in the disk layer.

0 commit comments

Comments
 (0)