Skip to content

Conversation

ajleong623
Copy link
Contributor

@ajleong623 ajleong623 commented Aug 12, 2025

Description

For requesting a regularly scheduled search evaluation, the user could add an cron parameter to denote the cron job schedule for running search evaluation.

Some changes that are made are that there are now 3 new APIs for interacting with scheduling experiments. The endpoints are experiment/<job_id>/schedule which is applied to the GET and DELETE methods and experiment/schedule which is applied to the GET and POST methods.

There are 2 new indices, .scheduled-jobs and search-relevance-scheduled-experiment-history. The purpose of the .scheduled-jobs index is to store the currently running experiment schedules. The search-relevance-scheduled-experiment-history index stores the historical experiment results with timestamps which were resulted from the scheduled job runner.

Unit and integration tests are provided, however, additions such as workload management, integration with alerting and resource monitoring are not available in this pull request, but I would like to add those into a future pull request.

Please let me know if there are any questions or concerns.

Issues Resolved

#213 #226

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Anthony Leong <[email protected]>
@ajleong623 ajleong623 marked this pull request as draft August 12, 2025 23:22
Signed-off-by: Anthony Leong <[email protected]>
Signed-off-by: Anthony Leong <[email protected]>
@epugh
Copy link
Collaborator

epugh commented Aug 13, 2025

Post discussion with @wrigleyDan and @epugh we are going to change direction a bit and make the API take in an ALREADY EXISTING Experiment ID, and use that (and it's associated settings) to run the experiment every iteration.

Let's move to a cron pattern versus a interval.

We need to think about if we need a limit to how many experiments can be run...

@epugh epugh linked an issue Aug 13, 2025 that may be closed by this pull request
@epugh epugh added the v3.3.0 label Aug 13, 2025
epugh
epugh previously requested changes Aug 20, 2025
Copy link
Collaborator

@epugh epugh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Progress! We are now on the cron pattern. Now to think about nesting the API under the /experiment/{experiment_id}/schedule name space.

ajleong623 and others added 5 commits August 20, 2025 16:43
Co-authored-by: Eric Pugh <[email protected]>
Signed-off-by: Anthony Leong <[email protected]>
This reverts commit 7f6352d.

Signed-off-by: Anthony Leong <[email protected]>
@ajleong623
Copy link
Contributor Author

I believe I have addressed the comments. One of them, I did add a TODO comment so that it can be addressed in the future. Right now, the solution to refactoring the logic of running experiments is a bit involved.

@ajleong623 ajleong623 marked this pull request as ready for review September 1, 2025 06:49
Signed-off-by: Anthony Leong <[email protected]>
Signed-off-by: Anthony Leong <[email protected]>
@epugh
Copy link
Collaborator

epugh commented Sep 2, 2025

You now just need to add soemthing to highlight this new Feature in the change log!

https://github.com/opensearch-project/search-relevance/blob/main/CHANGELOG.md#features

Copy link
Member

@martin-gaievski martin-gaievski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for addressing comments. Some area still need improvements:

  • empty finally blocks - no resource cleanup implemented
  • nested async operations lack individual timeouts
  • no mechanism to cancel in-progress experiments during timeout

FutureUtils.cancel(searchEvaluationTask); // Attempt to interrupt the running task
} catch (InterruptedException | ExecutionException e) {
log.error("Interrupt for scheduled experiment has occured!");
} finally {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're missing resource cleanup in finally block. yopu need to do something like this:

    if (currentExperimentTask != null && !currentExperimentTask.isDone()) {
        currentExperimentTask.cancel(true);
    }
    manager.cleanupResources(parameter.getExperimentId());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I fixed this issue. For handling cleanups, I update the experiment result that was registered to the new async status of TIMEOUT.

// TODO: A lot of the logic here is reused from PutTransportExperiment.
// Eventually we have to abstract it in another class to reduce complexity.

Runnable runnable = () -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can implement timeout with a simple future wrapper task, something like:

public <T> CompletableFuture<T> withTimeout(CompletableFuture<T> future, long timeoutSeconds) {
        CompletableFuture<T> timeoutFuture = new CompletableFuture<>();
        
        ScheduledFuture<?> timeout = scheduler.schedule(() -> {
            if (timeoutFuture.cancel(false)) {
                future.cancel(true);
            }
        }, timeoutSeconds, TimeUnit.SECONDS);
        
        // complete when original completes
        future.whenComplete((result, throwable) -> {
            timeout.cancel(false); // Cancel timeout
            if (throwable == null) {
                timeoutFuture.complete(result);
            } else {
                timeoutFuture.completeExceptionally(throwable);
            }
        });
        
        return timeoutFuture;
    }

// If any of the futures fails, the exception would be handled
// in the logic of that future. Therefore, no action for failure
// is necessary here.
CompletableFuture.allOf(configFutures.toArray(new CompletableFuture[0])).join();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like here you don't have timeout control for individual operations, this could lead to resource leaks in high-load scenarios: main task will be interrupted but nested task will keep running. You can solve it by adding timeout wrapper to all async operations.

Copy link
Contributor Author

@ajleong623 ajleong623 Sep 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the timeout wrapper. The only issue is when I looked at FutureUtils, the API does not allow threads to be interrupted. However, not interrupting the threads defeats the purpose of the timeout.

@ajleong623
Copy link
Contributor Author

ajleong623 commented Sep 9, 2025

@martin-gaievski one last issue is that cancelled timeouts with thread interruption is not allowed. Does this mean that my solution would have to involve adding an atomic boolean that denotes whether the task was cancelled and then checking it at key points? This might not be sufficient in cancelling long running calculations such as the hybrid optimizer one. Additionally, I think would have to make sure all background tasks such as the asynchronous point wise experiment processing are cancelled manually. Please let me know of any feedback on the ideas I mentioned.

@ajleong623
Copy link
Contributor Author

Just a personal reminder to add documentation comments throughout code changes before submitting.

@martin-gaievski
Copy link
Member

@martin-gaievski one last issue is that cancelled timeouts with thread interruption is not allowed. Does this mean that my solution would have to involve adding an atomic boolean that denotes whether the task was cancelled and then checking it at key points? This might not be sufficient in cancelling long running calculations such as the hybrid optimizer one. Additionally, I think would have to make sure all background tasks such as the asynchronous point wise experiment processing are cancelled manually. Please let me know of any feedback on the ideas I mentioned.

@ajleong623, excellent question about the thread interruption limitations. You're absolutely right to be concerned about this, especially for long-running operations like the hybrid optimizer. Let me provide some guidance on the best approach here.
Your intuition about using an AtomicBoolean for cancellation is spot-on. This is indeed the recommended pattern in OpenSearch for handling timeouts without thread interruption. Here's my suggested implementation strategy:

start from introducing the cancellation token approach/pattern:

public class ExperimentCancellationToken {
    private final AtomicBoolean cancelled = new AtomicBoolean(false);
    private final List<Runnable> cancellationCallbacks = new CopyOnWriteArrayList<>();
    
    public boolean isCancelled() {
        return cancelled.get();
    }
    
    public void cancel() {
        if (cancelled.compareAndSet(false, true)) {
            cancellationCallbacks.forEach(Runnable::run);
        }
    }
    
    public void onCancel(Runnable callback) {
        cancellationCallbacks.add(callback);
        if (isCancelled()) {
            callback.run();
        }
    }
}
  • identify check points in long-running operations

For the hybrid optimizer and other long-running calculations, you'll need to add cancellation check points at strategic locations:

// In HybridOptimizerExperimentProcessor
public void processHybridOptimizerExperiment(..., ExperimentCancellationToken cancellationToken) {
    for (String queryText : queryTexts) {
        // check before each query processing
        if (cancellationToken.isCancelled()) {
            handleCancellation();
            return;
        }
        
        for (SearchConfiguration config : configurations) {
            if (cancellationToken.isCancelled()) {
                handleCancellation();
                return;
            }
            // process configuration...
        }
    }
}

- special manual handling for asyn operations
async operations like pointwise experiments, may need manual cancellation. I suggest:
```java
// part ExperimentRunningManager
private final Map<String, List<CompletableFuture<?>>> runningFutures = new ConcurrentHashMap<>();

public void startExperimentRun(String experimentId, PutExperimentRequest request, ExperimentCancellationToken token) {
    List<CompletableFuture<?>> futures = new ArrayList<>();
    
    // register cancellation callback
    token.onCancel(() -> {
        futures.forEach(f -> f.cancel(false));
        runningFutures.remove(experimentId);
    });
    
    // track all async operations
    CompletableFuture<QuerySet> querySetFuture = fetchQuerySetAsync(...);
    futures.add(querySetFuture);
    runningFutures.put(experimentId, futures);
    
    // continue experiment...
}
  • integrate cancellation token into ConcurrencyUtil. For instance, ConcurrencyUtil.withTimeout is a good place to such integration:
public static <T> CompletableFuture<T> withTimeout(
    CompletableFuture<T> future, 
    long timeoutSeconds, 
    ThreadPool threadPool,
    ExperimentCancellationToken cancellationToken) {
    
    CompletableFuture<T> timeoutFuture = new CompletableFuture<>();
    
    ScheduledFuture<?> timeout = threadPool.scheduler().schedule(() -> {
        cancellationToken.cancel();  // Signal cancellation instead of interrupting
        timeoutFuture.completeExceptionally(new TimeoutException());
    }, timeoutSeconds, TimeUnit.SECONDS);
    
    ...
}

Some other important considerations:

  • granularity of checks, for the hybrid optimizer, consider adding checks:
    • before/after each query evaluation
    • inside any loops that process multiple configurations
    • before expensive operations (network calls, large computations)
  • ensure all resources (connections, temporary data) are properly cleaned up when cancellation occurs.
  • consider whether to save partial results when an experiment is cancelled due to timeout.
  • add unit tests that specifically test the cancellation behavior at various points in the execution.

One alternative to cancellation token approach is chucking. For very long-running operation we may want to break it into smaller chunks. To me this is more complex then tokens, mainly due to uncertainty of how exactly break the task and how to handle partial results.

Overall, your proposed solution with AtomicBoolean is the right direction, great work on identifying this important consideration


ScheduledFuture<?> timeout = threadPool.scheduler().schedule(() -> {
if (timeoutFuture.cancel(false)) {
future.cancel(true);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please resolve the build failure

Forbidden method invocation: java.util.concurrent.Future#cancel(boolean) [Don't interrupt threads use FutureUtils#cancel(Future<T>) instead]

sample code:

    // Before:
    // future.cancel(true);

    // After:
    FutureUtils.cancel(future);

Copy link
Contributor Author

@ajleong623 ajleong623 Sep 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, I looked into the FutureUtils implementation and noticed that the thread cannot be interrupted on cancel which means the long-running task might still be running after cancel. Martin and I have been working on a workaround for that issue, and I will use the proper api.

@ajleong623
Copy link
Contributor Author

ajleong623 commented Sep 17, 2025

@martin-gaievski SearchRelevanceJobRunner is where the scheduler starts. In line 95, when creating the future with timeout, I attached a countdown latch to indicate when all the asynchronous results finish. This either happens during updateFinalExperiment or handleAsyncFailure in ExperimentRunningManager. That way, we can wait on that latch and only clean up after all the asynchronous operations have finished. I noticed the code flow of completing the task itself is actually much faster because once a future is scheduled, the code moves on. Therefore, the asynchronous tasks could still be running even after the task completes or times out. Additionally, I took your suggestion of grouping the futures in a map called runningFutures so that they can cancel right when the cancellation token is cancelled.

In ScheduledExperimentRunnerManager, the placeholder for the ScheduledExperimentResult is created and placed into the index. The checks will be in lines 89 and 95. (before and after the put).

In ExperimentRunningManager, the query set is first fetched, then search configurations are fetched, and finally for all the queries in the query set, one of the experiments is run. The checks are in lines 145 (adding async futures to be cancelled), 209 (for each search configuration fetch), 345 (each experiment evaluation loop around query text), and 443 (right before results are processed for each evaluation).

In HybridOptimizerExperimentProcessor the only check is in line 243 which is where the loop for scheduling a variant set for each search configuration is processed.

In ExperimentTaskManager, line 257 (for each time an experiment is scheduled), 291/299 (before and after submitting into the threadpool), and 326 (scheduling each variant asynchronously) are where the checks are available. The most important checks are around 291 and 299 because the tasks submitted to the threadpool in hybrid optimization are the longest running.

For cleanup, I handled each case similarly to how failures detections are handled. However, in the final cleanup in SearchRelevanceJobRunner line 113, the scheduled experiment result is simply updated. I do not know about temporary data being created, and index connections are not interrupted.

I also have not handled partial results, but it will be null if timeout occurs

Let me know if you have any questions or comments. I need another look because I have been working on this for a while, and my brain is currently fried.

@epugh epugh added v3.4.0 and removed v3.3.0 labels Sep 19, 2025
/**
* List scheduled jobs by source builder
* @param sourceBuilder - source builder to be searched
* @param listener - action lister for async operation
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @param listener - action lister for async operation
* @param listener - action listener for async operation

import lombok.extern.log4j.Log4j2;

/**
* ExperimentRunningManager helps isolate the logic for running the logic in
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

slight awk phrasing.

@ajleong623
Copy link
Contributor Author

@martin-gaievski @fen-qin I think I am ready for the next round of code reviews as I believe I addressed the comments mentioned prior. Please let me know about any other suggestions or concerns.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[FEATURE] Scheduling for running evaluations regularly

7 participants