-
Notifications
You must be signed in to change notification settings - Fork 29
Catalog system enhancements #88
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- Add incremental updates (only refresh changed collections) - Replace polling with event-driven enrichment via ThreadToAsyncBridge - Add parallel schema inference with asyncio.Semaphore(5) - Add job queue with priority and retry support - Add 4 new MCP catalog tools - Add documentation
Summary of ChangesHello @teetangh, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly overhauls the catalog system by introducing several key enhancements aimed at improving efficiency, responsiveness, and manageability. The changes include optimizing schema refreshes through incremental updates and parallel processing, transitioning to an event-driven model for enrichment, and implementing a robust job queue for inference tasks. Additionally, new administrative tools are provided to interact with and monitor the enhanced catalog system. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request significantly enhances the catalog system by introducing incremental updates for schema refresh, an event-driven mechanism for enrichment sampling, parallel schema inference using asyncio.Semaphore, and a priority-based job queue for managing inference tasks. It also adds new MCP tools for interacting with these catalog features. However, the review highlights several issues: the refresh_collection_schema tool's use of asyncio.run() is problematic for cross-thread communication with the job queue, a SQL injection vulnerability exists in _get_index_definitions due to f-string query construction, store.set_collection_metadata() causes inefficient disk I/O when called repeatedly within a loop, the InferenceJobQueue.enqueue method's behavior regarding duplicate jobs contradicts its docstring, and the _get_document_count function is duplicated across modules, requiring refactoring for maintainability.
| # Use asyncio to run the enqueue | ||
| try: | ||
| loop = asyncio.get_running_loop() | ||
| # If we're in an async context, create a task | ||
| asyncio.create_task(queue.enqueue(job)) | ||
| queued = True | ||
| except RuntimeError: | ||
| # Not in async context - run in new loop | ||
| queued = asyncio.run(queue.enqueue(job)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using asyncio.run() here to call the async queue.enqueue(job) method is problematic. asyncio.run() creates a new event loop, runs the coroutine, and closes it. The InferenceJobQueue and its internal asyncio.Lock are tied to the event loop they are created in. This approach can lead to race conditions, deadlocks, or RuntimeError: Event loop is closed if the queue is accessed from different contexts (e.g., the worker thread's loop).
A tool, which is synchronous, should not directly interact with async components in this way. A better approach would be to use a thread-safe mechanism to pass the job to the worker's event loop, for example by having a thread-safe enqueue method on the queue that uses loop.call_soon_threadsafe internally.
| query = ( | ||
| f"SELECT meta().id, i.name, i.index_key, i.metadata.definition " | ||
| f"FROM system:indexes as i " | ||
| f"WHERE i.bucket_id = '{bucket_name}' " | ||
| f"AND i.scope_id = '{scope_name}' " | ||
| f"AND i.keyspace_id = '{collection_name}'" | ||
| ) | ||
| result = await self._cluster.query(query) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This query is constructed using an f-string, which makes it vulnerable to SQL injection. The bucket_name, scope_name, and collection_name can originate from user input via the new refresh_collection_schema tool. Please use parameterized queries to prevent this vulnerability.
query = (
"SELECT meta().id, i.name, i.index_key, i.metadata.definition "
"FROM system:indexes as i "
"WHERE i.bucket_id = $bucket_name "
"AND i.scope_id = $scope_name "
"AND i.keyspace_id = $collection_name"
)
result = await self._cluster.query(
query, bucket_name=bucket_name, scope_name=scope_name, collection_name=collection_name
)| last_infer_time=datetime.utcnow().isoformat(), | ||
| document_count=result.document_count, | ||
| ) | ||
| store.set_collection_metadata(new_metadata) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Calling store.set_collection_metadata() inside this loop is inefficient. This method saves the entire catalog state to disk on every call. For a large number of collections needing refresh, this will cause significant and unnecessary disk I/O. It would be better to collect all new metadata and then update the store once outside the loop.
| Prevents duplicate jobs for the same collection path. | ||
| If a job with the same path exists and the new job has higher priority, | ||
| the new job will be added (old one will be skipped when dequeued). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docstring for enqueue states that an existing job can be replaced by a new one with higher priority. However, the implementation simply skips adding the job if one is already pending, regardless of priority. This discrepancy should be resolved. Given that updating items in a priority queue is complex, I recommend updating the docstring to reflect the actual behavior of skipping duplicates.
| async def _get_document_count(bucket: AsyncBucket, scope_name: str, collection_name: str) -> int: | ||
| """Get the document count for a collection.""" | ||
| try: | ||
| scope = bucket.scope(name=scope_name) | ||
| count_query = f"SELECT RAW COUNT(*) FROM `{collection_name}`" | ||
| count_result = scope.query(count_query) | ||
| async for row in count_result: | ||
| return row | ||
| return 0 | ||
| except Exception as e: | ||
| logger.warning(f"Error getting document count for {scope_name}.{collection_name}: {e}") | ||
| return 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
…log system enhancements cleanup.
Summary
This PR implements 4 enhancements to the catalog system as documented in CATALOG.md:
Additionally adds 4 new MCP tools for catalog interaction.
Changes
New Files
src/catalog/events/bridge.pysrc/catalog/jobs/executor.pysrc/catalog/jobs/queue.pysrc/tools/catalog.pydocs/CATALOG_ENHANCEMENTS.mdModified Files
src/catalog/store/store.pysrc/catalog/worker.pysrc/catalog/enrichment/catalog_enrichment.pysrc/tools/__init__.pyNew MCP Tools
get_catalog_status- Returns catalog system statusget_collection_schema_from_catalog- Get cached schema without running INFERrefresh_collection_schema- Queue high-priority refresh for a collectionget_enriched_database_context- Get LLM-enriched database contextTest plan