Skip to content

Commit b185d17

Browse files
authored
ref(processing): Pass store handle to forward_store (#5425)
1 parent c1898b1 commit b185d17

File tree

9 files changed

+67
-23
lines changed

9 files changed

+67
-23
lines changed

relay-server/src/processing/check_ins/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,13 +112,13 @@ impl Forward for CheckInsOutput {
112112
#[cfg(feature = "processing")]
113113
fn forward_store(
114114
self,
115-
s: &relay_system::Addr<crate::services::store::Store>,
115+
s: processing::StoreHandle<'_>,
116116
ctx: processing::ForwardContext<'_>,
117117
) -> Result<(), Rejected<()>> {
118118
let envelope = self.serialize_envelope(ctx)?;
119119
let envelope = ManagedEnvelope::from(envelope).into_processed();
120120

121-
s.send(crate::services::store::StoreEnvelope { envelope });
121+
s.store(crate::services::store::StoreEnvelope { envelope });
122122

123123
Ok(())
124124
}

relay-server/src/processing/common.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use crate::Envelope;
22
use crate::managed::{Managed, Rejected};
33
use crate::processing::ForwardContext;
4+
#[cfg(feature = "processing")]
5+
use crate::processing::StoreHandle;
46
use crate::processing::check_ins::CheckInsProcessor;
57
use crate::processing::logs::LogsProcessor;
68
use crate::processing::sessions::SessionsProcessor;
@@ -30,7 +32,7 @@ macro_rules! outputs {
3032
#[cfg(feature = "processing")]
3133
fn forward_store(
3234
self,
33-
s: &relay_system::Addr<crate::services::store::Store>,
35+
s: StoreHandle<'_>,
3436
ctx: ForwardContext<'_>,
3537
) -> Result<(), Rejected<()>> {
3638
match self {

relay-server/src/processing/forward.rs

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,52 @@ use relay_config::Config;
22
use relay_dynamic_config::GlobalConfig;
33
#[cfg(feature = "processing")]
44
use relay_dynamic_config::{RetentionConfig, RetentionsConfig};
5+
#[cfg(feature = "processing")]
6+
use relay_system::{Addr, FromMessage};
57

68
use crate::Envelope;
79
use crate::managed::{Managed, Rejected};
810
use crate::services::projects::project::ProjectInfo;
11+
#[cfg(feature = "processing")]
12+
use crate::services::store::Store;
13+
#[cfg(feature = "processing")]
14+
use crate::services::upload::Upload;
15+
16+
/// A transparent handle that dispatches between store-like services.
17+
#[cfg(feature = "processing")]
18+
#[derive(Debug, Clone, Copy)]
19+
pub struct StoreHandle<'a> {
20+
store: &'a Addr<Store>,
21+
upload: Option<&'a Addr<Upload>>,
22+
}
23+
24+
#[cfg(feature = "processing")]
25+
impl<'a> StoreHandle<'a> {
26+
pub fn new(store: &'a Addr<Store>, upload: Option<&'a Addr<Upload>>) -> Self {
27+
Self { store, upload }
28+
}
29+
30+
/// Sends a message to the [`Store`] service.
31+
pub fn store<M>(&self, message: M)
32+
where
33+
Store: FromMessage<M>,
34+
{
35+
self.store.send(message);
36+
}
37+
38+
/// Sends a message to the [`Upload`] service.
39+
#[expect(unused)]
40+
pub fn upload<M>(&self, message: M)
41+
where
42+
Upload: FromMessage<M>,
43+
{
44+
if let Some(upload) = self.upload {
45+
upload.send(message);
46+
} else {
47+
relay_log::error!("Upload service not configured. Dropping message.");
48+
}
49+
}
50+
}
951

1052
/// A processor output which can be forwarded to a different destination.
1153
pub trait Forward {
@@ -21,11 +63,8 @@ pub trait Forward {
2163
///
2264
/// This function must only be called when Relay is configured to be in processing mode.
2365
#[cfg(feature = "processing")]
24-
fn forward_store(
25-
self,
26-
s: &relay_system::Addr<crate::services::store::Store>,
27-
ctx: ForwardContext<'_>,
28-
) -> Result<(), Rejected<()>>;
66+
fn forward_store(self, s: StoreHandle<'_>, ctx: ForwardContext<'_>)
67+
-> Result<(), Rejected<()>>;
2968
}
3069

3170
/// Context passed to [`Forward`].
@@ -80,11 +119,7 @@ impl Forward for Nothing {
80119
}
81120

82121
#[cfg(feature = "processing")]
83-
fn forward_store(
84-
self,
85-
_: &relay_system::Addr<crate::services::store::Store>,
86-
_: ForwardContext<'_>,
87-
) -> Result<(), Rejected<()>> {
122+
fn forward_store(self, _: StoreHandle<'_>, _: ForwardContext<'_>) -> Result<(), Rejected<()>> {
88123
match self {}
89124
}
90125
}

relay-server/src/processing/logs/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ impl Forward for LogOutput {
175175
#[cfg(feature = "processing")]
176176
fn forward_store(
177177
self,
178-
s: &relay_system::Addr<crate::services::store::Store>,
178+
s: processing::StoreHandle<'_>,
179179
ctx: processing::ForwardContext<'_>,
180180
) -> Result<(), Rejected<()>> {
181181
let Self(logs) = self;
@@ -188,7 +188,7 @@ impl Forward for LogOutput {
188188

189189
for log in logs.split(|logs| logs.logs) {
190190
if let Ok(log) = log.try_map(|log, _| store::convert(log, &ctx)) {
191-
s.send(log)
191+
s.store(log)
192192
};
193193
}
194194

relay-server/src/processing/sessions/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ impl Forward for SessionsOutput {
123123
#[cfg(feature = "processing")]
124124
fn forward_store(
125125
self,
126-
_: &relay_system::Addr<crate::services::store::Store>,
126+
_: processing::forward::StoreHandle<'_>,
127127
_: processing::ForwardContext<'_>,
128128
) -> Result<(), Rejected<()>> {
129129
let SessionsOutput(sessions) = self;

relay-server/src/processing/spans/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ impl Forward for SpanOutput {
226226
#[cfg(feature = "processing")]
227227
fn forward_store(
228228
self,
229-
s: &relay_system::Addr<crate::services::store::Store>,
229+
s: processing::forward::StoreHandle<'_>,
230230
ctx: processing::ForwardContext<'_>,
231231
) -> Result<(), Rejected<()>> {
232232
let spans = match self {
@@ -255,7 +255,7 @@ impl Forward for SpanOutput {
255255

256256
for span in spans.split(|spans| spans.into_indexed_spans()) {
257257
if let Ok(span) = span.try_map(|span, _| store::convert(span, &ctx)) {
258-
s.send(span)
258+
s.store(span)
259259
};
260260
}
261261

relay-server/src/processing/trace_metrics/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ impl Forward for TraceMetricOutput {
161161
#[cfg(feature = "processing")]
162162
fn forward_store(
163163
self,
164-
s: &relay_system::Addr<crate::services::store::Store>,
164+
s: processing::forward::StoreHandle<'_>,
165165
ctx: processing::ForwardContext<'_>,
166166
) -> Result<(), Rejected<()>> {
167167
let Self(metrics) = self;
@@ -174,7 +174,7 @@ impl Forward for TraceMetricOutput {
174174

175175
for metric in metrics.split(|metrics| metrics.metrics) {
176176
if let Ok(metric) = metric.try_map(|metric, _| store::convert(metric, &ctx)) {
177-
s.send(metric);
177+
s.store(metric);
178178
}
179179
}
180180

relay-server/src/processing/transactions/mod.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ use crate::envelope::{ContentType, EnvelopeHeaders, Item, ItemType, Items};
1919
use crate::managed::{
2020
Counted, Managed, ManagedEnvelope, ManagedResult, OutcomeError, Quantities, Rejected,
2121
};
22+
#[cfg(feature = "processing")]
23+
use crate::processing;
24+
#[cfg(feature = "processing")]
25+
use crate::processing::forward::StoreHandle;
2226
use crate::processing::transactions::profile::{Profile, ProfileWithHeaders};
2327
use crate::processing::utils::event::{
2428
EventFullyNormalized, EventMetricsExtracted, FiltersStatus, SpansExtracted, event_type,
@@ -592,13 +596,13 @@ impl Forward for TransactionOutput {
592596
#[cfg(feature = "processing")]
593597
fn forward_store(
594598
self,
595-
s: &relay_system::Addr<crate::services::store::Store>,
599+
s: StoreHandle<'_>,
596600
ctx: ForwardContext<'_>,
597601
) -> Result<(), Rejected<()>> {
598602
// TODO: split out spans into a separate message.
599603
let envelope: ManagedEnvelope = self.serialize_envelope(ctx)?.into();
600604

601-
s.send(StoreEnvelope {
605+
s.store(StoreEnvelope {
602606
envelope: envelope.into_processed(),
603607
});
604608

relay-server/src/services/processor.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2305,6 +2305,9 @@ impl EnvelopeProcessorService {
23052305
if self.inner.config.processing_enabled()
23062306
&& let Some(store_forwarder) = &self.inner.addrs.store_forwarder
23072307
{
2308+
use crate::processing::StoreHandle;
2309+
2310+
let upload = self.inner.addrs.upload.as_ref();
23082311
match submit {
23092312
Submit::Envelope(envelope) => {
23102313
let envelope_has_attachments = envelope
@@ -2328,7 +2331,7 @@ impl EnvelopeProcessorService {
23282331
}
23292332
}
23302333
Submit::Output { output, ctx } => output
2331-
.forward_store(store_forwarder, ctx)
2334+
.forward_store(StoreHandle::new(store_forwarder, upload), ctx)
23322335
.unwrap_or_else(|err| err.into_inner()),
23332336
}
23342337
return;

0 commit comments

Comments
 (0)