Skip to content

Commit dba50e1

Browse files
authored
feat: add before_send hooks (#135)
* feat: add before_send hooks * chore: simplify before_send hook type * address pr review feedback
1 parent af311cc commit dba50e1

9 files changed

Lines changed: 301 additions & 26 deletions

File tree

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
cargo/posthog-rs: minor
3+
---
4+
5+
Add before_send hooks for mutating or dropping events before capture.

src/client/async_client.rs

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,14 @@ use crate::feature_flags::{match_feature_flag, FeatureFlag, FeatureFlagsResponse
2323
use crate::local_evaluation::{AsyncFlagPoller, FlagCache, LocalEvaluationConfig, LocalEvaluator};
2424
use crate::{Error, Event};
2525

26+
#[cfg(feature = "capture-v1")]
27+
use super::common::apply_capture_defaults;
2628
use super::common::{
27-
already_reported, build_dedup_key, extract_flag_details, flag_called_event,
28-
flag_event_dedup_cache, local_record, remote_record_from_detail, DetailedFlagsResponse,
29-
FlagEventDedupCache,
29+
already_reported, apply_before_send_hooks, build_dedup_key, extract_flag_details,
30+
flag_called_event, flag_event_dedup_cache, local_record, remote_record_from_detail,
31+
DetailedFlagsResponse, FlagEventDedupCache,
3032
};
31-
use super::ClientOptions;
33+
use super::{BeforeSendHook, ClientOptions};
3234

3335
#[cfg(not(feature = "capture-v1"))]
3436
async fn check_response(response: reqwest::Response) -> Result<(), Error> {
@@ -64,6 +66,7 @@ struct AsyncFlagEventHost {
6466
http_client: HttpClient,
6567
options: ClientOptions,
6668
capture_url: String,
69+
before_send: Vec<BeforeSendHook>,
6770
dedup_cache: FlagEventDedupCache,
6871
/// Tokio runtime handle captured at host construction (which always runs
6972
/// inside the runtime that hosts `evaluate_flags`). This lets snapshot
@@ -85,6 +88,7 @@ impl AsyncFlagEventHost {
8588
http_client,
8689
options: options.clone(),
8790
capture_url,
91+
before_send: options.before_send.clone(),
8892
dedup_cache: flag_event_dedup_cache(),
8993
runtime: tokio::runtime::Handle::current(),
9094
}
@@ -146,6 +150,9 @@ impl AsyncFlagEventHost {
146150
#[cfg(not(feature = "capture-v1"))]
147151
fn spawn_ship_v0(&self, mut event: Event) {
148152
event.prepare_for_v0();
153+
let Some(event) = apply_before_send_hooks(&self.before_send, event) else {
154+
return;
155+
};
149156
let inner_event = InnerEvent::new(event, self.options.api_key.clone());
150157
let payload = match serde_json::to_string(&inner_event) {
151158
Ok(p) => p,
@@ -284,6 +291,12 @@ impl Client {
284291

285292
#[cfg(feature = "capture-v1")]
286293
{
294+
let mut event = event;
295+
let defaults = self.options.capture_defaults();
296+
apply_capture_defaults(&mut event, &defaults);
297+
let Some(event) = apply_before_send_hooks(&self.options.before_send, event) else {
298+
return Ok(());
299+
};
287300
return self.capture_v1(vec![event], false).await.map(|_| ());
288301
}
289302

@@ -318,6 +331,17 @@ impl Client {
318331

319332
#[cfg(feature = "capture-v1")]
320333
{
334+
let defaults = self.options.capture_defaults();
335+
let events: Vec<_> = events
336+
.into_iter()
337+
.filter_map(|mut event| {
338+
apply_capture_defaults(&mut event, &defaults);
339+
apply_before_send_hooks(&self.options.before_send, event)
340+
})
341+
.collect();
342+
if events.is_empty() {
343+
return Ok(());
344+
}
321345
return self
322346
.capture_v1(events, historical_migration)
323347
.await
@@ -332,6 +356,9 @@ impl Client {
332356
async fn capture_v0(&self, mut event: Event) -> Result<(), Error> {
333357
let defaults = self.options.capture_defaults();
334358
super::v0_capture::prepare_event(&mut event, &defaults);
359+
let Some(event) = apply_before_send_hooks(&self.options.before_send, event) else {
360+
return Ok(());
361+
};
335362
let payload =
336363
super::v0_capture::build_capture_payload(event, self.options.api_key.clone())?;
337364
let url = self.options.endpoints().build_url(Endpoint::Capture);
@@ -346,12 +373,16 @@ impl Client {
346373
historical_migration: bool,
347374
) -> Result<(), Error> {
348375
let defaults = self.options.capture_defaults();
349-
let payload = super::v0_capture::build_batch_payload(
376+
let Some(payload) = super::v0_capture::build_batch_payload(
350377
events,
351378
self.options.api_key.clone(),
352379
historical_migration,
353380
&defaults,
354-
)?;
381+
&self.options.before_send,
382+
)?
383+
else {
384+
return Ok(());
385+
};
355386
let url = self.options.endpoints().build_url(Endpoint::Batch);
356387
let (body, encoding) = super::v0_capture::encode_body(&self.options, payload);
357388
self.send_v0_with_retry(&url, body, encoding).await

src/client/blocking.rs

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,14 @@ use crate::feature_flags::{match_feature_flag, FeatureFlag, FeatureFlagsResponse
2323
use crate::local_evaluation::{FlagCache, FlagPoller, LocalEvaluationConfig, LocalEvaluator};
2424
use crate::{Error, Event};
2525

26+
#[cfg(feature = "capture-v1")]
27+
use super::common::apply_capture_defaults;
2628
use super::common::{
27-
already_reported, build_dedup_key, extract_flag_details, flag_called_event,
28-
flag_event_dedup_cache, local_record, remote_record_from_detail, DetailedFlagsResponse,
29-
FlagEventDedupCache,
29+
already_reported, apply_before_send_hooks, build_dedup_key, extract_flag_details,
30+
flag_called_event, flag_event_dedup_cache, local_record, remote_record_from_detail,
31+
DetailedFlagsResponse, FlagEventDedupCache,
3032
};
31-
use super::ClientOptions;
33+
use super::{BeforeSendHook, ClientOptions};
3234

3335
#[cfg(not(feature = "capture-v1"))]
3436
fn check_response(response: reqwest::blocking::Response) -> Result<(), Error> {
@@ -63,6 +65,7 @@ struct BlockingFlagEventHost {
6365
http_client: HttpClient,
6466
options: ClientOptions,
6567
capture_url: String,
68+
before_send: Vec<BeforeSendHook>,
6669
dedup_cache: FlagEventDedupCache,
6770
}
6871

@@ -78,6 +81,7 @@ impl BlockingFlagEventHost {
7881
http_client,
7982
options: options.clone(),
8083
capture_url,
84+
before_send: options.before_send.clone(),
8185
dedup_cache: flag_event_dedup_cache(),
8286
}
8387
}
@@ -130,6 +134,9 @@ impl BlockingFlagEventHost {
130134
#[cfg(not(feature = "capture-v1"))]
131135
fn ship_event_v0(&self, mut event: Event) {
132136
event.prepare_for_v0();
137+
let Some(event) = apply_before_send_hooks(&self.before_send, event) else {
138+
return;
139+
};
133140
let inner_event = InnerEvent::new(event, self.options.api_key.clone());
134141
let payload = match serde_json::to_string(&inner_event) {
135142
Ok(p) => p,
@@ -260,6 +267,12 @@ impl Client {
260267

261268
#[cfg(feature = "capture-v1")]
262269
{
270+
let mut event = event;
271+
let defaults = self.options.capture_defaults();
272+
apply_capture_defaults(&mut event, &defaults);
273+
let Some(event) = apply_before_send_hooks(&self.options.before_send, event) else {
274+
return Ok(());
275+
};
263276
return self.capture_v1(vec![event], false).map(|_| ());
264277
}
265278

@@ -296,6 +309,17 @@ impl Client {
296309

297310
#[cfg(feature = "capture-v1")]
298311
{
312+
let defaults = self.options.capture_defaults();
313+
let events: Vec<_> = events
314+
.into_iter()
315+
.filter_map(|mut event| {
316+
apply_capture_defaults(&mut event, &defaults);
317+
apply_before_send_hooks(&self.options.before_send, event)
318+
})
319+
.collect();
320+
if events.is_empty() {
321+
return Ok(());
322+
}
299323
return self.capture_v1(events, historical_migration).map(|_| ());
300324
}
301325

@@ -307,6 +331,9 @@ impl Client {
307331
fn capture_v0(&self, mut event: Event) -> Result<(), Error> {
308332
let defaults = self.options.capture_defaults();
309333
super::v0_capture::prepare_event(&mut event, &defaults);
334+
let Some(event) = apply_before_send_hooks(&self.options.before_send, event) else {
335+
return Ok(());
336+
};
310337
let payload =
311338
super::v0_capture::build_capture_payload(event, self.options.api_key.clone())?;
312339
let url = self.options.endpoints().build_url(Endpoint::Capture);
@@ -321,12 +348,16 @@ impl Client {
321348
historical_migration: bool,
322349
) -> Result<(), Error> {
323350
let defaults = self.options.capture_defaults();
324-
let payload = super::v0_capture::build_batch_payload(
351+
let Some(payload) = super::v0_capture::build_batch_payload(
325352
events,
326353
self.options.api_key.clone(),
327354
historical_migration,
328355
&defaults,
329-
)?;
356+
&self.options.before_send,
357+
)?
358+
else {
359+
return Ok(());
360+
};
330361
let url = self.options.endpoints().build_url(Endpoint::Batch);
331362
let (body, encoding) = super::v0_capture::encode_body(&self.options, payload);
332363
self.send_v0_with_retry(&url, body, encoding)

src/client/common.rs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
use std::collections::{HashMap, HashSet};
22
use std::sync::Mutex;
33

4+
use crate::client::BeforeSendHook;
5+
use crate::client::CaptureDefaults;
46
use crate::feature_flag_evaluations::{EvaluatedFlagRecord, FlagCalledEventParams};
57
use crate::feature_flags::{FeatureFlagsResponse, FlagDetail, FlagMetadata, FlagValue};
68
use crate::Event;
9+
use tracing::error;
710

811
/// Cap on the number of `distinct_id` entries in the `$feature_flag_called`
912
/// dedup cache. On overflow the entire map is reset (matches the JS SDK).
@@ -15,6 +18,33 @@ pub(super) fn flag_event_dedup_cache() -> FlagEventDedupCache {
1518
Mutex::new(HashMap::new())
1619
}
1720

21+
pub(super) fn apply_capture_defaults(event: &mut Event, defaults: &CaptureDefaults) {
22+
if defaults.disable_geoip {
23+
event.insert_prop_default("$geoip_disable", serde_json::Value::Bool(true));
24+
}
25+
if defaults.is_server {
26+
event.insert_prop_default("$is_server", serde_json::Value::Bool(true));
27+
}
28+
}
29+
30+
pub(super) fn apply_before_send_hooks(hooks: &[BeforeSendHook], event: Event) -> Option<Event> {
31+
let mut current = Some(event);
32+
33+
for hook in hooks {
34+
let event = current.take().expect("event is present between hooks");
35+
match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| hook.apply(event))) {
36+
Ok(Some(next)) => current = Some(next),
37+
Ok(None) => return None,
38+
Err(_) => {
39+
error!("panic in PostHog before_send hook; dropping event");
40+
return None;
41+
}
42+
}
43+
}
44+
45+
current
46+
}
47+
1848
/// Returns `true` when the helper has already shipped this
1949
/// `(distinct_id, key, response)` combination and the caller should skip.
2050
pub(super) fn already_reported(
@@ -295,4 +325,44 @@ mod tests {
295325
assert_eq!(event.properties().get("$is_server"), Some(&json!(true)));
296326
assert_eq!(event.properties().get("$geoip_disable"), Some(&json!(true)));
297327
}
328+
329+
#[test]
330+
fn before_send_hooks_mutate_and_drop_events() {
331+
let options = crate::ClientOptionsBuilder::default()
332+
.api_key("test-key".to_string())
333+
.before_send(|mut event| {
334+
event.insert_prop("from_hook", true).unwrap();
335+
Some(event)
336+
})
337+
.before_send(|event| {
338+
if event.event_name() == "drop" {
339+
None
340+
} else {
341+
Some(event)
342+
}
343+
})
344+
.build()
345+
.unwrap();
346+
347+
let event = apply_before_send_hooks(&options.before_send, Event::new("keep", "user-1"))
348+
.expect("event should be kept");
349+
assert_eq!(event.properties().get("from_hook"), Some(&json!(true)));
350+
351+
assert!(
352+
apply_before_send_hooks(&options.before_send, Event::new("drop", "user-1")).is_none()
353+
);
354+
}
355+
356+
#[test]
357+
fn before_send_hook_panic_drops_event() {
358+
let options = crate::ClientOptionsBuilder::default()
359+
.api_key("test-key".to_string())
360+
.before_send(|_event| panic!("boom"))
361+
.build()
362+
.unwrap();
363+
364+
assert!(
365+
apply_before_send_hooks(&options.before_send, Event::new("test", "user-1")).is_none()
366+
);
367+
}
298368
}

src/client/mod.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1+
use std::sync::{Arc, Mutex};
2+
13
use crate::endpoints::{EndpointManager, DEFAULT_HOST};
4+
use crate::event::Event;
25
use derive_builder::Builder;
36
use tracing::warn;
47

@@ -49,6 +52,38 @@ pub use async_client::client;
4952
#[cfg(feature = "async-client")]
5053
pub use async_client::Client;
5154

55+
type BeforeSendFn = dyn FnMut(Event) -> Option<Event> + Send + 'static;
56+
type SharedBeforeSendHook = Arc<Mutex<Box<BeforeSendFn>>>;
57+
58+
/// Hook that can modify or discard events before they are sent.
59+
///
60+
/// Hooks run before serialization. Return `Some(event)` to continue sending the
61+
/// event, or `None` to drop it.
62+
///
63+
/// Hook panics are caught and cause the current event to be dropped. If a hook
64+
/// keeps mutable state, a panic can leave that state partially updated; the SDK
65+
/// recovers the hook mutex and subsequent events continue through the same hook.
66+
#[derive(Clone)]
67+
pub struct BeforeSendHook(SharedBeforeSendHook);
68+
69+
impl BeforeSendHook {
70+
/// Create a new before-send hook.
71+
pub fn new<F>(hook: F) -> Self
72+
where
73+
F: FnMut(Event) -> Option<Event> + Send + 'static,
74+
{
75+
Self(Arc::new(Mutex::new(Box::new(hook))))
76+
}
77+
78+
pub(crate) fn apply(&self, event: Event) -> Option<Event> {
79+
let mut hook = self
80+
.0
81+
.lock()
82+
.unwrap_or_else(|poisoned| poisoned.into_inner());
83+
(hook)(event)
84+
}
85+
}
86+
5287
/// Configuration options for the PostHog client.
5388
///
5489
/// Use [`ClientOptionsBuilder`] to construct options with custom settings, or
@@ -147,6 +182,10 @@ pub struct ClientOptions {
147182
#[builder(default, setter(strip_option))]
148183
pub(crate) capture_compression: Option<CaptureCompression>,
149184

185+
/// Hooks to modify, filter, or sample events before they are sent.
186+
#[builder(default, setter(custom))]
187+
pub(crate) before_send: Vec<BeforeSendHook>,
188+
150189
/// Extra HTTP headers injected into every outbound capture request.
151190
/// Used by the SDK test harness adapter to attach `X-Test-Id` for
152191
/// parallel test isolation.
@@ -229,6 +268,21 @@ impl ClientOptions {
229268
}
230269

231270
impl ClientOptionsBuilder {
271+
/// Add a hook that can modify or discard events before they are sent.
272+
///
273+
/// Hooks should avoid panicking. Panics are caught and drop the current event,
274+
/// but any mutable state captured by the hook may be left partially updated
275+
/// and will be reused on subsequent calls.
276+
pub fn before_send<F>(&mut self, hook: F) -> &mut Self
277+
where
278+
F: FnMut(Event) -> Option<Event> + Send + 'static,
279+
{
280+
self.before_send
281+
.get_or_insert_with(Vec::new)
282+
.push(BeforeSendHook::new(hook));
283+
self
284+
}
285+
232286
/// Build sanitized [`ClientOptions`].
233287
///
234288
/// Missing or whitespace-only API keys are allowed and disable the client so

0 commit comments

Comments
 (0)