Skip to content

Commit f140674

Browse files
zanniszannis
andauthored
fix: gate kafka streams behind kafka feature (#333)
Co-authored-by: zannis <[email protected]>
1 parent 044b72b commit f140674

File tree

9 files changed

+73
-17
lines changed

9 files changed

+73
-17
lines changed

.github/workflows/ci-cd.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ jobs:
278278
set -eo pipefail
279279
target="${{ matrix.target }}"
280280
flags=()
281-
281+
282282
if [[ "$target" != *msvc* && "$target" != "aarch64-unknown-linux-gnu" ]]; then
283283
flags+=(--features jemalloc,reth)
284284
else
@@ -649,9 +649,9 @@ jobs:
649649
set -eo pipefail
650650
target="${{ matrix.target }}"
651651
flags=()
652-
652+
653653
if [[ "$target" != *msvc* && "$target" != "aarch64-unknown-linux-gnu" ]]; then
654-
flags+=(--features jemalloc,reth)
654+
flags+=(--features jemalloc,reth)
655655
else
656656
flags+=(--features reth)
657657
fi

cli/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,5 +33,7 @@ codegen-units = 1
3333
incremental = false
3434

3535
[features]
36+
default = ["kafka"]
3637
jemalloc = ["dep:jemallocator", "dep:jemalloc-ctl"]
3738
reth = ["rindexer/reth"]
39+
kafka = ["rindexer/kafka"]

core/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,10 +95,10 @@ reth-node-ethereum = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.
9595
reth-tracing = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.2", optional = true }
9696

9797
[target.'cfg(not(windows))'.dependencies]
98-
rdkafka = { version = "0.38.0", features = ["tokio"] }
98+
rdkafka = { version = "0.38.0", features = ["tokio"], optional = true }
9999

100100
[target.'cfg(windows)'.dependencies]
101-
rdkafka = { version = "0.38.0", features = ["tokio", "cmake-build"] }
101+
rdkafka = { version = "0.38.0", features = ["tokio", "cmake-build"], optional = true }
102102

103103
[profile.release]
104104
lto = "fat"
@@ -108,6 +108,7 @@ incremental = false
108108
[features]
109109
jemalloc = ["dep:jemallocator", "dep:jemalloc-ctl"]
110110
debug-json = []
111+
kafka = ["dep:rdkafka"]
111112
reth = [
112113
"dep:reth",
113114
"dep:reth-exex",

core/src/manifest/stream.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ impl RabbitMQStreamConfig {
148148
}
149149
}
150150

151+
#[cfg(feature = "kafka")]
151152
#[derive(Debug, Serialize, Deserialize, Clone)]
152153
pub struct KafkaStreamQueueConfig {
153154
pub topic: String,
@@ -159,6 +160,7 @@ pub struct KafkaStreamQueueConfig {
159160
pub events: Vec<StreamEvent>,
160161
}
161162

163+
#[cfg(feature = "kafka")]
162164
#[derive(Debug, Serialize, Deserialize, Clone)]
163165
pub struct KafkaStreamConfig {
164166
pub brokers: Vec<String>,
@@ -203,6 +205,7 @@ pub struct StreamsConfig {
203205
#[serde(default, skip_serializing_if = "Option::is_none")]
204206
pub rabbitmq: Option<RabbitMQStreamConfig>,
205207

208+
#[cfg(feature = "kafka")]
206209
#[serde(default, skip_serializing_if = "Option::is_none")]
207210
pub kafka: Option<KafkaStreamConfig>,
208211

@@ -224,13 +227,23 @@ impl StreamsConfig {
224227

225228
pub fn get_streams_last_synced_block_path(&self) -> String {
226229
let mut path = ".rindexer/".to_string();
230+
#[allow(clippy::blocks_in_conditions)]
227231
if self.rabbitmq.is_some() {
228232
path.push_str("rabbitmq_");
229233
} else if self.sns.is_some() {
230234
path.push_str("sns_");
231235
} else if self.webhooks.is_some() {
232236
path.push_str("webhooks_");
233-
} else if self.kafka.is_some() {
237+
} else if {
238+
#[cfg(feature = "kafka")]
239+
{
240+
self.kafka.is_some()
241+
}
242+
#[cfg(not(feature = "kafka"))]
243+
{
244+
false
245+
}
246+
} {
234247
path.push_str("kafka_");
235248
} else if self.redis.is_some() {
236249
path.push_str("redis_");

core/src/streams/clients.rs

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,22 @@ use crate::{
1414
event::{filter_event_data_by_conditions, EventMessage},
1515
indexer::native_transfer::EVENT_NAME,
1616
manifest::stream::{
17-
CloudflareQueuesStreamConfig, CloudflareQueuesStreamQueueConfig, KafkaStreamConfig,
18-
KafkaStreamQueueConfig, RabbitMQStreamConfig, RabbitMQStreamQueueConfig, RedisStreamConfig,
19-
RedisStreamStreamConfig, SNSStreamTopicConfig, StreamEvent, StreamsConfig,
20-
WebhookStreamConfig,
17+
CloudflareQueuesStreamConfig, CloudflareQueuesStreamQueueConfig, RabbitMQStreamConfig,
18+
RabbitMQStreamQueueConfig, RedisStreamConfig, RedisStreamStreamConfig,
19+
SNSStreamTopicConfig, StreamEvent, StreamsConfig, WebhookStreamConfig,
2120
},
2221
streams::{
23-
kafka::{Kafka, KafkaError},
2422
CloudflareQueues, CloudflareQueuesError, RabbitMQ, RabbitMQError, Redis, RedisError,
2523
Webhook, WebhookError, SNS,
2624
},
2725
};
2826

27+
#[cfg(feature = "kafka")]
28+
use crate::{
29+
manifest::stream::{KafkaStreamConfig, KafkaStreamQueueConfig},
30+
streams::kafka::{Kafka, KafkaError},
31+
};
32+
2933
// we should limit the max chunk size we send over when streaming to 70KB - 100KB is most limits
3034
// we can add this to yaml if people need it
3135
const MAX_CHUNK_SIZE: usize = 75 * 1024; // 75 KB
@@ -50,6 +54,7 @@ pub enum StreamError {
5054
#[error("RabbitMQ could not publish: {0}")]
5155
RabbitMQCouldNotPublish(#[from] RabbitMQError),
5256

57+
#[cfg(feature = "kafka")]
5358
#[error("Kafka could not publish: {0}")]
5459
KafkaCouldNotPublish(#[from] KafkaError),
5560

@@ -75,6 +80,7 @@ pub struct RabbitMQStream {
7580
client: Arc<RabbitMQ>,
7681
}
7782

83+
#[cfg(feature = "kafka")]
7884
#[derive(Debug)]
7985
pub struct KafkaStream {
8086
config: KafkaStreamConfig,
@@ -98,6 +104,7 @@ pub struct StreamsClients {
98104
sns: Option<SNSStream>,
99105
webhook: Option<WebhookStream>,
100106
rabbitmq: Option<RabbitMQStream>,
107+
#[cfg(feature = "kafka")]
101108
kafka: Option<KafkaStream>,
102109
redis: Option<RedisStream>,
103110
cloudflare_queues: Option<CloudflareQueuesStream>,
@@ -131,6 +138,7 @@ impl StreamsClients {
131138
None
132139
};
133140

141+
#[cfg(feature = "kafka")]
134142
#[allow(clippy::manual_map)]
135143
let kafka = if let Some(config) = stream_config.kafka.as_ref() {
136144
Some(KafkaStream {
@@ -172,14 +180,31 @@ impl StreamsClients {
172180
None
173181
};
174182

175-
Self { sns, webhook, rabbitmq, kafka, redis, cloudflare_queues }
183+
Self {
184+
sns,
185+
webhook,
186+
rabbitmq,
187+
#[cfg(feature = "kafka")]
188+
kafka,
189+
redis,
190+
cloudflare_queues,
191+
}
176192
}
177193

178194
fn has_any_streams(&self) -> bool {
179195
self.sns.is_some()
180196
|| self.webhook.is_some()
181197
|| self.rabbitmq.is_some()
182-
|| self.kafka.is_some()
198+
|| {
199+
#[cfg(feature = "kafka")]
200+
{
201+
self.kafka.is_some()
202+
}
203+
#[cfg(not(feature = "kafka"))]
204+
{
205+
false
206+
}
207+
}
183208
|| self.redis.is_some()
184209
|| self.cloudflare_queues.is_some()
185210
}
@@ -413,6 +438,7 @@ impl StreamsClients {
413438
tasks
414439
}
415440

441+
#[cfg(feature = "kafka")]
416442
fn kafka_stream_tasks(
417443
&self,
418444
config: &KafkaStreamQueueConfig,
@@ -581,6 +607,7 @@ impl StreamsClients {
581607
}
582608
}
583609

610+
#[cfg(feature = "kafka")]
584611
if let Some(kafka) = &self.kafka {
585612
for config in &kafka.config.topics {
586613
if config.events.iter().any(|e| e.event_name == event_message.event_name)

core/src/streams/kafka.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
use std::{
2-
fmt::{Debug, Formatter},
3-
time::Duration,
4-
};
1+
use std::fmt::{Debug, Formatter};
2+
3+
#[cfg(not(windows))]
4+
use std::time::Duration;
55

66
#[cfg(not(windows))]
77
use rdkafka::{

core/src/streams/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ pub use webhook::{Webhook, WebhookError};
77
mod rabbitmq;
88
pub use rabbitmq::{RabbitMQ, RabbitMQError};
99

10+
#[cfg(feature = "kafka")]
1011
mod kafka;
1112

1213
mod clients;

documentation/docs/pages/docs/changelog.mdx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
### Breaking changes
1313
-------------------------------------------------
14+
- fix: gate kafka streams support behind `kafka` feature
1415

1516
## Releases
1617
-------------------------------------------------

documentation/docs/pages/docs/start-building/streams/kafka.mdx

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,17 @@ Kafka streams do not work with windows from the CLI installation, it will panic
55
If you are on windows and want to use kafka streams you should use the docker image.
66
:::
77

8+
:::info
9+
**Feature Gate:** If you're including the rindexer crate in your Rust project, Kafka support is gated behind the `kafka` feature flag. Add it to your `Cargo.toml`:
10+
11+
```toml
12+
[dependencies]
13+
rindexer = { version = "*", features = ["kafka"] }
14+
```
15+
16+
**Docker Images:** Kafka support is enabled by default in all official rindexer Docker images - no additional configuration needed.
17+
:::
18+
819
:::info
920
rindexer streams can be used without any other storage providers. It can also be used with storage providers.
1021
:::

0 commit comments

Comments
 (0)