Skip to content

Commit 3c61d58

Browse files
feat: add cloudflare queues to the streams (#314)
1 parent e8ad16e commit 3c61d58

File tree

10 files changed

+757
-35
lines changed

10 files changed

+757
-35
lines changed

cli/Makefile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,3 +73,5 @@ hyperwarp_codegen_typings:
7373
RUSTFLAGS='-C target-cpu=native' cargo run --release --features jemalloc -- codegen --path $(CURDIR)/../examples/hyperevm typings
7474
hyperwarp_codegen_indexer:
7575
RUSTFLAGS='-C target-cpu=native' cargo run --release --features jemalloc -- codegen --path $(CURDIR)/../examples/hyperevm indexer
76+
start_cloudflare_indexer:
77+
RUSTFLAGS='-C target-cpu=native' RUST_BACKTRACE='full' cargo run --release --features jemalloc -- start --path $(CURDIR)/../examples/rindexer_cloudflare_queues indexer

core/src/manifest/stream.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,21 @@ pub struct KafkaStreamConfig {
160160
pub topics: Vec<KafkaStreamQueueConfig>,
161161
}
162162

163+
#[derive(Debug, Serialize, Deserialize, Clone)]
164+
pub struct CloudflareQueuesStreamQueueConfig {
165+
pub queue_id: String,
166+
pub networks: Vec<String>,
167+
#[serde(default)]
168+
pub events: Vec<StreamEvent>,
169+
}
170+
171+
#[derive(Debug, Serialize, Deserialize, Clone)]
172+
pub struct CloudflareQueuesStreamConfig {
173+
pub api_token: String,
174+
pub account_id: String,
175+
pub queues: Vec<CloudflareQueuesStreamQueueConfig>,
176+
}
177+
163178
#[derive(Debug, Serialize, Deserialize, Clone)]
164179
pub struct StreamsConfig {
165180
#[serde(default, skip_serializing_if = "Option::is_none")]
@@ -176,6 +191,9 @@ pub struct StreamsConfig {
176191

177192
#[serde(default, skip_serializing_if = "Option::is_none")]
178193
pub redis: Option<RedisStreamConfig>,
194+
195+
#[serde(default, skip_serializing_if = "Option::is_none")]
196+
pub cloudflare_queues: Option<CloudflareQueuesStreamConfig>,
179197
}
180198

181199
impl StreamsConfig {
@@ -199,6 +217,8 @@ impl StreamsConfig {
199217
path.push_str("kafka_");
200218
} else if self.redis.is_some() {
201219
path.push_str("redis_");
220+
} else if self.cloudflare_queues.is_some() {
221+
path.push_str("cloudflare_queues_");
202222
}
203223

204224
path.trim_end_matches('_').to_string()

core/src/streams/clients.rs

Lines changed: 85 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,15 @@ use crate::{
1414
event::{filter_event_data_by_conditions, EventMessage},
1515
indexer::native_transfer::EVENT_NAME,
1616
manifest::stream::{
17-
KafkaStreamConfig, KafkaStreamQueueConfig, RabbitMQStreamConfig, RabbitMQStreamQueueConfig,
18-
RedisStreamConfig, RedisStreamStreamConfig, SNSStreamTopicConfig, StreamEvent,
19-
StreamsConfig, WebhookStreamConfig,
17+
CloudflareQueuesStreamConfig, CloudflareQueuesStreamQueueConfig, KafkaStreamConfig,
18+
KafkaStreamQueueConfig, RabbitMQStreamConfig, RabbitMQStreamQueueConfig, RedisStreamConfig,
19+
RedisStreamStreamConfig, SNSStreamTopicConfig, StreamEvent, StreamsConfig,
20+
WebhookStreamConfig,
2021
},
2122
streams::{
2223
kafka::{Kafka, KafkaError},
23-
RabbitMQ, RabbitMQError, Redis, RedisError, Webhook, WebhookError, SNS,
24+
CloudflareQueues, CloudflareQueuesError, RabbitMQ, RabbitMQError, Redis, RedisError,
25+
Webhook, WebhookError, SNS,
2426
},
2527
};
2628

@@ -54,6 +56,9 @@ pub enum StreamError {
5456
#[error("Redis could not publish: {0}")]
5557
RedisCouldNotPublish(#[from] RedisError),
5658

59+
#[error("Cloudflare Queues could not publish: {0}")]
60+
CloudflareQueuesCouldNotPublish(#[from] CloudflareQueuesError),
61+
5762
#[error("Task failed: {0}")]
5863
JoinError(JoinError),
5964
}
@@ -82,17 +87,25 @@ pub struct RedisStream {
8287
client: Arc<Redis>,
8388
}
8489

90+
#[derive(Debug)]
91+
pub struct CloudflareQueuesStream {
92+
config: CloudflareQueuesStreamConfig,
93+
client: Arc<CloudflareQueues>,
94+
}
95+
8596
#[derive(Debug)]
8697
pub struct StreamsClients {
8798
sns: Option<SNSStream>,
8899
webhook: Option<WebhookStream>,
89100
rabbitmq: Option<RabbitMQStream>,
90101
kafka: Option<KafkaStream>,
91102
redis: Option<RedisStream>,
103+
cloudflare_queues: Option<CloudflareQueuesStream>,
92104
}
93105

94106
impl StreamsClients {
95107
pub async fn new(stream_config: StreamsConfig) -> Self {
108+
#[allow(clippy::manual_map)]
96109
let sns = if let Some(config) = &stream_config.sns {
97110
Some(SNSStream {
98111
config: config.topics.clone(),
@@ -102,11 +115,13 @@ impl StreamsClients {
102115
None
103116
};
104117

118+
#[allow(clippy::manual_map)]
105119
let webhook = stream_config.webhooks.as_ref().map(|config| WebhookStream {
106120
config: config.clone(),
107121
client: Arc::new(Webhook::new()),
108122
});
109123

124+
#[allow(clippy::manual_map)]
110125
let rabbitmq = if let Some(config) = stream_config.rabbitmq.as_ref() {
111126
Some(RabbitMQStream {
112127
config: config.clone(),
@@ -116,6 +131,7 @@ impl StreamsClients {
116131
None
117132
};
118133

134+
#[allow(clippy::manual_map)]
119135
let kafka = if let Some(config) = stream_config.kafka.as_ref() {
120136
Some(KafkaStream {
121137
config: config.clone(),
@@ -129,6 +145,7 @@ impl StreamsClients {
129145
None
130146
};
131147

148+
#[allow(clippy::manual_map)]
132149
let redis = if let Some(config) = stream_config.redis.as_ref() {
133150
Some(RedisStream {
134151
config: config.clone(),
@@ -142,7 +159,20 @@ impl StreamsClients {
142159
None
143160
};
144161

145-
Self { sns, webhook, rabbitmq, kafka, redis }
162+
#[allow(clippy::manual_map)]
163+
let cloudflare_queues = if let Some(config) = stream_config.cloudflare_queues.as_ref() {
164+
Some(CloudflareQueuesStream {
165+
config: config.clone(),
166+
client: Arc::new(CloudflareQueues::new(
167+
config.api_token.clone(),
168+
config.account_id.clone(),
169+
)),
170+
})
171+
} else {
172+
None
173+
};
174+
175+
Self { sns, webhook, rabbitmq, kafka, redis, cloudflare_queues }
146176
}
147177

148178
fn has_any_streams(&self) -> bool {
@@ -151,6 +181,7 @@ impl StreamsClients {
151181
|| self.rabbitmq.is_some()
152182
|| self.kafka.is_some()
153183
|| self.redis.is_some()
184+
|| self.cloudflare_queues.is_some()
154185
}
155186

156187
fn chunk_data(&self, data_array: &Vec<Value>) -> Vec<Vec<Value>> {
@@ -450,6 +481,39 @@ impl StreamsClients {
450481
tasks
451482
}
452483

484+
fn cloudflare_queues_stream_tasks(
485+
&self,
486+
config: &CloudflareQueuesStreamQueueConfig,
487+
client: Arc<CloudflareQueues>,
488+
id: &str,
489+
event_message: &EventMessage,
490+
chunks: Arc<Vec<Vec<Value>>>,
491+
) -> StreamPublishes {
492+
let tasks: Vec<_> = chunks
493+
.iter()
494+
.enumerate()
495+
.map(|(index, chunk)| {
496+
let filtered_chunk: Vec<Value> = self.filter_chunk_event_data_by_conditions(
497+
&config.events,
498+
event_message,
499+
chunk,
500+
);
501+
502+
let publish_message_id = self.generate_publish_message_id(id, index, &None);
503+
let client = Arc::clone(&client);
504+
let queue_id = config.queue_id.clone();
505+
let publish_message =
506+
self.create_chunk_message_json(&config.events, event_message, &filtered_chunk);
507+
508+
task::spawn(async move {
509+
client.publish(&publish_message_id, &queue_id, &publish_message).await?;
510+
Ok(filtered_chunk.len())
511+
})
512+
})
513+
.collect();
514+
tasks
515+
}
516+
453517
pub async fn stream(
454518
&self,
455519
id: String,
@@ -549,6 +613,22 @@ impl StreamsClients {
549613
}
550614
}
551615

616+
if let Some(cloudflare_queues) = &self.cloudflare_queues {
617+
for config in &cloudflare_queues.config.queues {
618+
if config.events.iter().any(|e| e.event_name == event_message.event_name)
619+
&& config.networks.contains(&event_message.network)
620+
{
621+
streams.push(self.cloudflare_queues_stream_tasks(
622+
config,
623+
Arc::clone(&cloudflare_queues.client),
624+
&id,
625+
event_message,
626+
Arc::clone(&chunks),
627+
));
628+
}
629+
}
630+
}
631+
552632
let mut streamed_total = 0;
553633

554634
if index_event_in_order {
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
use reqwest::Client;
2+
use serde_json::Value;
3+
use thiserror::Error;
4+
5+
use crate::streams::STREAM_MESSAGE_ID_KEY;
6+
7+
#[derive(Error, Debug)]
8+
#[allow(clippy::enum_variant_names)]
9+
pub enum CloudflareQueuesError {
10+
#[error("Request error: {0}")]
11+
RequestError(#[from] reqwest::Error),
12+
13+
#[error("Cloudflare API error: {status} - {message}")]
14+
ApiError { status: u16, message: String },
15+
16+
#[error("Serialization error: {0}")]
17+
SerializationError(#[from] serde_json::Error),
18+
}
19+
20+
#[derive(Debug, Clone)]
21+
pub struct CloudflareQueues {
22+
client: Client,
23+
api_token: String,
24+
account_id: String,
25+
}
26+
27+
impl CloudflareQueues {
28+
pub fn new(api_token: String, account_id: String) -> Self {
29+
Self { client: Client::new(), api_token, account_id }
30+
}
31+
32+
pub async fn publish(
33+
&self,
34+
id: &str,
35+
queue_id: &str,
36+
message: &Value,
37+
) -> Result<(), CloudflareQueuesError> {
38+
let url = format!(
39+
"https://api.cloudflare.com/client/v4/accounts/{}/queues/{}/messages",
40+
self.account_id, queue_id
41+
);
42+
43+
let mut message_with_metadata = message.clone();
44+
if let Value::Object(ref mut map) = message_with_metadata {
45+
map.insert("message_id".to_string(), Value::String(id.to_string()));
46+
}
47+
48+
let payload = serde_json::json!({
49+
"body": message_with_metadata
50+
});
51+
52+
let response = self
53+
.client
54+
.post(&url)
55+
.header("Authorization", format!("Bearer {}", self.api_token))
56+
.header("Content-Type", "application/json")
57+
.header(STREAM_MESSAGE_ID_KEY, id)
58+
.json(&payload)
59+
.send()
60+
.await?;
61+
62+
if response.status().is_success() {
63+
Ok(())
64+
} else {
65+
let status = response.status().as_u16();
66+
let error_text = response.text().await.unwrap_or_else(|_| "Unknown error".to_string());
67+
Err(CloudflareQueuesError::ApiError { status, message: error_text })
68+
}
69+
}
70+
71+
#[allow(dead_code)]
72+
pub async fn publish_batch(
73+
&self,
74+
messages: Vec<(String, Value)>,
75+
queue_id: &str,
76+
) -> Result<(), CloudflareQueuesError> {
77+
if messages.is_empty() {
78+
return Ok(());
79+
}
80+
81+
// Cloudflare Queues supports up to 100 messages per batch
82+
const MAX_BATCH_SIZE: usize = 100;
83+
84+
for chunk in messages.chunks(MAX_BATCH_SIZE) {
85+
let url = format!(
86+
"https://api.cloudflare.com/client/v4/accounts/{}/queues/{}/messages/batch",
87+
self.account_id, queue_id
88+
);
89+
90+
let batch_messages: Vec<Value> = chunk
91+
.iter()
92+
.map(|(id, message)| {
93+
let mut message_with_metadata = message.clone();
94+
if let Value::Object(ref mut map) = message_with_metadata {
95+
map.insert("message_id".to_string(), Value::String(id.to_string()));
96+
}
97+
serde_json::json!({
98+
"body": message_with_metadata
99+
})
100+
})
101+
.collect();
102+
103+
let response = self
104+
.client
105+
.post(&url)
106+
.header("Authorization", format!("Bearer {}", self.api_token))
107+
.header("Content-Type", "application/json")
108+
.json(&batch_messages)
109+
.send()
110+
.await?;
111+
112+
if !response.status().is_success() {
113+
let status = response.status().as_u16();
114+
let error_text =
115+
response.text().await.unwrap_or_else(|_| "Unknown error".to_string());
116+
return Err(CloudflareQueuesError::ApiError { status, message: error_text });
117+
}
118+
}
119+
120+
Ok(())
121+
}
122+
}

core/src/streams/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,7 @@ mod redis;
1515
pub use clients::StreamsClients;
1616
pub use redis::{Redis, RedisError};
1717

18+
mod cloudflare_queues;
19+
pub use cloudflare_queues::{CloudflareQueues, CloudflareQueuesError};
20+
1821
pub const STREAM_MESSAGE_ID_KEY: &str = "x-rindexer-id";

core/src/types/core.rs

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,5 @@
1+
use alloy::dyn_abi::DynSolValue;
12
use alloy::json_abi::Param;
2-
use alloy::{
3-
dyn_abi::DynSolValue,
4-
primitives::{Address, BlockHash, TxHash, U256, U64},
5-
};
6-
use serde::{Deserialize, Serialize};
7-
8-
/// Metadata inside a log
9-
#[allow(dead_code)]
10-
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
11-
pub struct LogMeta {
12-
/// Address from which this log originated
13-
pub address: Address,
14-
15-
/// The block in which the log was emitted
16-
pub block_number: U64,
17-
18-
/// The block hash in which the log was emitted
19-
pub block_hash: BlockHash,
20-
21-
/// The transaction hash in which the log was emitted
22-
pub transaction_hash: TxHash,
23-
24-
/// Transactions index position log was created from
25-
pub transaction_index: U64,
26-
27-
/// Log index position in the block
28-
pub log_index: U256,
29-
}
303

314
/// Decoded log param.
325
#[derive(Debug, PartialEq, Clone)]

documentation/docs/pages/docs/changelog.mdx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
### Features
77
-------------------------------------------------
88
- feat: add health endpoint with comprehensive system status monitoring
9+
- feat: add cloudflare queues to the streams
910

1011
### Bug fixes
1112
-------------------------------------------------

0 commit comments

Comments
 (0)