Skip to content
Open
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
91c8eec
feat: Add POST /api/admin/reindex endpoint to rebuild ES indices from…
Li-Xingyu Mar 16, 2026
4fae1c8
refactor: Make reindex async with progress tracking endpoint
Li-Xingyu Mar 16, 2026
e82280e
docs: Add reindex API documentation
Li-Xingyu Mar 16, 2026
41bfc81
fix: Use workflow_id instead of id in getAllWorkflowIds query
Li-Xingyu Mar 16, 2026
78b1404
Merge branch 'main' into feature/admin-reindex
Li-Xingyu Mar 22, 2026
d8b8671
Merge branch 'main' into feature/admin-reindex
Li-Xingyu Mar 24, 2026
425df93
fix: Harden reindex - OOM prevention, multi-DB support, unsupported b…
Li-Xingyu Mar 25, 2026
072cde1
feat: Update Docker configuration and add reindex API documentation
Li-Xingyu Mar 25, 2026
08a11ef
Merge remote-tracking branch 'origin/feature/admin-reindex' into feat…
Li-Xingyu Mar 25, 2026
a8b97ed
Merge branch 'main' into feature/admin-reindex
Li-Xingyu Mar 25, 2026
6f4bbef
feat: Add reason for incompletion to mapping and update reindex API d…
Li-Xingyu Mar 25, 2026
5586042
Merge remote-tracking branch 'origin/feature/admin-reindex' into feat…
Li-Xingyu Mar 25, 2026
6d08b76
Merge branch 'main' into feature/admin-reindex
Li-Xingyu Mar 25, 2026
d2ad776
Merge branch 'main' into feature/admin-reindex
Li-Xingyu Apr 2, 2026
c95f83b
Merge branch 'main' into feature/admin-reindex
nthmost-orkes Apr 6, 2026
7be5167
Merge branch 'main' into feature/admin-reindex
nthmost-orkes Apr 7, 2026
708bde3
Merge branch 'main' into feature/admin-reindex
nthmost-orkes Apr 17, 2026
e360403
fix: correct MySQL ORDER BY column and add reindex unit tests
Li-Xingyu Apr 22, 2026
df2c991
perf: replace OFFSET pagination with keyset pagination in reindex
Li-Xingyu Apr 22, 2026
9e88dad
feat: add configurable rate limiting to reindex via Guava RateLimiter
Li-Xingyu Apr 22, 2026
fc687fa
Merge remote-tracking branch 'origin/feature/admin-reindex' into feat…
Li-Xingyu Apr 22, 2026
57d49f2
Merge branch 'main' into feature/admin-reindex
Li-Xingyu Apr 22, 2026
4620bb1
style: fix spotless import order violations in AdminServiceImpl
Li-Xingyu Apr 22, 2026
027ceaf
Merge branch 'main' into feature/admin-reindex
Li-Xingyu Apr 28, 2026
0f3b639
Merge branch 'main' into feature/admin-reindex
nthmost-orkes May 5, 2026
9042870
Merge branch 'main' into feature/admin-reindex
nthmost-orkes May 5, 2026
6c6fdc3
Merge branch 'main' into feature/admin-reindex
nthmost-orkes May 5, 2026
e71555b
Merge branch 'main' into feature/admin-reindex
nthmost-orkes May 5, 2026
e65981c
feat(reindex): add cluster-health pre-flight, force override, and dan…
Li-Xingyu May 6, 2026
095525b
Merge branch 'main' into feature/admin-reindex
nthmost-orkes May 7, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions core/src/main/java/com/netflix/conductor/dao/ExecutionDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,41 @@ List<WorkflowModel> getWorkflowsByCorrelationId(
*/
boolean canSearchAcrossWorkflows();

/**
* Get all workflow ids with pagination. Used for reindexing.
*
* @param offset start offset
* @param limit max number of ids to return
* @return list of workflow ids
*/
default List<String> getAllWorkflowIds(int offset, int limit) {
throw new UnsupportedOperationException(
getClass().getSimpleName() + " does not support getAllWorkflowIds");
}

/**
* Get the total count of workflows. Used for reindex progress tracking.
*
* @return total number of workflows
*/
default long getWorkflowCount() {
throw new UnsupportedOperationException(
getClass().getSimpleName() + " does not support getWorkflowCount");
}

/**
* Return workflow ids whose workflow_id is strictly greater than {@code cursor}, ordered by
* workflow_id, up to {@code limit} entries. Pass an empty string as the initial cursor.
*
* @param cursor exclusive lower bound (empty string to start from the beginning)
* @param limit max number of ids to return
* @return list of workflow ids
*/
default List<String> getAllWorkflowIdsAfter(String cursor, int limit) {
throw new UnsupportedOperationException(
getClass().getSimpleName() + " does not support getAllWorkflowIdsAfter");
}

// Events

/**
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/java/com/netflix/conductor/service/AdminService.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,18 @@ boolean verifyAndRepairWorkflowConsistency(
* @return map of event queues
*/
Map<String, ?> getEventQueues(boolean verbose);

/**
* Start an async reindex job. Returns immediately. Use getReindexStatus() to track progress.
*
* @return initial status map
*/
Map<String, Object> startReindex();

/**
* Get the current reindex job status and progress.
*
* @return status map with state, processed, errors, total fields
*/
Map<String, Object> getReindexStatus();
}
165 changes: 165 additions & 0 deletions core/src/main/java/com/netflix/conductor/service/AdminServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,41 +17,90 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.info.BuildProperties;
import org.springframework.stereotype.Service;

import com.netflix.conductor.annotations.Audit;
import com.netflix.conductor.annotations.Trace;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.run.TaskSummary;
import com.netflix.conductor.common.run.WorkflowSummary;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.events.EventQueueManager;
import com.netflix.conductor.core.reconciliation.WorkflowRepairService;
import com.netflix.conductor.core.utils.Utils;
import com.netflix.conductor.dao.ExecutionDAO;
import com.netflix.conductor.dao.IndexDAO;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;

import com.google.common.util.concurrent.RateLimiter;

@Audit
@Trace
@Service
public class AdminServiceImpl implements AdminService {

private static final Logger LOGGER = LoggerFactory.getLogger(AdminServiceImpl.class);

// ---- reindex state ----
private enum ReindexState {
IDLE,
RUNNING,
COMPLETED,
FAILED
}

private final AtomicReference<ReindexState> reindexState =
new AtomicReference<>(ReindexState.IDLE);
private final AtomicLong reindexProcessed = new AtomicLong(0);
private final AtomicLong reindexErrors = new AtomicLong(0);
private final AtomicLong reindexTotal = new AtomicLong(0);
private volatile String reindexMessage = "";
private volatile RateLimiter reindexRateLimiter = null;
private final ExecutorService reindexExecutor =
Executors.newSingleThreadExecutor(
r -> {
Thread t = new Thread(r, "reindex-worker");
t.setDaemon(true);
return t;
});

private final ConductorProperties properties;
private final ExecutionService executionService;
private final QueueDAO queueDAO;
private final ExecutionDAO executionDAO;
private final IndexDAO indexDAO;
private final WorkflowRepairService workflowRepairService;
private final EventQueueManager eventQueueManager;
private final BuildProperties buildProperties;

@Value("${conductor.reindex.rateLimitPerSecond:0}")
private double reindexRateLimitPerSecond;

public AdminServiceImpl(
ConductorProperties properties,
ExecutionService executionService,
QueueDAO queueDAO,
ExecutionDAO executionDAO,
IndexDAO indexDAO,
Optional<WorkflowRepairService> workflowRepairService,
Optional<EventQueueManager> eventQueueManager,
Optional<BuildProperties> buildProperties) {
this.properties = properties;
this.executionService = executionService;
this.queueDAO = queueDAO;
this.executionDAO = executionDAO;
this.indexDAO = indexDAO;
this.workflowRepairService = workflowRepairService.orElse(null);
this.eventQueueManager = eventQueueManager.orElse(null);
this.buildProperties = buildProperties.orElse(null);
Expand Down Expand Up @@ -135,4 +184,120 @@ public String requeueSweep(String workflowId) {
}
return (verbose ? eventQueueManager.getQueueSizes() : eventQueueManager.getQueues());
}

@Override
public Map<String, Object> startReindex() {
// Fail fast if the DAO doesn't support reindexing
try {
executionDAO.getWorkflowCount();
executionDAO.getAllWorkflowIdsAfter("", 1);
} catch (UnsupportedOperationException e) {
Map<String, Object> result = new HashMap<>();
result.put("state", "UNSUPPORTED");
result.put(
"message",
"Reindex is not supported by the current persistence backend ("
+ executionDAO.getClass().getSimpleName()
+ ")");
return result;
}

if (!reindexState.compareAndSet(ReindexState.IDLE, ReindexState.RUNNING)
&& !reindexState.compareAndSet(ReindexState.COMPLETED, ReindexState.RUNNING)
&& !reindexState.compareAndSet(ReindexState.FAILED, ReindexState.RUNNING)) {
Map<String, Object> result = new HashMap<>();
result.put("state", "ALREADY_RUNNING");
result.put("message", "A reindex job is already in progress");
return result;
}

// Reset counters
reindexProcessed.set(0);
reindexErrors.set(0);
reindexTotal.set(0);
reindexMessage = "Starting...";
reindexRateLimiter =
reindexRateLimitPerSecond > 0
? RateLimiter.create(reindexRateLimitPerSecond)
: null;

reindexExecutor.submit(this::doReindex);

Map<String, Object> result = new HashMap<>();
result.put("state", "STARTED");
result.put(
"message",
"Reindex job started. Use GET /api/admin/reindex/status to track progress.");
return result;
}

@Override
public Map<String, Object> getReindexStatus() {
Map<String, Object> result = new HashMap<>();
result.put("state", reindexState.get().name());
result.put("processed", reindexProcessed.get());
result.put("errors", reindexErrors.get());
result.put("total", reindexTotal.get());
result.put("message", reindexMessage);
return result;
}

private void doReindex() {
LOGGER.info("Reindex job started");
int batchSize = 100;
String cursor = "";

try {
// Count total using lightweight COUNT query (no heap allocation)
long total = executionDAO.getWorkflowCount();
reindexTotal.set(total);
reindexMessage = "Indexing 0 / " + total;
LOGGER.info("Reindex: {} workflows to process", total);

while (true) {
List<String> workflowIds = executionDAO.getAllWorkflowIdsAfter(cursor, batchSize);
if (workflowIds.isEmpty()) {
break;
}
cursor = workflowIds.get(workflowIds.size() - 1);

for (String workflowId : workflowIds) {
if (reindexRateLimiter != null) {
reindexRateLimiter.acquire();
}
try {
WorkflowModel wfModel = executionDAO.getWorkflow(workflowId, true);
if (wfModel == null) {
LOGGER.warn("Workflow {} not found, skipping", workflowId);
reindexErrors.incrementAndGet();
continue;
}
indexDAO.indexWorkflow(new WorkflowSummary(wfModel.toWorkflow()));
for (TaskModel task : wfModel.getTasks()) {
indexDAO.indexTask(new TaskSummary(task.toTask()));
}
long done = reindexProcessed.incrementAndGet();
reindexMessage = "Indexing " + done + " / " + reindexTotal.get();
} catch (Exception e) {
reindexErrors.incrementAndGet();
LOGGER.error("Failed to reindex workflow {}", workflowId, e);
}
}

LOGGER.info("Reindex progress: {}/{}", reindexProcessed.get(), reindexTotal.get());
}

reindexMessage =
"Completed. processed="
+ reindexProcessed.get()
+ ", errors="
+ reindexErrors.get();
reindexState.set(ReindexState.COMPLETED);
LOGGER.info("Reindex job completed. {}", reindexMessage);
} catch (Exception e) {
reindexMessage = "Failed: " + e.getMessage();
reindexState.set(ReindexState.FAILED);
LOGGER.error("Reindex job failed", e);
}
}
}
Loading