🚀 Cortex Data Framework v3.1.2
Release Date: 10 February 2026
Version: 3.1.2
We're thrilled to announce the release of Cortex Data Framework v3.1 – the most feature-packed, performant, and developer-friendly version yet! This release brings major enhancements to stream processing, state management, mediator patterns, and introduces powerful new type utilities.
What's Changed
- v3/bug/209 : Improve error handling and diagnostics in Streams/Kafka by @eneshoxha in #210
Full Changelog: v3.1.1...v3.1.2
🎯 Highlights
| Feature | Description |
|---|---|
| 🪟 Unified Windowing API | Tumbling, Sliding, and Session windows with custom triggers |
| 🔗 Stream-Stream Joins | Windowed joins between two streams |
| 🎛️ Simplified APIs | Single type parameter for StreamBuilder |
| 🦆 DuckDB State Store | High-performance analytical state storage |
| 📡 Async Buffering | Backpressure-aware stream processing |
| 🔀 FanOut Operator | Multi-sink stream support with filters |
| 🎭 Type-Inferred Mediator | No more explicit generic parameters |
🌊 Stream Processing Enhancements
🪟 Unified Windowing API
Create tumbling, sliding, and session windows with a clean, unified API:
// Tumbling Window - Non-overlapping fixed-size windows
// Returns WindowResult<string, OrderEvent> containing the key and all items in the window
var stream = StreamBuilder<OrderEvent>
.CreateNewStream("OrderAggregation")
.Stream()
.TumblingWindow(
keySelector: e => e.CustomerId,
timestampSelector: e => e.OrderTime,
windowSize: TimeSpan.FromMinutes(5))
.Map(windowResult => new OrderSummary
{
CustomerId = windowResult.Key,
TotalAmount = windowResult.Items.Sum(o => o.Amount),
Count = windowResult.Items.Count,
WindowStart = windowResult.WindowStart,
WindowEnd = windowResult.WindowEnd
})
.Sink(summary => Console.WriteLine($"Customer {summary.CustomerId}: {summary.TotalAmount}"))
.Build();
// Sliding Window - Overlapping windows with slide interval
var slidingStream = StreamBuilder<SensorReading>
.CreateNewStream("SensorAverage")
.Stream()
.SlidingWindow(
keySelector: e => e.SensorId,
timestampSelector: e => e.Timestamp,
windowSize: TimeSpan.FromMinutes(10),
slideInterval: TimeSpan.FromMinutes(2))
.Map(window => new { window.Key, Average = window.Items.Average(r => r.Value) })
.Sink(avg => Console.WriteLine($"Sensor {avg.Key} avg: {avg.Average}"))
.Build();
// Session Window - Activity-based windows with inactivity gaps
var sessionStream = StreamBuilder<UserActivity>
.CreateNewStream("UserSessions")
.Stream()
.SessionWindow(
keySelector: e => e.UserId,
timestampSelector: e => e.ActivityTime,
inactivityGap: TimeSpan.FromMinutes(30))
.Map(session => new UserSession
{
UserId = session.Key,
Events = session.Items.ToList(),
Duration = session.WindowEnd - session.WindowStart
})
.Sink(s => Console.WriteLine($"Session ended for {s.UserId}: {s.Events.Count} events"))
.Build();🔗 Stream-Stream Joins
Join two streams together within time windows:
// Create the join operator with configuration
var joinOperator = new StreamStreamJoinOperator<Order, Payment, string, OrderWithPayment>(
leftKeySelector: order => order.OrderId,
rightKeySelector: payment => payment.OrderId,
leftTimestampSelector: order => order.CreatedAt,
rightTimestampSelector: payment => payment.ProcessedAt,
joinFunction: (order, payment) => new OrderWithPayment
{
Order = order,
Payment = payment
},
configuration: new StreamJoinConfiguration
{
WindowSize = TimeSpan.FromMinutes(5),
JoinType = StreamJoinType.Inner
});
// Build the stream with the join
var orderStream = StreamBuilder<Order>
.CreateNewStream("OrderPaymentJoin")
.Stream()
.JoinStream(joinOperator)
.Sink(result => Console.WriteLine($"Matched: {result.Order.OrderId}"))
.Build();
// Feed payments to the right side of the join
paymentStream.ForEach(payment => joinOperator.ProcessRight(payment));📊 Stream-Table Left Joins
Enrich stream data with table lookups:
// Create a state store for the "table" side
var customerStore = new InMemoryStateStore<string, Customer>("CustomerStore");
// Build stream with left join - emits result even when no match found
var enrichedStream = StreamBuilder<OrderEvent>
.CreateNewStream("EnrichedOrders")
.Stream()
.LeftJoin<Customer, string, EnrichedEvent>(
rightStateStore: customerStore,
keySelector: e => e.CustomerId,
joinFunction: (order, customer) => new EnrichedEvent
{
Order = order,
CustomerName = customer?.Name ?? "Unknown",
CustomerTier = customer?.Tier ?? "Standard"
})
.Sink(e => Console.WriteLine($"Order for {e.CustomerName}"))
.Build();🔀 FanOut: Multi-Sink Stream Support
Route events to multiple sinks with optional filters:
var stream = StreamBuilder<LogEvent>
.CreateNewStream("LogRouter")
.Stream()
.FanOut(fanOut => fanOut
.To("alerts", log => log.Level == "ERROR", log => alertService.SendAlert(log))
.To("metrics", log => log.Level == "INFO", log => metricsService.RecordMetric(log))
.To("archive", log => archiveService.Store(log))) // No filter = all events
.Build();
// With transformations before sinking
.FanOut(fanOut => fanOut
.To("console", log => Console.WriteLine(log.Message))
.ToWithTransform("json",
log => JsonSerializer.Serialize(log),
json => File.AppendAllText("logs.json", json)))📡 Async Buffering & Backpressure
Handle high-throughput scenarios with built-in backpressure:
// Option 1: Use predefined high-throughput settings
var stream = StreamBuilder<Event>
.CreateNewStream("HighThroughputStream")
.WithPerformanceOptions(StreamPerformanceOptions.HighThroughput(
bufferCapacity: 100_000,
concurrencyLevel: Environment.ProcessorCount))
.Stream()
.Map(e => ProcessEvent(e))
.Sink(e => SaveToDatabase(e))
.Build();
// Option 2: Custom configuration
var streamCustom = StreamBuilder<Event>
.CreateNewStream("CustomBufferedStream")
.WithPerformanceOptions(new StreamPerformanceOptions
{
EnableBufferedProcessing = true,
BufferCapacity = 10_000,
BackpressureStrategy = BackpressureStrategy.Block,
BatchSize = 100,
BatchTimeout = TimeSpan.FromMilliseconds(50),
ConcurrencyLevel = 4,
BlockingTimeout = TimeSpan.FromSeconds(30),
OnItemDropped = (item, reason) => _logger.LogWarning("Dropped: {Reason}", reason)
})
.Stream()
.Map(e => ProcessEvent(e))
.Sink(e => SaveToDatabase(e))
.Build();
// Option 3: Low-latency settings (immediate processing)
var lowLatencyStream = StreamBuilder<Event>
.CreateNewStream("LowLatencyStream")
.WithPerformanceOptions(StreamPerformanceOptions.LowLatency())
.Stream()
.Sink(e => ProcessImmediately(e))
.Build();🎛️ Simplified StreamBuilder API
The StreamBuilder now uses a single type parameter for stream creation:
// Clean, simple API - just specify the input type
var stream = StreamBuilder<OrderEvent>
.CreateNewStream("MyStream")
.Stream()
.Filter(e => e.IsValid)
.Map(e => Transform(e))
.Sink(e => Save(e))
.Build();
// With external source operator
var kafkaStream = StreamBuilder<OrderEvent>
.CreateNewStream("KafkaOrders")
.Stream(new KafkaSourceOperator<OrderEvent>(kafkaConfig))
.Filter(e => e.Amount > 100)
.Sink(e => ProcessLargeOrder(e))
.Build();📝 Structured Logging for Operators
All source and sink operators now include structured logging:
// Automatic structured logging for all operators
[14:23:45 INF] KafkaSourceOperator started consuming from topic "orders"
[14:23:46 INF] Processed 1000 messages in 1.2s (833 msg/s)
[14:23:47 WRN] Backpressure detected, slowing consumption🧠 Mediator Pattern Improvements
🎭 Type-Inferred Commands & Queries
No more explicit type parameters – the compiler infers everything:
// Before (v2.x) - Explicit type parameters required
var result = await mediator.SendCommandAsync<CreateOrderCommand, OrderId>(command);
var data = await mediator.SendQueryAsync<GetOrderQuery, OrderDto>(query);
// After (v3.0) - Type inference with extension methods!
var result = await mediator.SendAsync(command); // Returns OrderId
var data = await mediator.QueryAsync(query); // Returns OrderDto
// Void commands work too!
await mediator.SendAsync(new DeleteOrderCommand { OrderId = id });
// Publish notifications
await mediator.PublishAsync(new OrderCreatedNotification { OrderId = id });🔔 Notification Pipeline Behaviors
Add cross-cutting concerns to notifications:
public class LoggingNotificationBehavior<TNotification>
: INotificationBehavior<TNotification>
where TNotification : INotification
{
public async Task Handle(
TNotification notification,
NotificationHandlerDelegate next,
CancellationToken cancellationToken)
{
_logger.LogInformation("Publishing {Notification}", typeof(TNotification).Name);
await next();
_logger.LogInformation("Published {Notification}", typeof(TNotification).Name);
}
}🚨 Exception Handling Pipeline Behaviors
Centralized exception handling for your CQRS pipeline:
// Register exception handling behaviors
services.AddCortexMediator(
new[] { typeof(Program).Assembly },
options => options.AddExceptionHandlingBehaviors());
// Implement a custom exception handler
public class MyExceptionHandler : IExceptionHandler
{
private readonly ILogger<MyExceptionHandler> _logger;
public MyExceptionHandler(ILogger<MyExceptionHandler> logger)
{
_logger = logger;
}
public Task<bool> HandleAsync<TResult>(Exception exception, out TResult result)
{
_logger.LogError(exception, "An error occurred");
// Optionally provide a fallback result
if (exception is ValidationException validationEx)
{
result = default;
return Task.FromResult(true); // Exception handled
}
result = default;
return Task.FromResult(false); // Re-throw exception
}
}
// Register the handler
services.AddSingleton<IExceptionHandler, MyExceptionHandler>();💾 Query Caching Behavior
Cache query results with built-in invalidation:
public class GetProductQuery : IQuery<ProductDto>, ICacheable
{
public string ProductId { get; set; }
public string CacheKey => $"product:{ProductId}";
public TimeSpan CacheDuration => TimeSpan.FromMinutes(10);
}
// Invalidate cache when needed
await mediator.SendAsync(new InvalidateCacheCommand { Key = "product:123" });📡 Streaming Query Support with IAsyncEnumerable
Stream large result sets efficiently:
public class GetAllOrdersQuery : IStreamingQuery<OrderDto> { }
public class GetAllOrdersQueryHandler
: IStreamingQueryHandler<GetAllOrdersQuery, OrderDto>
{
public async IAsyncEnumerable<OrderDto> Handle(
GetAllOrdersQuery query,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
await foreach (var order in _repository.GetAllAsync(cancellationToken))
{
yield return order.ToDto();
}
}
}
// Usage
await foreach (var order in mediator.StreamAsync(new GetAllOrdersQuery()))
{
Console.WriteLine(order.Id);
}🔄 Request Pre/Post Processors
Execute logic before and after request handling:
public class AuditPreProcessor<TRequest> : IRequestPreProcessor<TRequest>
{
public Task Process(TRequest request, CancellationToken cancellationToken)
{
_auditLog.LogRequest(request);
return Task.CompletedTask;
}
}
public class MetricsPostProcessor<TRequest, TResponse>
: IRequestPostProcessor<TRequest, TResponse>
{
public Task Process(TRequest request, TResponse response, CancellationToken cancellationToken)
{
_metrics.RecordRequestCompleted(typeof(TRequest).Name);
return Task.CompletedTask;
}
}💳 Transactional Behaviors
Wrap commands in database transactions automatically:
// Install: dotnet add package Cortex.Mediator.Behaviors.Transactional
// Register transactional behaviors
services.AddCortexMediator(
new[] { typeof(Program).Assembly },
options => options.AddTransactionalBehaviors());
// Commands are automatically wrapped in transactions using TransactionScope
public class TransferFundsCommand : ICommand<TransferResult>
{
public string FromAccount { get; set; }
public string ToAccount { get; set; }
public decimal Amount { get; set; }
}
// The handler runs within a TransactionScope
public class TransferFundsHandler : ICommandHandler<TransferFundsCommand, TransferResult>
{
public async Task<TransferResult> Handle(TransferFundsCommand command, CancellationToken ct)
{
// All operations here are transactional
await _accountRepository.DebitAsync(command.FromAccount, command.Amount, ct);
await _accountRepository.CreditAsync(command.ToAccount, command.Amount, ct);
return new TransferResult { Success = true };
// Transaction commits automatically on success
// Transaction rolls back on exception
}
}📦 New Packages
🦆 Cortex.States.DuckDb
High-performance analytical state storage powered by DuckDB:
// Install
dotnet add package Cortex.States.DuckDb
// Usage
var stateStore = new DuckDbKeyValueStateStore<string, OrderAggregate>(
name: "OrderAggregates",
databasePath: "analytics.db", // Use ":memory:" for in-memory
tableName: "order_aggregates");
// Use with stream aggregation
var stream = StreamBuilder<OrderEvent>
.CreateNewStream("OrderAnalytics")
.Stream()
.Aggregate(
keySelector: e => e.CustomerId,
aggregateFunction: (agg, e) => new OrderAggregate
{
TotalSpent = (agg?.TotalSpent ?? 0) + e.Amount,
OrderCount = (agg?.OrderCount ?? 0) + 1
},
stateStore: stateStore)
.Sink(kv => Console.WriteLine($"Customer {kv.Key}: {kv.Value.TotalSpent}"))
.Build();🔗 Cortex.Streams.Mediator
Bridge between Cortex.Streams and Cortex.Mediator for CQRS integration:
// Install
dotnet add package Cortex.Streams.Mediator
// Sink stream events to commands
var stream = StreamBuilder<OrderEvent>
.CreateNewStream("OrderProcessing")
.Stream()
.SinkToCommand<OrderEvent, OrderEvent, ProcessOrderCommand, OrderResult>(
mediator,
orderEvent => new ProcessOrderCommand { OrderId = orderEvent.Id },
resultHandler: (e, result) => _logger.LogInformation("Processed: {Result}", result),
errorHandler: (e, ex) => _logger.LogError(ex, "Failed: {OrderId}", e.Id))
.Build();
// Publish stream events as notifications
var notificationStream = StreamBuilder<OrderEvent>
.CreateNewStream("OrderNotifications")
.Stream()
.Filter(e => e.Status == "Completed")
.SinkToNotification<OrderEvent, OrderEvent, OrderProcessedNotification>(
mediator,
orderEvent => new OrderProcessedNotification
{
OrderId = orderEvent.Id,
ProcessedAt = DateTime.UtcNow
})
.Build();
// Source streams from streaming queries
var queryStream = StreamBuilder<OrderEvent>
.CreateNewStream("LiveOrders")
.StreamFromQuery<OrderEvent, OrderEvent, GetLiveOrdersQuery>(
mediator,
new GetLiveOrdersQuery { Region = "US" },
errorHandler: ex => _logger.LogError(ex, "Query error"))
.Filter(e => e.Amount > 100)
.Sink(e => Console.WriteLine($"High-value order: {e.Id}"))
.Build();📝 Cortex.Serialization.Yaml
High-performance YAML serialization with advanced features:
// Install
dotnet add package Cortex.Serialization.Yaml
// Basic usage
var yaml = YamlSerializer.Serialize(config);
var config = YamlSerializer.Deserialize<AppConfig>(yaml);
// Advanced features: anchors, tags, flow style, comments
var yamlWithComments = """
# Application configuration
database:
host: localhost # Primary database
port: 5432
# Feature flags
features: &default-features
enableCache: true
maxConnections: 100
production:
<<: *default-features # Anchor reference
maxConnections: 500
""";💳 Cortex.Mediator.Behaviors.Transactional
Automatic transaction management for commands:
// Install
dotnet add package Cortex.Mediator.Behaviors.Transactional
// Register in your DI configuration
services.AddCortexMediator(
new[] { typeof(Program).Assembly },
options => options.AddTransactionalBehaviors());
// All commands are now automatically wrapped in TransactionScope🔧 Type System & Utilities
🎯 OneOf / AnyOf Types (Up to 8 Types)
Discriminated unions for type-safe polymorphism:
// OneOf - Exactly one of the types
public OneOf<Success, ValidationError, NotFoundError> ProcessOrder(Order order)
{
if (!order.IsValid)
return new ValidationError("Invalid order");
if (!_repository.Exists(order.Id))
return new NotFoundError(order.Id);
return new Success(order.Id);
}
// Pattern matching
var result = ProcessOrder(order);
var message = result.Match(
success => $"Order {success.Id} processed",
validation => $"Validation failed: {validation.Message}",
notFound => $"Order {notFound.Id} not found");
// AnyOf - Can be multiple types simultaneously
AnyOf<ILoggable, ISerializable> item = GetItem();
if (item.Is<ILoggable>())
item.As<ILoggable>().Log();✅ Result Type
Railway-oriented programming for error handling:
public Result<Order> GetOrder(string orderId)
{
var order = _repository.Find(orderId);
if (order == null)
return Result<Order>.Failure($"Order {orderId} not found");
return Result<Order>.Success(order);
}
// Usage with pattern matching
var result = GetOrder("123");
if (result.IsSuccess)
{
ProcessOrder(result.Value);
}
else
{
_logger.LogError(result.Error.Message);
}
// Fluent chaining
var finalResult = GetOrder("123")
.Map(order => EnrichOrder(order))
.Bind(order => ValidateOrder(order))
.Match(
success => $"Processed: {success.Id}",
failure => $"Failed: {failure.Message}");⚡ Performance Optimizations
🚀 Operator Allocation Optimizations
Reduced memory allocations across all stream operators:
- MapOperator: Zero-allocation for value types
- FilterOperator: Optimized predicate caching
- Windowing: Pooled buffer management
- State Stores: Reduced serialization overhead
📈 Mediator Pipeline Optimizations
- Handler resolution caching
- Reduced reflection overhead
- Optimized behavior chain execution
- Lazy initialization of optional components
Benchmark Results:
| Operation | v2.0 | v3.0 | Improvement |
|---|---|---|---|
| Command dispatch | 45μs | 12μs | 73% faster |
| Query dispatch | 38μs | 9μs | 76% faster |
| Notification publish | 52μs | 15μs | 71% faster |
| Memory per request | 2.4KB | 0.8KB | 67% less |
🛡️ Error Handling & Resilience
🔄 Unified Error Handling Across Sinks
All sink integrations now have consistent error handling:
var stream = StreamBuilder<Event>
.CreateNewStream("ResilientStream")
.WithErrorHandling(new StreamExecutionOptions
{
ErrorHandlingStrategy = ErrorHandlingStrategy.Retry,
MaxRetries = 3,
RetryDelay = TimeSpan.FromSeconds(1),
OnError = (context) =>
{
_logger.LogError(context.Exception, "Failed: {Op}", context.OperatorName);
_deadLetterQueue.Enqueue(context.Input);
return ErrorAction.Skip; // or ErrorAction.Retry, ErrorAction.Stop
}
})
.Stream()
.Map(e => ProcessEvent(e))
.Sink(e => _database.Save(e))
.Build();🛡️ Robust Resource Management
- Automatic connection recovery for Kafka, Pulsar, RabbitMQ
- Graceful shutdown with in-flight message completion
- Memory-bounded buffers with overflow strategies
- Circuit breaker integration
📊 Telemetry & Observability
📈 Stream Operator Telemetry
All operators now emit telemetry metrics:
var stream = StreamBuilder<Event>
.CreateNewStream("ObservableStream")
.WithTelemetry(new OpenTelemetryProvider(meterProvider, tracerProvider))
.Stream()
.Filter(e => e.IsValid) // Emits: cortex_filter_processed_total, cortex_filter_passed_total
.Map(e => Transform(e)) // Emits: cortex_map_processed_total, cortex_map_duration_seconds
.Sink(e => Save(e)) // Emits: cortex_sink_processed_total, cortex_sink_errors_total
.Build();Available Metrics:
cortex_stream_messages_processed_totalcortex_stream_processing_duration_secondscortex_stream_buffer_sizecortex_stream_errors_totalcortex_window_items_countcortex_state_store_operations_total
📝 Serialization
📄 YAML Enhancements
Advanced YAML features now supported:
// Flow style for compact representation
var flowStyle = YamlSerializer.Serialize(data, new YamlOptions
{
FlowStyle = FlowStyle.Block,
IndentSize = 2
});
// Preserve comments during round-trip
var doc = YamlDocument.Parse(yamlWithComments);
doc.SetValue("database.port", 5433);
var updated = doc.ToString(); // Comments preserved!
// Custom type tags
var tagged = """
!order
id: 12345
items:
- !product { sku: ABC123, quantity: 2 }
""";
var order = YamlSerializer.Deserialize<Order>(tagged);🔄 Breaking Changes
⚠️ API Changes
-
StreamBuilder API Simplified
// Before (v2.x) - Two separate type parameters required var stream = new StreamBuilder<OrderEvent, ProcessedOrder>("MyStream") .AddSource(source) .AddProcessor(processor) .Build(); // After (v3.0) - Clean fluent API with single entry point var stream = StreamBuilder<OrderEvent> .CreateNewStream("MyStream") .Stream() // or .Stream(sourceOperator) .Map(e => Process(e)) .Sink(e => Save(e)) .Build();
-
AddCortexMediator Signature Refactored
// Before (v2.x) services.AddCortexMediator(typeof(Program).Assembly); // After (v3.0) - Now requires assemblies array and options services.AddCortexMediator(new[] { typeof(Program).Assembly }, options => { options.AddDefaultBehaviors(); });
-
Unit of Work Removed
- The Unit of Work infrastructure has been removed
- Use
Cortex.Mediator.Behaviors.Transactionalfor transaction management
-
Windowing API Returns WindowResult
// Before (v2.x) - Direct aggregate function .TumblingWindow(..., aggregateFunction: items => Aggregate(items)) // After (v3.0) - Returns WindowResult, use Map for aggregation .TumblingWindow(keySelector, timestampSelector, windowSize) .Map(windowResult => new Aggregate { Key = windowResult.Key, Items = windowResult.Items, Start = windowResult.WindowStart })
📦 Package Changes
| Old Package | New Package / Status |
|---|---|
| Internal UoW | Use Cortex.Mediator.Behaviors.Transactional |
| - | ✨ New: Cortex.States.DuckDb |
| - | ✨ New: Cortex.Streams.Mediator |
| - | ✨ New: Cortex.Mediator.Behaviors.Transactional |
| - | ✨ New: Cortex.Serialization.Yaml |
| - | ✨ New: Cortex.Types |
🙏 Thank You
A huge thank you to everyone who contributed to this release! 🎉
👥 Contributors
We deeply appreciate all the developers, testers, and community members who helped make v3 possible through:
- 💻 Code contributions and pull requests
- 🐛 Bug reports and issue tracking
- 📖 Documentation improvements
- 💬 Community discussions and feedback
- ⭐ Starring the repository and spreading the word
🌟 Special Thanks
To our community on Discord for the valuable feedback, feature requests, and helping other developers get started with Cortex!
📚 Resources
📖 Documentation
📦 NuGet Packages
# Core packages
dotnet add package Cortex.Streams
dotnet add package Cortex.States
dotnet add package Cortex.Mediator
dotnet add package Cortex.Types
# New in v3.0
dotnet add package Cortex.States.DuckDb
dotnet add package Cortex.Streams.Mediator
dotnet add package Cortex.Mediator.Behaviors.Transactional
dotnet add package Cortex.Serialization.Yaml🔗 Links
🔮 What's Next?
We're already working on exciting features for future releases:
- 🌐 gRPC stream sources and sinks
- 📊 Real-time analytics dashboards
- 🔄 Change Data Capture (CDC) enhancements
- 🤖 AI/ML pipeline integration
- ☸️ Kubernetes operator for Cortex deployments
Stay tuned and join our Discord to influence the roadmap!