Skip to content

feat: Use parquet files for data export#18

Open
schwepmo wants to merge 11 commits intomainfrom
tse-to-parquet
Open

feat: Use parquet files for data export#18
schwepmo wants to merge 11 commits intomainfrom
tse-to-parquet

Conversation

@schwepmo
Copy link
Contributor

Description

This PR introduces comprehensive support for storing Traffic State Estimation (TSE) data using Apache Parquet and adds new aggregation capabilities for traffic metrics.
Key Changes

  1. Parquet Storage Support (FcdParquetStorage):
    • Implemented a new storage backend using Apache Parquet for efficient, columnar data storage.
    • Added support for writing:
      • Raw FCD records (fcd_records.parquet) as GeoParquet.
      • Traversal metrics (traversal_metrics.parquet).
      • Aggregated metrics (aggregated_metrics.parquet).
      • System thresholds (thresholds.parquet).
      • Static connection data (connection_data.parquet).
    • Added a flexible ParquetSink API to allow for custom data extensions.
  2. Aggregated Spatio-Temporal Processor:
    • Added AggregatedSpatioTemporalProcessor to compute and store metrics over configurable time intervals (e.g., 15-minute windows).
    • This significantly reduces data volume for long-running simulations while preserving analytical value.
    • Includes a configurable processingDelay to handle late-arriving data in distributed simulations.
  3. New Traffic Metrics:
    • Naive Mean Speed: A simple arithmetic mean of vehicle speeds.
    • Speed Performance Index (SPI): A ratio of actual speed to maximum allowed speed (clamped to 1.0).
    • These metrics are now calculated and stored alongside existing spatial and temporal mean speeds.
  4. Refactoring:
    • Extracted shared logic into AbstractSpatioTemporalProcessor to reduce code duplication between the standard and aggregated processors.
    • Updated FcdDataStorage interface to support aggregated metric insertion.
  5. Documentation:
    • Added PARQUET_SUPPORT.md (merged into README.md) detailing configuration and usage of the new Parquet features.
    • Updated AGENTS.md and README.md to reflect the new architecture and build options.
      Affected parts of the online documentation
  • The Eclipse MOSAIC documentation regarding the Traffic State Estimation app will need to be updated to include:
    • Configuration options for FcdParquetStorage.
    • Configuration options for AggregatedSpatioTemporalProcessor.
    • Details on the new Parquet output files and their schemas.

Definition of Done

Prerequisites

  • You have read CONTRIBUTING.md carefully.
    Required
  • The title of this merge request follows the scheme type(scope): description
  • origin/main has been merged into your Fork.
  • Coding guidelines have been followed (see CONTRIBUTING.md).
  • All checks on GitHub pass.

Special notes to reviewer

  • The FcdParquetStorage implementation includes a mechanism to rewrite the traversal_metrics.parquet file when Relative Traffic Status Metrics (RTSM) are updated, as Parquet files are immutable. This is handled by caching relevant records in memory and rewriting the file upon update.
  • A new Maven profile fcd-shaded-jar has been added to build a standalone JAR with all dependencies, which is useful for deployment in environments where dependencies are not provided by the container.

Signed-off-by: Moritz Schweppenhaeuser <moritz.schweppenhaeuser@fokus.fraunhofer.de>
Add AggregatedSpatioTemporalProcessor to aggregate traffic metrics over
configurable time intervals, significantly reducing data volume while
preserving analytical value.

Refactoring:
- Extract AbstractSpatioTemporalProcessor base class with shared logic
- Refactor SpatioTemporalProcessor to extend base (273→92 lines, 66% reduction)
- Replace custom aggregation with MOSAIC Aggregator class
- Eliminate 265+ lines of duplicate code across processors
- Introduce ComputedMetrics record for consistent data transfer

Storage Layer:
- Add insertAggregatedTraversalMetrics() to FcdDataStorage interface
- Implement SQLite storage with aggregated_traversal_metrics table
- Implement Parquet storage with aggregated_metrics.parquet output
- Add AggregatedMetricRecord for Parquet serialization

Configuration:
- Add example config to BeST scenario TseServerApp.json
- Add AGGREGATED_PROCESSOR_EXAMPLE.md documentation
- Support configurable aggregationInterval and processingDelay

Build:
- Add fcd-shaded-jar Maven profile for standalone JAR creation

Benefits:
- Reduces storage requirements (865 traversals → ~50 intervals)
- Maintains temporal and spatial mean speed accuracy
- Enables min/max/variance analysis via MOSAIC Aggregator
- Single source of truth for metric computation logic
- Handles late-arriving data with configurable processing delay
- Add naiveMeanSpeed and speedPerformanceIndex to ComputedMetrics
- Store speedPerformanceIndex in traversal_metrics (Parquet)
- Aggregate and store naiveMeanSpeed and speedPerformanceIndex in aggregated_traversal_metrics (SQLite and Parquet)
- Update FcdDataStorage interface and implementations
- Calculate average Speed Performance Index in getAveragesForInterval
- Pass calculated value instead of null to TraversalStatistics constructor
- Limit SPI values to 1.0 even if vehicles exceed max speed
- Prevents misleading metrics > 100% performance
- Limit SPI values to 1.0 even if vehicles exceed max speed
- Prevents misleading metrics > 100% performance
@schwepmo schwepmo self-assigned this Feb 13, 2026
Signed-off-by: Moritz Schweppenhaeuser <moritz.schweppenhaeuser@fokus.fraunhofer.de>
Signed-off-by: Moritz Schweppenhaeuser <moritz.schweppenhaeuser@fokus.fraunhofer.de>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant