Skip to content

Commit 28a4058

Browse files
Add Raw Event To Handler Callback (#147)
1 parent 095c27d commit 28a4058

File tree

7 files changed

+29
-24
lines changed

7 files changed

+29
-24
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ pub struct Opts {
5656
#[derive(Debug)]
5757
pub struct Logger;
5858

59-
impl<V: std::fmt::Debug + Sync> yellowstone_vixen::Handler<V> for Logger {
60-
async fn handle(&self, value: &V) -> yellowstone_vixen::HandlerResult<()> {
59+
impl<V: std::fmt::Debug + Sync, R: Sync> vixen::Handler<V, R> for Logger {
60+
async fn handle(&self, _value: &V, _raw: &R) -> vixen::HandlerResult<()> {
6161
tracing::info!(?value);
6262
Ok(())
6363
}
@@ -73,7 +73,7 @@ fn main() {
7373
let config = std::fs::read_to_string(config).expect("Error reading config file");
7474
let config = toml::from_str(&config).expect("Error parsing config");
7575

76-
yellowstone_vixen::Runtime<YellowstoneGrpcSourc>::builder()
76+
yellowstone_vixen::Runtime<YellowstoneGrpcSource>::builder()
7777
.account(Pipeline::new(AccountParser, [Logger]))
7878
.instruction(Pipeline::new(InstructionParser, [Logger]))
7979
.build(config)

crates/runtime/src/filter_pipeline.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ where
7575
P::Input: Send + Sync,
7676
H: Debug + Sync,
7777
for<'i> &'i H: IntoIterator,
78-
for<'i> <&'i H as IntoIterator>::Item: Handler<P::Output>,
78+
for<'i> <&'i H as IntoIterator>::Item: Handler<P::Output, P::Input>,
7979
{
8080
/// Handle fn for `FilterPipeline`
8181
///
@@ -92,7 +92,7 @@ where
9292
let errs = self
9393
.handlers
9494
.into_iter()
95-
.map(|h| async move { h.handle(parsed).await })
95+
.map(|h| async move { h.handle(parsed, value).await })
9696
.collect::<futures_util::stream::FuturesUnordered<_>>()
9797
.filter_map(|r| async move { r.err() })
9898
.collect::<SmallVec<[_; 1]>>()
@@ -113,7 +113,7 @@ where
113113
P::Output: Send + Sync,
114114
H: Debug + Sync,
115115
for<'i> &'i H: IntoIterator,
116-
for<'i> <&'i H as IntoIterator>::Item: Handler<P::Output> + Send,
116+
for<'i> <&'i H as IntoIterator>::Item: Handler<P::Output, P::Input> + Send,
117117
{
118118
fn handle<'h>(
119119
&'h self,

crates/runtime/src/handler.rs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,20 @@ type BoxedError = Box<dyn std::error::Error + Send + Sync + 'static>;
1919
/// The result returned by a handler.
2020
pub type HandlerResult<T> = Result<T, BoxedError>;
2121

22-
/// A handler callback for a parsed value.
23-
pub trait Handler<T> {
24-
/// Consume the parsed value.
25-
fn handle(&self, value: &T) -> impl Future<Output = HandlerResult<()>> + Send;
22+
/// A handler callback for a parsed value and its corresponding raw event.
23+
pub trait Handler<T, R>
24+
where R: Sync
25+
{
26+
/// Consume the parsed value together with the raw event.
27+
fn handle(&self, value: &T, raw_event: &R) -> impl Future<Output = HandlerResult<()>> + Send;
2628
}
2729

28-
impl<T: Handler<U>, U> Handler<U> for &T {
30+
impl<T: Handler<U, R>, U, R> Handler<U, R> for &T
31+
where R: Sync
32+
{
2933
#[inline]
30-
fn handle(&self, value: &U) -> impl Future<Output = HandlerResult<()>> + Send {
31-
<T as Handler<U>>::handle(self, value)
34+
fn handle(&self, value: &U, raw_event: &R) -> impl Future<Output = HandlerResult<()>> + Send {
35+
<T as Handler<U, R>>::handle(self, value, raw_event)
3236
}
3337
}
3438

@@ -145,7 +149,8 @@ impl<P, I> Pipeline<P, I>
145149
where
146150
for<'i> &'i I: IntoIterator,
147151
P: Parser,
148-
for<'i> <&'i I as IntoIterator>::Item: Handler<P::Output>,
152+
P::Input: Sync,
153+
for<'i> <&'i I as IntoIterator>::Item: Handler<P::Output, P::Input>,
149154
{
150155
/// Handle fn for `Pipeline`
151156
///
@@ -167,7 +172,7 @@ where
167172
let errs = (&self.1)
168173
.into_iter()
169174
.map(|h| async move {
170-
h.handle(parsed)
175+
h.handle(parsed, value)
171176
.instrument(tracing::info_span!("vixen.handle",))
172177
.await
173178
})
@@ -209,7 +214,7 @@ where
209214
for<'i> &'i I: IntoIterator,
210215
P::Input: Sync,
211216
P::Output: Send + Sync,
212-
for<'i> <&'i I as IntoIterator>::Item: Handler<P::Output> + Send,
217+
for<'i> <&'i I as IntoIterator>::Item: Handler<P::Output, P::Input> + Send,
213218
{
214219
fn handle<'h>(
215220
&'h self,

crates/stream/src/grpc.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ pub enum Error {
3232
#[derive(Debug)]
3333
pub struct GrpcHandler(pub(super) broadcast::Sender<Any>);
3434

35-
impl<T: Message + Name + Sync> Handler<T> for GrpcHandler {
36-
async fn handle(&self, value: &T) -> HandlerResult<()> {
35+
impl<T: Message + Name + Sync, R: Sync> Handler<T, R> for GrpcHandler {
36+
async fn handle(&self, value: &T, _raw: &R) -> HandlerResult<()> {
3737
self.0.send(Any::from_msg(value)?).ok();
3838
Ok(())
3939
}

examples/filtered-pipeline/src/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ pub struct Opts {
3232
#[derive(Debug)]
3333
pub struct Logger;
3434

35-
impl<V: std::fmt::Debug + Sync> vixen::Handler<V> for Logger {
36-
async fn handle(&self, _value: &V) -> vixen::HandlerResult<()> { Ok(()) }
35+
impl<V: std::fmt::Debug + Sync, R: Sync> vixen::Handler<V, R> for Logger {
36+
async fn handle(&self, _value: &V, _raw: &R) -> vixen::HandlerResult<()> { Ok(()) }
3737
}
3838
fn main() {
3939
let Opts { config } = Opts::parse();

examples/prometheus/src/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ pub struct Opts {
2424
#[derive(Debug)]
2525
pub struct Logger;
2626

27-
impl<V: std::fmt::Debug + Sync> yellowstone_vixen::Handler<V> for Logger {
28-
async fn handle(&self, _value: &V) -> yellowstone_vixen::HandlerResult<()> { Ok(()) }
27+
impl<V: std::fmt::Debug + Sync, R: Sync> yellowstone_vixen::Handler<V, R> for Logger {
28+
async fn handle(&self, _value: &V, _raw: &R) -> yellowstone_vixen::HandlerResult<()> { Ok(()) }
2929
}
3030

3131
#[tokio::main]

examples/tracing/src/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ pub struct Opts {
2626
#[derive(Debug)]
2727
pub struct Logger;
2828

29-
impl<V: std::fmt::Debug + Sync> Handler<V> for Logger {
30-
async fn handle(&self, _value: &V) -> HandlerResult<()> { Ok(()) }
29+
impl<V: std::fmt::Debug + Sync, R: Sync> Handler<V, R> for Logger {
30+
async fn handle(&self, _value: &V, _raw: &R) -> HandlerResult<()> { Ok(()) }
3131
}
3232

3333
#[rustfmt::skip]

0 commit comments

Comments
 (0)