Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/ci-cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ jobs:
set -eo pipefail
target="${{ matrix.target }}"
flags=()

if [[ "$target" != *msvc* && "$target" != "aarch64-unknown-linux-gnu" ]]; then
flags+=(--features jemalloc,reth)
else
Expand Down Expand Up @@ -649,9 +649,9 @@ jobs:
set -eo pipefail
target="${{ matrix.target }}"
flags=()

if [[ "$target" != *msvc* && "$target" != "aarch64-unknown-linux-gnu" ]]; then
flags+=(--features jemalloc,reth)
flags+=(--features jemalloc,reth)
else
flags+=(--features reth)
fi
Expand Down
2 changes: 2 additions & 0 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,7 @@ codegen-units = 1
incremental = false

[features]
default = ["kafka"]
jemalloc = ["dep:jemallocator", "dep:jemalloc-ctl"]
reth = ["rindexer/reth"]
kafka = ["rindexer/kafka"]
5 changes: 3 additions & 2 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,10 @@ reth-node-ethereum = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.
reth-tracing = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.2", optional = true }

[target.'cfg(not(windows))'.dependencies]
rdkafka = { version = "0.38.0", features = ["tokio"] }
rdkafka = { version = "0.38.0", features = ["tokio"], optional = true }

[target.'cfg(windows)'.dependencies]
rdkafka = { version = "0.38.0", features = ["tokio", "cmake-build"] }
rdkafka = { version = "0.38.0", features = ["tokio", "cmake-build"], optional = true }

[profile.release]
lto = "fat"
Expand All @@ -108,6 +108,7 @@ incremental = false
[features]
jemalloc = ["dep:jemallocator", "dep:jemalloc-ctl"]
debug-json = []
kafka = ["dep:rdkafka"]
reth = [
"dep:reth",
"dep:reth-exex",
Expand Down
15 changes: 14 additions & 1 deletion core/src/manifest/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ impl RabbitMQStreamConfig {
}
}

#[cfg(feature = "kafka")]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct KafkaStreamQueueConfig {
pub topic: String,
Expand All @@ -159,6 +160,7 @@ pub struct KafkaStreamQueueConfig {
pub events: Vec<StreamEvent>,
}

#[cfg(feature = "kafka")]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct KafkaStreamConfig {
pub brokers: Vec<String>,
Expand Down Expand Up @@ -203,6 +205,7 @@ pub struct StreamsConfig {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub rabbitmq: Option<RabbitMQStreamConfig>,

#[cfg(feature = "kafka")]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub kafka: Option<KafkaStreamConfig>,

Expand All @@ -224,13 +227,23 @@ impl StreamsConfig {

pub fn get_streams_last_synced_block_path(&self) -> String {
let mut path = ".rindexer/".to_string();
#[allow(clippy::blocks_in_conditions)]
if self.rabbitmq.is_some() {
path.push_str("rabbitmq_");
} else if self.sns.is_some() {
path.push_str("sns_");
} else if self.webhooks.is_some() {
path.push_str("webhooks_");
} else if self.kafka.is_some() {
} else if {
#[cfg(feature = "kafka")]
{
self.kafka.is_some()
}
#[cfg(not(feature = "kafka"))]
{
false
}
} {
path.push_str("kafka_");
} else if self.redis.is_some() {
path.push_str("redis_");
Expand Down
41 changes: 34 additions & 7 deletions core/src/streams/clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,22 @@ use crate::{
event::{filter_event_data_by_conditions, EventMessage},
indexer::native_transfer::EVENT_NAME,
manifest::stream::{
CloudflareQueuesStreamConfig, CloudflareQueuesStreamQueueConfig, KafkaStreamConfig,
KafkaStreamQueueConfig, RabbitMQStreamConfig, RabbitMQStreamQueueConfig, RedisStreamConfig,
RedisStreamStreamConfig, SNSStreamTopicConfig, StreamEvent, StreamsConfig,
WebhookStreamConfig,
CloudflareQueuesStreamConfig, CloudflareQueuesStreamQueueConfig, RabbitMQStreamConfig,
RabbitMQStreamQueueConfig, RedisStreamConfig, RedisStreamStreamConfig,
SNSStreamTopicConfig, StreamEvent, StreamsConfig, WebhookStreamConfig,
},
streams::{
kafka::{Kafka, KafkaError},
CloudflareQueues, CloudflareQueuesError, RabbitMQ, RabbitMQError, Redis, RedisError,
Webhook, WebhookError, SNS,
},
};

#[cfg(feature = "kafka")]
use crate::{
manifest::stream::{KafkaStreamConfig, KafkaStreamQueueConfig},
streams::kafka::{Kafka, KafkaError},
};

// we should limit the max chunk size we send over when streaming to 70KB - 100KB is most limits
// we can add this to yaml if people need it
const MAX_CHUNK_SIZE: usize = 75 * 1024; // 75 KB
Expand All @@ -50,6 +54,7 @@ pub enum StreamError {
#[error("RabbitMQ could not publish: {0}")]
RabbitMQCouldNotPublish(#[from] RabbitMQError),

#[cfg(feature = "kafka")]
#[error("Kafka could not publish: {0}")]
KafkaCouldNotPublish(#[from] KafkaError),

Expand All @@ -75,6 +80,7 @@ pub struct RabbitMQStream {
client: Arc<RabbitMQ>,
}

#[cfg(feature = "kafka")]
#[derive(Debug)]
pub struct KafkaStream {
config: KafkaStreamConfig,
Expand All @@ -98,6 +104,7 @@ pub struct StreamsClients {
sns: Option<SNSStream>,
webhook: Option<WebhookStream>,
rabbitmq: Option<RabbitMQStream>,
#[cfg(feature = "kafka")]
kafka: Option<KafkaStream>,
redis: Option<RedisStream>,
cloudflare_queues: Option<CloudflareQueuesStream>,
Expand Down Expand Up @@ -131,6 +138,7 @@ impl StreamsClients {
None
};

#[cfg(feature = "kafka")]
#[allow(clippy::manual_map)]
let kafka = if let Some(config) = stream_config.kafka.as_ref() {
Some(KafkaStream {
Expand Down Expand Up @@ -172,14 +180,31 @@ impl StreamsClients {
None
};

Self { sns, webhook, rabbitmq, kafka, redis, cloudflare_queues }
Self {
sns,
webhook,
rabbitmq,
#[cfg(feature = "kafka")]
kafka,
redis,
cloudflare_queues,
}
}

fn has_any_streams(&self) -> bool {
self.sns.is_some()
|| self.webhook.is_some()
|| self.rabbitmq.is_some()
|| self.kafka.is_some()
|| {
#[cfg(feature = "kafka")]
{
self.kafka.is_some()
}
#[cfg(not(feature = "kafka"))]
{
false
}
}
|| self.redis.is_some()
|| self.cloudflare_queues.is_some()
}
Expand Down Expand Up @@ -413,6 +438,7 @@ impl StreamsClients {
tasks
}

#[cfg(feature = "kafka")]
fn kafka_stream_tasks(
&self,
config: &KafkaStreamQueueConfig,
Expand Down Expand Up @@ -581,6 +607,7 @@ impl StreamsClients {
}
}

#[cfg(feature = "kafka")]
if let Some(kafka) = &self.kafka {
for config in &kafka.config.topics {
if config.events.iter().any(|e| e.event_name == event_message.event_name)
Expand Down
8 changes: 4 additions & 4 deletions core/src/streams/kafka.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{
fmt::{Debug, Formatter},
time::Duration,
};
use std::fmt::{Debug, Formatter};

#[cfg(not(windows))]
use std::time::Duration;

#[cfg(not(windows))]
use rdkafka::{
Expand Down
1 change: 1 addition & 0 deletions core/src/streams/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub use webhook::{Webhook, WebhookError};
mod rabbitmq;
pub use rabbitmq::{RabbitMQ, RabbitMQError};

#[cfg(feature = "kafka")]
mod kafka;

mod clients;
Expand Down
1 change: 1 addition & 0 deletions documentation/docs/pages/docs/changelog.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

### Breaking changes
-------------------------------------------------
- fix: gate kafka streams support behind `kafka` feature

## Releases
-------------------------------------------------
Expand Down
11 changes: 11 additions & 0 deletions documentation/docs/pages/docs/start-building/streams/kafka.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,17 @@ Kafka streams do not work with windows from the CLI installation, it will panic
If you are on windows and want to use kafka streams you should use the docker image.
:::

:::info
**Feature Gate:** If you're including the rindexer crate in your Rust project, Kafka support is gated behind the `kafka` feature flag. Add it to your `Cargo.toml`:

```toml
[dependencies]
rindexer = { version = "*", features = ["kafka"] }
```

**Docker Images:** Kafka support is enabled by default in all official rindexer Docker images - no additional configuration needed.
:::

:::info
rindexer streams can be used without any other storage providers. It can also be used with storage providers.
:::
Expand Down