Skip to content

Commit 849e429

Browse files
committed
feature(spill): Add resource spilling support
1 parent bcce9b6 commit 849e429

File tree

7 files changed

+1544
-15
lines changed

7 files changed

+1544
-15
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ description = "A batch processing clone of Apache Beam in Rust."
77
repository = "https://github.com/nhubbard/ironbeam"
88

99
[features]
10-
default = ["io-jsonl", "io-csv", "io-parquet", "parallel-io", "compression-gzip", "compression-zstd", "compression-bzip2", "compression-xz", "metrics", "checkpointing"]
10+
default = ["io-jsonl", "io-csv", "io-parquet", "parallel-io", "compression-gzip", "compression-zstd", "compression-bzip2", "compression-xz", "metrics", "checkpointing", "spilling"]
1111

1212
# IO backends
1313
io-jsonl = []
@@ -24,6 +24,7 @@ compression-xz = ["dep:xz2"]
2424
parallel-io = []
2525
metrics = []
2626
checkpointing = ["dep:bincode", "dep:sha2"]
27+
spilling = ["dep:bincode"]
2728

2829
[dependencies]
2930
anyhow = "1"

README.md

Lines changed: 158 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,21 @@ A data processing framework for Rust inspired by Apache Beam and Google Cloud Da
44

55
## Features
66

7-
- Declarative pipeline API with fluent interface
8-
- Stateless operations: `map`, `filter`, `flat_map`, `map_batches`
9-
- Stateful operations: `group_by_key`, `combine_values`, keyed aggregations
10-
- Built-in combiners: Sum, Min, Max, Average, DistinctCount, TopK
11-
- Join support: inner, left, right, and full outer joins
12-
- Side inputs for enriching streams with auxiliary data
13-
- Sequential and parallel execution modes
14-
- Type-safe with compile-time correctness
15-
- Optional I/O backends: JSON Lines, CSV, Parquet
16-
- Optional compression: gzip, zstd, bzip2, xz
17-
- Metrics collection and checkpointing support
7+
- **Declarative pipeline API** with fluent interface
8+
- **Stateless operations**: `map`, `filter`, `flat_map`, `map_batches`
9+
- **Stateful operations**: `group_by_key`, `combine_values`, keyed aggregations
10+
- **Built-in combiners**: Sum, Min, Max, Average, DistinctCount, TopK
11+
- **Join support**: inner, left, right, and full outer joins
12+
- **Side inputs** for enriching streams with auxiliary data
13+
- **Sequential and parallel execution** modes
14+
- **Type-safe** with compile-time correctness
15+
- **Optional I/O backends**: JSON Lines, CSV, Parquet
16+
- **Optional compression**: gzip, zstd, bzip2, xz
17+
- **Metrics collection** and **checkpointing** for fault tolerance
18+
- **Automatic memory spilling** to disk for memory-constrained environments
19+
- **Cloud I/O abstractions** for provider-agnostic cloud integrations
20+
- **Data validation utilities** for production-grade error handling
21+
- **Comprehensive testing framework** with fixtures and assertions
1822

1923
## Installation
2024

@@ -44,6 +48,7 @@ Available feature flags:
4448
- `parallel-io` - parallel I/O operations
4549
- `metrics` - pipeline metrics collection
4650
- `checkpointing` - checkpoint and recovery support
51+
- `spilling` - automatic memory spilling to disk
4752

4853
## Quick Start
4954

@@ -224,7 +229,7 @@ let windowed = data
224229

225230
### Checkpointing
226231

227-
Save and restore the pipeline's state:
232+
Save and restore the pipeline's state for fault tolerance:
228233

229234
```rust
230235
data.checkpoint("checkpoints/step1")?;
@@ -243,6 +248,87 @@ let metrics = p.get_metrics();
243248
println!("Elements processed: {}", metrics.elements_processed);
244249
```
245250

251+
### Automatic Memory Spilling
252+
253+
For memory-constrained environments, Ironbeam can automatically spill data to disk when memory limits are exceeded:
254+
255+
```rust
256+
use ironbeam::spill::SpillConfig;
257+
use ironbeam::spill_integration::init_spilling;
258+
259+
// Configure automatic spilling with a 100MB memory limit
260+
init_spilling(SpillConfig::new()
261+
.with_memory_limit(100 * 1024 * 1024)
262+
.with_spill_directory("/tmp/ironbeam-spill")
263+
.with_compression(true));
264+
265+
// Now run your pipeline - spilling happens automatically
266+
let results = large_dataset
267+
.map(|x| heavy_computation(x))
268+
.collect_seq()?;
269+
```
270+
271+
Data is transparently spilled to disk when memory pressure is detected and automatically restored when needed. This allows processing datasets larger than available RAM without manual intervention.
272+
273+
[Learn more about spilling →](https://github.com/nhubbard/ironbeam/blob/main/src/spill.rs)
274+
275+
### Cloud I/O Abstractions
276+
277+
Write provider-agnostic code that works with any cloud service:
278+
279+
```rust
280+
use ironbeam::io::cloud::*;
281+
282+
// Use fake implementations for testing
283+
let storage = FakeObjectIO::new();
284+
storage.put_object("bucket", "key", b"data")?;
285+
286+
// In production, swap in real implementations (AWS S3, GCS, Azure Blob, etc.)
287+
// without changing your business logic
288+
```
289+
290+
Supported cloud service categories:
291+
- **Storage**: Object storage (S3/GCS/Azure), warehouses (BigQuery/Redshift), databases
292+
- **Messaging**: Pub/sub (Kafka/Kinesis), queues (SQS), notifications (SNS)
293+
- **Compute**: Serverless functions (Lambda/Cloud Functions)
294+
- **ML**: Model inference (SageMaker/Vertex AI)
295+
- **Observability**: Metrics, logging, configuration, caching
296+
297+
All traits are synchronous by design and include fake implementations for unit testing.
298+
299+
[Learn more about cloud I/O →](https://github.com/nhubbard/ironbeam/blob/main/src/io/cloud/mod.rs)
300+
301+
### Data Validation
302+
303+
Handle bad data gracefully in production pipelines:
304+
305+
```rust
306+
use ironbeam::validation::*;
307+
308+
impl Validate for MyRecord {
309+
fn validate(&self) -> ValidationResult {
310+
let mut errors = Vec::new();
311+
312+
if self.email.is_empty() {
313+
errors.push(ValidationError::field("email", "Email required"));
314+
}
315+
316+
if self.age < 0 || self.age > 150 {
317+
errors.push(ValidationError::field("age", "Invalid age"));
318+
}
319+
320+
if errors.is_empty() { Ok(()) } else { Err(errors) }
321+
}
322+
}
323+
324+
// Apply validation with configurable error handling
325+
let validated = data
326+
.try_map(|record: &MyRecord| record.validate())
327+
.collect_fail_fast()?; // Or use .collect_skip_errors()
328+
```
329+
330+
[Learn more about validation →](https://github.com/nhubbard/ironbeam/blob/main/src/validation.rs)
331+
246332
## Examples
247333

248334
The `examples/` directory contains complete demonstrations:
@@ -254,22 +340,80 @@ The `examples/` directory contains complete demonstrations:
254340
- `compressed_io.rs` - Working with compressed files
255341
- `checkpointing_demo.rs` - Checkpoint and recovery
256342
- `metrics_example.rs` - Collecting metrics
343+
- `cloud_io_demo.rs` - Cloud service integrations
344+
- `data_quality_validation.rs` - Production data validation
257345
- `testing_pipeline.rs` - Testing patterns
258346

259347
Run examples with:
260348

261349
```bash
262350
cargo run --example etl_pipeline --features io-jsonl,io-csv
351+
cargo run --example cloud_io_demo
352+
```
353+
354+
## Testing Your Pipelines
355+
356+
Ironbeam includes a comprehensive testing framework with utilities specifically designed for pipeline testing:
357+
358+
### Test Utilities
359+
360+
```rust
361+
use ironbeam::testing::*;
362+
363+
#[test]
364+
fn test_my_pipeline() -> anyhow::Result<()> {
365+
let p = TestPipeline::new();
366+
367+
let result = from_vec(&p, vec![1, 2, 3])
368+
.map(|x: &i32| x * 2)
369+
.collect_seq()?;
370+
371+
// Specialized assertions for collections
372+
assert_collections_equal(result, vec![2, 4, 6]);
373+
assert_all(&result, |x| x % 2 == 0);
374+
375+
Ok(())
376+
}
263377
```
264378

265-
## Testing
379+
### Pre-built Test Fixtures
266380

267-
Run tests with:
381+
```rust
382+
use ironbeam::testing::fixtures::*;
383+
384+
// Ready-to-use test datasets
385+
let logs = sample_log_entries(); // Web server logs
386+
let users = user_product_interactions(); // Relational data
387+
let series = time_series_data(); // Sensor readings
388+
```
389+
390+
### Debug Utilities
391+
392+
```rust
393+
// Inspect data during test execution
394+
let result = data
395+
.debug_inspect("after filter")
396+
.filter(|x: &i32| x > 10)
397+
.debug_count("filtered count")
398+
.collect_seq()?;
399+
```
400+
401+
[Learn more about testing →](https://github.com/nhubbard/ironbeam/blob/main/src/testing.rs)
402+
403+
### Running Tests
404+
405+
Run the full test suite:
268406

269407
```bash
270408
cargo test
271409
```
272410

411+
Run tests with specific features:
412+
413+
```bash
414+
cargo test --features spilling
415+
```
416+
273417
For coverage:
274418

275419
```bash

src/lib.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@
188188
//! - `parallel-io` - Enable parallel I/O operations (`write_*_par` methods)
189189
//! - `metrics` - Enable metrics collection and reporting (enabled by default)
190190
//! - `checkpointing` - Enable automatic checkpointing for fault tolerance (enabled by default)
191+
//! - `spilling` - Enable automatic memory spilling to disk (enabled by default)
191192
//!
192193
//! ## Examples
193194
//!
@@ -534,6 +535,11 @@ pub mod metrics;
534535
#[cfg(feature = "checkpointing")]
535536
pub mod checkpoint;
536537

538+
#[cfg(feature = "spilling")]
539+
pub mod spill;
540+
541+
pub mod spill_integration;
542+
537543
// General re-exports
538544
pub use collection::{CombineFn, Count, PCollection, RFBound};
539545
pub use combiners::{AverageF64, DistinctCount, Max, Min, Sum, TopK};

0 commit comments

Comments
 (0)