Skip to content

prometheus.remote_write: prevent unbounded growth of inactive series in WAL #3927

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 39 additions & 8 deletions internal/static/metrics/wal/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (s *stripeSeries) gc(mint int64) map[chunks.HeadSeriesRef]struct{} {

// The series is stale. We need to obtain a second lock for the
// ref if it's different than the hash lock.
refLock := int(series.ref) & (s.size - 1)
refLock := int(s.refLock(series.ref))
if hashLock != refLock {
s.locks[refLock].Lock()
}
Expand All @@ -168,14 +168,14 @@ func (s *stripeSeries) gc(mint int64) map[chunks.HeadSeriesRef]struct{} {
}

func (s *stripeSeries) GetByID(id chunks.HeadSeriesRef) *memSeries {
refLock := uint64(id) & uint64(s.size-1)
refLock := s.refLock(id)
s.locks[refLock].RLock()
defer s.locks[refLock].RUnlock()
return s.series[refLock][id]
}

func (s *stripeSeries) GetByHash(hash uint64, lset labels.Labels) *memSeries {
hashLock := hash & uint64(s.size-1)
hashLock := s.hashLock(hash)

s.locks[hashLock].RLock()
defer s.locks[hashLock].RUnlock()
Expand All @@ -184,8 +184,8 @@ func (s *stripeSeries) GetByHash(hash uint64, lset labels.Labels) *memSeries {

func (s *stripeSeries) Set(hash uint64, series *memSeries) {
var (
hashLock = hash & uint64(s.size-1)
refLock = uint64(series.ref) & uint64(s.size-1)
hashLock = s.hashLock(hash)
refLock = s.refLock(series.ref)
)

// We can't hold both locks at once otherwise we might deadlock with a
Expand All @@ -202,8 +202,31 @@ func (s *stripeSeries) Set(hash uint64, series *memSeries) {
s.locks[hashLock].Unlock()
}

// GetOrSet returns the existing series for the given hash and label set, or sets it if it does not exist.
// It returns the series and a boolean indicating whether it was newly created.
func (s *stripeSeries) GetOrSet(hash uint64, lset labels.Labels, series *memSeries) (*memSeries, bool) {
hashLock := s.hashLock(hash)

s.locks[hashLock].Lock()
// If it already exists in hashes, return it.
if prev := s.hashes[hashLock].Get(hash, lset); prev != nil {
s.locks[hashLock].Unlock()
return prev, false
}
s.hashes[hashLock].Set(hash, series)
s.locks[hashLock].Unlock()

refLock := s.refLock(series.ref)

s.locks[refLock].Lock()
s.series[refLock][series.ref] = series
s.locks[refLock].Unlock()

return series, true
}

func (s *stripeSeries) GetLatestExemplar(ref chunks.HeadSeriesRef) *exemplar.Exemplar {
i := uint64(ref) & uint64(s.size-1)
i := s.refLock(ref)

s.locks[i].RLock()
exemplar := s.exemplars[i][ref]
Expand All @@ -213,7 +236,7 @@ func (s *stripeSeries) GetLatestExemplar(ref chunks.HeadSeriesRef) *exemplar.Exe
}

func (s *stripeSeries) SetLatestExemplar(ref chunks.HeadSeriesRef, exemplar *exemplar.Exemplar) {
i := uint64(ref) & uint64(s.size-1)
i := s.refLock(ref)

// Make sure that's a valid series id and record its latest exemplar
s.locks[i].Lock()
Expand All @@ -227,6 +250,14 @@ func (s *stripeSeries) iterator() *stripeSeriesIterator {
return &stripeSeriesIterator{s}
}

func (s *stripeSeries) hashLock(hash uint64) uint64 {
return hash & uint64(s.size-1)
}

func (s *stripeSeries) refLock(ref chunks.HeadSeriesRef) uint64 {
return uint64(ref) & uint64(s.size-1)
}

// stripeSeriesIterator allows to iterate over series through a channel.
// The channel should always be completely consumed to not leak.
type stripeSeriesIterator struct {
Expand All @@ -243,7 +274,7 @@ func (it *stripeSeriesIterator) Channel() <-chan *memSeries {
for _, series := range it.s.series[i] {
series.Lock()

j := int(series.lset.Hash()) & (it.s.size - 1)
j := int(it.s.hashLock(series.lset.Hash()))
if i != j {
it.s.locks[j].RLock()
}
Expand Down
79 changes: 45 additions & 34 deletions internal/static/metrics/wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/alloy/internal/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
Expand All @@ -29,6 +28,8 @@ import (
"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/wlog"
"go.uber.org/atomic"

"github.com/grafana/alloy/internal/util"
)

// ErrWALClosed is an error returned when a WAL operation can't run because the
Expand Down Expand Up @@ -211,12 +212,11 @@ func (w *Storage) replayWAL() error {

level.Info(w.logger).Log("msg", "replaying WAL, this may take a while", "dir", w.wal.Dir())
dir, startFrom, err := wlog.LastCheckpoint(w.wal.Dir())
if err != nil && err != record.ErrNotFound {
if err != nil && !errors.Is(err, record.ErrNotFound) {
return fmt.Errorf("find last checkpoint: %w", err)
}

multiRef := map[chunks.HeadSeriesRef]chunks.HeadSeriesRef{}

duplicateRefToValidRef := map[chunks.HeadSeriesRef]chunks.HeadSeriesRef{}
if err == nil {
sr, err := wlog.NewSegmentsReader(dir)
if err != nil {
Expand All @@ -230,41 +230,45 @@ func (w *Storage) replayWAL() error {

// A corrupted checkpoint is a hard error for now and requires user
// intervention. There's likely little data that can be recovered anyway.
if err := w.loadWAL(wlog.NewReader(sr), multiRef); err != nil {
if err := w.loadWAL(wlog.NewReader(sr), duplicateRefToValidRef, startFrom); err != nil {
return fmt.Errorf("backfill checkpoint: %w", err)
}
startFrom++
level.Info(w.logger).Log("msg", "WAL checkpoint loaded")
}

// Find the last segment.
_, last, err := wlog.Segments(w.wal.Dir())
_, lastSegment, err := wlog.Segments(w.wal.Dir())
if err != nil {
return fmt.Errorf("finding WAL segments: %w", err)
}

// Backfill segments from the most recent checkpoint onwards.
for i := startFrom; i <= last; i++ {
for i := startFrom; i <= lastSegment; i++ {
s, err := wlog.OpenReadSegment(wlog.SegmentName(w.wal.Dir(), i))
if err != nil {
return fmt.Errorf("open WAL segment %d: %w", i, err)
}

sr := wlog.NewSegmentBufReader(s)
err = w.loadWAL(wlog.NewReader(sr), multiRef)
err = w.loadWAL(wlog.NewReader(sr), duplicateRefToValidRef, i)
if err := sr.Close(); err != nil {
level.Warn(w.logger).Log("msg", "error while closing the wal segments reader", "err", err)
}
if err != nil {
return err
}
level.Info(w.logger).Log("msg", "WAL segment loaded", "segment", i, "maxSegment", last)
level.Info(w.logger).Log("msg", "WAL segment loaded", "segment", i, "maxSegment", lastSegment)
}

return nil
}

func (w *Storage) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef) (err error) {
// loadWAL reads the WAL and populates the in-memory series.
// duplicateRefToValidRef tracks SeriesRefs that are duplicates by their labels, and maps them to the valid SeriesRef
// that should be used instead. Duplicate SeriesRefs for the same labels can happen when a series is gc'ed from memory
// but has not been fully removed from the WAL via a wlog.Checkpoint yet.
func (w *Storage) loadWAL(r *wlog.Reader, duplicateRefToValidRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, currentSegmentOrCheckpoint int) (err error) {
var (
dec record.Decoder
lastRef = chunks.HeadSeriesRef(w.nextRef.Load())
Expand Down Expand Up @@ -367,36 +371,37 @@ func (w *Storage) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chun
switch v := d.(type) {
case []record.RefSeries:
for _, s := range v {
// If this is a new series, create it in memory without a timestamp.
// If we read in a sample for it, we'll use the timestamp of the latest
// sample. Otherwise, the series is stale and will be deleted once
// the truncation is performed.
if w.series.GetByID(s.Ref) == nil {
series := &memSeries{ref: s.Ref, lset: s.Labels, lastTs: 0}
w.series.Set(s.Labels.Hash(), series)
multiRef[s.Ref] = series.ref
// Make sure we don't try to reuse a Ref that already exists in the WAL.
if s.Ref > lastRef {
lastRef = s.Ref
}

series := &memSeries{ref: s.Ref, lset: s.Labels, lastTs: 0}
series, created := w.series.GetOrSet(s.Labels.Hash(), s.Labels, series)
if !created {
duplicateRefToValidRef[s.Ref] = series.ref
// Make sure we keep the duplicate SeriesRef checkpoints while it might still exist in the WAL.
w.deleted[s.Ref] = currentSegmentOrCheckpoint
} else {
w.metrics.numActiveSeries.Inc()
w.metrics.totalCreatedSeries.Inc()

if s.Ref > lastRef {
lastRef = s.Ref
}
}
}

//nolint:staticcheck
seriesPool.Put(v)
case []record.RefSample:
for _, s := range v {
// Update the lastTs for the series based
ref, ok := multiRef[s.Ref]
if !ok {
if ref, ok := duplicateRefToValidRef[s.Ref]; ok {
s.Ref = ref
}
series := w.series.GetByID(s.Ref)
if series == nil {
nonExistentSeriesRefs.Inc()
continue
}

series := w.series.GetByID(ref)
// Update the lastTs for the series if this sample is newer
if s.T > series.lastTs {
series.lastTs = s.T
}
Expand All @@ -406,13 +411,16 @@ func (w *Storage) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chun
samplesPool.Put(v)
case []record.RefHistogramSample:
for _, entry := range v {
// Update the lastTs for the series based
ref, ok := multiRef[entry.Ref]
if !ok {
if ref, ok := duplicateRefToValidRef[entry.Ref]; ok {
entry.Ref = ref
}
series := w.series.GetByID(entry.Ref)
if series == nil {
nonExistentSeriesRefs.Inc()
continue
}
series := w.series.GetByID(ref)

// Update the lastTs for the series if this sample is newer
if entry.T > series.lastTs {
series.lastTs = entry.T
}
Expand All @@ -422,13 +430,16 @@ func (w *Storage) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chun
histogramsPool.Put(v)
case []record.RefFloatHistogramSample:
for _, entry := range v {
// Update the lastTs for the series based
ref, ok := multiRef[entry.Ref]
if !ok {
if ref, ok := duplicateRefToValidRef[entry.Ref]; ok {
entry.Ref = ref
}
series := w.series.GetByID(entry.Ref)
if series == nil {
nonExistentSeriesRefs.Inc()
continue
}
series := w.series.GetByID(ref)

// Update the lastTs for the series if this sample is newer
if entry.T > series.lastTs {
series.lastTs = entry.T
}
Expand Down
87 changes: 85 additions & 2 deletions internal/static/metrics/wal/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,18 @@ import (
"time"

"github.com/go-kit/log"
"github.com/grafana/alloy/internal/util"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/alloy/internal/util"
)

func TestStorage_InvalidSeries(t *testing.T) {
Expand Down Expand Up @@ -337,6 +340,82 @@ func TestStorage_Truncate(t *testing.T) {
require.Equal(t, expectedExemplars, actualExemplars)
}

func TestStorage_HandlesDuplicateSeriesRefsByHash(t *testing.T) {
// Ensure the WAL can handle duplicate SeriesRefs by hash when being loaded.
walDir := t.TempDir()

s, err := NewStorage(log.NewLogfmtLogger(os.Stdout), nil, walDir)
require.NoError(t, err)

app := s.Appender(t.Context())

var payload seriesList
for i, metricName := range []string{"foo", "bar", "baz", "blerg"} {
payload = append(payload, &series{
name: metricName,
samples: []sample{
{int64(i), float64(i * 10.0)},
{int64(i * 10), float64(i * 100.0)},
},
})
}

originalSeriesRefs := make([]chunks.HeadSeriesRef, 0, len(payload))
for _, metric := range payload {
metric.Write(t, app)
originalSeriesRefs = append(originalSeriesRefs, chunks.HeadSeriesRef(*metric.ref))
}
require.NoError(t, app.Commit())

// Forcefully create a bunch of new segments so when we truncate
// there's enough segments to be considered for truncation.
for i := 0; i < 3; i++ {
_, err := s.wal.NextSegmentSync()
require.NoError(t, err)
}
// Series are still active
require.Equal(t, 4.0, testutil.ToFloat64(s.metrics.numActiveSeries))

// Force GC of all the series, but they will stay in the checkpoint
keepTs := payload[len(payload)-1].samples[1].ts + 1
err = s.Truncate(keepTs)
require.NoError(t, err)
// No more active series because they were GC'ed with Truncate
require.Equal(t, 0.0, testutil.ToFloat64(s.metrics.numActiveSeries))

// Publish new samples that will create new SeriesRefs for the same labels.
duplicateSeriesRefs := make([]chunks.HeadSeriesRef, 0, len(payload))
for _, metric := range payload {
metric.samples = metric.samples[1:]
metric.samples[0].ts = metric.samples[0].ts * 10
metric.Write(t, app)

duplicateSeriesRefs = append(duplicateSeriesRefs, chunks.HeadSeriesRef(*metric.ref))
}
require.NoError(t, app.Commit())
// We should be back to 4 active series now
require.Equal(t, 4.0, testutil.ToFloat64(s.metrics.numActiveSeries))

// Close the WAL before we have a chance to remove the first RefIDs
err = s.Close()
require.NoError(t, err)

s, err = NewStorage(log.NewLogfmtLogger(os.Stdout), nil, walDir)
require.NoError(t, err)

// There should only be 4 active series after we reload the WAL
assert.Equal(t, 4.0, testutil.ToFloat64(s.metrics.numActiveSeries))
// The original SeriesRefs should be in series
for _, ref := range originalSeriesRefs {
assert.NotNil(t, s.series.GetByID(ref))
}

// The duplicated SeriesRefs should be considered deleted
for _, ref := range duplicateSeriesRefs {
assert.Contains(t, s.deleted, ref)
}
}

func TestStorage_WriteStalenessMarkers(t *testing.T) {
walDir := t.TempDir()

Expand Down Expand Up @@ -519,7 +598,11 @@ func (s *series) Write(t *testing.T, app storage.Appender) {

// Write other data points with AddFast
for _, sample := range s.samples[offset:] {
_, err := app.Append(*s.ref, lbls, sample.ts, sample.val)
ref, err := app.Append(*s.ref, lbls, sample.ts, sample.val)
// The ref we had changed stop using the old value
if *s.ref != ref {
s.ref = &ref
}
require.NoError(t, err)
}

Expand Down