File tree Expand file tree Collapse file tree 3 files changed +46
-0
lines changed Expand file tree Collapse file tree 3 files changed +46
-0
lines changed Original file line number Diff line number Diff line change @@ -28,6 +28,14 @@ func (t *Tr) DeleteItems(values ...interface{}) ([]*Item, error) {
28
28
keys = append (keys , & Key {Value : item .Value , Payload : item .Payload })
29
29
}, values ... )
30
30
31
+ if err != nil {
32
+ return nil , err
33
+ }
34
+
35
+ // we need to sort the keys to ensure that a multidelete
36
+ // distributes deletes across a single node correctly
37
+ keys = keys .sort (t .config .Comparator )
38
+
31
39
err = t .delete (keys )
32
40
if err != nil {
33
41
return nil , err
Original file line number Diff line number Diff line change @@ -59,6 +59,32 @@ func (k Keys) toItems() items {
59
59
return items
60
60
}
61
61
62
+ func (k Keys ) sort (comparator Comparator ) Keys {
63
+ return (& keySortWrapper {comparator , k }).sort ()
64
+ }
65
+
66
+ type keySortWrapper struct {
67
+ comparator Comparator
68
+ keys Keys
69
+ }
70
+
71
+ func (sw * keySortWrapper ) Len () int {
72
+ return len (sw .keys )
73
+ }
74
+
75
+ func (sw * keySortWrapper ) Swap (i , j int ) {
76
+ sw .keys [i ], sw .keys [j ] = sw .keys [j ], sw .keys [i ]
77
+ }
78
+
79
+ func (sw * keySortWrapper ) Less (i , j int ) bool {
80
+ return sw .comparator (sw .keys [i ].Value , sw .keys [j ].Value ) < 0
81
+ }
82
+
83
+ func (sw * keySortWrapper ) sort () Keys {
84
+ sort .Sort (sw )
85
+ return sw .keys
86
+ }
87
+
62
88
func splitKeys (keys Keys , numParts int ) []Keys {
63
89
parts := make ([]Keys , numParts )
64
90
for i := int64 (0 ); i < int64 (numParts ); i ++ {
Original file line number Diff line number Diff line change @@ -587,6 +587,18 @@ func TestDeleteAfterSplitIncreasing(t *testing.T) {
587
587
}
588
588
}
589
589
590
+ func TestDeleteMultipleLevelsRandomlyBulk (t * testing.T ) {
591
+ num := 200
592
+ cfg := defaultConfig ()
593
+ rt := New (cfg )
594
+ mutable := rt .AsMutable ()
595
+ items := generateRandomItems (num )
596
+ mutable .AddItems (items ... )
597
+ mutable .DeleteItems (itemsToValues (items [:100 ]... )... )
598
+ result , _ := mutable .(* Tr ).toList (itemsToValues (items ... )... )
599
+ assert .Len (t , result , 100 )
600
+ }
601
+
590
602
func TestDeleteAfterSplitDecreasing (t * testing.T ) {
591
603
num := 11
592
604
cfg := defaultConfig ()
You can’t perform that action at this time.
0 commit comments