Skip to content

[ML] Prevent retention classes from failing when deleting documents in read-only indices #125408

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
c40723a
unit test passes
valeriy42 Mar 21, 2025
f32c3be
clean up
valeriy42 Mar 21, 2025
ca253b8
Update docs/changelog/125408.yaml
valeriy42 Mar 21, 2025
0419085
[ML] Enhance UnusedStatsRemover by integrating IndexNameExpressionRes…
valeriy42 Apr 7, 2025
f3058f3
Merge branch 'enhancement/1532-unused-stats-remover' of https://githu…
valeriy42 Apr 7, 2025
2ddc240
Merge branch 'main' of https://github.com/elastic/elasticsearch into …
valeriy42 Apr 8, 2025
1b1f97f
[ML] Introduce WritableIndexExpander to manage writable indices for j…
valeriy42 Apr 8, 2025
bec1653
Merge branch 'main' into enhancement/1532-unused-stats-remover
davidkyle Apr 11, 2025
a4405ef
[ML] Refactor job data removers to utilize WritableIndexExpander for …
valeriy42 Apr 14, 2025
c500317
Merge branch 'main' of https://github.com/elastic/elasticsearch into …
valeriy42 Apr 14, 2025
546b635
Merge branch 'enhancement/1532-unused-stats-remover' of https://githu…
valeriy42 Apr 14, 2025
c98414d
[CI] Auto commit changes from spotless
elasticsearchmachine Apr 14, 2025
482b243
Merge branch 'main' of https://github.com/elastic/elasticsearch into …
valeriy42 May 19, 2025
2a10a63
fix typo
valeriy42 May 21, 2025
64ed211
add unit test for ExpiredAnnotationsRemover
valeriy42 May 21, 2025
8a28cb0
add unit test for ExpiredModelSnapshotsRemover
valeriy42 May 22, 2025
5db2496
add unit test for ExpiredResultsRemoverTests
valeriy42 May 22, 2025
1ad69b3
Merge branch 'main' of https://github.com/elastic/elasticsearch into …
valeriy42 May 22, 2025
576318e
[CI] Auto commit changes from spotless
elasticsearchmachine May 22, 2025
0064d5e
fix integration test
valeriy42 May 22, 2025
26bdb36
Merge remote-tracking branch 'origin/enhancement/1532-unused-stats-re…
valeriy42 May 22, 2025
bed8f1e
update docs
valeriy42 May 22, 2025
3d8a61b
fix logger usage failure
valeriy42 May 22, 2025
400b69d
Merge branch 'main' into enhancement/1532-unused-stats-remover
valeriy42 May 22, 2025
4447a52
making WritableIndexExpander a singleton
valeriy42 May 23, 2025
18808b4
prevent JobDataDeleter from attempting to delete from read-only indices
valeriy42 May 23, 2025
0317323
Merge branch 'main' into enhancement/1532-unused-stats-remover
valeriy42 May 23, 2025
532bb6a
fit test failure
valeriy42 May 23, 2025
264b581
Merge branch 'main' into enhancement/1532-unused-stats-remover
valeriy42 May 23, 2025
7b98158
fit test failure
valeriy42 May 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/125408.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 125408
summary: Prevent `UnusuedStatsRemover` from failing when deleting documents in read-only
indices
area: Machine Learning
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.internal.OriginSettingClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.indices.TestIndexNameExpressionResolver;
import org.elasticsearch.tasks.TaskId;
Expand Down Expand Up @@ -37,6 +38,7 @@
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
import org.elasticsearch.xpack.ml.inference.persistence.TrainedModelProvider;
import org.elasticsearch.xpack.ml.job.retention.UnusedStatsRemover;
import org.elasticsearch.xpack.ml.job.retention.WritableIndexExpander;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import org.junit.Before;

Expand All @@ -47,10 +49,12 @@
public class UnusedStatsRemoverIT extends BaseMlIntegTestCase {

private OriginSettingClient client;
private WritableIndexExpander writableIndexExpander;

@Before
public void createComponents() {
client = new OriginSettingClient(client(), ClientHelper.ML_ORIGIN);
writableIndexExpander = new WritableIndexExpander(clusterService(), TestIndexNameExpressionResolver.newInstance());
PlainActionFuture<Boolean> future = new PlainActionFuture<>();
MlStatsIndex.createStatsIndexAndAliasIfNecessary(
client(),
Expand All @@ -63,91 +67,118 @@ public void createComponents() {
}

public void testRemoveUnusedStats() throws Exception {
String modelId = "model-with-stats";
putDFA(modelId);

prepareIndex("foo").setId("some-empty-doc").setSource("{}", XContentType.JSON).get();

PutDataFrameAnalyticsAction.Request request = new PutDataFrameAnalyticsAction.Request(
new DataFrameAnalyticsConfig.Builder().setId("analytics-with-stats")
.setModelMemoryLimit(ByteSizeValue.ofGb(1))
.setSource(new DataFrameAnalyticsSource(new String[] { "foo" }, null, null, null))
.setDest(new DataFrameAnalyticsDest("bar", null))
.setAnalysis(new Regression("prediction"))
.build()
);
client.execute(PutDataFrameAnalyticsAction.INSTANCE, request).actionGet();

client.execute(
PutTrainedModelAction.INSTANCE,
new PutTrainedModelAction.Request(
TrainedModelConfig.builder()
.setModelId("model-with-stats")
.setInferenceConfig(RegressionConfig.EMPTY_PARAMS)
.setInput(new TrainedModelInput(Arrays.asList("foo", "bar")))
.setParsedDefinition(
new TrainedModelDefinition.Builder().setPreProcessors(Collections.emptyList())
.setTrainedModel(
Tree.builder()
.setFeatureNames(Arrays.asList("foo", "bar"))
.setRoot(TreeNode.builder(0).setLeafValue(42))
.build()
)
)
.validate(true)
.build(),
false
)
).actionGet();

// Existing analytics and models
indexStatDocument(new DataCounts("analytics-with-stats", 1, 1, 1), DataCounts.documentId("analytics-with-stats"));
indexStatDocument(new DataCounts("missing-analytics-with-stats", 1, 1, 1), DataCounts.documentId("missing-analytics-with-stats"));
indexStatDocument(new InferenceStats(1, 1, 1, 1, modelId, "test", Instant.now()), InferenceStats.docId(modelId, "test"));
indexStatDocument(
new InferenceStats(1, 1, 1, 1, TrainedModelProvider.MODELS_STORED_AS_RESOURCE.iterator().next(), "test", Instant.now()),
InferenceStats.docId(TrainedModelProvider.MODELS_STORED_AS_RESOURCE.iterator().next(), "test")
);

// Unused analytics/model stats
indexStatDocument(new DataCounts("missing-analytics-with-stats", 1, 1, 1), DataCounts.documentId("missing-analytics-with-stats"));
indexStatDocument(
new InferenceStats(1, 1, 1, 1, "missing-model", "test", Instant.now()),
InferenceStats.docId("missing-model", "test")
);

refreshStatsIndex();
runUnusedStatsRemover();

final String index = MlStatsIndex.TEMPLATE_NAME + "-000001";

// Validate expected docs
assertDocExists(index, InferenceStats.docId(modelId, "test"));
assertDocExists(index, DataCounts.documentId("analytics-with-stats"));
assertDocExists(index, InferenceStats.docId(TrainedModelProvider.MODELS_STORED_AS_RESOURCE.iterator().next(), "test"));

// Validate removed docs
assertDocDoesNotExist(index, InferenceStats.docId("missing-model", "test"));
assertDocDoesNotExist(index, DataCounts.documentId("missing-analytics-with-stats"));
}

public void testRemovingUnusedStatsFromReadOnlyIndexShouldFailSilently() throws Exception {
String modelId = "model-with-stats";
putDFA(modelId);

indexStatDocument(
new InferenceStats(1, 1, 1, 1, "model-with-stats", "test", Instant.now()),
InferenceStats.docId("model-with-stats", "test")
new InferenceStats(1, 1, 1, 1, "missing-model", "test", Instant.now()),
InferenceStats.docId("missing-model", "test")
);
client().admin().indices().prepareRefresh(MlStatsIndex.indexPattern()).get();
makeIndexReadOnly();
refreshStatsIndex();

PlainActionFuture<Boolean> deletionListener = new PlainActionFuture<>();
UnusedStatsRemover statsRemover = new UnusedStatsRemover(client, new TaskId("test", 0L));
statsRemover.remove(10000.0f, deletionListener, () -> false);
deletionListener.actionGet();
runUnusedStatsRemover();
refreshStatsIndex();

client().admin().indices().prepareRefresh(MlStatsIndex.indexPattern()).get();
final String index = MlStatsIndex.TEMPLATE_NAME + "-000001";
assertDocExists(index, InferenceStats.docId("missing-model", "test")); // should still exist
}

final String initialStateIndex = MlStatsIndex.TEMPLATE_NAME + "-000001";
private void putDFA(String modelId) {
prepareIndex("foo").setId("some-empty-doc").setSource("{}", XContentType.JSON).get();

// Make sure that stats that should exist still exist
assertTrue(client().prepareGet(initialStateIndex, InferenceStats.docId("model-with-stats", "test")).get().isExists());
assertTrue(
client().prepareGet(
initialStateIndex,
InferenceStats.docId(TrainedModelProvider.MODELS_STORED_AS_RESOURCE.iterator().next(), "test")
).get().isExists()
PutDataFrameAnalyticsAction.Request analyticsRequest = new PutDataFrameAnalyticsAction.Request(
new DataFrameAnalyticsConfig.Builder().setId("analytics-with-stats")
.setModelMemoryLimit(ByteSizeValue.ofGb(1))
.setSource(new DataFrameAnalyticsSource(new String[] { "foo" }, null, null, null))
.setDest(new DataFrameAnalyticsDest("bar", null))
.setAnalysis(new Regression("prediction"))
.build()
);
assertTrue(client().prepareGet(initialStateIndex, DataCounts.documentId("analytics-with-stats")).get().isExists());

// make sure that unused stats were deleted
assertFalse(client().prepareGet(initialStateIndex, DataCounts.documentId("missing-analytics-with-stats")).get().isExists());
assertFalse(client().prepareGet(initialStateIndex, InferenceStats.docId("missing-model", "test")).get().isExists());
client.execute(PutDataFrameAnalyticsAction.INSTANCE, analyticsRequest).actionGet();

TrainedModelDefinition.Builder definition = new TrainedModelDefinition.Builder().setPreProcessors(Collections.emptyList())
.setTrainedModel(
Tree.builder().setFeatureNames(Arrays.asList("foo", "bar")).setRoot(TreeNode.builder(0).setLeafValue(42)).build()
);

TrainedModelConfig modelConfig = TrainedModelConfig.builder()
.setModelId(modelId)
.setInferenceConfig(RegressionConfig.EMPTY_PARAMS)
.setInput(new TrainedModelInput(Arrays.asList("foo", "bar")))
.setParsedDefinition(definition)
.validate(true)
.build();

client.execute(PutTrainedModelAction.INSTANCE, new PutTrainedModelAction.Request(modelConfig, false)).actionGet();
}

private void indexStatDocument(ToXContentObject object, String docId) throws Exception {
ToXContent.Params params = new ToXContent.MapParams(
Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, Boolean.toString(true))
);
IndexRequest doc = new IndexRequest(MlStatsIndex.writeAlias());
doc.id(docId);
IndexRequest doc = new IndexRequest(MlStatsIndex.writeAlias()).id(docId);
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
object.toXContent(builder, params);
object.toXContent(builder, new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")));
doc.source(builder);
client.index(doc).actionGet();
}
}

private void refreshStatsIndex() {
client().admin().indices().prepareRefresh(MlStatsIndex.indexPattern()).get();
}

private void runUnusedStatsRemover() {
PlainActionFuture<Boolean> deletionListener = new PlainActionFuture<>();
new UnusedStatsRemover(client, new TaskId("test", 0L), writableIndexExpander).remove(10000.0f, deletionListener, () -> false);
deletionListener.actionGet();
}

private void makeIndexReadOnly() {
client().admin()
.indices()
.prepareUpdateSettings(MlStatsIndex.indexPattern())
.setSettings(Settings.builder().put("index.blocks.write", true))
.get();
}

private void assertDocExists(String index, String docId) {
assertTrue(client().prepareGet(index, docId).get().isExists());
}

private void assertDocDoesNotExist(String index, String docId) {
assertFalse(client().prepareGet(index, docId).get().isExists());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.OriginSettingClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.TimeValue;
Expand All @@ -40,6 +41,7 @@
import org.elasticsearch.xpack.ml.job.retention.MlDataRemover;
import org.elasticsearch.xpack.ml.job.retention.UnusedStateRemover;
import org.elasticsearch.xpack.ml.job.retention.UnusedStatsRemover;
import org.elasticsearch.xpack.ml.job.retention.WritableIndexExpander;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator;
import org.elasticsearch.xpack.ml.utils.persistence.WrappedBatchedJobsIterator;
Expand All @@ -62,16 +64,19 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<

private final ThreadPool threadPool;
private final Executor executor;
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final OriginSettingClient client;
private final ClusterService clusterService;
private final Clock clock;
private final JobConfigProvider jobConfigProvider;
private final JobResultsProvider jobResultsProvider;
private final AnomalyDetectionAuditor auditor;
private final WritableIndexExpander writableIndexExpander;

@Inject
public TransportDeleteExpiredDataAction(
ThreadPool threadPool,
IndexNameExpressionResolver indexNameExpressionResolver,
TransportService transportService,
ActionFilters actionFilters,
Client client,
Expand All @@ -83,6 +88,7 @@ public TransportDeleteExpiredDataAction(
this(
threadPool,
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME),
indexNameExpressionResolver,
transportService,
actionFilters,
client,
Expand All @@ -97,6 +103,7 @@ public TransportDeleteExpiredDataAction(
TransportDeleteExpiredDataAction(
ThreadPool threadPool,
Executor executor,
IndexNameExpressionResolver indexNameExpressionResolver,
TransportService transportService,
ActionFilters actionFilters,
Client client,
Expand All @@ -109,12 +116,14 @@ public TransportDeleteExpiredDataAction(
super(DeleteExpiredDataAction.NAME, transportService, actionFilters, DeleteExpiredDataAction.Request::new, executor);
this.threadPool = threadPool;
this.executor = executor;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.client = new OriginSettingClient(client, ClientHelper.ML_ORIGIN);
this.clusterService = clusterService;
this.clock = clock;
this.jobConfigProvider = jobConfigProvider;
this.jobResultsProvider = jobResultsProvider;
this.auditor = auditor;
this.writableIndexExpander = new WritableIndexExpander(clusterService, indexNameExpressionResolver);
}

@Override
Expand Down Expand Up @@ -240,13 +249,15 @@ private List<MlDataRemover> createDataRemovers(
TaskId parentTaskId,
AnomalyDetectionAuditor anomalyDetectionAuditor
) {

return Arrays.asList(
new ExpiredResultsRemover(
originClient,
new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(originClient)),
parentTaskId,
anomalyDetectionAuditor,
threadPool
threadPool,
writableIndexExpander
),
new ExpiredForecastsRemover(originClient, threadPool, parentTaskId),
new ExpiredModelSnapshotsRemover(
Expand All @@ -257,9 +268,9 @@ private List<MlDataRemover> createDataRemovers(
jobResultsProvider,
anomalyDetectionAuditor
),
new UnusedStateRemover(originClient, parentTaskId),
new UnusedStateRemover(originClient, parentTaskId, writableIndexExpander),
new EmptyStateIndexRemover(originClient, parentTaskId),
new UnusedStatsRemover(originClient, parentTaskId),
new UnusedStatsRemover(originClient, parentTaskId, writableIndexExpander),
new ExpiredAnnotationsRemover(
originClient,
new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(originClient)),
Expand All @@ -272,7 +283,14 @@ private List<MlDataRemover> createDataRemovers(

private List<MlDataRemover> createDataRemovers(List<Job> jobs, TaskId parentTaskId, AnomalyDetectionAuditor anomalyDetectionAuditor) {
return Arrays.asList(
new ExpiredResultsRemover(client, new VolatileCursorIterator<>(jobs), parentTaskId, anomalyDetectionAuditor, threadPool),
new ExpiredResultsRemover(
client,
new VolatileCursorIterator<>(jobs),
parentTaskId,
anomalyDetectionAuditor,
threadPool,
writableIndexExpander
),
new ExpiredForecastsRemover(client, threadPool, parentTaskId),
new ExpiredModelSnapshotsRemover(
client,
Expand All @@ -282,9 +300,9 @@ private List<MlDataRemover> createDataRemovers(List<Job> jobs, TaskId parentTask
jobResultsProvider,
anomalyDetectionAuditor
),
new UnusedStateRemover(client, parentTaskId),
new UnusedStateRemover(client, parentTaskId, writableIndexExpander),
new EmptyStateIndexRemover(client, parentTaskId),
new UnusedStatsRemover(client, parentTaskId),
new UnusedStatsRemover(client, parentTaskId, writableIndexExpander),
new ExpiredAnnotationsRemover(client, new VolatileCursorIterator<>(jobs), parentTaskId, anomalyDetectionAuditor, threadPool)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover {
this.client = client;
this.jobIterator = jobIterator;
this.parentTaskId = parentTaskId;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

}

protected TaskId getParentTaskId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,20 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {

private final AnomalyDetectionAuditor auditor;
private final ThreadPool threadPool;
private final WritableIndexExpander writableIndexExpander;

public ExpiredResultsRemover(
OriginSettingClient client,
Iterator<Job> jobIterator,
TaskId parentTaskId,
AnomalyDetectionAuditor auditor,
ThreadPool threadPool
ThreadPool threadPool,
WritableIndexExpander writableIndexExpander
) {
super(client, jobIterator, parentTaskId);
this.auditor = Objects.requireNonNull(auditor);
this.threadPool = Objects.requireNonNull(threadPool);
this.writableIndexExpander = Objects.requireNonNull(writableIndexExpander);
}

@Override
Expand Down Expand Up @@ -136,7 +139,7 @@ public void onFailure(Exception e) {
});
}

private static DeleteByQueryRequest createDBQRequest(Job job, float requestsPerSec, long cutoffEpochMs) {
private DeleteByQueryRequest createDBQRequest(Job job, float requestsPerSec, long cutoffEpochMs) {
QueryBuilder excludeFilter = QueryBuilders.termsQuery(
Result.RESULT_TYPE.getPreferredName(),
ModelSizeStats.RESULT_TYPE_VALUE,
Expand All @@ -148,7 +151,15 @@ private static DeleteByQueryRequest createDBQRequest(Job job, float requestsPerS
.filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).lt(cutoffEpochMs).format("epoch_millis"))
.filter(QueryBuilders.existsQuery(Result.RESULT_TYPE.getPreferredName()))
.mustNot(excludeFilter);
DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId())).setSlices(

var indicesToQuery = writableIndexExpander.getWritableIndices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId()));

if (indicesToQuery.isEmpty()) {
LOGGER.warn("No writable indices found for job [{}]", job.getId());
return new DeleteByQueryRequest();
}

DeleteByQueryRequest request = new DeleteByQueryRequest(indicesToQuery.toArray(new String[0])).setSlices(
AbstractBulkByScrollRequest.AUTO_SLICES
)
.setBatchSize(AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE)
Expand Down
Loading
Loading