Conversation
352c78c to
7694305
Compare
- Add application_name check to preserve_local_filepath trigger so banzai_download_worker can clear filepath when needed - Add reconcile_orphaned_filepaths() to clear stale filepath entries - Set application_name in delete_calibration() for trigger bypass - Change version_rank to keep top 2 calibrations per config
- Add smart logging: startup summary, 5-min heartbeat, state-change detection - Silent during idle polls when nothing changes - Change routine "Found X calibrations" to DEBUG level - Add _get_type_summary() for compact heartbeat messages
Convert CalibrationImage ORM objects to CachedCalibrationInfo dataclass instances while the session is open. This prevents DetachedInstanceError when calibration data is accessed after get_calibrations_to_cache() returns.
Tests the full site deployment flow including PostgreSQL logical replication, calibration file caching, and frame reduction using cached calibrations.
Add null check to return False (safe default) when calibration attributes are None, preventing AttributeError during cache cleanup.
- Fix test_06: Use proper two-step archive API download (get metadata with auth header, then download from S3 URL) - Fix test_07: Read FITS headers to determine output path dynamically, use DAY-OBS instead of DATE-OBS to match banzai's directory structure
Site E2E tests require Docker and AUTH_TOKEN, so they should not run with the regular 'pytest -m e2e' command. Use '-m e2e_site' instead.
97080b8 to
f0f3b51
Compare
There was a problem hiding this comment.
Pull request overview
This PR implements a calibration file caching system for site deployments using PostgreSQL logical replication. The system enables observatory sites to maintain local copies of calibration files, allowing continued image processing during network outages.
Changes:
- Implements PostgreSQL logical replication setup to sync calibration metadata from AWS to local databases
- Adds a download worker daemon that polls the local database and fetches calibration files from the archive
- Implements version management that keeps only the top 2 calibrations per instrument/type/configuration
- Adds comprehensive unit tests and site-specific E2E tests
- Updates Docker Compose configurations for site deployments
Reviewed changes
Copilot reviewed 26 out of 27 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| sql/triggers.sql | PostgreSQL trigger to preserve local file paths during replication |
| site-banzai-env.default | Updated environment configuration template with cache settings |
| pytest.ini | Added site E2E test markers and excluded site_e2e directory |
| pyproject.toml | Added console entry points for download worker and cache init |
| docker-compose-site.yml | Added PostgreSQL, cache-init, and download-worker services |
| docker-compose-local.yml | New local-only deployment configuration without replication |
| banzai/dbs.py | Added CacheConfig model and related database functions |
| banzai/cache/replication.py | PostgreSQL replication management functions |
| banzai/cache/init.py | One-time initialization for site cache setup |
| banzai/cache/download_worker.py | Worker daemon for downloading and managing cached calibration files |
| banzai/tests/utils.py | Added FakeCacheConfig test utility |
| banzai/tests/test_replication.py | Unit tests for replication functions |
| banzai/tests/test_download_worker.py | Comprehensive unit tests for download worker |
| banzai/tests/test_cache_init.py | Unit tests for cache initialization |
| banzai/tests/site_e2e/* | Complete E2E test infrastructure for site deployments |
| README.rst | Updated documentation with site E2E test instructions |
| .gitignore | Added site E2E test data directories |
Comments suppressed due to low confidence (1)
site-banzai-env.default:21
- The AUTH_TOKEN is left empty in the default configuration file. While this is appropriate for a template, consider adding a comment warning users that this token is required for the download worker to function and should be kept secret. Also consider documenting where users can obtain this token.
# API Configuration
API_ROOT=https://archive-api.lco.global/
AUTH_TOKEN=
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
banzai/cache/download_worker.py
Outdated
| # Update database with filepath | ||
| with dbs.get_session(self.db_address) as session: | ||
| cal_record = session.query(dbs.CalibrationImage).get(cal_info.id) | ||
| if cal_record: | ||
| cal_record.filepath = self.cache_root | ||
| session.commit() |
There was a problem hiding this comment.
The download_calibration method doesn't set the application_name when updating the filepath in the database. This means the trigger's special handling for 'banzai_download_worker' won't apply to these updates. While this may be intentional (since we're setting filepath, not clearing it), it creates an asymmetry with delete_calibration and reconcile_orphaned_filepaths which do set the application_name. Consider whether application_name should also be set here for consistency, or add a comment explaining why it's not needed.
banzai/cache/download_worker.py
Outdated
| with open(temp_path, 'wb') as f: | ||
| f.write(buffer.read()) | ||
| buffer.close() | ||
|
|
||
| # Validate FITS file | ||
| if not self.validate_fits(temp_path): | ||
| os.remove(temp_path) | ||
| raise ValueError(f"Invalid FITS file: {cal_info.filename}") | ||
|
|
||
| # Atomic rename to final location | ||
| os.rename(temp_path, local_path) | ||
|
|
||
| # Update database with filepath | ||
| with dbs.get_session(self.db_address) as session: | ||
| cal_record = session.query(dbs.CalibrationImage).get(cal_info.id) | ||
| if cal_record: | ||
| cal_record.filepath = self.cache_root | ||
| session.commit() | ||
|
|
||
| logger.info(f"Successfully downloaded {cal_info.filename}") | ||
|
|
||
| except Exception as e: |
There was a problem hiding this comment.
The buffer is closed after writing to the temp file, but if an exception occurs during the write (line 479), the buffer won't be closed. Consider using a try-finally block or a context manager to ensure the buffer is always closed, preventing potential resource leaks.
| with open(temp_path, 'wb') as f: | |
| f.write(buffer.read()) | |
| buffer.close() | |
| # Validate FITS file | |
| if not self.validate_fits(temp_path): | |
| os.remove(temp_path) | |
| raise ValueError(f"Invalid FITS file: {cal_info.filename}") | |
| # Atomic rename to final location | |
| os.rename(temp_path, local_path) | |
| # Update database with filepath | |
| with dbs.get_session(self.db_address) as session: | |
| cal_record = session.query(dbs.CalibrationImage).get(cal_info.id) | |
| if cal_record: | |
| cal_record.filepath = self.cache_root | |
| session.commit() | |
| logger.info(f"Successfully downloaded {cal_info.filename}") | |
| except Exception as e: | |
| try: | |
| with open(temp_path, 'wb') as f: | |
| f.write(buffer.read()) | |
| # Validate FITS file | |
| if not self.validate_fits(temp_path): | |
| os.remove(temp_path) | |
| raise ValueError(f"Invalid FITS file: {cal_info.filename}") | |
| # Atomic rename to final location | |
| os.rename(temp_path, local_path) | |
| # Update database with filepath | |
| with dbs.get_session(self.db_address) as session: | |
| cal_record = session.query(dbs.CalibrationImage).get(cal_info.id) | |
| if cal_record: | |
| cal_record.filepath = self.cache_root | |
| session.commit() | |
| logger.info(f"Successfully downloaded {cal_info.filename}") | |
| finally: | |
| buffer.close() |
| subscription_sql = f""" | ||
| CREATE SUBSCRIPTION {subscription_name} | ||
| CONNECTION '{aws_connection_string}' | ||
| PUBLICATION {publication_name} | ||
| WITH ( | ||
| copy_data = true, | ||
| create_slot = true, | ||
| slot_name = '{slot_name}', | ||
| synchronous_commit = off | ||
| ); | ||
| """ |
There was a problem hiding this comment.
The subscription SQL uses f-string formatting to insert subscription_name, publication_name, and slot_name directly into the SQL. While these values come from function parameters and are not user input in the typical sense, this could be a SQL injection risk if site_id or other parameters are ever derived from untrusted sources. Consider using parameterized queries or at least validating/sanitizing the subscription_name, publication_name, and slot_name to ensure they contain only alphanumeric characters and underscores.
banzai/cache/replication.py
Outdated
| drop_sql = f"DROP SUBSCRIPTION IF EXISTS {subscription_name}" | ||
| if drop_slot: | ||
| drop_sql += " CASCADE" | ||
| drop_sql += ";" |
There was a problem hiding this comment.
Similar SQL injection concern as setup_subscription - the subscription_name is inserted directly into the SQL string using f-string formatting. Consider adding validation or using a safer approach to ensure subscription_name contains only valid identifier characters.
| - "5442:5432" | ||
| environment: | ||
| - POSTGRES_DB=banzai_local | ||
| - POSTGRES_USER=banzai |
There was a problem hiding this comment.
The PostgreSQL container uses POSTGRES_HOST_AUTH_METHOD=trust which allows connections without password authentication. While this may be acceptable for local development or closed network deployments, it's a security risk if the PostgreSQL port (5442) is exposed to untrusted networks. Consider documenting this security implication or using password-based authentication even for local deployments to follow security best practices.
| - POSTGRES_USER=banzai | |
| - POSTGRES_USER=banzai | |
| # WARNING: Using POSTGRES_HOST_AUTH_METHOD=trust disables password authentication. | |
| # This setting is intended only for local development or closed networks. | |
| # Do NOT expose port 5442 to untrusted networks; use password-based auth instead. |
banzai/cache/download_worker.py
Outdated
| def safe_to_delete(self, filename, needed_cals, files_on_disk): | ||
| """ | ||
| Check if it's safe to delete this calibration file. | ||
|
|
||
| A file is only safe to delete if we have 2+ OTHER files for the same | ||
| configuration already on disk. This prevents deleting the only working | ||
| calibration if a download fails. | ||
|
|
||
| Args: | ||
| filename: Filename of the calibration to potentially delete | ||
| needed_cals: Dict of {filename: CachedCalibrationInfo} for all needed calibrations | ||
| files_on_disk: Set of filenames currently cached | ||
|
|
||
| Returns: | ||
| bool: True if safe to delete, False otherwise | ||
| """ | ||
| # Get the calibration info for the file we want to delete | ||
| # It's not in needed_cals (that's why we want to delete it), so we need to query it | ||
| with dbs.get_session(self.db_address) as session: | ||
| cal_to_delete = session.query(dbs.CalibrationImage)\ | ||
| .filter_by(filename=filename).first() | ||
|
|
||
| if not cal_to_delete: | ||
| logger.warning(f"Cannot find calibration record for {filename}, skipping deletion") | ||
| return False | ||
|
|
||
| # If we can't determine attributes, don't delete (safe default) | ||
| if cal_to_delete.attributes is None: | ||
| logger.warning(f"Calibration {filename} has no attributes, skipping deletion") | ||
| return False | ||
|
|
||
| # Find all calibrations for the same configuration in needed_cals | ||
| same_config = [] | ||
| for needed_cal in needed_cals.values(): | ||
| # Check if this calibration is the same configuration | ||
| if (needed_cal.instrument_id == cal_to_delete.instrument_id and | ||
| needed_cal.type == cal_to_delete.type and | ||
| needed_cal.attributes.get('configuration_mode') == cal_to_delete.attributes.get('configuration_mode') and | ||
| needed_cal.attributes.get('binning') == cal_to_delete.attributes.get('binning')): | ||
|
|
||
| # For SKYFLAT/FLAT types, also check filter | ||
| if cal_to_delete.type in ('SKYFLAT', 'FLAT'): | ||
| if needed_cal.attributes.get('filter') == cal_to_delete.attributes.get('filter'): | ||
| same_config.append(needed_cal) | ||
| else: | ||
| same_config.append(needed_cal) | ||
|
|
||
| # Count how many of these are actually on disk | ||
| files_for_config_on_disk = [ | ||
| cal for cal in same_config | ||
| if cal.filename in files_on_disk | ||
| ] | ||
|
|
||
| # Safe to delete if we have 2+ other files on disk | ||
| return len(files_for_config_on_disk) >= 2 |
There was a problem hiding this comment.
In the safe_to_delete method, there's a potential issue with the safety check logic. The method checks if we have 2+ files for the same configuration on disk, but it doesn't account for the file being deleted itself. If the file to delete is the only one on disk (and there are 0 other files), it will return False (safe). However, if there's 1 other file on disk plus the file to delete (total of 2), it will still return False because len(files_for_config_on_disk) will be 1, not 2. This seems correct, but the comment on line 280 says "2+ OTHER files" which could be misleading since files_on_disk includes all files. Consider clarifying whether the logic is checking for 2+ files total (including the one to delete) or 2+ files excluding the one to delete.
sql/triggers.sql
Outdated
| BEGIN | ||
| -- Allow download worker to clear filepath by checking application_name | ||
| -- This enables the worker to clear filepath when deleting files or reconciling orphaned records | ||
| IF current_setting('application_name', true) = 'banzai_download_worker' THEN |
There was a problem hiding this comment.
The SQL trigger uses current_setting('application_name', true) which returns NULL when the setting doesn't exist (the true parameter makes it return NULL instead of raising an error). However, the comparison current_setting('application_name', true) = 'banzai_download_worker' will be NULL = 'string' which evaluates to NULL (not TRUE), so this should work correctly. However, for clarity and defensive programming, consider using current_setting('application_name', true) IS NOT DISTINCT FROM 'banzai_download_worker' to make the NULL-handling behavior more explicit.
| IF current_setting('application_name', true) = 'banzai_download_worker' THEN | |
| IF current_setting('application_name', true) IS NOT DISTINCT FROM 'banzai_download_worker' THEN |
banzai/cache/download_worker.py
Outdated
| def get_calibrations_to_cache(self): | ||
| """ | ||
| Query for top 2 versions of each calibration configuration. | ||
|
|
||
| Executes a ranking query to find the 2 most recent calibrations for each | ||
| unique combination of (instrument, type, config_mode, binning, filter). | ||
| Only returns calibrations matching the site's filter configuration. | ||
|
|
||
| Returns: | ||
| dict: {filename: CachedCalibrationInfo} mapping filenames to calibration data. | ||
| """ | ||
| config = self.get_cache_config() | ||
| if not config: | ||
| logger.warning("No cache configuration found - cache_config table may not be initialized") | ||
| return {} | ||
|
|
||
| try: | ||
| with dbs.get_session(self.db_address) as session: | ||
| # Build ORM query for calibrations at this site | ||
| query = session.query(dbs.CalibrationImage)\ | ||
| .join(dbs.Instrument)\ | ||
| .filter( | ||
| dbs.CalibrationImage.is_master == True, | ||
| dbs.CalibrationImage.is_bad == False, | ||
| dbs.Instrument.site == config.site_id | ||
| ) | ||
|
|
||
| # Handle wildcard vs specific instrument types | ||
| if config.instrument_types != ['*']: | ||
| query = query.filter(dbs.Instrument.type.in_(config.instrument_types)) | ||
|
|
||
| all_cals = query.all() | ||
|
|
||
| # Group calibrations by config key | ||
| grouped = defaultdict(list) | ||
| for cal in all_cals: | ||
| key = self._get_config_key(cal) | ||
| grouped[key].append(cal) | ||
|
|
||
| # Rank each group and take top 2 | ||
| needed = {} | ||
| for key, cals in grouped.items(): | ||
| # Sort by (dateobs, id) descending and take top 2 | ||
| sorted_cals = sorted(cals, key=lambda c: (c.dateobs, c.id), reverse=True) | ||
| for cal in sorted_cals[:2]: | ||
| info = CachedCalibrationInfo( | ||
| id=cal.id, | ||
| filename=cal.filename, | ||
| type=cal.type, | ||
| frameid=cal.frameid, | ||
| instrument_id=cal.instrument_id, | ||
| attributes=dict(cal.attributes) if cal.attributes else {} | ||
| ) | ||
| needed[info.filename] = info | ||
|
|
||
| logger.debug(f"Found {len(needed)} calibrations that should be cached") | ||
| return needed |
There was a problem hiding this comment.
The get_calibrations_to_cache method loads all calibrations into memory and then sorts them in Python. For sites with many instruments or a large calibration history, this could consume significant memory. Consider whether this approach will scale as the database grows. A more efficient approach might be to use a window function or SQL-based ranking to let the database handle the sorting and limiting, especially since PostgreSQL supports window functions like ROW_NUMBER() OVER (PARTITION BY ... ORDER BY ...). However, this may be acceptable for the expected scale.
This PR implements the calibration file caching system for site deployments. This allows sites to maintain local
copies of calibration files, enabling image processing during network outages.
sequenceDiagram participant AWS as AWS PostgreSQL participant Local as Site PostgreSQL participant Worker as Download Worker participant Archive as Archive API participant Cache as Cache Directory AWS->>Local: Replicate calibration metadata Note over Local: filepath = NULL (from AWS) loop Every poll interval Worker->>Local: Query needed calibrations (top 2 per config) Local-->>Worker: Return calibrations without local files Worker->>Archive: Download calibration file Archive-->>Worker: FITS file data Worker->>Cache: Write file to disk Worker->>Local: UPDATE db filepath = '/data/cal/file.fits' Worker->>Cache: Delete old versions (keep top 2) Worker->>Local: Clear db filepath for deleted files end AWS->>Local: Replication update (filepath=NULL) Note over Local: Trigger preserves<br/>local db filepathKey components:
Changes
Tests
Site e2e tests are separate from the main e2e tests because it involves a different configuration.
For the site e2e tests, we mock the AWS banzai db with a local postgres instance that includes publication. We also run the docker-compose-site.yml setup and subscribe to the local publication. Then we populate the publication db with some calibration frames and test:
Unit tests are also included for the files in
banzai/cache/.Note: this PR is based on banzai-local and should be merged after that branch lands