diff --git a/core/src/main/java/com/netflix/conductor/dao/ExecutionDAO.java b/core/src/main/java/com/netflix/conductor/dao/ExecutionDAO.java index b94ffaf593..3b2cf0e4d7 100644 --- a/core/src/main/java/com/netflix/conductor/dao/ExecutionDAO.java +++ b/core/src/main/java/com/netflix/conductor/dao/ExecutionDAO.java @@ -195,6 +195,41 @@ List getWorkflowsByCorrelationId( */ boolean canSearchAcrossWorkflows(); + /** + * Get all workflow ids with pagination. Used for reindexing. + * + * @param offset start offset + * @param limit max number of ids to return + * @return list of workflow ids + */ + default List getAllWorkflowIds(int offset, int limit) { + throw new UnsupportedOperationException( + getClass().getSimpleName() + " does not support getAllWorkflowIds"); + } + + /** + * Get the total count of workflows. Used for reindex progress tracking. + * + * @return total number of workflows + */ + default long getWorkflowCount() { + throw new UnsupportedOperationException( + getClass().getSimpleName() + " does not support getWorkflowCount"); + } + + /** + * Return workflow ids whose workflow_id is strictly greater than {@code cursor}, ordered by + * workflow_id, up to {@code limit} entries. Pass an empty string as the initial cursor. + * + * @param cursor exclusive lower bound (empty string to start from the beginning) + * @param limit max number of ids to return + * @return list of workflow ids + */ + default List getAllWorkflowIdsAfter(String cursor, int limit) { + throw new UnsupportedOperationException( + getClass().getSimpleName() + " does not support getAllWorkflowIdsAfter"); + } + // Events /** diff --git a/core/src/main/java/com/netflix/conductor/dao/IndexDAO.java b/core/src/main/java/com/netflix/conductor/dao/IndexDAO.java index 18c3d82dd7..5a2c32439c 100644 --- a/core/src/main/java/com/netflix/conductor/dao/IndexDAO.java +++ b/core/src/main/java/com/netflix/conductor/dao/IndexDAO.java @@ -247,4 +247,18 @@ CompletableFuture asyncUpdateTask( * @return Number of matches for the query */ long getWorkflowCount(String query, String freeText); + + /** + * Report whether the underlying index backend is healthy enough to accept a bulk rebuild. + * + *

Implementations should surface conditions that would cause bulk ingest to fail or worsen + * (e.g. cluster status not green, disk watermarks exceeded). Non-clustered backends that cannot + * fail in this way should return true. Default returns true so existing implementations remain + * compatible until they explicitly opt in. + * + * @return true if the backend is ready for heavy write load, false otherwise + */ + default boolean isClusterHealthy() { + return true; + } } diff --git a/core/src/main/java/com/netflix/conductor/service/AdminService.java b/core/src/main/java/com/netflix/conductor/service/AdminService.java index 973a3edc86..61d8eef75f 100644 --- a/core/src/main/java/com/netflix/conductor/service/AdminService.java +++ b/core/src/main/java/com/netflix/conductor/service/AdminService.java @@ -69,4 +69,20 @@ boolean verifyAndRepairWorkflowConsistency( * @return map of event queues */ Map getEventQueues(boolean verbose); + + /** + * Start an async reindex job. Returns immediately. Use getReindexStatus() to track progress. + * + * @param force if true, skip the pre-flight cluster health check. Use only when you know what + * you are doing — a non-green cluster can be driven into a red state by bulk writes. + * @return initial status map + */ + Map startReindex(boolean force); + + /** + * Get the current reindex job status and progress. + * + * @return status map with state, processed, errors, total fields + */ + Map getReindexStatus(); } diff --git a/core/src/main/java/com/netflix/conductor/service/AdminServiceImpl.java b/core/src/main/java/com/netflix/conductor/service/AdminServiceImpl.java index 8c11dd94bf..5d2c544bb7 100644 --- a/core/src/main/java/com/netflix/conductor/service/AdminServiceImpl.java +++ b/core/src/main/java/com/netflix/conductor/service/AdminServiceImpl.java @@ -17,41 +17,91 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.info.BuildProperties; import org.springframework.stereotype.Service; import com.netflix.conductor.annotations.Audit; import com.netflix.conductor.annotations.Trace; import com.netflix.conductor.common.metadata.tasks.Task; +import com.netflix.conductor.common.run.TaskSummary; +import com.netflix.conductor.common.run.WorkflowSummary; import com.netflix.conductor.core.config.ConductorProperties; import com.netflix.conductor.core.events.EventQueueManager; import com.netflix.conductor.core.reconciliation.WorkflowRepairService; import com.netflix.conductor.core.utils.Utils; +import com.netflix.conductor.dao.ExecutionDAO; +import com.netflix.conductor.dao.IndexDAO; import com.netflix.conductor.dao.QueueDAO; +import com.netflix.conductor.model.TaskModel; +import com.netflix.conductor.model.WorkflowModel; + +import com.google.common.util.concurrent.RateLimiter; @Audit @Trace @Service public class AdminServiceImpl implements AdminService { + private static final Logger LOGGER = LoggerFactory.getLogger(AdminServiceImpl.class); + + // ---- reindex state ---- + private enum ReindexState { + IDLE, + RUNNING, + COMPLETED, + FAILED, + PREFLIGHT_FAILED + } + + private final AtomicReference reindexState = + new AtomicReference<>(ReindexState.IDLE); + private final AtomicLong reindexProcessed = new AtomicLong(0); + private final AtomicLong reindexErrors = new AtomicLong(0); + private final AtomicLong reindexTotal = new AtomicLong(0); + private volatile String reindexMessage = ""; + private volatile RateLimiter reindexRateLimiter = null; + private final ExecutorService reindexExecutor = + Executors.newSingleThreadExecutor( + r -> { + Thread t = new Thread(r, "reindex-worker"); + t.setDaemon(true); + return t; + }); + private final ConductorProperties properties; private final ExecutionService executionService; private final QueueDAO queueDAO; + private final ExecutionDAO executionDAO; + private final IndexDAO indexDAO; private final WorkflowRepairService workflowRepairService; private final EventQueueManager eventQueueManager; private final BuildProperties buildProperties; + @Value("${conductor.reindex.rateLimitPerSecond:0}") + private double reindexRateLimitPerSecond; + public AdminServiceImpl( ConductorProperties properties, ExecutionService executionService, QueueDAO queueDAO, + ExecutionDAO executionDAO, + IndexDAO indexDAO, Optional workflowRepairService, Optional eventQueueManager, Optional buildProperties) { this.properties = properties; this.executionService = executionService; this.queueDAO = queueDAO; + this.executionDAO = executionDAO; + this.indexDAO = indexDAO; this.workflowRepairService = workflowRepairService.orElse(null); this.eventQueueManager = eventQueueManager.orElse(null); this.buildProperties = buildProperties.orElse(null); @@ -135,4 +185,147 @@ public String requeueSweep(String workflowId) { } return (verbose ? eventQueueManager.getQueueSizes() : eventQueueManager.getQueues()); } + + private static final String REINDEX_WARNING = + "WARNING: bulk-writing every workflow and task back to the search index can saturate " + + "ingest throughput, breach disk watermarks, and drive the cluster into a red " + + "state. Verify cluster health (GET /_cluster/health) and available disk " + + "space before using this endpoint. This is a voluntary call — the operator " + + "is responsible for knowing it is safe to run."; + + @Override + public Map startReindex(boolean force) { + // Fail fast if the DAO doesn't support reindexing + try { + executionDAO.getWorkflowCount(); + executionDAO.getAllWorkflowIdsAfter("", 1); + } catch (UnsupportedOperationException e) { + Map result = new HashMap<>(); + result.put("state", "UNSUPPORTED"); + result.put( + "message", + "Reindex is not supported by the current persistence backend (" + + executionDAO.getClass().getSimpleName() + + ")"); + return result; + } + + // Pre-flight: refuse to start on an unhealthy index cluster unless explicitly forced. + // Bulk writes to a yellow/red cluster are the fastest way to make the problem worse. + if (!force && !indexDAO.isClusterHealthy()) { + reindexState.set(ReindexState.PREFLIGHT_FAILED); + reindexMessage = + "Pre-flight failed: index cluster is not green. Fix the cluster first, or " + + "re-run with ?force=true if you understand the risk."; + Map result = new HashMap<>(); + result.put("state", "PREFLIGHT_FAILED"); + result.put("message", reindexMessage); + result.put("warning", REINDEX_WARNING); + return result; + } + + if (!reindexState.compareAndSet(ReindexState.IDLE, ReindexState.RUNNING) + && !reindexState.compareAndSet(ReindexState.COMPLETED, ReindexState.RUNNING) + && !reindexState.compareAndSet(ReindexState.FAILED, ReindexState.RUNNING) + && !reindexState.compareAndSet( + ReindexState.PREFLIGHT_FAILED, ReindexState.RUNNING)) { + Map result = new HashMap<>(); + result.put("state", "ALREADY_RUNNING"); + result.put("message", "A reindex job is already in progress"); + return result; + } + + // Reset counters + reindexProcessed.set(0); + reindexErrors.set(0); + reindexTotal.set(0); + reindexMessage = "Starting..."; + reindexRateLimiter = + reindexRateLimitPerSecond > 0 + ? RateLimiter.create(reindexRateLimitPerSecond) + : null; + + reindexExecutor.submit(this::doReindex); + + Map result = new HashMap<>(); + result.put("state", "STARTED"); + result.put( + "message", + "Reindex job started. Use GET /api/admin/reindex/status to track progress."); + result.put("warning", REINDEX_WARNING); + if (force) { + result.put("forced", true); + } + return result; + } + + @Override + public Map getReindexStatus() { + Map result = new HashMap<>(); + result.put("state", reindexState.get().name()); + result.put("processed", reindexProcessed.get()); + result.put("errors", reindexErrors.get()); + result.put("total", reindexTotal.get()); + result.put("message", reindexMessage); + return result; + } + + private void doReindex() { + LOGGER.info("Reindex job started"); + int batchSize = 100; + String cursor = ""; + + try { + // Count total using lightweight COUNT query (no heap allocation) + long total = executionDAO.getWorkflowCount(); + reindexTotal.set(total); + reindexMessage = "Indexing 0 / " + total; + LOGGER.info("Reindex: {} workflows to process", total); + + while (true) { + List workflowIds = executionDAO.getAllWorkflowIdsAfter(cursor, batchSize); + if (workflowIds.isEmpty()) { + break; + } + cursor = workflowIds.get(workflowIds.size() - 1); + + for (String workflowId : workflowIds) { + if (reindexRateLimiter != null) { + reindexRateLimiter.acquire(); + } + try { + WorkflowModel wfModel = executionDAO.getWorkflow(workflowId, true); + if (wfModel == null) { + LOGGER.warn("Workflow {} not found, skipping", workflowId); + reindexErrors.incrementAndGet(); + continue; + } + indexDAO.indexWorkflow(new WorkflowSummary(wfModel.toWorkflow())); + for (TaskModel task : wfModel.getTasks()) { + indexDAO.indexTask(new TaskSummary(task.toTask())); + } + long done = reindexProcessed.incrementAndGet(); + reindexMessage = "Indexing " + done + " / " + reindexTotal.get(); + } catch (Exception e) { + reindexErrors.incrementAndGet(); + LOGGER.error("Failed to reindex workflow {}", workflowId, e); + } + } + + LOGGER.info("Reindex progress: {}/{}", reindexProcessed.get(), reindexTotal.get()); + } + + reindexMessage = + "Completed. processed=" + + reindexProcessed.get() + + ", errors=" + + reindexErrors.get(); + reindexState.set(ReindexState.COMPLETED); + LOGGER.info("Reindex job completed. {}", reindexMessage); + } catch (Exception e) { + reindexMessage = "Failed: " + e.getMessage(); + reindexState.set(ReindexState.FAILED); + LOGGER.error("Reindex job failed", e); + } + } } diff --git a/core/src/test/java/com/netflix/conductor/service/AdminServiceImplReindexTest.java b/core/src/test/java/com/netflix/conductor/service/AdminServiceImplReindexTest.java new file mode 100644 index 0000000000..69d507e0ea --- /dev/null +++ b/core/src/test/java/com/netflix/conductor/service/AdminServiceImplReindexTest.java @@ -0,0 +1,273 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.service; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.springframework.test.util.ReflectionTestUtils; + +import com.netflix.conductor.common.metadata.workflow.WorkflowDef; +import com.netflix.conductor.common.run.Workflow; +import com.netflix.conductor.core.config.ConductorProperties; +import com.netflix.conductor.dao.ExecutionDAO; +import com.netflix.conductor.dao.IndexDAO; +import com.netflix.conductor.dao.QueueDAO; +import com.netflix.conductor.model.WorkflowModel; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class AdminServiceImplReindexTest { + + @Mock private ConductorProperties properties; + @Mock private ExecutionService executionService; + @Mock private QueueDAO queueDAO; + @Mock private ExecutionDAO executionDAO; + @Mock private IndexDAO indexDAO; + + private AdminServiceImpl adminService; + + @Before + public void setUp() { + adminService = + new AdminServiceImpl( + properties, + executionService, + queueDAO, + executionDAO, + indexDAO, + Optional.empty(), + Optional.empty(), + Optional.empty()); + // default to healthy cluster so existing tests don't need to stub it + lenient().when(indexDAO.isClusterHealthy()).thenReturn(true); + } + + private void waitForReindex() throws InterruptedException { + for (int i = 0; i < 50; i++) { + String state = (String) adminService.getReindexStatus().get("state"); + if ("COMPLETED".equals(state) || "FAILED".equals(state)) return; + Thread.sleep(100); + } + throw new AssertionError("Reindex did not complete within 5 seconds"); + } + + private WorkflowModel stubWorkflow() { + WorkflowModel wf = mock(WorkflowModel.class); + WorkflowDef def = new WorkflowDef(); + def.setName("test-workflow"); + Workflow workflow = new Workflow(); + workflow.setWorkflowDefinition(def); + when(wf.toWorkflow()).thenReturn(workflow); + when(wf.getTasks()).thenReturn(Collections.emptyList()); + return wf; + } + + @Test + public void testIdleToCompleted() throws Exception { + WorkflowModel wf = stubWorkflow(); + when(executionDAO.getWorkflowCount()).thenReturn(2L); + when(executionDAO.getAllWorkflowIdsAfter("", 100)).thenReturn(Arrays.asList("wf1", "wf2")); + when(executionDAO.getAllWorkflowIdsAfter("wf2", 100)).thenReturn(Collections.emptyList()); + when(executionDAO.getWorkflow("wf1", true)).thenReturn(wf); + when(executionDAO.getWorkflow("wf2", true)).thenReturn(wf); + + Map result = adminService.startReindex(false); + assertEquals("STARTED", result.get("state")); + + waitForReindex(); + + Map status = adminService.getReindexStatus(); + assertEquals("COMPLETED", status.get("state")); + assertEquals(2L, status.get("processed")); + assertEquals(0L, status.get("errors")); + verify(indexDAO, times(2)).indexWorkflow(any()); + } + + @Test + public void testCompletedToRunningResetsCounters() throws Exception { + WorkflowModel wf = stubWorkflow(); + when(executionDAO.getWorkflowCount()).thenReturn(1L); + when(executionDAO.getAllWorkflowIdsAfter("", 100)) + .thenReturn(Collections.singletonList("wf1")); + when(executionDAO.getAllWorkflowIdsAfter("wf1", 100)).thenReturn(Collections.emptyList()); + when(executionDAO.getWorkflow("wf1", true)).thenReturn(wf); + + adminService.startReindex(false); + waitForReindex(); + assertEquals("COMPLETED", adminService.getReindexStatus().get("state")); + + Map result = adminService.startReindex(false); + assertEquals("STARTED", result.get("state")); + waitForReindex(); + + Map status = adminService.getReindexStatus(); + assertEquals("COMPLETED", status.get("state")); + assertEquals(1L, status.get("processed")); + assertEquals(0L, status.get("errors")); + } + + @Test + public void testDoublePostReturnsAlreadyRunning() { + when(executionDAO.getWorkflowCount()).thenReturn(0L); + // fail-fast probe in startReindex (limit=1) returns immediately; + // the actual paging call (limit=100) blocks long enough to keep state RUNNING + when(executionDAO.getAllWorkflowIdsAfter("", 1)).thenReturn(Collections.emptyList()); + when(executionDAO.getAllWorkflowIdsAfter("", 100)) + .thenAnswer( + inv -> { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return Collections.emptyList(); + }); + + adminService.startReindex(false); + + Map result = adminService.startReindex(false); + assertEquals("ALREADY_RUNNING", result.get("state")); + } + + @Test + public void testUnsupportedBackend() { + when(executionDAO.getWorkflowCount()) + .thenThrow(new UnsupportedOperationException("not supported")); + + Map result = adminService.startReindex(false); + assertEquals("UNSUPPORTED", result.get("state")); + } + + @Test + public void testKeysetCursorAdvancesCorrectly() throws Exception { + // batch1: 100 items wf000..wf099, batch2: empty — verifies cursor = last id of batch1 + List batch1 = new ArrayList<>(); + for (int i = 0; i < 100; i++) batch1.add(String.format("wf%03d", i)); + String lastId = batch1.get(batch1.size() - 1); // "wf099" + WorkflowModel wf = stubWorkflow(); + + when(executionDAO.getWorkflowCount()).thenReturn(100L); + when(executionDAO.getAllWorkflowIdsAfter("", 100)).thenReturn(batch1); + when(executionDAO.getAllWorkflowIdsAfter(lastId, 100)).thenReturn(Collections.emptyList()); + when(executionDAO.getWorkflow(anyString(), eq(true))).thenReturn(wf); + + adminService.startReindex(false); + waitForReindex(); + + verify(executionDAO).getAllWorkflowIdsAfter("", 100); + verify(executionDAO).getAllWorkflowIdsAfter(lastId, 100); + verify(indexDAO, times(100)).indexWorkflow(any()); + } + + @Test + public void testRateLimiterNotCreatedWhenRateIsZero() throws Exception { + // default rate is 0 — no limiter, reindexRateLimiter field stays null + WorkflowModel wf = stubWorkflow(); + when(executionDAO.getWorkflowCount()).thenReturn(1L); + when(executionDAO.getAllWorkflowIdsAfter("", 100)) + .thenReturn(Collections.singletonList("wf1")); + when(executionDAO.getAllWorkflowIdsAfter("wf1", 100)).thenReturn(Collections.emptyList()); + when(executionDAO.getWorkflow("wf1", true)).thenReturn(wf); + + adminService.startReindex(false); + waitForReindex(); + + assertEquals(null, ReflectionTestUtils.getField(adminService, "reindexRateLimiter")); + } + + @Test + public void testRateLimiterCreatedWhenRateIsPositive() throws Exception { + ReflectionTestUtils.setField(adminService, "reindexRateLimitPerSecond", 50.0); + WorkflowModel wf = stubWorkflow(); + when(executionDAO.getWorkflowCount()).thenReturn(1L); + when(executionDAO.getAllWorkflowIdsAfter("", 100)) + .thenReturn(Collections.singletonList("wf1")); + when(executionDAO.getAllWorkflowIdsAfter("wf1", 100)).thenReturn(Collections.emptyList()); + when(executionDAO.getWorkflow("wf1", true)).thenReturn(wf); + + adminService.startReindex(false); + waitForReindex(); + + Object limiter = ReflectionTestUtils.getField(adminService, "reindexRateLimiter"); + assertEquals("COMPLETED", adminService.getReindexStatus().get("state")); + // limiter was created (non-null) and job still completed successfully + org.junit.Assert.assertNotNull(limiter); + } + + @Test + public void testPreflightFailedWhenClusterUnhealthy() { + when(executionDAO.getWorkflowCount()).thenReturn(5L); + when(executionDAO.getAllWorkflowIdsAfter("", 1)).thenReturn(Collections.emptyList()); + when(indexDAO.isClusterHealthy()).thenReturn(false); + + Map result = adminService.startReindex(false); + assertEquals("PREFLIGHT_FAILED", result.get("state")); + // background thread must not have been submitted + assertEquals("PREFLIGHT_FAILED", adminService.getReindexStatus().get("state")); + org.junit.Assert.assertNotNull(result.get("warning")); + } + + @Test + public void testForceBypassesPreflight() throws Exception { + WorkflowModel wf = stubWorkflow(); + when(executionDAO.getWorkflowCount()).thenReturn(1L); + when(executionDAO.getAllWorkflowIdsAfter("", 1)).thenReturn(Collections.emptyList()); + when(executionDAO.getAllWorkflowIdsAfter("", 100)) + .thenReturn(Collections.singletonList("wf1")); + when(executionDAO.getAllWorkflowIdsAfter("wf1", 100)).thenReturn(Collections.emptyList()); + when(executionDAO.getWorkflow("wf1", true)).thenReturn(wf); + // force=true must bypass the health check entirely; this stub should NOT be invoked + lenient().when(indexDAO.isClusterHealthy()).thenReturn(false); + + Map result = adminService.startReindex(true); + assertEquals("STARTED", result.get("state")); + assertEquals(Boolean.TRUE, result.get("forced")); + waitForReindex(); + assertEquals("COMPLETED", adminService.getReindexStatus().get("state")); + } + + @Test + public void testStartedResponseIncludesWarning() throws Exception { + WorkflowModel wf = stubWorkflow(); + when(executionDAO.getWorkflowCount()).thenReturn(1L); + when(executionDAO.getAllWorkflowIdsAfter("", 1)).thenReturn(Collections.emptyList()); + when(executionDAO.getAllWorkflowIdsAfter("", 100)) + .thenReturn(Collections.singletonList("wf1")); + when(executionDAO.getAllWorkflowIdsAfter("wf1", 100)).thenReturn(Collections.emptyList()); + when(executionDAO.getWorkflow("wf1", true)).thenReturn(wf); + + Map result = adminService.startReindex(false); + assertEquals("STARTED", result.get("state")); + org.junit.Assert.assertNotNull(result.get("warning")); + waitForReindex(); + } +} diff --git a/docs/documentation/configuration/reindex-en.md b/docs/documentation/configuration/reindex-en.md new file mode 100644 index 0000000000..2ec30a0abd --- /dev/null +++ b/docs/documentation/configuration/reindex-en.md @@ -0,0 +1,141 @@ +# ES Index Rebuild (Reindex) + +When Elasticsearch data is lost but the primary database (Postgres / MySQL) remains intact, this API can be used to re-index all workflows and tasks into ES. + +> ⚠️ **This will blow your arm off if you are not careful.** Bulk-writing every workflow and task back to the search cluster can saturate ingest throughput, breach disk watermarks, and cause the cluster to start rejecting requests or enter a red state on any non-trivial deployment. +> +> **Before you call this endpoint:** +> 1. Verify cluster health — `GET /_cluster/health` must report `status: green`; +> 2. Confirm no node is above the disk high-watermark; +> 3. Make sure the live workload can absorb the extra write pressure. +> +> The fact that this is a voluntary on-demand call is the only thing that makes it safe to offer — the operator needs to know what they are doing before calling it. The server performs a pre-flight health check and refuses to start if the cluster is not green. `?force=true` overrides the check, but **do not use this on production without a very good reason**. + +## Applicable Scenarios + +| Index Backend | Useful? | +|---|---| +| `elasticsearch` / `opensearch2` / `opensearch3` | ✅ Most common scenario — ES is an external service and prone to data loss | +| `postgres` | ⚠️ Limited value — index and data share the same database, typically lost or preserved together | +| Disabled (`indexing.enabled=false`) | ❌ No-op | + +--- + +## API + +### Start Reindex + +``` +POST /api/admin/reindex +POST /api/admin/reindex?force=true # bypass the cluster-health pre-flight (use with extreme care) +``` + +**Returns immediately**; the job runs in the background on a single thread. The server runs a `GET /_cluster/health` pre-flight first and refuses to start if the cluster is not `green` and `force=true` was not supplied. + +Response example: +```json +{ + "state": "STARTED", + "message": "Reindex job started. Use GET /api/admin/reindex/status to track progress.", + "warning": "WARNING: bulk-writing every workflow and task back to the search index can saturate ingest throughput, breach disk watermarks, and drive the cluster into a red state. Verify cluster health (GET /_cluster/health) and available disk space before using this endpoint. This is a voluntary call — the operator is responsible for knowing it is safe to run." +} +``` + +Pre-flight failure (cluster not green and `force` was not set): +```json +{ + "state": "PREFLIGHT_FAILED", + "message": "Pre-flight failed: index cluster is not green. Fix the cluster first, or re-run with ?force=true if you understand the risk.", + "warning": "..." +} +``` + +If a job is already running: +```json +{ + "state": "ALREADY_RUNNING", + "message": "A reindex job is already in progress" +} +``` + +--- + +### Check Progress + +``` +GET /api/admin/reindex/status +``` + +Response example: +```json +{ + "state": "RUNNING", + "processed": 350, + "errors": 0, + "total": 1240, + "message": "Indexing 350 / 1240" +} +``` + +**`state` values:** + +| state | Description | +|---|---| +| `IDLE` | No reindex has been run since the server started | +| `RUNNING` | Job is currently executing in the background | +| `COMPLETED` | All items have been processed | +| `FAILED` | An uncaught exception occurred and the job was aborted | +| `PREFLIGHT_FAILED` | The last call was rejected because the index cluster was not green | +| `ALREADY_RUNNING` | Only appears in POST responses, indicating a duplicate submission was ignored | + +Completed response example: +```json +{ + "state": "COMPLETED", + "processed": 1240, + "errors": 2, + "total": 1240, + "message": "Completed. processed=1240, errors=2" +} +``` + +--- + +## Procedure + +```bash +# 1. Start reindex +curl -X POST http://localhost:8081/api/admin/reindex + +# 2. Poll progress (every few seconds) +watch -n 5 'curl -s http://localhost:8081/api/admin/reindex/status | jq .' + +# 3. Verify ES index document counts +curl http://localhost:9200/conductor_workflow/_count +curl http://localhost:9200/conductor_task/_count + +# 4. Verify search API is working again +curl "http://localhost:8081/api/workflow/search?query=*" +``` + +--- + +## Notes + +- **Idempotent**: ES uses `workflow_id` as the document ID, so repeated runs overwrite existing documents without creating duplicates. +- **Retriable**: After a `COMPLETED` or `FAILED` state, sending another `POST /reindex` will automatically reset and restart the job. +- **Non-disruptive**: Reindex only reads from the database and writes to ES — it does not modify any business data. +- **Non-zero errors**: Check the conductor-server logs and search for `Failed to reindex workflow` to identify the root cause. + +--- + +## Implementation Details + +- Uses a single-thread `ExecutorService` (daemon thread) in the background, without blocking HTTP requests. +- Progress counters use `AtomicInteger` and state uses `AtomicReference` for thread safety. +- Processes 100 items per batch, logging an INFO message after each batch. +- Related files: + - `core/.../service/AdminService.java` — interface definition + - `core/.../service/AdminServiceImpl.java` — async implementation + - `rest/.../controllers/AdminResource.java` — HTTP endpoints + - `postgres-persistence/.../dao/PostgresExecutionDAO.java` — `getAllWorkflowIds()` paginated query diff --git a/docs/documentation/configuration/reindex.md b/docs/documentation/configuration/reindex.md new file mode 100644 index 0000000000..202cf85a12 --- /dev/null +++ b/docs/documentation/configuration/reindex.md @@ -0,0 +1,141 @@ +# ES 索引重建(Reindex) + +当 Elasticsearch 数据丢失但主数据库(Postgres / MySQL)完好时,可通过此接口将所有 workflow 和 task 重新写入 ES 索引。 + +> ⚠️ **高风险操作。** 本接口会将数据库里的每一条 workflow 和 task 批量写回 ES/OS。对规模较大的部署,这足以打爆 ingest 吞吐、触发 disk watermark、让集群开始拒绝写入,甚至进入 red state。 +> +> **调用前请先确认:** +> 1. 集群健康:`GET /_cluster/health` 返回 `status: green`; +> 2. 节点磁盘未超过 high watermark; +> 3. 业务端可以承受后台批量写入带来的额外负载。 +> +> 这个接口是"自愿主动调用"的 — 这也是它能被安全提供的唯一前提。调用方需要自己清楚自己在做什么。服务端内置了 pre-flight 健康检查,会在集群非 green 时拒绝执行;使用 `?force=true` 可以绕过该检查,但**不建议在生产上这么做**。 + +## 适用场景 + +| 索引后端 | 是否有意义 | +|---|---| +| `elasticsearch` / `opensearch2` / `opensearch3` | ✅ 最常见场景,ES 是独立服务,容易丢数据 | +| `postgres` | ⚠️ 意义不大,索引和数据同库,通常一起丢或一起在 | +| 已禁用(`indexing.enabled=false`)| ❌ 空操作 | + +--- + +## 接口 + +### 启动重建 + +``` +POST /api/admin/reindex +POST /api/admin/reindex?force=true # 集群非 green 时强制执行(请谨慎) +``` + +**立即返回**,任务在后台单线程执行。调用前服务端会先做一次 `GET /_cluster/health` 的 pre-flight,非 green 且未 `force` 则拒绝启动。 + +响应示例: +```json +{ + "state": "STARTED", + "message": "Reindex job started. Use GET /api/admin/reindex/status to track progress.", + "warning": "WARNING: bulk-writing every workflow and task back to the search index can saturate ingest throughput, breach disk watermarks, and drive the cluster into a red state. Verify cluster health (GET /_cluster/health) and available disk space before using this endpoint. This is a voluntary call — the operator is responsible for knowing it is safe to run." +} +``` + +Pre-flight 失败(集群非 green 且未 `force`): +```json +{ + "state": "PREFLIGHT_FAILED", + "message": "Pre-flight failed: index cluster is not green. Fix the cluster first, or re-run with ?force=true if you understand the risk.", + "warning": "..." +} +``` + +若已有任务正在执行: +```json +{ + "state": "ALREADY_RUNNING", + "message": "A reindex job is already in progress" +} +``` + +--- + +### 查询进度 + +``` +GET /api/admin/reindex/status +``` + +响应示例: +```json +{ + "state": "RUNNING", + "processed": 350, + "errors": 0, + "total": 1240, + "message": "Indexing 350 / 1240" +} +``` + +**`state` 取值说明:** + +| state | 含义 | +|---|---| +| `IDLE` | 服务启动后从未执行过 reindex | +| `RUNNING` | 正在后台执行 | +| `COMPLETED` | 全部完成 | +| `FAILED` | 发生未捕获异常,任务中止 | +| `PREFLIGHT_FAILED` | 上次调用时集群非 green,已拒绝启动 | +| `ALREADY_RUNNING` | 仅出现在 POST 响应中,表示重复提交被忽略 | + +完成后响应示例: +```json +{ + "state": "COMPLETED", + "processed": 1240, + "errors": 2, + "total": 1240, + "message": "Completed. processed=1240, errors=2" +} +``` + +--- + +## 操作流程 + +```bash +# 1. 启动重建 +curl -X POST http://localhost:8081/api/admin/reindex + +# 2. 轮询进度(每隔几秒查一次) +watch -n 5 'curl -s http://localhost:8081/api/admin/reindex/status | jq .' + +# 3. 验证 ES 索引数据量 +curl http://localhost:9200/conductor_workflow/_count +curl http://localhost:9200/conductor_task/_count + +# 4. 验证搜索接口是否恢复 +curl "http://localhost:8081/api/workflow/search?query=*" +``` + +--- + +## 注意事项 + +- **幂等**:ES 以 `workflow_id` 作为文档 ID,重复执行会覆盖,不会产生重复数据。 +- **可重试**:`COMPLETED` / `FAILED` 状态后再次 `POST /reindex` 会自动重置并重新开始。 +- **不影响正在运行的 workflow**:reindex 只是读 DB 写 ES,不修改任何业务数据。 +- **errors 不为 0**:检查 conductor-server 日志,搜索 `Failed to reindex workflow` 定位具体原因。 + +--- + +## 实现说明 + +- 后台使用单线程 `ExecutorService`(daemon 线程),不阻塞 HTTP 请求。 +- 进度计数使用 `AtomicInteger`,状态使用 `AtomicReference`,线程安全。 +- 每批次 100 条,每批完成后输出一次 INFO 日志。 +- 涉及文件: + - `core/.../service/AdminService.java` — 接口定义 + - `core/.../service/AdminServiceImpl.java` — 异步实现 + - `rest/.../controllers/AdminResource.java` — HTTP 端点 + - `postgres-persistence/.../dao/PostgresExecutionDAO.java` — `getAllWorkflowIds()` 分页查询 diff --git a/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/index/ElasticSearchRestDAOV7.java b/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/index/ElasticSearchRestDAOV7.java index 5979cdcbb5..6826fa26ef 100644 --- a/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/index/ElasticSearchRestDAOV7.java +++ b/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/index/ElasticSearchRestDAOV7.java @@ -32,6 +32,8 @@ import org.apache.http.nio.entity.NStringEntity; import org.apache.http.util.EntityUtils; import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; @@ -45,6 +47,8 @@ import org.elasticsearch.client.*; import org.elasticsearch.client.core.CountRequest; import org.elasticsearch.client.core.CountResponse; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; @@ -1200,6 +1204,25 @@ public long getWorkflowCount(String query, String freeText) { } } + @Override + public boolean isClusterHealthy() { + try { + ClusterHealthRequest request = new ClusterHealthRequest(); + request.timeout(TimeValue.timeValueSeconds(5)); + ClusterHealthResponse response = + elasticSearchClient.cluster().health(request, RequestOptions.DEFAULT); + ClusterHealthStatus status = response.getStatus(); + if (status != ClusterHealthStatus.GREEN) { + logger.warn("Elasticsearch cluster health is {} (not GREEN)", status); + return false; + } + return true; + } catch (Exception e) { + logger.warn("Failed to query Elasticsearch cluster health", e); + return false; + } + } + private long getObjectCounts(String structuredQuery, String freeTextQuery, String docType) throws ParserException, IOException { QueryBuilder queryBuilder = boolQueryBuilder(structuredQuery, freeTextQuery); diff --git a/es8-persistence/src/main/java/org/conductoross/conductor/es8/dao/index/ElasticSearchRestDAOV8.java b/es8-persistence/src/main/java/org/conductoross/conductor/es8/dao/index/ElasticSearchRestDAOV8.java index f37645e089..280e659eae 100644 --- a/es8-persistence/src/main/java/org/conductoross/conductor/es8/dao/index/ElasticSearchRestDAOV8.java +++ b/es8-persistence/src/main/java/org/conductoross/conductor/es8/dao/index/ElasticSearchRestDAOV8.java @@ -1182,6 +1182,25 @@ public long getWorkflowCount(String query, String freeText) { } } + @Override + public boolean isClusterHealthy() { + try { + Request request = new Request(HttpMethod.GET, "/_cluster/health"); + request.addParameter("timeout", "5s"); + Response response = elasticSearchAdminClient.performRequest(request); + JsonNode body = objectMapper.readTree(response.getEntity().getContent()); + String status = body.path("status").asText("unknown"); + if (!"green".equalsIgnoreCase(status)) { + logger.warn("Elasticsearch cluster health is {} (not green)", status); + return false; + } + return true; + } catch (Exception e) { + logger.warn("Failed to query Elasticsearch cluster health", e); + return false; + } + } + private long getObjectCounts(String structuredQuery, String freeTextQuery, String docType) throws ParserException, IOException { return searchSupport.getObjectCounts(structuredQuery, freeTextQuery, docType); diff --git a/mysql-persistence/src/main/java/com/netflix/conductor/mysql/dao/MySQLExecutionDAO.java b/mysql-persistence/src/main/java/com/netflix/conductor/mysql/dao/MySQLExecutionDAO.java index 04939469d5..0916dd000d 100644 --- a/mysql-persistence/src/main/java/com/netflix/conductor/mysql/dao/MySQLExecutionDAO.java +++ b/mysql-persistence/src/main/java/com/netflix/conductor/mysql/dao/MySQLExecutionDAO.java @@ -338,6 +338,25 @@ public WorkflowModel getWorkflow(String workflowId, boolean includeTasks) { return workflow; } + @Override + public List getAllWorkflowIds(int offset, int limit) { + return queryWithTransaction( + "SELECT workflow_id FROM workflow ORDER BY workflow_id LIMIT ? OFFSET ?", + q -> q.addParameter(limit).addParameter(offset).executeScalarList(String.class)); + } + + @Override + public long getWorkflowCount() { + return queryWithTransaction("SELECT COUNT(*) FROM workflow", q -> q.executeCount()); + } + + @Override + public List getAllWorkflowIdsAfter(String cursor, int limit) { + return queryWithTransaction( + "SELECT workflow_id FROM workflow WHERE workflow_id > ? ORDER BY workflow_id LIMIT ?", + q -> q.addParameter(cursor).addParameter(limit).executeScalarList(String.class)); + } + /** * @param workflowName name of the workflow * @param version the workflow version diff --git a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresExecutionDAO.java b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresExecutionDAO.java index 597c3c7ff5..b69933cad0 100644 --- a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresExecutionDAO.java +++ b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresExecutionDAO.java @@ -374,6 +374,27 @@ public WorkflowModel getWorkflow(String workflowId, boolean includeTasks) { return workflow; } + @Override + public List getAllWorkflowIds(int offset, int limit) { + String GET_ALL_WORKFLOW_IDS = + "SELECT workflow_id FROM workflow ORDER BY workflow_id LIMIT ? OFFSET ?"; + return queryWithTransaction( + GET_ALL_WORKFLOW_IDS, + q -> q.addParameter(limit).addParameter(offset).executeScalarList(String.class)); + } + + @Override + public long getWorkflowCount() { + return queryWithTransaction("SELECT COUNT(*) FROM workflow", q -> q.executeCount()); + } + + @Override + public List getAllWorkflowIdsAfter(String cursor, int limit) { + return queryWithTransaction( + "SELECT workflow_id FROM workflow WHERE workflow_id > ? ORDER BY workflow_id LIMIT ?", + q -> q.addParameter(cursor).addParameter(limit).executeScalarList(String.class)); + } + /** * @param workflowName name of the workflow * @param version the workflow version diff --git a/rest/src/main/java/com/netflix/conductor/rest/controllers/AdminResource.java b/rest/src/main/java/com/netflix/conductor/rest/controllers/AdminResource.java index bb1c6a6bbd..2f317f6bbf 100644 --- a/rest/src/main/java/com/netflix/conductor/rest/controllers/AdminResource.java +++ b/rest/src/main/java/com/netflix/conductor/rest/controllers/AdminResource.java @@ -76,4 +76,20 @@ public String verifyAndRepairWorkflowConsistency( boolean verbose) { return adminService.getEventQueues(verbose); } + + @PostMapping("/reindex") + @Operation( + summary = + "Start async reindex of all workflows and tasks from database to search index") + public Map startReindex( + @RequestParam(value = "force", defaultValue = "false", required = false) + boolean force) { + return adminService.startReindex(force); + } + + @GetMapping("/reindex/status") + @Operation(summary = "Get the current reindex job status and progress") + public Map getReindexStatus() { + return adminService.getReindexStatus(); + } } diff --git a/sqlite-persistence/src/main/java/com/netflix/conductor/sqlite/dao/SqliteExecutionDAO.java b/sqlite-persistence/src/main/java/com/netflix/conductor/sqlite/dao/SqliteExecutionDAO.java index eb8d58c2aa..4f920aaf93 100644 --- a/sqlite-persistence/src/main/java/com/netflix/conductor/sqlite/dao/SqliteExecutionDAO.java +++ b/sqlite-persistence/src/main/java/com/netflix/conductor/sqlite/dao/SqliteExecutionDAO.java @@ -374,6 +374,25 @@ public WorkflowModel getWorkflow(String workflowId, boolean includeTasks) { return workflow; } + @Override + public List getAllWorkflowIds(int offset, int limit) { + return queryWithTransaction( + "SELECT workflow_id FROM workflow ORDER BY workflow_id LIMIT ? OFFSET ?", + q -> q.addParameter(limit).addParameter(offset).executeScalarList(String.class)); + } + + @Override + public long getWorkflowCount() { + return queryWithTransaction("SELECT COUNT(*) FROM workflow", q -> q.executeCount()); + } + + @Override + public List getAllWorkflowIdsAfter(String cursor, int limit) { + return queryWithTransaction( + "SELECT workflow_id FROM workflow WHERE workflow_id > ? ORDER BY workflow_id LIMIT ?", + q -> q.addParameter(cursor).addParameter(limit).executeScalarList(String.class)); + } + /** * @param workflowName name of the workflow * @param version the workflow version