-
Notifications
You must be signed in to change notification settings - Fork 367
prometheus.remote_write: prevent unbounded growth of inactive series in WAL #3927
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Is this relevant to the upstream Prometheus (Agent) WAL too? |
@bboreham great question, I believe so as the code is largely the same between the two but I haven't done the same test upstream yet. If this fix makes sense and works as I expect I'm happy to upstream it as well. |
I didn’t really follow the code in this PR. There is an upstream issue which might be related: WAL Checkpoint holds deleted series for 1 extra compaction cycle #12286 |
Thanks for pointing out the issue @bboreham it's absolutely related but of a different flavor. I'll adapt the flow you originally described in prometheus/prometheus#12286 (comment) with modifications for this problem. edit: Visualization was cleaned up and moved to the issue #3926 (comment) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it necessary to do any additional tests, e.g. in a real environment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a good place to try to add an integration / acceptance test? Perhaps adding another one similar to
alloy/internal/component/prometheus/remotewrite/remote_write_test.go
Lines 20 to 89 in 2619183
// Test is an integration-level test which ensures that metrics can get sent to | |
// a prometheus.remote_write component and forwarded to a | |
// remote_write-compatible server. | |
func Test(t *testing.T) { | |
writeResult := make(chan *prompb.WriteRequest) | |
// Create a remote_write server which forwards any received payloads to the | |
// writeResult channel. | |
srv := newTestServer(t, writeResult) | |
defer srv.Close() | |
// Create our component and wait for it to start running, so we can write | |
// metrics to the WAL. | |
args := testArgsForConfig(t, fmt.Sprintf(` | |
external_labels = { | |
cluster = "local", | |
} | |
endpoint { | |
name = "test-url" | |
url = "%s/api/v1/write" | |
remote_timeout = "100ms" | |
queue_config { | |
batch_send_deadline = "100ms" | |
} | |
} | |
`, srv.URL)) | |
tc, err := componenttest.NewControllerFromID(util.TestLogger(t), "prometheus.remote_write") | |
require.NoError(t, err) | |
go func() { | |
err = tc.Run(componenttest.TestContext(t), args) | |
require.NoError(t, err) | |
}() | |
require.NoError(t, tc.WaitRunning(5*time.Second)) | |
// We need to use a future timestamp since remote_write will ignore any | |
// sample which is earlier than the time when it started. Adding a minute | |
// ensures that our samples will never get ignored. | |
sampleTimestamp := time.Now().Add(time.Minute).UnixMilli() | |
// Send metrics to our component. These will be written to the WAL and | |
// subsequently written to our HTTP server. | |
sendMetric(t, tc, labels.FromStrings("foo", "bar"), sampleTimestamp, 12) | |
sendMetric(t, tc, labels.FromStrings("fizz", "buzz"), sampleTimestamp, 34) | |
expect := []prompb.TimeSeries{{ | |
Labels: []prompb.Label{ | |
{Name: "cluster", Value: "local"}, | |
{Name: "foo", Value: "bar"}, | |
}, | |
Samples: []prompb.Sample{ | |
{Timestamp: sampleTimestamp, Value: 12}, | |
}, | |
}, { | |
Labels: []prompb.Label{ | |
{Name: "cluster", Value: "local"}, | |
{Name: "fizz", Value: "buzz"}, | |
}, | |
Samples: []prompb.Sample{ | |
{Timestamp: sampleTimestamp, Value: 34}, | |
}, | |
}} | |
select { | |
case <-time.After(time.Minute): | |
require.FailNow(t, "timed out waiting for metrics") | |
case res := <-writeResult: | |
require.Equal(t, expect, res.Timeseries) | |
} | |
} |
I did run it against a real WAL that had accumulated ~14 million inactive series in the checkpoint and it pruned all them keeping the ~500k I expected to be active.
@bboreham I went down the rabbit hole to figure out the potential impact in upstream between prometheus agent and the prometheus tsdb itself (it has very similar WAL replay code). The prometheus agent is impacted because it makes the same assumption during WAL replay as alloy, a series that does not exist by its RefID is a series to be created https://github.com/prometheus/prometheus/blob/e83dc66bdb67a01ff06a96e81faf66b0095d9948/tsdb/agent/db.go#L522-L533. The tsdb itself is not impacted because it does not trust the Series RefID to indicate if a series should be created or not it ends up using the hash. I'll see what it looks like to mirror the tsdb behavior into alloy as I think it should be a simpler fix than what I proposed here. |
…plicate SeriesRefs
52e92ad
to
901772e
Compare
PR Description
This PR fixes an issue that can cause active series in the WAL to grow without bound described in, #3926, by refactoring the
loadWAL
implementation to more closely mirror the prometheus tsdb implementation it was likely based upon (https://github.com/prometheus/prometheus/blob/e83dc66bdb67a01ff06a96e81faf66b0095d9948/tsdb/head_wal.go#L78). Notes to the reviewer has a breakdown of the changesets + rationale.Which issue(s) this PR fixes
Fixes #3926
Notes to the Reviewer
stripeSeries.GetOrSet
multiRef
multiRef
to track all series created and the change to only track duplicates series breaks the logic in all sample decode steps resulting in most samples being considerednonExistentSeriesRefs
multiRef
is used to rewrite the sample SeriesRef if neededmultiRef
based on the short name I renamed it toduplicateRefToValidRef
which IMO helps illustrate its purpose a lot more clearlyPR Checklist