Skip to content

Commit 901772e

Browse files
committed
Ensure WAL replay considers the fact that the same labels can have duplicate SeriesRefs
1 parent 01f6937 commit 901772e

File tree

3 files changed

+82
-45
lines changed

3 files changed

+82
-45
lines changed

internal/static/metrics/wal/series.go

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ func (s *stripeSeries) gc(mint int64) map[chunks.HeadSeriesRef]struct{} {
141141

142142
// The series is stale. We need to obtain a second lock for the
143143
// ref if it's different than the hash lock.
144-
refLock := int(series.ref) & (s.size - 1)
144+
refLock := int(s.refLock(series.ref))
145145
if hashLock != refLock {
146146
s.locks[refLock].Lock()
147147
}
@@ -168,14 +168,14 @@ func (s *stripeSeries) gc(mint int64) map[chunks.HeadSeriesRef]struct{} {
168168
}
169169

170170
func (s *stripeSeries) GetByID(id chunks.HeadSeriesRef) *memSeries {
171-
refLock := uint64(id) & uint64(s.size-1)
171+
refLock := s.refLock(id)
172172
s.locks[refLock].RLock()
173173
defer s.locks[refLock].RUnlock()
174174
return s.series[refLock][id]
175175
}
176176

177177
func (s *stripeSeries) GetByHash(hash uint64, lset labels.Labels) *memSeries {
178-
hashLock := hash & uint64(s.size-1)
178+
hashLock := s.hashLock(hash)
179179

180180
s.locks[hashLock].RLock()
181181
defer s.locks[hashLock].RUnlock()
@@ -184,8 +184,8 @@ func (s *stripeSeries) GetByHash(hash uint64, lset labels.Labels) *memSeries {
184184

185185
func (s *stripeSeries) Set(hash uint64, series *memSeries) {
186186
var (
187-
hashLock = hash & uint64(s.size-1)
188-
refLock = uint64(series.ref) & uint64(s.size-1)
187+
hashLock = s.hashLock(hash)
188+
refLock = s.refLock(series.ref)
189189
)
190190

191191
// We can't hold both locks at once otherwise we might deadlock with a
@@ -202,8 +202,31 @@ func (s *stripeSeries) Set(hash uint64, series *memSeries) {
202202
s.locks[hashLock].Unlock()
203203
}
204204

205+
// GetOrSet returns the existing series for the given hash and label set, or sets it if it does not exist.
206+
// It returns the series and a boolean indicating whether it was newly created.
207+
func (s *stripeSeries) GetOrSet(hash uint64, lset labels.Labels, series *memSeries) (*memSeries, bool) {
208+
hashLock := s.hashLock(hash)
209+
210+
s.locks[hashLock].Lock()
211+
// If it already exists in hashes, return it.
212+
if prev := s.hashes[hashLock].Get(hash, lset); prev != nil {
213+
s.locks[hashLock].Unlock()
214+
return prev, false
215+
}
216+
s.hashes[hashLock].Set(hash, series)
217+
s.locks[hashLock].Unlock()
218+
219+
refLock := s.refLock(series.ref)
220+
221+
s.locks[refLock].Lock()
222+
s.series[refLock][series.ref] = series
223+
s.locks[refLock].Unlock()
224+
225+
return series, true
226+
}
227+
205228
func (s *stripeSeries) GetLatestExemplar(ref chunks.HeadSeriesRef) *exemplar.Exemplar {
206-
i := uint64(ref) & uint64(s.size-1)
229+
i := s.refLock(ref)
207230

208231
s.locks[i].RLock()
209232
exemplar := s.exemplars[i][ref]
@@ -213,7 +236,7 @@ func (s *stripeSeries) GetLatestExemplar(ref chunks.HeadSeriesRef) *exemplar.Exe
213236
}
214237

215238
func (s *stripeSeries) SetLatestExemplar(ref chunks.HeadSeriesRef, exemplar *exemplar.Exemplar) {
216-
i := uint64(ref) & uint64(s.size-1)
239+
i := s.refLock(ref)
217240

218241
// Make sure that's a valid series id and record its latest exemplar
219242
s.locks[i].Lock()
@@ -227,6 +250,14 @@ func (s *stripeSeries) iterator() *stripeSeriesIterator {
227250
return &stripeSeriesIterator{s}
228251
}
229252

253+
func (s *stripeSeries) hashLock(hash uint64) uint64 {
254+
return hash & uint64(s.size-1)
255+
}
256+
257+
func (s *stripeSeries) refLock(ref chunks.HeadSeriesRef) uint64 {
258+
return uint64(ref) & uint64(s.size-1)
259+
}
260+
230261
// stripeSeriesIterator allows to iterate over series through a channel.
231262
// The channel should always be completely consumed to not leak.
232263
type stripeSeriesIterator struct {
@@ -243,7 +274,7 @@ func (it *stripeSeriesIterator) Channel() <-chan *memSeries {
243274
for _, series := range it.s.series[i] {
244275
series.Lock()
245276

246-
j := int(series.lset.Hash()) & (it.s.size - 1)
277+
j := int(it.s.hashLock(series.lset.Hash()))
247278
if i != j {
248279
it.s.locks[j].RLock()
249280
}

internal/static/metrics/wal/wal.go

Lines changed: 28 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515

1616
"github.com/go-kit/log"
1717
"github.com/go-kit/log/level"
18-
"github.com/grafana/alloy/internal/util"
1918
"github.com/prometheus/client_golang/prometheus"
2019
"github.com/prometheus/prometheus/model/exemplar"
2120
"github.com/prometheus/prometheus/model/histogram"
@@ -29,6 +28,8 @@ import (
2928
"github.com/prometheus/prometheus/tsdb/record"
3029
"github.com/prometheus/prometheus/tsdb/wlog"
3130
"go.uber.org/atomic"
31+
32+
"github.com/grafana/alloy/internal/util"
3233
)
3334

3435
// ErrWALClosed is an error returned when a WAL operation can't run because the
@@ -210,14 +211,19 @@ func (w *Storage) replayWAL() error {
210211
}
211212

212213
level.Info(w.logger).Log("msg", "replaying WAL, this may take a while", "dir", w.wal.Dir())
213-
dir, startFrom, err := wlog.LastCheckpoint(w.wal.Dir())
214-
if err != nil && err != record.ErrNotFound {
215-
return fmt.Errorf("find last checkpoint: %w", err)
214+
dir, startFrom, cpErr := wlog.LastCheckpoint(w.wal.Dir())
215+
if cpErr != nil && !errors.Is(cpErr, record.ErrNotFound) {
216+
return fmt.Errorf("find last checkpoint: %w", cpErr)
216217
}
217218

218-
multiRef := map[chunks.HeadSeriesRef]chunks.HeadSeriesRef{}
219+
// Find the last segment.
220+
_, lastSegment, err := wlog.Segments(w.wal.Dir())
221+
if err != nil {
222+
return fmt.Errorf("finding WAL segments: %w", err)
223+
}
219224

220-
if err == nil {
225+
multiRef := map[chunks.HeadSeriesRef]chunks.HeadSeriesRef{}
226+
if cpErr == nil {
221227
sr, err := wlog.NewSegmentsReader(dir)
222228
if err != nil {
223229
return fmt.Errorf("open checkpoint: %w", err)
@@ -230,41 +236,35 @@ func (w *Storage) replayWAL() error {
230236

231237
// A corrupted checkpoint is a hard error for now and requires user
232238
// intervention. There's likely little data that can be recovered anyway.
233-
if err := w.loadWAL(wlog.NewReader(sr), multiRef); err != nil {
239+
if err := w.loadWAL(wlog.NewReader(sr), multiRef, lastSegment); err != nil {
234240
return fmt.Errorf("backfill checkpoint: %w", err)
235241
}
236242
startFrom++
237243
level.Info(w.logger).Log("msg", "WAL checkpoint loaded")
238244
}
239245

240-
// Find the last segment.
241-
_, last, err := wlog.Segments(w.wal.Dir())
242-
if err != nil {
243-
return fmt.Errorf("finding WAL segments: %w", err)
244-
}
245-
246246
// Backfill segments from the most recent checkpoint onwards.
247-
for i := startFrom; i <= last; i++ {
247+
for i := startFrom; i <= lastSegment; i++ {
248248
s, err := wlog.OpenReadSegment(wlog.SegmentName(w.wal.Dir(), i))
249249
if err != nil {
250250
return fmt.Errorf("open WAL segment %d: %w", i, err)
251251
}
252252

253253
sr := wlog.NewSegmentBufReader(s)
254-
err = w.loadWAL(wlog.NewReader(sr), multiRef)
254+
err = w.loadWAL(wlog.NewReader(sr), multiRef, lastSegment)
255255
if err := sr.Close(); err != nil {
256256
level.Warn(w.logger).Log("msg", "error while closing the wal segments reader", "err", err)
257257
}
258258
if err != nil {
259259
return err
260260
}
261-
level.Info(w.logger).Log("msg", "WAL segment loaded", "segment", i, "maxSegment", last)
261+
level.Info(w.logger).Log("msg", "WAL segment loaded", "segment", i, "maxSegment", lastSegment)
262262
}
263263

264264
return nil
265265
}
266266

267-
func (w *Storage) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef) (err error) {
267+
func (w *Storage) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, lastSegment int) (err error) {
268268
var (
269269
dec record.Decoder
270270
lastRef = chunks.HeadSeriesRef(w.nextRef.Load())
@@ -367,21 +367,20 @@ func (w *Storage) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chun
367367
switch v := d.(type) {
368368
case []record.RefSeries:
369369
for _, s := range v {
370-
// If this is a new series, create it in memory without a timestamp.
371-
// If we read in a sample for it, we'll use the timestamp of the latest
372-
// sample. Otherwise, the series is stale and will be deleted once
373-
// the truncation is performed.
374-
if w.series.GetByID(s.Ref) == nil {
375-
series := &memSeries{ref: s.Ref, lset: s.Labels, lastTs: 0}
376-
w.series.Set(s.Labels.Hash(), series)
377-
multiRef[s.Ref] = series.ref
370+
// Make sure we don't try to reuse a Ref that already exists in the WAL.
371+
if s.Ref > lastRef {
372+
lastRef = s.Ref
373+
}
378374

375+
series := &memSeries{ref: s.Ref, lset: s.Labels, lastTs: 0}
376+
series, created := w.series.GetOrSet(s.Labels.Hash(), s.Labels, series)
377+
if !created {
378+
multiRef[s.Ref] = series.ref
379+
// Keep the duplicate series in the checkpoint until the latest segment.
380+
w.deleted[series.ref] = lastSegment
381+
} else {
379382
w.metrics.numActiveSeries.Inc()
380383
w.metrics.totalCreatedSeries.Inc()
381-
382-
if s.Ref > lastRef {
383-
lastRef = s.Ref
384-
}
385384
}
386385
}
387386

internal/static/metrics/wal/wal_test.go

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,15 @@ import (
99
"time"
1010

1111
"github.com/go-kit/log"
12-
io_prometheus_client "github.com/prometheus/client_model/go"
12+
"github.com/prometheus/client_golang/prometheus/testutil"
1313
"github.com/prometheus/prometheus/model/exemplar"
1414
"github.com/prometheus/prometheus/model/labels"
1515
"github.com/prometheus/prometheus/model/value"
1616
"github.com/prometheus/prometheus/storage"
1717
"github.com/prometheus/prometheus/tsdb"
1818
"github.com/prometheus/prometheus/tsdb/chunks"
1919
"github.com/prometheus/prometheus/tsdb/record"
20+
"github.com/stretchr/testify/assert"
2021
"github.com/stretchr/testify/require"
2122

2223
"github.com/grafana/alloy/internal/util"
@@ -339,7 +340,7 @@ func TestStorage_Truncate(t *testing.T) {
339340
require.Equal(t, expectedExemplars, actualExemplars)
340341
}
341342

342-
func TestStorage_WillNotLeaveUnGCableSeries(t *testing.T) {
343+
func TestStorage_NewWillNotLoadDuplicateSeries(t *testing.T) {
343344
walDir := t.TempDir()
344345

345346
s, err := NewStorage(log.NewLogfmtLogger(os.Stdout), nil, walDir)
@@ -369,20 +370,25 @@ func TestStorage_WillNotLeaveUnGCableSeries(t *testing.T) {
369370
_, err := s.wal.NextSegmentSync()
370371
require.NoError(t, err)
371372
}
373+
// Series are still active
374+
require.Equal(t, 4.0, testutil.ToFloat64(s.metrics.numActiveSeries))
372375

373376
// Force GC of all the series, but they will stay in the checkpoint
374377
keepTs := payload[len(payload)-1].samples[1].ts + 1
375378
err = s.Truncate(keepTs)
376379
require.NoError(t, err)
380+
// No more active series because they were GC'ed with Truncate
381+
require.Equal(t, 0.0, testutil.ToFloat64(s.metrics.numActiveSeries))
377382

378383
// Publish new samples that will create new RefIDs
379384
for _, metric := range payload {
380-
// Write a new sample for the same metric and see what happens
381385
metric.samples = metric.samples[1:]
382386
metric.samples[0].ts = metric.samples[0].ts * 10
383387
metric.Write(t, app)
384388
}
385389
require.NoError(t, app.Commit())
390+
// We should be back to 4 active series now
391+
require.Equal(t, 4.0, testutil.ToFloat64(s.metrics.numActiveSeries))
386392

387393
// Close the WAL before we have a chance to remove the first RefIDs
388394
err = s.Close()
@@ -391,6 +397,9 @@ func TestStorage_WillNotLeaveUnGCableSeries(t *testing.T) {
391397
s, err = NewStorage(log.NewLogfmtLogger(os.Stdout), nil, walDir)
392398
require.NoError(t, err)
393399

400+
// There should only be 4 active series after we reload the WAL
401+
assert.Equal(t, 4.0, testutil.ToFloat64(s.metrics.numActiveSeries))
402+
394403
// Force multiple GC's that should purge all active series
395404
for i := 0; i < 3; i++ {
396405
for j := 0; j < 3; j++ {
@@ -401,11 +410,9 @@ func TestStorage_WillNotLeaveUnGCableSeries(t *testing.T) {
401410
err = s.Truncate(keepTs)
402411
require.NoError(t, err)
403412
}
404-
// We should 0 active series but will have 4 instead because the first 4 RefIDs are lost in the WAL
405-
var metric io_prometheus_client.Metric
406-
err = s.metrics.numActiveSeries.Write(&metric)
407-
require.NoError(t, err)
408-
require.Equal(t, 0.0, metric.Gauge.GetValue())
413+
414+
// We should have zero active series now
415+
require.Equal(t, 0.0, testutil.ToFloat64(s.metrics.numActiveSeries))
409416
}
410417

411418
func TestStorage_WriteStalenessMarkers(t *testing.T) {

0 commit comments

Comments
 (0)