Skip to content

Commit 87b23e6

Browse files
cmanallenjjbayer
andauthored
feat(replays): Reject replay envelopes if any item contained within is invalid (#3201)
Closes: #3180 Drop the envelope in its entirety if any component fails validation. This prevents confusing states where is shown a replay which was never ingested. --------- Co-authored-by: Joris Bayer <[email protected]>
1 parent ae75d25 commit 87b23e6

File tree

6 files changed

+133
-51
lines changed

6 files changed

+133
-51
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
- Adds support for dynamic metric bucket encoding. ([#3137](https://github.com/getsentry/relay/pull/3137))
3030
- Use statsdproxy to pre-aggregate metrics. ([#2425](https://github.com/getsentry/relay/pull/2425))
3131
- Add SDK information to spans. ([#3178](https://github.com/getsentry/relay/pull/3178))
32+
- Drop replay envelopes if any item fails. ([#3201](https://github.com/getsentry/relay/pull/3201))
3233
- Filter null values from metrics summary tags. ([#3204](https://github.com/getsentry/relay/pull/3204))
3334
- Emit a usage metric for every span seen. ([#3209](https://github.com/getsentry/relay/pull/3209))
3435
- Add namespace for profile metrics. ([#3229](https://github.com/getsentry/relay/pull/3229))

relay-server/src/envelope.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1319,6 +1319,11 @@ impl Envelope {
13191319
self.items.retain(f)
13201320
}
13211321

1322+
/// Drops every item in the envelope.
1323+
pub fn drop_items_silently(&mut self) {
1324+
self.items.clear()
1325+
}
1326+
13221327
/// Serializes this envelope into the given writer.
13231328
pub fn serialize<W>(&self, mut writer: W) -> Result<(), EnvelopeError>
13241329
where

relay-server/src/services/processor.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,9 @@ pub enum ProcessingError {
419419

420420
#[error("invalid processing group type")]
421421
InvalidProcessingGroup(#[from] InvalidProcessingGroupType),
422+
423+
#[error("invalid replay")]
424+
InvalidReplay(DiscardReason),
422425
}
423426

424427
impl ProcessingError {
@@ -460,6 +463,8 @@ impl ProcessingError {
460463
Self::MissingProjectId => None,
461464
Self::EventFiltered(_) => None,
462465
Self::InvalidProcessingGroup(_) => None,
466+
467+
Self::InvalidReplay(reason) => Some(Outcome::Invalid(reason)),
463468
}
464469
}
465470

relay-server/src/services/processor/replay.rs

Lines changed: 42 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,9 @@ use rmp_serde;
1717
use serde::{Deserialize, Serialize};
1818

1919
use crate::envelope::{ContentType, ItemType};
20-
use crate::services::outcome::{DiscardReason, Outcome};
20+
use crate::services::outcome::DiscardReason;
2121
use crate::services::processor::{ProcessEnvelopeState, ProcessingError, ReplayGroup};
2222
use crate::statsd::RelayTimers;
23-
use crate::utils::ItemAction;
2423

2524
/// Removes replays if the feature flag is not enabled.
2625
pub fn process(
@@ -55,64 +54,57 @@ pub fn process(
5554
let combined_envelope_items =
5655
project_state.has_feature(Feature::SessionReplayCombinedEnvelopeItems);
5756

58-
state.managed_envelope.retain_items(|item| {
59-
// If replays aren't enabled or an item was dropped - drop the remainder of the
60-
// envelope.
61-
if !replays_enabled {
62-
return ItemAction::DropSilently;
63-
}
57+
// If the replay feature is not enabled drop the items silenty.
58+
if !replays_enabled {
59+
state.managed_envelope.drop_items_silently();
60+
return Ok(());
61+
}
6462

63+
for item in state.managed_envelope.envelope_mut().items_mut() {
6564
// Set the combined payload header to the value of the combined feature.
6665
item.set_replay_combined_payload(combined_envelope_items);
6766

6867
match item.ty() {
6968
ItemType::ReplayEvent => {
70-
match handle_replay_event_item(
69+
let replay_event = handle_replay_event_item(
7170
item.payload(),
7271
&event_id,
7372
project_config,
7473
client_addr,
7574
user_agent,
76-
) {
77-
Err(outcome) => ItemAction::Drop(outcome),
78-
Ok(replay_event) => {
79-
item.set_payload(ContentType::Json, replay_event);
80-
ItemAction::Keep
81-
}
82-
}
75+
)
76+
.map_err(ProcessingError::InvalidReplay)?;
77+
78+
item.set_payload(ContentType::Json, replay_event);
8379
}
8480
ItemType::ReplayRecording => {
85-
match handle_replay_recording_item(
81+
let replay_recording = handle_replay_recording_item(
8682
item.payload(),
8783
&event_id,
8884
scrubbing_enabled,
8985
&mut scrubber,
90-
) {
91-
Err(outcome) => ItemAction::Drop(outcome),
92-
Ok(replay_recording) => {
93-
item.set_payload(ContentType::OctetStream, replay_recording);
94-
ItemAction::Keep
95-
}
96-
}
86+
)
87+
.map_err(ProcessingError::InvalidReplay)?;
88+
89+
item.set_payload(ContentType::OctetStream, replay_recording);
9790
}
98-
ItemType::ReplayVideo => match handle_replay_video_item(
99-
item.payload(),
100-
&event_id,
101-
project_config,
102-
client_addr,
103-
user_agent,
104-
scrubbing_enabled,
105-
&mut scrubber,
106-
) {
107-
Err(outcome) => ItemAction::Drop(outcome),
108-
Ok(payload) => {
109-
item.set_payload(ContentType::OctetStream, payload);
110-
ItemAction::Keep
111-
}
112-
},
113-
_ => ItemAction::Keep,
91+
ItemType::ReplayVideo => {
92+
let replay_video = handle_replay_video_item(
93+
item.payload(),
94+
&event_id,
95+
project_config,
96+
client_addr,
97+
user_agent,
98+
scrubbing_enabled,
99+
&mut scrubber,
100+
)
101+
.map_err(ProcessingError::InvalidReplay)?;
102+
103+
item.set_payload(ContentType::OctetStream, replay_video);
104+
}
105+
_ => {}
114106
}
115-
});
107+
}
116108

117109
Ok(())
118110
}
@@ -125,7 +117,7 @@ fn handle_replay_event_item(
125117
config: &ProjectConfig,
126118
client_ip: Option<IpAddr>,
127119
user_agent: &RawUserAgentInfo<&str>,
128-
) -> Result<Bytes, Outcome> {
120+
) -> Result<Bytes, DiscardReason> {
129121
match process_replay_event(&payload, config, client_ip, user_agent) {
130122
Ok(replay) => match replay.to_json() {
131123
Ok(json) => Ok(json.into_bytes().into()),
@@ -144,12 +136,12 @@ fn handle_replay_event_item(
144136
?event_id,
145137
"invalid replay event"
146138
);
147-
Err(Outcome::Invalid(match error {
139+
Err(match error {
148140
ReplayError::NoContent => DiscardReason::InvalidReplayEventNoPayload,
149141
ReplayError::CouldNotScrub(_) => DiscardReason::InvalidReplayEventPii,
150142
ReplayError::CouldNotParse(_) => DiscardReason::InvalidReplayEvent,
151143
ReplayError::InvalidPayload(_) => DiscardReason::InvalidReplayEvent,
152-
}))
144+
})
153145
}
154146
}
155147
}
@@ -197,7 +189,7 @@ fn handle_replay_recording_item(
197189
event_id: &Option<EventId>,
198190
scrubbing_enabled: bool,
199191
scrubber: &mut RecordingScrubber,
200-
) -> Result<Bytes, Outcome> {
192+
) -> Result<Bytes, DiscardReason> {
201193
// XXX: Processing is there just for data scrubbing. Skip the entire expensive
202194
// processing step if we do not need to scrub.
203195
if !scrubbing_enabled || scrubber.is_empty() {
@@ -216,7 +208,7 @@ fn handle_replay_recording_item(
216208
Ok(recording) => Ok(recording.into()),
217209
Err(e) => {
218210
relay_log::warn!("replay-recording-event: {e} {event_id:?}");
219-
Err(Outcome::Invalid(DiscardReason::InvalidReplayRecordingEvent))
211+
Err(DiscardReason::InvalidReplayRecordingEvent)
220212
}
221213
}
222214
}
@@ -238,7 +230,7 @@ fn handle_replay_video_item(
238230
user_agent: &RawUserAgentInfo<&str>,
239231
scrubbing_enabled: bool,
240232
scrubber: &mut RecordingScrubber,
241-
) -> Result<Bytes, Outcome> {
233+
) -> Result<Bytes, DiscardReason> {
242234
let ReplayVideoEvent {
243235
replay_event,
244236
replay_recording,
@@ -247,7 +239,7 @@ fn handle_replay_video_item(
247239
Ok(result) => result,
248240
Err(e) => {
249241
relay_log::warn!("replay-video-event: {e} {event_id:?}");
250-
return Err(Outcome::Invalid(DiscardReason::InvalidReplayVideoEvent));
242+
return Err(DiscardReason::InvalidReplayVideoEvent);
251243
}
252244
};
253245

@@ -261,7 +253,7 @@ fn handle_replay_video_item(
261253

262254
// Verify the replay-video payload is not empty.
263255
if replay_video.is_empty() {
264-
return Err(Outcome::Invalid(DiscardReason::InvalidReplayVideoEvent));
256+
return Err(DiscardReason::InvalidReplayVideoEvent);
265257
}
266258

267259
match rmp_serde::to_vec_named(&ReplayVideoEvent {
@@ -270,6 +262,6 @@ fn handle_replay_video_item(
270262
replay_video,
271263
}) {
272264
Ok(payload) => Ok(payload.into()),
273-
Err(_) => Err(Outcome::Invalid(DiscardReason::InvalidReplayVideoEvent)),
265+
Err(_) => Err(DiscardReason::InvalidReplayVideoEvent),
274266
}
275267
}

relay-server/src/utils/managed_envelope.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,11 @@ impl ManagedEnvelope {
315315
// TODO: once `update` is private, it should be called here.
316316
}
317317

318+
/// Drops every item in the envelope.
319+
pub fn drop_items_silently(&mut self) {
320+
self.envelope.drop_items_silently();
321+
}
322+
318323
/// Record that event metrics have been extracted.
319324
///
320325
/// This is usually done automatically as part of `EnvelopeContext::new` or `update`. However,
@@ -460,13 +465,21 @@ impl ManagedEnvelope {
460465
// (see: `Self::event_category()`).
461466
if self.context.summary.secondary_transaction_quantity > 0 {
462467
self.track_outcome(
463-
outcome,
468+
outcome.clone(),
464469
// Secondary transaction counts are never indexed transactions
465470
DataCategory::Transaction,
466471
self.context.summary.secondary_transaction_quantity,
467472
);
468473
}
469474

475+
if self.context.summary.replay_quantity > 0 {
476+
self.track_outcome(
477+
outcome.clone(),
478+
DataCategory::Replay,
479+
self.context.summary.replay_quantity,
480+
);
481+
}
482+
470483
self.finish(RelayCounters::EnvelopeRejected, handling);
471484
}
472485

tests/integration/test_outcome.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2041,3 +2041,69 @@ def send_buckets(n, name, value, ty):
20412041
outcomes = outcomes_consumer.get_outcomes()
20422042
assert len(outcomes) == 1
20432043
assert outcomes[0]["reason"] == global_reason_code
2044+
2045+
2046+
def test_replay_outcomes_item_failed(
2047+
mini_sentry,
2048+
relay_with_processing,
2049+
outcomes_consumer,
2050+
metrics_consumer,
2051+
):
2052+
"""
2053+
Assert Relay records a single outcome even though both envelope items fail.
2054+
"""
2055+
outcomes_consumer = outcomes_consumer(timeout=2)
2056+
metrics_consumer = metrics_consumer()
2057+
2058+
project_id = 42
2059+
mini_sentry.add_basic_project_config(
2060+
project_id, extra={"config": {"features": ["organizations:session-replay"]}}
2061+
)
2062+
2063+
config = {
2064+
"outcomes": {
2065+
"emit_outcomes": True,
2066+
"batch_size": 1,
2067+
"batch_interval": 1,
2068+
"aggregator": {
2069+
"bucket_interval": 1,
2070+
"flush_interval": 1,
2071+
},
2072+
"source": "pop-relay",
2073+
},
2074+
"aggregator": {"bucket_interval": 1, "initial_delay": 0, "debounce_delay": 0},
2075+
}
2076+
2077+
upstream = relay_with_processing(config)
2078+
2079+
def make_envelope():
2080+
envelope = Envelope(headers=[["event_id", "515539018c9b4260a6f999572f1661ee"]])
2081+
envelope.add_item(
2082+
Item(payload=PayloadRef(bytes=b"not valid"), type="replay_event")
2083+
)
2084+
envelope.add_item(
2085+
Item(payload=PayloadRef(bytes=b"still not valid"), type="replay_recording")
2086+
)
2087+
2088+
return envelope
2089+
2090+
envelope = make_envelope()
2091+
upstream.send_envelope(project_id, envelope)
2092+
2093+
outcomes = outcomes_consumer.get_outcomes()
2094+
2095+
assert len(outcomes) == 1
2096+
2097+
expected = {
2098+
"category": 7,
2099+
"event_id": "515539018c9b4260a6f999572f1661ee",
2100+
"key_id": 123,
2101+
"outcome": 3,
2102+
"project_id": 42,
2103+
"quantity": 2,
2104+
"reason": "invalid_replay",
2105+
"remote_addr": "127.0.0.1",
2106+
"source": "pop-relay",
2107+
}
2108+
expected["timestamp"] = outcomes[0]["timestamp"]
2109+
assert outcomes[0] == expected

0 commit comments

Comments
 (0)