Skip to content

Commit ffebefa

Browse files
authored
Merge pull request weaviate#7801 from weaviate/rangeable_index_in_memory
Feature: rangeable index in memory
2 parents a2b7c4c + ee514e8 commit ffebefa

File tree

15 files changed

+351
-14
lines changed

15 files changed

+351
-14
lines changed

adapters/handlers/rest/configure_api.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,7 @@ func MakeAppState(ctx context.Context, options *swag.CommandLineOptionsGroup) *s
373373
SeparateObjectsCompactions: appState.ServerConfig.Config.Persistence.LSMSeparateObjectsCompactions,
374374
MaxSegmentSize: appState.ServerConfig.Config.Persistence.LSMMaxSegmentSize,
375375
CycleManagerRoutinesFactor: appState.ServerConfig.Config.Persistence.LSMCycleManagerRoutinesFactor,
376+
IndexRangeableInMemory: appState.ServerConfig.Config.Persistence.IndexRangeableInMemory,
376377
HNSWMaxLogSize: appState.ServerConfig.Config.Persistence.HNSWMaxLogSize,
377378
HNSWWaitForCachePrefill: appState.ServerConfig.Config.HNSWStartupWaitForVectorCache,
378379
HNSWFlatSearchConcurrency: appState.ServerConfig.Config.HNSWFlatSearchConcurrency,

adapters/repos/db/index.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -637,6 +637,7 @@ type IndexConfig struct {
637637
SegmentsCleanupIntervalSeconds int
638638
SeparateObjectsCompactions bool
639639
CycleManagerRoutinesFactor int
640+
IndexRangeableInMemory bool
640641
MaxSegmentSize int64
641642
HNSWMaxLogSize int64
642643
HNSWWaitForCachePrefill bool

adapters/repos/db/init.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ func (db *DB) init(ctx context.Context) error {
9393
SegmentsCleanupIntervalSeconds: db.config.SegmentsCleanupIntervalSeconds,
9494
SeparateObjectsCompactions: db.config.SeparateObjectsCompactions,
9595
CycleManagerRoutinesFactor: db.config.CycleManagerRoutinesFactor,
96+
IndexRangeableInMemory: db.config.IndexRangeableInMemory,
9697
MaxSegmentSize: db.config.MaxSegmentSize,
9798
HNSWMaxLogSize: db.config.HNSWMaxLogSize,
9899
HNSWWaitForCachePrefill: db.config.HNSWWaitForCachePrefill,

adapters/repos/db/inverted/searcher_doc_bitmap.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ func (s *Searcher) docBitmap(ctx context.Context, b *lsmkv.Bucket, limit int,
3030
pv *propValuePair,
3131
) (bm docBitmap, err error) {
3232
before := time.Now()
33+
strategy := "geo"
3334
defer func() {
3435
took := time.Since(before)
3536
vals := map[string]any{
@@ -39,6 +40,7 @@ func (s *Searcher) docBitmap(ctx context.Context, b *lsmkv.Bucket, limit int,
3940
"took_string": took.String(),
4041
"value": pv.value,
4142
"count": bm.count(),
43+
"strategy": strategy,
4244
}
4345

4446
helpers.AnnotateSlowQueryLogAppend(ctx, "build_allow_list_doc_bitmap", vals)
@@ -51,6 +53,7 @@ func (s *Searcher) docBitmap(ctx context.Context, b *lsmkv.Bucket, limit int,
5153
bm, err = s.docBitmapGeo(ctx, pv)
5254
return
5355
}
56+
strategy = b.Strategy()
5457

5558
// all other operators perform operations on the inverted index which we
5659
// can serve directly

adapters/repos/db/lsmkv/bucket.go

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,10 @@ type Bucket struct {
142142
// introduces latency of segment availability, for the tradeoff of
143143
// ensuring segment files have integrity before reading them.
144144
enableChecksumValidation bool
145+
146+
// keep segments in memory for more performant search
147+
// (currently used by roaringsetrange inverted indexes)
148+
keepSegmentsInMemory bool
145149
}
146150

147151
func NewBucketCreator() *Bucket { return &Bucket{} }
@@ -205,6 +209,7 @@ func (*Bucket) NewBucket(ctx context.Context, dir, rootDir string, logger logrus
205209
maxSegmentSize: b.maxSegmentSize,
206210
cleanupInterval: b.segmentsCleanupInterval,
207211
enableChecksumValidation: b.enableChecksumValidation,
212+
keepSegmentsInMemory: b.keepSegmentsInMemory,
208213
}, b.allocChecker)
209214
if err != nil {
210215
return nil, fmt.Errorf("init disk segments: %w", err)
@@ -1343,23 +1348,34 @@ func (b *Bucket) FlushAndSwitch() error {
13431348
return fmt.Errorf("precompute metadata: %w", err)
13441349
}
13451350

1351+
flushing := b.flushing
13461352
if err := b.atomicallyAddDiskSegmentAndRemoveFlushing(segment); err != nil {
13471353
return fmt.Errorf("add segment and remove flushing: %w", err)
13481354
}
13491355

1350-
if b.strategy == StrategyInverted && !tombstones.IsEmpty() {
1351-
if err = func() error {
1352-
b.disk.maintenanceLock.RLock()
1353-
defer b.disk.maintenanceLock.RUnlock()
1354-
// add flushing memtable tombstones to all segments
1355-
for _, seg := range b.disk.segments {
1356-
if _, err := seg.MergeTombstones(tombstones); err != nil {
1357-
return fmt.Errorf("merge tombstones: %w", err)
1356+
switch b.strategy {
1357+
case StrategyInverted:
1358+
if !tombstones.IsEmpty() {
1359+
if err = func() error {
1360+
b.disk.maintenanceLock.RLock()
1361+
defer b.disk.maintenanceLock.RUnlock()
1362+
// add flushing memtable tombstones to all segments
1363+
for _, seg := range b.disk.segments {
1364+
if _, err := seg.MergeTombstones(tombstones); err != nil {
1365+
return fmt.Errorf("merge tombstones: %w", err)
1366+
}
13581367
}
1368+
return nil
1369+
}(); err != nil {
1370+
return fmt.Errorf("add tombstones: %w", err)
1371+
}
1372+
}
1373+
1374+
case StrategyRoaringSetRange:
1375+
if b.keepSegmentsInMemory {
1376+
if err := b.disk.roaringSetRangeSegmentInMemory.MergeMemtable(flushing.roaringSetRange); err != nil {
1377+
return fmt.Errorf("merge roaringsetrange memtable to segment-in-memory: %w", err)
13591378
}
1360-
return nil
1361-
}(); err != nil {
1362-
return fmt.Errorf("add tombstones: %w", err)
13631379
}
13641380
}
13651381

adapters/repos/db/lsmkv/bucket_options.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,3 +185,10 @@ func WithForceCompaction(opt bool) BucketOption {
185185
return nil
186186
}
187187
}
188+
189+
func WithKeepSegmentsInMemory(keep bool) BucketOption {
190+
return func(b *Bucket) error {
191+
b.keepSegmentsInMemory = keep
192+
return nil
193+
}
194+
}

adapters/repos/db/lsmkv/bucket_roaring_set_range.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,14 @@ func (b *Bucket) ReaderRoaringSetRange() ReaderRoaringSetRange {
5252

5353
b.flushLock.RLock()
5454

55-
readers, releaseSegmentGroup := b.disk.newRoaringSetRangeReaders()
55+
var release func()
56+
var readers []roaringsetrange.InnerReader
57+
if b.keepSegmentsInMemory {
58+
reader, releaseInt := b.disk.roaringSetRangeSegmentInMemory.Reader()
59+
readers, release = []roaringsetrange.InnerReader{reader}, releaseInt
60+
} else {
61+
readers, release = b.disk.newRoaringSetRangeReaders()
62+
}
5663

5764
// we have a flush-RLock, so we have the guarantee that the flushing state
5865
// will not change for the lifetime of the cursor, thus there can only be two
@@ -63,7 +70,7 @@ func (b *Bucket) ReaderRoaringSetRange() ReaderRoaringSetRange {
6370
readers = append(readers, b.active.newRoaringSetRangeReader())
6471

6572
return roaringsetrange.NewCombinedReader(readers, func() {
66-
releaseSegmentGroup()
73+
release()
6774
b.flushLock.RUnlock()
6875
}, concurrency.SROAR_MERGE, b.logger)
6976
}

adapters/repos/db/lsmkv/segment_group.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/sirupsen/logrus"
2626
"github.com/weaviate/weaviate/adapters/repos/db/lsmkv/segmentindex"
2727
"github.com/weaviate/weaviate/adapters/repos/db/roaringset"
28+
"github.com/weaviate/weaviate/adapters/repos/db/roaringsetrange"
2829
"github.com/weaviate/weaviate/entities/cyclemanager"
2930
"github.com/weaviate/weaviate/entities/diskio"
3031
"github.com/weaviate/weaviate/entities/lsmkv"
@@ -83,6 +84,8 @@ type SegmentGroup struct {
8384
cleanupInterval time.Duration
8485
lastCleanupCall time.Time
8586
lastCompactionCall time.Time
87+
88+
roaringSetRangeSegmentInMemory *roaringsetrange.SegmentInMemory
8689
}
8790

8891
type sgConfig struct {
@@ -98,6 +101,7 @@ type sgConfig struct {
98101
maxSegmentSize int64
99102
cleanupInterval time.Duration
100103
enableChecksumValidation bool
104+
keepSegmentsInMemory bool
101105
}
102106

103107
func newSegmentGroup(logger logrus.FieldLogger, metrics *Metrics,
@@ -359,7 +363,8 @@ func newSegmentGroup(logger logrus.FieldLogger, metrics *Metrics,
359363
sg.strategy = StrategyMapCollection
360364
}
361365

362-
if sg.strategy == StrategyInverted {
366+
switch sg.strategy {
367+
case StrategyInverted:
363368
// start with last but one segment, as the last one doesn't need tombstones for now
364369
for i := len(sg.segments) - 2; i >= 0; i-- {
365370
// avoid crashing if segment has no tombstones
@@ -371,6 +376,17 @@ func newSegmentGroup(logger logrus.FieldLogger, metrics *Metrics,
371376
return nil, fmt.Errorf("init segment %s: merge tombstones %w", sg.segments[i].path, err)
372377
}
373378
}
379+
380+
case StrategyRoaringSetRange:
381+
if cfg.keepSegmentsInMemory {
382+
sg.roaringSetRangeSegmentInMemory = roaringsetrange.NewSegmentInMemory()
383+
for _, seg := range sg.segments {
384+
cursor := seg.newRoaringSetRangeCursor()
385+
if err := sg.roaringSetRangeSegmentInMemory.MergeSegmentByCursor(cursor); err != nil {
386+
return nil, fmt.Errorf("build segment-in-memory of strategy '%s': %w", sg.strategy, err)
387+
}
388+
}
389+
}
374390
}
375391

376392
return sg, nil

adapters/repos/db/migrator.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ func (m *Migrator) AddClass(ctx context.Context, class *models.Class,
119119
SegmentsCleanupIntervalSeconds: m.db.config.SegmentsCleanupIntervalSeconds,
120120
SeparateObjectsCompactions: m.db.config.SeparateObjectsCompactions,
121121
CycleManagerRoutinesFactor: m.db.config.CycleManagerRoutinesFactor,
122+
IndexRangeableInMemory: m.db.config.IndexRangeableInMemory,
122123
MaxSegmentSize: m.db.config.MaxSegmentSize,
123124
HNSWMaxLogSize: m.db.config.HNSWMaxLogSize,
124125
HNSWWaitForCachePrefill: m.db.config.HNSWWaitForCachePrefill,

adapters/repos/db/repo.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ type Config struct {
219219
Replication replication.GlobalConfig
220220
MaximumConcurrentShardLoads int
221221
CycleManagerRoutinesFactor int
222+
IndexRangeableInMemory bool
222223
}
223224

224225
// GetIndex returns the index if it exists or nil if it doesn't

0 commit comments

Comments
 (0)