Skip to content

Commit

Permalink
feat(replays): Combined Envelope Items (reopen) (#3035)
Browse files Browse the repository at this point in the history
Reopening #2170

- We originally closed it because there wasn't a clear use case. Now
there is. getsentry/sentry#60826


I've resolved merge conflicts, tests pass, and I believe comments from
the previous PR have been addressed.

---------

Co-authored-by: Joris Bayer <joris.bayer@sentry.io>
Co-authored-by: Colton Allen <colton.allen@sentry.io>
  • Loading branch information
3 people authored Feb 15, 2024
1 parent eba85e3 commit ac1ff14
Show file tree
Hide file tree
Showing 10 changed files with 224 additions and 45 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
- Emit a usage metric for total spans. ([#3007](https://github.com/getsentry/relay/pull/3007))
- Drop timestamp from metrics partition key. ([#3025](https://github.com/getsentry/relay/pull/3025))
- Drop spans ending outside the valid timestamp range. ([#3013](https://github.com/getsentry/relay/pull/3013))
- Add support for combining replay envelope items. ([#3035](https://github.com/getsentry/relay/pull/3035))
- Extract INP metrics from spans. ([#2969](https://github.com/getsentry/relay/pull/2969), [#3041](https://github.com/getsentry/relay/pull/3041))
- Add ability to rate limit metric buckets by namespace. ([#2941](https://github.com/getsentry/relay/pull/2941))
- Upgrade sqlparser to 0.43.1.([#3057](https://github.com/getsentry/relay/pull/3057))
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ make build

#### Snapshot tests

We use `insta` for snapshot testing. It will run as part of the `make test` command
We use `insta` for snapshot testing. It will run as part of the `make test` command
to validate schema/protocol changes. To install the `insta` tool for reviewing snapshots run:
```bash
cargo install cargo-insta
Expand Down
4 changes: 4 additions & 0 deletions relay-dynamic-config/src/feature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ pub enum Feature {
/// Enables data scrubbing of replay recording payloads.
#[serde(rename = "organizations:session-replay-recording-scrubbing")]
SessionReplayRecordingScrubbing,
/// Enables combining session replay envelope items (Replay Recordings and Replay Events).
/// into one Kafka message.
#[serde(rename = "organizations:session-replay-combined-envelope-items")]
SessionReplayCombinedEnvelopeItems,
/// Enables new User Feedback ingest.
///
/// TODO(jferg): rename to UserFeedbackIngest once old UserReport logic is deprecated.
Expand Down
18 changes: 17 additions & 1 deletion relay-server/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,11 @@ pub struct ItemHeaders {
#[serde(default, skip)]
rate_limited: bool,

/// Indicates that this item should be combined into one payload with other replay item.
/// NOTE: This is internal-only and not exposed into the Envelope.
#[serde(default, skip)]
replay_combined_payload: bool,

/// Contains the amount of events this item was generated and aggregated from.
///
/// A [metrics buckets](`ItemType::MetricBuckets`) item contains metrics extracted and
Expand Down Expand Up @@ -591,6 +596,7 @@ impl Item {
filename: None,
routing_hint: None,
rate_limited: false,
replay_combined_payload: false,
source_quantities: None,
sample_rates: None,
other: BTreeMap::new(),
Expand Down Expand Up @@ -754,6 +760,17 @@ impl Item {
self.headers.source_quantities = Some(source_quantities);
}

/// Returns if the payload's replay items should be combined into one kafka message.
#[cfg(feature = "processing")]
pub fn replay_combined_payload(&self) -> bool {
self.headers.replay_combined_payload
}

/// Sets the replay_combined_payload for this item.
pub fn set_replay_combined_payload(&mut self, combined_payload: bool) {
self.headers.replay_combined_payload = combined_payload;
}

/// Sets sample rates for this item.
pub fn set_sample_rates(&mut self, sample_rates: Value) {
if matches!(sample_rates, Value::Array(ref a) if !a.is_empty()) {
Expand Down Expand Up @@ -873,7 +890,6 @@ impl Item {
ItemType::UnrealReport => true,
ItemType::UserReport => true,
ItemType::UserReportV2 => true,

ItemType::ReplayEvent => true,
ItemType::Session => false,
ItemType::Sessions => false,
Expand Down
8 changes: 8 additions & 0 deletions relay-server/src/services/processor/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,17 @@ pub fn process(
user_agent: meta.user_agent(),
client_hints: meta.client_hints().as_deref(),
};
let combined_envelope_items =
project_state.has_feature(Feature::SessionReplayCombinedEnvelopeItems);

state.managed_envelope.retain_items(|item| match item.ty() {
ItemType::ReplayEvent => {
if !replays_enabled {
return ItemAction::DropSilently;
}
if combined_envelope_items {
item.set_replay_combined_payload(true);
}

match process_replay_event(&item.payload(), project_config, client_addr, user_agent) {
Ok(replay) => match replay.to_json() {
Expand Down Expand Up @@ -87,6 +92,9 @@ pub fn process(
if !replays_enabled {
return ItemAction::DropSilently;
}
if combined_envelope_items {
item.set_replay_combined_payload(true);
}

// XXX: Processing is there just for data scrubbing. Skip the entire expensive
// processing step if we do not need to scrub.
Expand Down
113 changes: 76 additions & 37 deletions relay-server/src/services/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ impl StoreService {

let mut attachments = Vec::new();

let mut replay_event = None;
let mut replay_recording = None;

for item in envelope.items() {
match item.ty() {
ItemType::Attachment => {
Expand Down Expand Up @@ -244,16 +247,22 @@ impl StoreService {
item,
)?,
ItemType::ReplayRecording => {
self.produce_replay_recording(event_id, scoping, item, start_time, retention)?
replay_recording = Some(item);
}
ItemType::ReplayEvent => {
if item.replay_combined_payload() {
replay_event = Some(item);
}

self.produce_replay_event(
event_id.ok_or(StoreError::NoEventId)?,
scoping.organization_id,
scoping.project_id,
start_time,
retention,
item,
)?;
}
ItemType::ReplayEvent => self.produce_replay_event(
event_id.ok_or(StoreError::NoEventId)?,
scoping.organization_id,
scoping.project_id,
start_time,
retention,
item,
)?,
ItemType::CheckIn => self.produce_check_in(
scoping.organization_id,
scoping.project_id,
Expand All @@ -269,6 +278,17 @@ impl StoreService {
}
}

if let Some(recording) = replay_recording {
self.produce_replay_recording(
event_id,
scoping,
recording,
replay_event,
start_time,
retention,
)?;
}

if event_item.is_none() && attachments.is_empty() {
// No event-related content. All done.
return Ok(());
Expand Down Expand Up @@ -774,40 +794,59 @@ impl StoreService {
event_id: Option<EventId>,
scoping: Scoping,
item: &Item,
replay_event: Option<&Item>,
start_time: Instant,
retention: u16,
) -> Result<(), StoreError> {
// 2000 bytes are reserved for the message metadata.
let max_message_metadata_size = 2000;

// Remaining bytes can be filled by the payload.
let max_payload_size = self.config.max_replay_message_size() - max_message_metadata_size;
// Map the event item to it's byte payload value.
let replay_event_payload = replay_event.map(|rv| rv.payload());

if item.payload().len() < max_payload_size {
let message =
KafkaMessage::ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage {
replay_id: event_id.ok_or(StoreError::NoEventId)?,
project_id: scoping.project_id,
key_id: scoping.key_id,
org_id: scoping.organization_id,
received: UnixTimestamp::from_instant(start_time).as_secs(),
retention_days: retention,
payload: item.payload(),
});
// Maximum number of bytes accepted by the consumer.
let max_payload_size = self.config.max_replay_message_size();

self.produce(
KafkaTopic::ReplayRecordings,
scoping.organization_id,
message,
)?;
// Size of the consumer message. We can be reasonably sure this won't overflow because
// of the request size validation provided by Nginx and Relay.
let mut payload_size = 2000; // Reserve 2KB for the message metadata.
payload_size += replay_event_payload.as_ref().map_or(0, |b| b.len());
payload_size += item.payload().len();

metric!(
counter(RelayCounters::ProcessingMessageProduced) += 1,
event_type = "replay_recording_not_chunked"
);
} else {
// If the recording payload can not fit in to the message do not produce and quit early.
if payload_size >= max_payload_size {
relay_log::warn!("replay_recording over maximum size.");
};
self.outcome_aggregator.send(TrackOutcome {
category: DataCategory::Replay,
event_id,
outcome: Outcome::Invalid(DiscardReason::TooLarge),
quantity: 1,
remote_addr: None,
scoping,
timestamp: instant_to_date_time(start_time),
});
return Ok(());
}

let message =
KafkaMessage::ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage {
replay_id: event_id.ok_or(StoreError::NoEventId)?,
project_id: scoping.project_id,
key_id: scoping.key_id,
org_id: scoping.organization_id,
received: UnixTimestamp::from_instant(start_time).as_secs(),
retention_days: retention,
payload: item.payload(),
replay_event: replay_event_payload,
});

self.produce(
KafkaTopic::ReplayRecordings,
scoping.organization_id,
message,
)?;

metric!(
counter(RelayCounters::ProcessingMessageProduced) += 1,
event_type = "replay_recording_not_chunked"
);

Ok(())
}
Expand Down Expand Up @@ -1072,7 +1111,6 @@ struct ReplayRecordingChunkKafkaMessage {
/// the tuple (id, chunk_index) is the unique identifier for a single chunk.
chunk_index: usize,
}

#[derive(Debug, Serialize)]
struct ReplayRecordingChunkMeta {
/// The attachment ID within the event.
Expand Down Expand Up @@ -1114,6 +1152,7 @@ struct ReplayRecordingNotChunkedKafkaMessage {
received: u64,
retention_days: u16,
payload: Bytes,
replay_event: Option<Bytes>,
}

/// User report for an event wrapped up in a message ready for consumption in Kafka.
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/fixtures/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,8 +409,8 @@ def get_chunked_replay(self):
assert v["type"] == "replay_recording", v["type"]
return v

def get_not_chunked_replay(self):
message = self.poll()
def get_not_chunked_replay(self, timeout=None):
message = self.poll(timeout=timeout)
assert message is not None
assert message.error() is None

Expand Down
111 changes: 111 additions & 0 deletions tests/integration/test_replay_combined_payload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from sentry_sdk.envelope import Envelope, Item, PayloadRef

from .test_replay_recordings import recording_payload
from .test_replay_events import generate_replay_sdk_event
import json


def test_replay_combined_with_processing(
mini_sentry,
relay_with_processing,
replay_recordings_consumer,
replay_events_consumer,
):
project_id = 42
replay_id = "515539018c9b4260a6f999572f1661ee"
relay = relay_with_processing()
mini_sentry.add_basic_project_config(
project_id,
extra={
"config": {
"features": [
"organizations:session-replay",
"organizations:session-replay-combined-envelope-items",
]
}
},
)
replay_recordings_consumer = replay_recordings_consumer()
replay_events_consumer = replay_events_consumer(timeout=10)

envelope = Envelope(
headers=[
[
"event_id",
replay_id,
],
["attachment_type", "replay_recording"],
]
)
payload = recording_payload(b"[]")
envelope.add_item(Item(payload=PayloadRef(bytes=payload), type="replay_recording"))

replay_event = generate_replay_sdk_event(replay_id=replay_id)
envelope.add_item(Item(payload=PayloadRef(json=replay_event), type="replay_event"))

relay.send_envelope(project_id, envelope)

combined_replay_message = replay_recordings_consumer.get_not_chunked_replay(
timeout=10
)

assert combined_replay_message["type"] == "replay_recording_not_chunked"
assert combined_replay_message["replay_id"] == replay_id

assert combined_replay_message["payload"] == payload

replay_event = json.loads(combined_replay_message["replay_event"])

assert replay_event["replay_id"] == replay_id

replay_event, replay_event_message = replay_events_consumer.get_replay_event()
assert replay_event["type"] == "replay_event"
assert replay_event["replay_id"] == replay_id
assert replay_event_message["retention_days"] == 90


def test_replay_combined_with_processing_no_flag_set(
mini_sentry, relay_with_processing, replay_recordings_consumer
):
project_id = 42
replay_id = "515539018c9b4260a6f999572f1661ee"
relay = relay_with_processing()
mini_sentry.add_basic_project_config(
project_id,
extra={
"config": {
"features": [
"organizations:session-replay",
]
}
},
)
replay_recordings_consumer = replay_recordings_consumer()

envelope = Envelope(
headers=[
[
"event_id",
replay_id,
],
["attachment_type", "replay_recording"],
]
)
payload = recording_payload(b"[]")
envelope.add_item(Item(payload=PayloadRef(bytes=payload), type="replay_recording"))

replay_event = generate_replay_sdk_event(replay_id=replay_id)
envelope.add_item(Item(payload=PayloadRef(json=replay_event), type="replay_event"))

relay.send_envelope(project_id, envelope)

replay_recording_message = replay_recordings_consumer.get_not_chunked_replay(
timeout=10
)

assert replay_recording_message["type"] == "replay_recording_not_chunked"
assert replay_recording_message["replay_id"] == replay_id

assert replay_recording_message["payload"] == payload

assert replay_recording_message["replay_event"] is None
Loading

0 comments on commit ac1ff14

Please sign in to comment.