-
Notifications
You must be signed in to change notification settings - Fork 1
35 - Add configuration bucket and Auto-Sync for Agent #42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
@AnthonyCvn what would be the best way to restart a recorder ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 12 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
reductstore_agent/recorder.py
Outdated
| self.bucket = await self.client.create_bucket( | ||
| "configuration", settings, exist_ok=True | ||
| ) |
Copilot
AI
Dec 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This overwrites self.bucket which was meant to store the data bucket initialized in init_reduct_bucket(). This will cause data writes to fail as they'll attempt to write to the configuration bucket instead. Consider using a separate variable like self.config_bucket or config_bucket as a local variable, and updating the read_configuration_bucket method accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 14 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if self.remote_config: | ||
| self.pull_timer = self.create_timer( | ||
| self.remote_config.pull_frequency_s, | ||
| lambda: self.loop.create_task(self.check_remote_updates()), | ||
| ) |
Copilot
AI
Dec 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Creating an async task within a timer callback (line 98) can lead to task lifecycle issues. If the timer fires while a previous check is still running, multiple concurrent calls to check_remote_updates could occur, potentially causing race conditions when updating self.pipeline_configs and related state. Consider adding a flag to prevent concurrent configuration updates or use a different scheduling mechanism.
| async def read_remote_bucket(self) -> str: | ||
| """Read configuration bucket from ReductStore.""" | ||
| config_bucket = await self.client.get_bucket("configuration") | ||
| entry_name = self.remote_config.entry | ||
| entry = await config_bucket.get_entry(entry_name) | ||
| async with entry.read() as record: | ||
| data = await record.read_all() | ||
| yaml_str = data.decode("utf-8") | ||
| return yaml_str | ||
|
|
||
| async def check_remote_updates(self): | ||
| """Periodically check for configuration updates.""" | ||
| try: | ||
| yaml_str = await self.read_remote_bucket() | ||
| self.reload_configuration(yaml_str) | ||
| except Exception as exc: | ||
| self.log_warn(f"Failed to fetch configuration: {exc}") | ||
|
|
||
| def reload_pipeline_configuration(self, yaml_str: str): | ||
| """Reload pipeline configuration.""" | ||
| new_config = self.validate_config(yaml_str) | ||
| if new_config is None: | ||
| self.log_warn( | ||
| lambda: "Failed to validate new configuration. " | ||
| "Keeping existing configuration." | ||
| ) | ||
| elif new_config.pipeline_configs == self.pipeline_configs: | ||
| self.log_info(lambda: "No changes in pipeline configuration.") | ||
| else: | ||
| self.check_diff_pipelines(new_config.pipeline_configs) | ||
| self.pipeline_configs = new_config.pipeline_configs | ||
| self.log_info(lambda: "Pipeline configuration updated.") | ||
|
|
||
| def validate_config(self, yaml_str: str): | ||
| """Validate fetched config, if not valid use past valid config.""" | ||
| try: | ||
| loaded_data = yaml.safe_load(yaml_str) | ||
| pipeline_cfgs = { | ||
| name: PipelineConfig(**cfg) | ||
| for name, cfg in loaded_data.get("pipelines", {}).items() | ||
| } | ||
| self.log_info(lambda: "Pipeline Configuration validated.") | ||
| return RemoteConfig(pipeline_configs=pipeline_cfgs) | ||
| except Exception as exc: | ||
| self.log_warn( | ||
| f"Configuration validation failed: {exc}. " | ||
| "Using previous valid configuration." | ||
| ) | ||
| return None |
Copilot
AI
Dec 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The remote configuration functionality (reading from remote bucket, validating configs, and managing pipeline changes) lacks test coverage. Given the comprehensive test suite for other recorder features, tests should be added for:
- Reading configuration from remote bucket
- Handling missing or invalid remote configurations
- Validating YAML configuration format
- Pipeline addition, removal, and modification during runtime
- Concurrent configuration updates
- Error handling when remote bucket is unavailable
reductstore_agent/recorder.py
Outdated
| entry = await config_bucket.get_entry(entry_name) | ||
| async with entry.read() as record: | ||
| data = await record.read_all() | ||
| yaml_str = data.decode("utf-8") |
Copilot
AI
Dec 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code attempts to read the latest record from the entry without specifying a timestamp. If the entry exists but is empty (no records), this will raise an exception. Consider using entry.query() to check if records exist before attempting to read, or handle the specific exception that occurs when no records are found.
Closes #
Please check if the PR fulfills these requirements
What kind of change does this PR introduce?
Introducing Auto-Sync and remote configuration
What was changed?
(Describe the changes)
Related issues
Closes #35
Does this PR introduce a breaking change?
(What changes might users need to make in their application due to this PR?)
Other information: