Skip to content

Commit 7d5c499

Browse files
envestccCoderZhi
andauthored
cow cache for staking view (#4655)
* cow cache for staking view * cow v1 staking view * add mutex for v1 staking view * add mutex for v2 staking view * address comment --------- Co-authored-by: CoderZhi <[email protected]>
1 parent e530ecc commit 7d5c499

File tree

12 files changed

+249
-29
lines changed

12 files changed

+249
-29
lines changed

action/protocol/staking/bucket_pool_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ func TestBucketPool(t *testing.T) {
7979
}
8080

8181
view, _, err := CreateBaseView(sm, false)
82+
view.contractsStake = &contractStakeView{}
8283
r.NoError(err)
8384
r.NoError(sm.WriteView(_protocolID, view))
8485
pool = view.bucketPool

action/protocol/staking/candidate_statereader.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,8 +176,9 @@ func CreateBaseView(sr protocol.StateReader, enableSMStorage bool) (*ViewData, u
176176
}
177177

178178
return &ViewData{
179-
candCenter: center,
180-
bucketPool: pool,
179+
candCenter: center,
180+
bucketPool: pool,
181+
contractsStake: &contractStakeView{},
181182
}, height, nil
182183
}
183184

action/protocol/staking/viewdata.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@ import (
1010
"math/big"
1111

1212
"github.com/iotexproject/iotex-address/address"
13+
"github.com/pkg/errors"
14+
1315
"github.com/iotexproject/iotex-core/v2/action"
1416
"github.com/iotexproject/iotex-core/v2/action/protocol"
15-
"github.com/pkg/errors"
1617
)
1718

1819
type (
@@ -21,6 +22,7 @@ type (
2122
Clone() ContractStakeView
2223
CreatePreStates(ctx context.Context) error
2324
Handle(ctx context.Context, receipt *action.Receipt) error
25+
Commit()
2426
BucketsByCandidate(ownerAddr address.Address) ([]*VoteBucket, error)
2527
}
2628
// ViewData is the data that need to be stored in protocol's view
@@ -79,6 +81,7 @@ func (v *ViewData) Commit(ctx context.Context, sr protocol.StateReader) error {
7981
if err := v.bucketPool.Commit(sr); err != nil {
8082
return err
8183
}
84+
v.contractsStake.Commit()
8285
v.snapshots = []Snapshot{}
8386

8487
return nil
@@ -172,3 +175,15 @@ func (csv *contractStakeView) Handle(ctx context.Context, receipt *action.Receip
172175
}
173176
return nil
174177
}
178+
179+
func (csv *contractStakeView) Commit() {
180+
if csv.v1 != nil {
181+
csv.v1.Commit()
182+
}
183+
if csv.v2 != nil {
184+
csv.v2.Commit()
185+
}
186+
if csv.v3 != nil {
187+
csv.v3.Commit()
188+
}
189+
}

action/protocol/staking/viewdata_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@ import (
66
"testing"
77

88
"github.com/golang/mock/gomock"
9+
"github.com/stretchr/testify/require"
10+
911
"github.com/iotexproject/iotex-core/v2/test/identityset"
1012
"github.com/iotexproject/iotex-core/v2/test/mock/mock_chainmanager"
11-
"github.com/stretchr/testify/require"
1213
)
1314

1415
func TestViewData_Clone(t *testing.T) {
@@ -61,9 +62,10 @@ func prepareViewData(t *testing.T) (*ViewData, int) {
6162
},
6263
}
6364
viewData := &ViewData{
64-
candCenter: candCenter,
65-
bucketPool: bucketPool,
66-
snapshots: []Snapshot{},
65+
candCenter: candCenter,
66+
bucketPool: bucketPool,
67+
snapshots: []Snapshot{},
68+
contractsStake: &contractStakeView{},
6769
}
6870
return viewData, viewData.Snapshot()
6971
}

blockindex/contractstaking/indexer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func (s *Indexer) StartView(ctx context.Context) (staking.ContractStakeView, err
8787
}
8888
return &stakeView{
8989
helper: s,
90-
cache: s.cache.Clone(),
90+
clean: s.cache.Clone(),
9191
height: s.cache.Height(),
9292
}, nil
9393
}

blockindex/contractstaking/stakeview.go

Lines changed: 50 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,59 +2,96 @@ package contractstaking
22

33
import (
44
"context"
5+
"sync"
56

67
"github.com/iotexproject/iotex-address/address"
8+
"github.com/iotexproject/iotex-proto/golang/iotextypes"
9+
710
"github.com/iotexproject/iotex-core/v2/action"
811
"github.com/iotexproject/iotex-core/v2/action/protocol"
912
"github.com/iotexproject/iotex-core/v2/action/protocol/staking"
10-
"github.com/iotexproject/iotex-proto/golang/iotextypes"
1113
)
1214

1315
type stakeView struct {
1416
helper *Indexer
15-
cache *contractStakingCache
17+
clean *contractStakingCache
18+
dirty *contractStakingCache
1619
height uint64
20+
mu sync.RWMutex
1721
}
1822

1923
func (s *stakeView) Clone() staking.ContractStakeView {
20-
return &stakeView{
24+
s.mu.Lock()
25+
defer s.mu.Unlock()
26+
clone := &stakeView{
2127
helper: s.helper,
22-
cache: s.cache.Clone(),
28+
clean: s.clean,
29+
dirty: nil,
2330
height: s.height,
2431
}
32+
if s.dirty != nil {
33+
clone.clean = s.dirty.Clone()
34+
}
35+
return clone
2536
}
2637

2738
func (s *stakeView) BucketsByCandidate(candidate address.Address) ([]*Bucket, error) {
28-
return s.cache.bucketsByCandidate(candidate, s.height)
39+
s.mu.RLock()
40+
defer s.mu.RUnlock()
41+
if s.dirty != nil {
42+
return s.dirty.bucketsByCandidate(candidate, s.height)
43+
}
44+
return s.clean.bucketsByCandidate(candidate, s.height)
2945
}
3046

3147
func (s *stakeView) CreatePreStates(ctx context.Context) error {
48+
s.mu.Lock()
49+
defer s.mu.Unlock()
3250
blkCtx := protocol.MustGetBlockCtx(ctx)
3351
s.height = blkCtx.BlockHeight
3452
return nil
3553
}
3654

3755
func (s *stakeView) Handle(ctx context.Context, receipt *action.Receipt) error {
38-
blkCtx := protocol.MustGetBlockCtx(ctx)
39-
// new event handler for this receipt
40-
handler := newContractStakingEventHandler(s.cache)
41-
42-
// handle events of receipt
4356
if receipt.Status != uint64(iotextypes.ReceiptStatus_Success) {
4457
return nil
4558
}
59+
var (
60+
blkCtx = protocol.MustGetBlockCtx(ctx)
61+
handler *contractStakingEventHandler
62+
)
4663
for _, log := range receipt.Logs() {
4764
if log.Address != s.helper.config.ContractAddress {
4865
continue
4966
}
67+
if handler == nil {
68+
s.mu.Lock()
69+
// new event handler for this receipt
70+
if s.dirty == nil {
71+
s.dirty = s.clean.Clone()
72+
}
73+
handler = newContractStakingEventHandler(s.dirty)
74+
s.mu.Unlock()
75+
}
5076
if err := handler.HandleEvent(ctx, blkCtx.BlockHeight, log); err != nil {
5177
return err
5278
}
5379
}
80+
if handler == nil {
81+
return nil
82+
}
5483
_, delta := handler.Result()
5584
// update cache
56-
if err := s.cache.Merge(delta, blkCtx.BlockHeight); err != nil {
57-
return err
85+
s.mu.Lock()
86+
defer s.mu.Unlock()
87+
return s.dirty.Merge(delta, blkCtx.BlockHeight)
88+
}
89+
90+
func (s *stakeView) Commit() {
91+
s.mu.Lock()
92+
defer s.mu.Unlock()
93+
if s.dirty != nil {
94+
s.clean = s.dirty
95+
s.dirty = nil
5896
}
59-
return nil
6097
}

systemcontractindex/stakingindex/cache.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func (s *cache) Load(kvstore db.KVStore) error {
5656
return nil
5757
}
5858

59-
func (s *cache) Copy() *cache {
59+
func (s *cache) Copy() bucketCache {
6060
c := newCache(s.ns, s.bucketNS)
6161
for k, v := range s.buckets {
6262
c.buckets[k] = v.Clone()
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package stakingindex
2+
3+
import (
4+
"errors"
5+
"sync"
6+
7+
"github.com/iotexproject/iotex-address/address"
8+
9+
"github.com/iotexproject/iotex-core/v2/db"
10+
)
11+
12+
type (
13+
bucketCache interface {
14+
Load(kvstore db.KVStore) error
15+
Copy() bucketCache
16+
PutBucket(id uint64, bkt *Bucket)
17+
DeleteBucket(id uint64)
18+
BucketIdxs() []uint64
19+
Bucket(id uint64) *Bucket
20+
Buckets(indices []uint64) []*Bucket
21+
BucketIdsByCandidate(candidate address.Address) []uint64
22+
TotalBucketCount() uint64
23+
}
24+
25+
cowCache struct {
26+
cache bucketCache
27+
dirty bool
28+
mu sync.Mutex
29+
}
30+
)
31+
32+
func newCowCache(cache bucketCache) *cowCache {
33+
return &cowCache{
34+
cache: cache,
35+
dirty: false,
36+
}
37+
}
38+
39+
func (cow *cowCache) Copy() bucketCache {
40+
cow.mu.Lock()
41+
defer cow.mu.Unlock()
42+
if cow.dirty {
43+
cow.dirty = false
44+
}
45+
return &cowCache{
46+
cache: cow.cache,
47+
dirty: false,
48+
}
49+
}
50+
51+
func (cow *cowCache) Load(kvstore db.KVStore) error {
52+
return errors.New("not supported in cowCache")
53+
}
54+
55+
func (cow *cowCache) BucketIdsByCandidate(candidate address.Address) []uint64 {
56+
cow.mu.Lock()
57+
defer cow.mu.Unlock()
58+
return cow.cache.BucketIdsByCandidate(candidate)
59+
}
60+
61+
func (cow *cowCache) PutBucket(id uint64, bkt *Bucket) {
62+
cow.mu.Lock()
63+
defer cow.mu.Unlock()
64+
cow.ensureCopied()
65+
cow.cache.PutBucket(id, bkt)
66+
}
67+
68+
func (cow *cowCache) DeleteBucket(id uint64) {
69+
cow.mu.Lock()
70+
defer cow.mu.Unlock()
71+
cow.ensureCopied()
72+
cow.cache.DeleteBucket(id)
73+
}
74+
75+
func (cow *cowCache) BucketIdxs() []uint64 {
76+
cow.mu.Lock()
77+
defer cow.mu.Unlock()
78+
return cow.cache.BucketIdxs()
79+
}
80+
81+
func (cow *cowCache) Bucket(id uint64) *Bucket {
82+
cow.mu.Lock()
83+
defer cow.mu.Unlock()
84+
return cow.cache.Bucket(id)
85+
}
86+
87+
func (cow *cowCache) Buckets(indices []uint64) []*Bucket {
88+
cow.mu.Lock()
89+
defer cow.mu.Unlock()
90+
return cow.cache.Buckets(indices)
91+
}
92+
93+
func (cow *cowCache) TotalBucketCount() uint64 {
94+
cow.mu.Lock()
95+
defer cow.mu.Unlock()
96+
return cow.cache.TotalBucketCount()
97+
}
98+
99+
func (cow *cowCache) ensureCopied() {
100+
if !cow.dirty {
101+
cow.cache = cow.cache.Copy()
102+
cow.dirty = true
103+
}
104+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package stakingindex
2+
3+
import (
4+
"math/big"
5+
"testing"
6+
7+
"github.com/stretchr/testify/require"
8+
9+
"github.com/iotexproject/iotex-core/v2/test/identityset"
10+
)
11+
12+
func TestCowCache(t *testing.T) {
13+
r := require.New(t)
14+
buckets := []*Bucket{
15+
{Candidate: identityset.Address(1), Owner: identityset.Address(2), StakedAmount: big.NewInt(1000), Timestamped: true, StakedDuration: 3600, CreatedAt: 1622548800, UnlockedAt: 1622552400, UnstakedAt: 1622556000, Muted: false},
16+
{Candidate: identityset.Address(3), Owner: identityset.Address(4), StakedAmount: big.NewInt(2000), Timestamped: false, StakedDuration: 7200, CreatedAt: 1622548801, UnlockedAt: 1622552401, UnstakedAt: 1622556001, Muted: true},
17+
{Candidate: identityset.Address(5), Owner: identityset.Address(6), StakedAmount: big.NewInt(3000), Timestamped: true, StakedDuration: 10800, CreatedAt: 1622548802, UnlockedAt: 1622552402, UnstakedAt: 1622556002, Muted: false},
18+
{Candidate: identityset.Address(7), Owner: identityset.Address(8), StakedAmount: big.NewInt(4000), Timestamped: true, StakedDuration: 10800, CreatedAt: 1622548802, UnlockedAt: 1622552402, UnstakedAt: 1622556002, Muted: false},
19+
}
20+
original := newCache("testNS", "testBucketNS")
21+
original.PutBucket(0, buckets[0])
22+
// case 1: read cowCache without modification
23+
cow := newCowCache(original)
24+
r.Equal(buckets[0], cow.Bucket(0))
25+
26+
// case 2: modify cowCache but not affect original cache
27+
cow.PutBucket(1, buckets[1])
28+
r.Equal(buckets[1], cow.Bucket(1))
29+
r.Nil(original.Bucket(1))
30+
cow.DeleteBucket(0)
31+
r.Nil(cow.Bucket(0))
32+
r.Equal(buckets[0], original.Bucket(0))
33+
34+
// case 3: not real copy before modification
35+
copi := cow.Copy()
36+
r.Equal(buckets[1], copi.Bucket(1))
37+
r.Equal(cow.cache, copi.(*cowCache).cache)
38+
39+
// case 4: copied not affected by original modification
40+
cow.PutBucket(2, buckets[2])
41+
r.Equal(buckets[2], cow.Bucket(2))
42+
r.Nil(copi.Bucket(2))
43+
44+
// case 5: original not affected by copied modification
45+
copi.PutBucket(3, buckets[3])
46+
r.Equal(buckets[3], copi.Bucket(3))
47+
r.Nil(cow.Bucket(3))
48+
}

systemcontractindex/stakingindex/event_handler.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ var (
3232

3333
type eventHandler struct {
3434
stakingBucketNS string
35-
dirty *cache // dirty cache, a view for current block
35+
dirty bucketCache // dirty cache, a view for current block
3636
delta batch.KVStoreBatch // delta for db to store buckets of current block
3737
tokenOwner map[uint64]address.Address
3838
// context for event handler
@@ -49,7 +49,7 @@ func init() {
4949
}
5050
}
5151

52-
func newEventHandler(bucketNS string, dirty *cache, blkCtx protocol.BlockCtx, timestamped, muted bool) *eventHandler {
52+
func newEventHandler(bucketNS string, dirty bucketCache, blkCtx protocol.BlockCtx, timestamped, muted bool) *eventHandler {
5353
return &eventHandler{
5454
stakingBucketNS: bucketNS,
5555
dirty: dirty,
@@ -284,7 +284,7 @@ func (eh *eventHandler) HandleDonatedEvent(event *abiutil.EventParam) error {
284284
return nil
285285
}
286286

287-
func (eh *eventHandler) Finalize() (batch.KVStoreBatch, *cache) {
287+
func (eh *eventHandler) Finalize() (batch.KVStoreBatch, bucketCache) {
288288
delta, dirty := eh.delta, eh.dirty
289289
eh.delta, eh.dirty = nil, nil
290290
return delta, dirty

0 commit comments

Comments
 (0)