The feature, motivation and pitch
Problem Description
Currently, Conductor relies on event-driven indexing. However, in several production scenarios, the search index can become out-of-sync with the primary database (Postgres):
Data Loss/Corruption: Partial or total data loss in Elasticsearch/OpenSearch.
Schema/Configuration Changes: When changing index mappings or upgrading search engine versions.
Missing Events: Occasional failures in the indexing pipeline that leave gaps in the search results.
There is currently no native mechanism to "replay" or bulk-sync historical data from the primary ExecutionDAO (Postgres) to the IndexDAO.
Proposed Feature
I propose adding an administrative tool or API that can:
Scan & Repair: Iterate through workflow and task records in Postgres.
Bulk Re-index: Push missing or outdated records to the indexing layer (ES/OpenSearch).
Filtering: Support re-indexing by specific time ranges or workflow types to avoid full-cluster load.
Implementation Plan (Draft)
Extend the ExecutionDAO to support a paginated stream of historical records.
Implement a service/task (perhaps a specialized System Task or an Admin API endpoint) that calls the IndexDAO for each record.
Ensure the process is throttled to prevent performance degradation of the live system.
Contribution
I am very interested in this feature and would like to participate in the development. I can help with the design, implementation, and testing of the synchronization logic. Please let me know if this aligns with the project roadmap, and I’ll be happy to start a PR!
Alternatives
No response
Additional context
No response
The feature, motivation and pitch
Problem Description
Currently, Conductor relies on event-driven indexing. However, in several production scenarios, the search index can become out-of-sync with the primary database (Postgres):
Data Loss/Corruption: Partial or total data loss in Elasticsearch/OpenSearch.
Schema/Configuration Changes: When changing index mappings or upgrading search engine versions.
Missing Events: Occasional failures in the indexing pipeline that leave gaps in the search results.
There is currently no native mechanism to "replay" or bulk-sync historical data from the primary ExecutionDAO (Postgres) to the IndexDAO.
Proposed Feature
I propose adding an administrative tool or API that can:
Scan & Repair: Iterate through workflow and task records in Postgres.
Bulk Re-index: Push missing or outdated records to the indexing layer (ES/OpenSearch).
Filtering: Support re-indexing by specific time ranges or workflow types to avoid full-cluster load.
Implementation Plan (Draft)
Extend the ExecutionDAO to support a paginated stream of historical records.
Implement a service/task (perhaps a specialized System Task or an Admin API endpoint) that calls the IndexDAO for each record.
Ensure the process is throttled to prevent performance degradation of the live system.
Contribution
I am very interested in this feature and would like to participate in the development. I can help with the design, implementation, and testing of the synchronization logic. Please let me know if this aligns with the project roadmap, and I’ll be happy to start a PR!
Alternatives
No response
Additional context
No response