Skip to content

Conversation

@Anvayt24
Copy link

@Anvayt24 Anvayt24 commented Jan 4, 2026

Description

This PR implements a "sliding window" pipeline to address Issue #73, replacing the previous batch-based approach with an asynchronous producer-consumer model. This allows data processing (CPU-bound) to occur concurrently with data downloading (I/O-bound), significantly improving throughput.

Key Changes

  • src/satellite_consumer/download_eumetsat.py:

    • Introduced buffered_download_stream: An async generator that manages a sliding window of concurrent download tasks.
    • Implemented a deque-based task buffer to ensuring that while downloads happen in parallel, results are yielded in strict chronological order to the consumer.
  • src/satellite_consumer/consume.py:

    • Refactored consume_to_store to be async.
    • Replaced the synchronous loop with an async for loop over the buffered stream.
    • Wrapped the CPU-intensive process_raw function in asyncio.to_thread to prevent blocking the event loop (and thus the background downloads).
    • Integrated with the new upstream structure (using consume.py instead of run.py).
  • src/satellite_consumer/cmd/main.py:

    • Updated the entry point to execute the consumer using asyncio.run().
    • Exposed concurrent_downloads configuration (defaulting to 5 workers).

Verification

  • Unit Tests: Passed (uv run python -m unittest discover ...)
  • Type Safety: Passed mypy checks on all modified files.
  • Linting: Passed ruff check.
  • Manual Verification: Validated that logs show processing starting before downloads complete (e.g., . . . . [Processing] num_files=1).

@Anvayt24
Copy link
Author

Anvayt24 commented Jan 4, 2026

@peterdudfield check this out!

@dfulu dfulu closed this Jan 5, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants