|
1 | | -use std::{collections::HashMap, sync::Arc, time::Duration}; |
| 1 | +use std::{sync::Arc, time::Duration}; |
2 | 2 |
|
3 | 3 | use async_trait::async_trait; |
4 | 4 | use clap::ValueEnum; |
@@ -73,106 +73,100 @@ pub struct YellowstoneGrpcSource { |
73 | 73 | impl SourceTrait for YellowstoneGrpcSource { |
74 | 74 | type Config = YellowstoneGrpcConfig; |
75 | 75 |
|
76 | | - fn new(config: Self::Config, filters: Filters) -> Self { |
77 | | - Self { config, filters } |
78 | | - } |
| 76 | + fn new(config: Self::Config, filters: Filters) -> Self { Self { config, filters } } |
79 | 77 |
|
80 | 78 | async fn connect(&self, tx: Sender<Result<SubscribeUpdate, Status>>) -> Result<(), VixenError> { |
81 | 79 | let filters = self.filters.clone(); |
82 | 80 | let config = self.config.clone(); |
83 | 81 |
|
84 | 82 | let timeout = Duration::from_secs(config.timeout); |
85 | 83 |
|
86 | | - let mut tasks_set = JoinSet::new(); |
87 | | - |
88 | | - for (filter_id, prefilter) in filters.parsers_filters { |
89 | | - let filter = Filters::new(HashMap::from([(filter_id, prefilter)])); |
| 84 | + // Create a single gRPC client connection |
| 85 | + let mut client = GeyserGrpcClient::build_from_shared(config.endpoint.clone())? |
| 86 | + .x_token(config.x_token.clone())? |
| 87 | + .max_decoding_message_size(config.max_decoding_message_size.unwrap_or(usize::MAX)) |
| 88 | + .accept_compressed(config.accept_compression.unwrap_or_default().into()) |
| 89 | + .connect_timeout(timeout) |
| 90 | + .timeout(timeout) |
| 91 | + .tls_config(ClientTlsConfig::new().with_native_roots())? |
| 92 | + .connect() |
| 93 | + .await?; |
| 94 | + |
| 95 | + // Build a single subscribe request with all filters combined |
| 96 | + let mut subscribe_request: SubscribeRequest = filters.into(); |
| 97 | + if let Some(from_slot) = config.from_slot { |
| 98 | + subscribe_request.from_slot = Some(from_slot); |
| 99 | + } |
| 100 | + if let Some(commitment_level) = config.commitment_level { |
| 101 | + subscribe_request.commitment = Some(commitment_level as i32); |
| 102 | + } |
90 | 103 |
|
91 | | - let tx = tx.clone(); |
| 104 | + // Log the raw GRPC SubscribeRequest for debugging with 3rd party provider |
| 105 | + tracing::debug!( |
| 106 | + "=== GRPC SubscribeRequest to {} ===\n{:#?}\n=== END ===", |
| 107 | + config.endpoint, |
| 108 | + subscribe_request |
| 109 | + ); |
92 | 110 |
|
93 | | - let mut client = GeyserGrpcClient::build_from_shared(config.endpoint.clone())? |
94 | | - .x_token(config.x_token.clone())? |
95 | | - .max_decoding_message_size(config.max_decoding_message_size.unwrap_or(usize::MAX)) |
96 | | - .accept_compressed(config.accept_compression.unwrap_or_default().into()) |
97 | | - .connect_timeout(timeout) |
98 | | - .timeout(timeout) |
99 | | - .tls_config(ClientTlsConfig::new().with_native_roots())? |
100 | | - .connect() |
101 | | - .await?; |
| 111 | + let (sub_tx, stream) = client |
| 112 | + .subscribe_with_request(Some(subscribe_request)) |
| 113 | + .await?; |
102 | 114 |
|
103 | | - let mut subscribe_request: SubscribeRequest = filter.into(); |
104 | | - if let Some(from_slot) = config.from_slot { |
105 | | - subscribe_request.from_slot = Some(from_slot); |
106 | | - } |
107 | | - if let Some(commitment_level) = config.commitment_level { |
108 | | - subscribe_request.commitment = Some(commitment_level as i32); |
109 | | - } |
| 115 | + // Wrap the subscription sender in Arc<Mutex<>> to share between tasks |
| 116 | + let sub_tx = Arc::new(Mutex::new(sub_tx)); |
| 117 | + let ping_sub_tx = Arc::clone(&sub_tx); |
110 | 118 |
|
111 | | - // Log the raw GRPC SubscribeRequest for debugging with 3rd party provider |
112 | | - tracing::debug!( |
113 | | - "=== GRPC SubscribeRequest to {} ===\n{:#?}\n=== END ===", |
114 | | - config.endpoint, |
115 | | - subscribe_request |
116 | | - ); |
117 | | - |
118 | | - let (sub_tx, stream) = client |
119 | | - .subscribe_with_request(Some(subscribe_request)) |
120 | | - .await?; |
121 | | - |
122 | | - // Wrap the subscription sender in Arc<Mutex<>> to share between tasks |
123 | | - let sub_tx = Arc::new(Mutex::new(sub_tx)); |
124 | | - let ping_sub_tx = Arc::clone(&sub_tx); |
125 | | - |
126 | | - // Spawn a task to receive updates and respond to server pings |
127 | | - tasks_set.spawn(async move { |
128 | | - let mut stream = std::pin::pin!(stream); |
129 | | - |
130 | | - while let Some(update_result) = stream.next().await { |
131 | | - // Handle server pings by responding with a ping |
132 | | - if let Ok(update) = &update_result |
133 | | - && let Some(UpdateOneof::Ping(_)) = update.update_oneof |
134 | | - { |
135 | | - tracing::debug!("Received ping from server, responding..."); |
136 | | - let ping_response = SubscribeRequest { |
137 | | - ping: Some(SubscribeRequestPing { id: 1 }), |
138 | | - ..Default::default() |
139 | | - }; |
140 | | - if let Err(e) = sub_tx.lock().await.send(ping_response).await { |
141 | | - tracing::warn!("Failed to send ping response to server: {}", e); |
142 | | - break; |
143 | | - } |
144 | | - } |
| 119 | + let mut tasks_set = JoinSet::new(); |
145 | 120 |
|
146 | | - // Forward all updates to the buffer |
147 | | - if let Err(_) = tx.send(update_result).await { |
148 | | - // Channel closed, likely due to shutdown - exit gracefully |
149 | | - tracing::debug!("Update channel closed, shutting down receiver task"); |
| 121 | + // Spawn a task to receive updates and respond to server pings |
| 122 | + tasks_set.spawn(async move { |
| 123 | + let mut stream = std::pin::pin!(stream); |
| 124 | + |
| 125 | + while let Some(update_result) = stream.next().await { |
| 126 | + // Handle server pings by responding with a ping |
| 127 | + if let Ok(update) = &update_result |
| 128 | + && let Some(UpdateOneof::Ping(_)) = update.update_oneof |
| 129 | + { |
| 130 | + tracing::debug!("Received ping from server, responding..."); |
| 131 | + let ping_response = SubscribeRequest { |
| 132 | + ping: Some(SubscribeRequestPing { id: 1 }), |
| 133 | + ..Default::default() |
| 134 | + }; |
| 135 | + if let Err(e) = sub_tx.lock().await.send(ping_response).await { |
| 136 | + tracing::warn!("Failed to send ping response to server: {}", e); |
150 | 137 | break; |
151 | 138 | } |
152 | 139 | } |
153 | | - }); |
154 | 140 |
|
155 | | - // Spawn a task to send periodic pings every 10 seconds |
156 | | - tasks_set.spawn(async move { |
157 | | - let mut ping_timer = interval(Duration::from_secs(10)); |
158 | | - let mut ping_id = 0i32; |
| 141 | + // Forward all updates to the buffer |
| 142 | + if tx.send(update_result).await.is_err() { |
| 143 | + // Channel closed, likely due to shutdown - exit gracefully |
| 144 | + tracing::debug!("Update channel closed, shutting down receiver task"); |
| 145 | + break; |
| 146 | + } |
| 147 | + } |
| 148 | + }); |
159 | 149 |
|
160 | | - loop { |
161 | | - ping_timer.tick().await; |
162 | | - ping_id = ping_id.wrapping_add(1); |
| 150 | + // Spawn a task to send periodic pings every 10 seconds |
| 151 | + tasks_set.spawn(async move { |
| 152 | + let mut ping_timer = interval(Duration::from_secs(10)); |
| 153 | + let mut ping_id = 0i32; |
163 | 154 |
|
164 | | - let ping_request = SubscribeRequest { |
165 | | - ping: Some(SubscribeRequestPing { id: ping_id }), |
166 | | - ..Default::default() |
167 | | - }; |
| 155 | + loop { |
| 156 | + ping_timer.tick().await; |
| 157 | + ping_id = ping_id.wrapping_add(1); |
168 | 158 |
|
169 | | - if let Err(e) = ping_sub_tx.lock().await.send(ping_request).await { |
170 | | - tracing::warn!("Failed to send ping to server: {}", e); |
171 | | - break; |
172 | | - } |
| 159 | + let ping_request = SubscribeRequest { |
| 160 | + ping: Some(SubscribeRequestPing { id: ping_id }), |
| 161 | + ..Default::default() |
| 162 | + }; |
| 163 | + |
| 164 | + if let Err(e) = ping_sub_tx.lock().await.send(ping_request).await { |
| 165 | + tracing::warn!("Failed to send ping to server: {}", e); |
| 166 | + break; |
173 | 167 | } |
174 | | - }); |
175 | | - } |
| 168 | + } |
| 169 | + }); |
176 | 170 |
|
177 | 171 | tasks_set.join_all().await; |
178 | 172 |
|
|
0 commit comments