Skip to content

Commit 52e92ad

Browse files
committed
Add initialization to wal stripeSeries to prune inactive series
1 parent 01f6937 commit 52e92ad

File tree

3 files changed

+113
-24
lines changed

3 files changed

+113
-24
lines changed

internal/static/metrics/wal/series.go

Lines changed: 87 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/prometheus/prometheus/model/exemplar"
1111
"github.com/prometheus/prometheus/model/labels"
1212
"github.com/prometheus/prometheus/tsdb/chunks"
13+
"go.uber.org/atomic"
1314
)
1415

1516
// memSeries is a chunkless version of tsdb.memSeries.
@@ -80,11 +81,12 @@ func (m seriesHashmap) Delete(hash uint64, ref chunks.HeadSeriesRef) {
8081
// Filling the padded space with the maps was profiled to be slower -
8182
// likely due to the additional pointer dereferences.
8283
type stripeSeries struct {
83-
size int
84-
series []map[chunks.HeadSeriesRef]*memSeries
85-
hashes []seriesHashmap
86-
exemplars []map[chunks.HeadSeriesRef]*exemplar.Exemplar
87-
locks []stripeLock
84+
size int
85+
series []map[chunks.HeadSeriesRef]*memSeries
86+
hashes []seriesHashmap
87+
exemplars []map[chunks.HeadSeriesRef]*exemplar.Exemplar
88+
locks []stripeLock
89+
initialized *atomic.Bool
8890

8991
gcMut sync.Mutex
9092
}
@@ -95,13 +97,18 @@ type stripeLock struct {
9597
_ [40]byte
9698
}
9799

100+
// newStripeSeries creates a new stripeSeries with the given stripe size in an uninitialized state.
101+
// When in an uninitialized state, reads and writes are not lock protected. After loading any
102+
// initial data, a call to MarkInitialized() must be made before using the stripeSeries for
103+
// ensuring proper function of stripeSeries.gc().
98104
func newStripeSeries(stripeSize int) *stripeSeries {
99105
s := &stripeSeries{
100-
size: stripeSize,
101-
series: make([]map[chunks.HeadSeriesRef]*memSeries, stripeSize),
102-
hashes: make([]seriesHashmap, stripeSize),
103-
exemplars: make([]map[chunks.HeadSeriesRef]*exemplar.Exemplar, stripeSize),
104-
locks: make([]stripeLock, stripeSize),
106+
size: stripeSize,
107+
series: make([]map[chunks.HeadSeriesRef]*memSeries, stripeSize),
108+
hashes: make([]seriesHashmap, stripeSize),
109+
exemplars: make([]map[chunks.HeadSeriesRef]*exemplar.Exemplar, stripeSize),
110+
locks: make([]stripeLock, stripeSize),
111+
initialized: atomic.NewBool(false),
105112
}
106113
for i := range s.series {
107114
s.series[i] = map[chunks.HeadSeriesRef]*memSeries{}
@@ -115,9 +122,63 @@ func newStripeSeries(stripeSize int) *stripeSeries {
115122
return s
116123
}
117124

125+
// MarkInitialized marks the stripeSeries initialized, allowing usage of stripeSeries.gc(). Returns
126+
// true if the stripeSeries was not initialized before, false otherwise.
127+
func (s *stripeSeries) MarkInitialized() bool {
128+
return s.initialized.CompareAndSwap(false, true)
129+
}
130+
131+
// RemoveInactiveSeries removes all series that have a lastTs of 0 while the stripeSeries is still in
132+
// an uninitialized state. If the stripeSeries is already initialized, it returns 0 and false. Otherwise,
133+
// it returns the number of series that were removed and true.
134+
//
135+
// The stripeSeries assumes that a chunks.HeadSeriesRef uniquely refers to a series in the stripeSeries.
136+
// But in practice, a chunks.HeadSeriesRef can remain on a WAL even after it has been removed from the
137+
// stripeSeries. If the series comes back before the original is removed from the WAL we are left with
138+
// multiple chunks.HeadSeriesRef for the same series. If the WAL is reloaded in this state, we end up with a
139+
// series leak. A call to stripeSeries.gc() is only capable of removing one instance of the chunks.HeadSeriesRef
140+
// as it assumes there can only be one chunks.HeadSeriesRef for a series. The remaining chunks.HeadSeriesRefs are
141+
// left in the stripeSeries and overtime can accumulate to consume a very large amount of memory.
142+
func (s *stripeSeries) RemoveInactiveSeries() (int, bool) {
143+
if s.initialized.Load() {
144+
return 0, false
145+
}
146+
147+
inactiveSeries := 0
148+
// Start with hashes first because it's easier to get to a series from the hash than a hash from a series.
149+
for _, hashSeries := range s.hashes {
150+
for hash, seriesForHash := range hashSeries {
151+
for _, series := range seriesForHash {
152+
if series.lastTs == 0 {
153+
hashSeries.Delete(hash, series.ref)
154+
inactiveSeries++
155+
156+
// Get the seriesRef lock to delete the series from s.series.
157+
refLock := s.refLock(series.ref)
158+
delete(s.series[refLock], series.ref)
159+
}
160+
}
161+
}
162+
}
163+
164+
for _, seriesRefs := range s.series {
165+
for head, series := range seriesRefs {
166+
if series.lastTs == 0 {
167+
delete(s.series[inactiveSeries], head)
168+
inactiveSeries++
169+
}
170+
}
171+
}
172+
173+
return inactiveSeries, true
174+
}
175+
118176
// gc garbage collects old chunks that are strictly before mint and removes
119177
// series entirely that have no chunks left.
120178
func (s *stripeSeries) gc(mint int64) map[chunks.HeadSeriesRef]struct{} {
179+
if !s.initialized.Load() {
180+
return nil
181+
}
121182
// NOTE(rfratto): GC will grab two locks, one for the hash and the other for
122183
// series. It's not valid for any other function to grab both locks,
123184
// otherwise a deadlock might occur when running GC in parallel with
@@ -141,7 +202,7 @@ func (s *stripeSeries) gc(mint int64) map[chunks.HeadSeriesRef]struct{} {
141202

142203
// The series is stale. We need to obtain a second lock for the
143204
// ref if it's different than the hash lock.
144-
refLock := int(series.ref) & (s.size - 1)
205+
refLock := int(s.refLock(series.ref))
145206
if hashLock != refLock {
146207
s.locks[refLock].Lock()
147208
}
@@ -168,14 +229,14 @@ func (s *stripeSeries) gc(mint int64) map[chunks.HeadSeriesRef]struct{} {
168229
}
169230

170231
func (s *stripeSeries) GetByID(id chunks.HeadSeriesRef) *memSeries {
171-
refLock := uint64(id) & uint64(s.size-1)
232+
refLock := s.refLock(id)
172233
s.locks[refLock].RLock()
173234
defer s.locks[refLock].RUnlock()
174235
return s.series[refLock][id]
175236
}
176237

177238
func (s *stripeSeries) GetByHash(hash uint64, lset labels.Labels) *memSeries {
178-
hashLock := hash & uint64(s.size-1)
239+
hashLock := s.hashLock(hash)
179240

180241
s.locks[hashLock].RLock()
181242
defer s.locks[hashLock].RUnlock()
@@ -184,8 +245,8 @@ func (s *stripeSeries) GetByHash(hash uint64, lset labels.Labels) *memSeries {
184245

185246
func (s *stripeSeries) Set(hash uint64, series *memSeries) {
186247
var (
187-
hashLock = hash & uint64(s.size-1)
188-
refLock = uint64(series.ref) & uint64(s.size-1)
248+
hashLock = s.hashLock(hash)
249+
refLock = s.refLock(series.ref)
189250
)
190251

191252
// We can't hold both locks at once otherwise we might deadlock with a
@@ -203,7 +264,7 @@ func (s *stripeSeries) Set(hash uint64, series *memSeries) {
203264
}
204265

205266
func (s *stripeSeries) GetLatestExemplar(ref chunks.HeadSeriesRef) *exemplar.Exemplar {
206-
i := uint64(ref) & uint64(s.size-1)
267+
i := s.refLock(ref)
207268

208269
s.locks[i].RLock()
209270
exemplar := s.exemplars[i][ref]
@@ -213,7 +274,7 @@ func (s *stripeSeries) GetLatestExemplar(ref chunks.HeadSeriesRef) *exemplar.Exe
213274
}
214275

215276
func (s *stripeSeries) SetLatestExemplar(ref chunks.HeadSeriesRef, exemplar *exemplar.Exemplar) {
216-
i := uint64(ref) & uint64(s.size-1)
277+
i := s.refLock(ref)
217278

218279
// Make sure that's a valid series id and record its latest exemplar
219280
s.locks[i].Lock()
@@ -227,6 +288,14 @@ func (s *stripeSeries) iterator() *stripeSeriesIterator {
227288
return &stripeSeriesIterator{s}
228289
}
229290

291+
func (s *stripeSeries) hashLock(hash uint64) uint64 {
292+
return hash & uint64(s.size-1)
293+
}
294+
295+
func (s *stripeSeries) refLock(ref chunks.HeadSeriesRef) uint64 {
296+
return uint64(ref) & uint64(s.size-1)
297+
}
298+
230299
// stripeSeriesIterator allows to iterate over series through a channel.
231300
// The channel should always be completely consumed to not leak.
232301
type stripeSeriesIterator struct {
@@ -243,7 +312,7 @@ func (it *stripeSeriesIterator) Channel() <-chan *memSeries {
243312
for _, series := range it.s.series[i] {
244313
series.Lock()
245314

246-
j := int(series.lset.Hash()) & (it.s.size - 1)
315+
j := int(it.s.hashLock(series.lset.Hash()))
247316
if i != j {
248317
it.s.locks[j].RLock()
249318
}

internal/static/metrics/wal/wal.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515

1616
"github.com/go-kit/log"
1717
"github.com/go-kit/log/level"
18-
"github.com/grafana/alloy/internal/util"
1918
"github.com/prometheus/client_golang/prometheus"
2019
"github.com/prometheus/prometheus/model/exemplar"
2120
"github.com/prometheus/prometheus/model/histogram"
@@ -29,6 +28,8 @@ import (
2928
"github.com/prometheus/prometheus/tsdb/record"
3029
"github.com/prometheus/prometheus/tsdb/wlog"
3130
"go.uber.org/atomic"
31+
32+
"github.com/grafana/alloy/internal/util"
3233
)
3334

3435
// ErrWALClosed is an error returned when a WAL operation can't run because the
@@ -198,6 +199,10 @@ func NewStorage(logger log.Logger, registerer prometheus.Registerer, path string
198199
}
199200
}
200201

202+
inactiveSeriesRemoved, _ := storage.series.RemoveInactiveSeries()
203+
storage.metrics.numActiveSeries.Sub(float64(inactiveSeriesRemoved))
204+
storage.series.MarkInitialized()
205+
201206
return storage, nil
202207
}
203208

internal/static/metrics/wal/wal_test.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,12 @@ func TestStorage_WillNotLeaveUnGCableSeries(t *testing.T) {
363363
}
364364
require.NoError(t, app.Commit())
365365

366+
// Make sure we have 4 active series left on the WAL
367+
var activeSeries io_prometheus_client.Metric
368+
err = s.metrics.numActiveSeries.Write(&activeSeries)
369+
require.NoError(t, err)
370+
require.Equal(t, 4.0, activeSeries.Gauge.GetValue())
371+
366372
// Forcefully create a bunch of new segments so when we truncate
367373
// there's enough segments to be considered for truncation.
368374
for i := 0; i < 3; i++ {
@@ -375,15 +381,24 @@ func TestStorage_WillNotLeaveUnGCableSeries(t *testing.T) {
375381
err = s.Truncate(keepTs)
376382
require.NoError(t, err)
377383

384+
// Make sure we have no active series left on the WAL
385+
err = s.metrics.numActiveSeries.Write(&activeSeries)
386+
require.NoError(t, err)
387+
require.Equal(t, 0.0, activeSeries.Gauge.GetValue())
388+
378389
// Publish new samples that will create new RefIDs
379390
for _, metric := range payload {
380-
// Write a new sample for the same metric and see what happens
381391
metric.samples = metric.samples[1:]
382392
metric.samples[0].ts = metric.samples[0].ts * 10
383393
metric.Write(t, app)
384394
}
385395
require.NoError(t, app.Commit())
386396

397+
// Make sure we have 4 active series left on the WAL
398+
err = s.metrics.numActiveSeries.Write(&activeSeries)
399+
require.NoError(t, err)
400+
require.Equal(t, 4.0, activeSeries.Gauge.GetValue())
401+
387402
// Close the WAL before we have a chance to remove the first RefIDs
388403
err = s.Close()
389404
require.NoError(t, err)
@@ -401,11 +416,11 @@ func TestStorage_WillNotLeaveUnGCableSeries(t *testing.T) {
401416
err = s.Truncate(keepTs)
402417
require.NoError(t, err)
403418
}
404-
// We should 0 active series but will have 4 instead because the first 4 RefIDs are lost in the WAL
405-
var metric io_prometheus_client.Metric
406-
err = s.metrics.numActiveSeries.Write(&metric)
419+
420+
// Make sure we have no active series left on the WAL
421+
err = s.metrics.numActiveSeries.Write(&activeSeries)
407422
require.NoError(t, err)
408-
require.Equal(t, 0.0, metric.Gauge.GetValue())
423+
require.Equal(t, 0.0, activeSeries.Gauge.GetValue())
409424
}
410425

411426
func TestStorage_WriteStalenessMarkers(t *testing.T) {

0 commit comments

Comments
 (0)