Skip to content

Commit e43d131

Browse files
committed
rework how refcounted wantlists work
License: MIT Signed-off-by: Jeromy <[email protected]>
1 parent bda8c3a commit e43d131

File tree

7 files changed

+211
-52
lines changed

7 files changed

+211
-52
lines changed

core/commands/bitswap.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,11 @@ var unwantCmd = &cmds.Command{
6464
ks = append(ks, c)
6565
}
6666

67-
bs.CancelWants(ks)
67+
// TODO: This should maybe find *all* sessions for this request and cancel them?
68+
// (why): in reality, i think this command should be removed. Its
69+
// messing with the internal state of bitswap. You should cancel wants
70+
// by killing the command that caused the want.
71+
bs.CancelWants(ks, 0)
6872
},
6973
}
7074

exchange/bitswap/bitswap.go

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,9 @@ type Bitswap struct {
169169
// Sessions
170170
sessions []*Session
171171
sessLk sync.Mutex
172+
173+
sessID uint64
174+
sessIDLk sync.Mutex
172175
}
173176

174177
type blockRequest struct {
@@ -219,7 +222,9 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block
219222
log.Event(ctx, "Bitswap.GetBlockRequest.Start", k)
220223
}
221224

222-
bs.wm.WantBlocks(ctx, keys, nil)
225+
mses := bs.getNextSessionID()
226+
227+
bs.wm.WantBlocks(ctx, keys, nil, mses)
223228

224229
// NB: Optimization. Assumes that providers of key[0] are likely to
225230
// be able to provide for all keys. This currently holds true in most
@@ -241,7 +246,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block
241246
defer close(out)
242247
defer func() {
243248
// can't just defer this call on its own, arguments are resolved *when* the defer is created
244-
bs.CancelWants(remaining.Keys())
249+
bs.CancelWants(remaining.Keys(), mses)
245250
}()
246251
for {
247252
select {
@@ -250,6 +255,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block
250255
return
251256
}
252257

258+
bs.CancelWants([]*cid.Cid{blk.Cid()}, mses)
253259
remaining.Remove(blk.Cid())
254260
select {
255261
case out <- blk:
@@ -270,9 +276,16 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block
270276
}
271277
}
272278

279+
func (bs *Bitswap) getNextSessionID() uint64 {
280+
bs.sessIDLk.Lock()
281+
defer bs.sessIDLk.Unlock()
282+
bs.sessID++
283+
return bs.sessID
284+
}
285+
273286
// CancelWant removes a given key from the wantlist
274-
func (bs *Bitswap) CancelWants(cids []*cid.Cid) {
275-
bs.wm.CancelWants(context.Background(), cids, nil)
287+
func (bs *Bitswap) CancelWants(cids []*cid.Cid, ses uint64) {
288+
bs.wm.CancelWants(context.Background(), cids, nil, ses)
276289
}
277290

278291
// HasBlock announces the existance of a block to this bitswap service. The
@@ -314,7 +327,7 @@ func (bs *Bitswap) SessionsForBlock(c *cid.Cid) []*Session {
314327

315328
var out []*Session
316329
for _, s := range bs.sessions {
317-
if s.InterestedIn(c) {
330+
if s.interestedIn(c) {
318331
out = append(out, s)
319332
}
320333
}
@@ -346,8 +359,6 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
346359
keys = append(keys, block.Cid())
347360
}
348361

349-
bs.wm.CancelWants(context.Background(), keys, nil)
350-
351362
wg := sync.WaitGroup{}
352363
for _, block := range iblocks {
353364
wg.Add(1)
@@ -360,7 +371,8 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
360371
log.Event(ctx, "Bitswap.GetBlockRequest.End", k)
361372

362373
for _, ses := range bs.SessionsForBlock(k) {
363-
ses.ReceiveBlock(p, b)
374+
ses.receiveBlockFrom(p, b)
375+
bs.CancelWants([]*cid.Cid{k}, ses.id)
364376
}
365377
log.Debugf("got block %s from %s", b, p)
366378
if err := bs.HasBlock(b); err != nil {

exchange/bitswap/bitswap_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,11 @@ func TestBasicBitswap(t *testing.T) {
332332
t.Fatal(err)
333333
}
334334

335+
time.Sleep(time.Millisecond * 20)
336+
if len(instances[1].Exchange.GetWantlist()) != 0 {
337+
t.Fatal("shouldnt have anything in wantlist")
338+
}
339+
335340
st0, err := instances[0].Exchange.Stat()
336341
if err != nil {
337342
t.Fatal(err)

exchange/bitswap/session.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ import (
1616

1717
const activeWantsLimit = 16
1818

19+
// Session holds state for an individual bitswap transfer operation.
20+
// This allows bitswap to make smarter decisions about who to send wantlist
21+
// info to, and who to request blocks from
1922
type Session struct {
2023
ctx context.Context
2124
tofetch []*cid.Cid
@@ -40,8 +43,12 @@ type Session struct {
4043
notif notifications.PubSub
4144

4245
uuid logging.Loggable
46+
47+
id uint64
4348
}
4449

50+
// NewSession creates a new bitswap session whose lifetime is bounded by the
51+
// given context
4552
func (bs *Bitswap) NewSession(ctx context.Context) *Session {
4653
s := &Session{
4754
activePeers: make(map[peer.ID]struct{}),
@@ -54,6 +61,7 @@ func (bs *Bitswap) NewSession(ctx context.Context) *Session {
5461
notif: notifications.New(),
5562
uuid: loggables.Uuid("GetBlockRequest"),
5663
baseTickDelay: time.Millisecond * 500,
64+
id: bs.getNextSessionID(),
5765
}
5866

5967
cache, _ := lru.New(2048)
@@ -73,11 +81,11 @@ type blkRecv struct {
7381
blk blocks.Block
7482
}
7583

76-
func (s *Session) ReceiveBlock(from peer.ID, blk blocks.Block) {
84+
func (s *Session) receiveBlockFrom(from peer.ID, blk blocks.Block) {
7785
s.incoming <- blkRecv{from: from, blk: blk}
7886
}
7987

80-
func (s *Session) InterestedIn(c *cid.Cid) bool {
88+
func (s *Session) interestedIn(c *cid.Cid) bool {
8189
return s.interest.Contains(c.KeyString())
8290
}
8391

@@ -134,14 +142,14 @@ func (s *Session) run(ctx context.Context) {
134142

135143
case <-s.tick.C:
136144
var live []*cid.Cid
137-
for c, _ := range s.liveWants {
145+
for c := range s.liveWants {
138146
cs, _ := cid.Cast([]byte(c))
139147
live = append(live, cs)
140148
s.liveWants[c] = time.Now()
141149
}
142150

143151
// Broadcast these keys to everyone we're connected to
144-
s.bs.wm.WantBlocks(ctx, live, nil)
152+
s.bs.wm.WantBlocks(ctx, live, nil, s.id)
145153

146154
if len(live) > 0 {
147155
go func() {
@@ -181,7 +189,7 @@ func (s *Session) wantBlocks(ctx context.Context, ks []*cid.Cid) {
181189
for _, c := range ks {
182190
s.liveWants[c.KeyString()] = time.Now()
183191
}
184-
s.bs.wm.WantBlocks(ctx, ks, s.activePeersArr)
192+
s.bs.wm.WantBlocks(ctx, ks, s.activePeersArr, s.id)
185193
}
186194

187195
func (s *Session) cancel(keys []*cid.Cid) {
@@ -211,11 +219,15 @@ func (s *Session) fetch(ctx context.Context, keys []*cid.Cid) {
211219
}
212220
}
213221

222+
// GetBlocks fetches a set of blocks within the context of this session and
223+
// returns a channel that found blocks will be returned on. No order is
224+
// guaranteed on the returned blocks.
214225
func (s *Session) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan blocks.Block, error) {
215226
ctx = logging.ContextWithLoggable(ctx, s.uuid)
216227
return getBlocksImpl(ctx, keys, s.notif, s.fetch, s.cancelWants)
217228
}
218229

230+
// GetBlock fetches a single block
219231
func (s *Session) GetBlock(parent context.Context, k *cid.Cid) (blocks.Block, error) {
220232
return getBlock(parent, k, s.GetBlocks)
221233
}

exchange/bitswap/wantlist/wantlist.go

Lines changed: 67 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ import (
1010
)
1111

1212
type ThreadSafe struct {
13-
lk sync.RWMutex
14-
Wantlist Wantlist
13+
lk sync.RWMutex
14+
set map[string]*Entry
1515
}
1616

1717
// not threadsafe
@@ -23,7 +23,16 @@ type Entry struct {
2323
Cid *cid.Cid
2424
Priority int
2525

26-
RefCnt int
26+
SesTrk map[uint64]struct{}
27+
}
28+
29+
// NewRefEntry creates a new reference tracked wantlist entry
30+
func NewRefEntry(c *cid.Cid, p int) *Entry {
31+
return &Entry{
32+
Cid: c,
33+
Priority: p,
34+
SesTrk: make(map[uint64]struct{}),
35+
}
2736
}
2837

2938
type entrySlice []*Entry
@@ -34,7 +43,7 @@ func (es entrySlice) Less(i, j int) bool { return es[i].Priority > es[j].Priorit
3443

3544
func NewThreadSafe() *ThreadSafe {
3645
return &ThreadSafe{
37-
Wantlist: *New(),
46+
set: make(map[string]*Entry),
3847
}
3948
}
4049

@@ -44,46 +53,86 @@ func New() *Wantlist {
4453
}
4554
}
4655

47-
func (w *ThreadSafe) Add(k *cid.Cid, priority int) bool {
56+
func (w *ThreadSafe) Add(c *cid.Cid, priority int, ses uint64) bool {
4857
w.lk.Lock()
4958
defer w.lk.Unlock()
50-
return w.Wantlist.Add(k, priority)
59+
k := c.KeyString()
60+
if e, ok := w.set[k]; ok {
61+
e.SesTrk[ses] = struct{}{}
62+
return false
63+
}
64+
65+
w.set[k] = &Entry{
66+
Cid: c,
67+
Priority: priority,
68+
SesTrk: map[uint64]struct{}{ses: struct{}{}},
69+
}
70+
71+
return true
5172
}
5273

53-
func (w *ThreadSafe) AddEntry(e *Entry) bool {
74+
func (w *ThreadSafe) AddEntry(e *Entry, ses uint64) bool {
5475
w.lk.Lock()
5576
defer w.lk.Unlock()
56-
return w.Wantlist.AddEntry(e)
77+
k := e.Cid.KeyString()
78+
if ex, ok := w.set[k]; ok {
79+
ex.SesTrk[ses] = struct{}{}
80+
return false
81+
}
82+
w.set[k] = e
83+
e.SesTrk[ses] = struct{}{}
84+
return true
5785
}
5886

59-
func (w *ThreadSafe) Remove(k *cid.Cid) bool {
87+
func (w *ThreadSafe) Remove(c *cid.Cid, ses uint64) bool {
6088
w.lk.Lock()
6189
defer w.lk.Unlock()
62-
return w.Wantlist.Remove(k)
90+
k := c.KeyString()
91+
e, ok := w.set[k]
92+
if !ok {
93+
return false
94+
}
95+
96+
delete(e.SesTrk, ses)
97+
if len(e.SesTrk) == 0 {
98+
delete(w.set, k)
99+
return true
100+
}
101+
return false
63102
}
64103

65104
func (w *ThreadSafe) Contains(k *cid.Cid) (*Entry, bool) {
66105
w.lk.RLock()
67106
defer w.lk.RUnlock()
68-
return w.Wantlist.Contains(k)
107+
e, ok := w.set[k.KeyString()]
108+
return e, ok
69109
}
70110

71111
func (w *ThreadSafe) Entries() []*Entry {
72112
w.lk.RLock()
73113
defer w.lk.RUnlock()
74-
return w.Wantlist.Entries()
114+
var es entrySlice
115+
for _, e := range w.set {
116+
es = append(es, e)
117+
}
118+
return es
75119
}
76120

77121
func (w *ThreadSafe) SortedEntries() []*Entry {
78122
w.lk.RLock()
79123
defer w.lk.RUnlock()
80-
return w.Wantlist.SortedEntries()
124+
var es entrySlice
125+
for _, e := range w.set {
126+
es = append(es, e)
127+
}
128+
sort.Sort(es)
129+
return es
81130
}
82131

83132
func (w *ThreadSafe) Len() int {
84133
w.lk.RLock()
85134
defer w.lk.RUnlock()
86-
return w.Wantlist.Len()
135+
return len(w.set)
87136
}
88137

89138
func (w *Wantlist) Len() int {
@@ -92,24 +141,21 @@ func (w *Wantlist) Len() int {
92141

93142
func (w *Wantlist) Add(c *cid.Cid, priority int) bool {
94143
k := c.KeyString()
95-
if e, ok := w.set[k]; ok {
96-
e.RefCnt++
144+
if _, ok := w.set[k]; ok {
97145
return false
98146
}
99147

100148
w.set[k] = &Entry{
101149
Cid: c,
102150
Priority: priority,
103-
RefCnt: 1,
104151
}
105152

106153
return true
107154
}
108155

109156
func (w *Wantlist) AddEntry(e *Entry) bool {
110157
k := e.Cid.KeyString()
111-
if ex, ok := w.set[k]; ok {
112-
ex.RefCnt++
158+
if _, ok := w.set[k]; ok {
113159
return false
114160
}
115161
w.set[k] = e
@@ -118,16 +164,12 @@ func (w *Wantlist) AddEntry(e *Entry) bool {
118164

119165
func (w *Wantlist) Remove(c *cid.Cid) bool {
120166
k := c.KeyString()
121-
e, ok := w.set[k]
167+
_, ok := w.set[k]
122168
if !ok {
123169
return false
124170
}
125171

126-
e.RefCnt--
127-
if e.RefCnt <= 0 {
128-
delete(w.set, k)
129-
return true
130-
}
172+
delete(w.set, k)
131173
return false
132174
}
133175

0 commit comments

Comments
 (0)