Skip to content

Conversation

@fernandodeluret
Copy link
Contributor

No description provided.

Copy link
Collaborator

@kespinola kespinola left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like where this is going. Good use of serde. We do loose the early typing but it would be a mess to make sources extensible and dynamic with static config.

We still cast it within the new. Ideally we would call new on all the sources and then connect. This allows the config to be validated without starting any connections to sources.

Comment on lines 63 to 100
pub fn new() -> Self { Default::default() }

#[cfg(feature = "prometheus")]
fn register_metrics(&self, config: &FumaroleConfig) -> JoinHandle<()> {
use std::time::Duration;

let prometheus_registry = prometheus::Registry::new();
yellowstone_fumarole_client::metrics::register_metrics(&prometheus_registry);

// Use the conditional config fields
let export_interval = Duration::from_secs(config.metrics_interval);
let job_name = config.metrics_job_name.clone();
let metrics_endpoint = config.metrics_endpoint.clone();

// Spawn metrics pusher task
let registry_clone = prometheus_registry.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(export_interval);
loop {
interval.tick().await;

let metrics = registry_clone.gather();
let job_name = job_name.clone();
let metrics_endpoint = metrics_endpoint.clone();
let _ = tokio::task::spawn_blocking(move || {
if let Err(e) = prometheus::push_metrics(
&job_name,
prometheus::labels! {},
&metrics_endpoint,
metrics,
None,
) {
tracing::error!("Failed to push Fumarole metrics: {e:?}");
}
})
.await;
}
})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is designed where if the binary using vixen when it has prometheus feature active on fumarole will export its metrics to the endpoint of the app. We don't need to expose our own.

This is how I'd like our metrics to work too. We just support prometheus as a feature with no push gateway. The user has to setup their own server.

Comment on lines 27 to 41
impl Source for YellowstoneGrpcSource {
fn name(&self) -> String { "yellowstone-grpc".to_string() }

async fn connect(&self, tx: Sender<Result<SubscribeUpdate, Status>>) -> Result<(), VixenError> {
async fn connect(
&self,
tx: Sender<Result<SubscribeUpdate, Status>>,
raw_config: toml::Value,
) -> Result<(), VixenError> {
// We require that config and filters are set before connecting to the `Source`
let filters = self.filters.clone().ok_or(VixenError::ConfigError)?;
let config = self.config.clone().ok_or(VixenError::ConfigError)?;

let config: YellowstoneConfig = serde::Deserialize::deserialize(raw_config)
.expect("Failed to deserialize YellowstoneConfig");

let timeout = Duration::from_secs(config.timeout);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can Source::new(config: toml::Value, filters:: Filters)

Then the Source trait can just be connect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main problem is that we can't have a new() method in the Source trait (because it's not dyn compatible). I have been trying some workarounds for this, but very quickly it scalates the complexity and none looks clean at all.

Regarding filters, we could receive them also in connect() like we are doing with config, but we would loose the ability to override filters for a particular source

Comment on lines 86 to 87
let result = vixen::stream::Server::builder()
.source(YellowstoneGrpcSource::new())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let result = vixen::stream::Server::builder()
.source(YellowstoneGrpcSource::new())
let result = vixen::stream::Server::builder()
.source::<YellowstoneGrpcSource>()

To get where new is invoked internally but call new once the runtime has gathered the filters and we read the connection config we call T::new(config, filters)

kespinola and others added 2 commits August 12, 2025 18:37
The SourceTrait now defines its config type as part of the trait definition.
Add DynSource which keeps consistent interface for starting a source.
Refactor builder to take .source<SourceTrait>::().
@fernandodeluret fernandodeluret merged commit 6952665 into fernandod/vix-7-fumarole-datasource Aug 12, 2025
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants