Skip to content

Conversation

luyuncheng
Copy link
Collaborator

@luyuncheng luyuncheng commented Feb 14, 2025

Description

When there is a scenarios:

  1. There is A Merge Task On Node1#Index1#Shard1(long time running)
  2. After merge task started, begin relocating from Node1#Index1#Shard1 TO Node2#Index1#Shard1
  3. At the finalize step, source need do closeShard, but the merge task would take a long time, stack as following shows.
  4. The clusterApplierService would wait for about N minutes(long time running), and mark the node stale, and master let node1 left because node1 long time no response.
opensearch[datanode1][clusterApplierService#updateTask][T#1]" #41 daemon prio=5 os_prio=0 cpu=5183.70ms elapsed=93132.85s tid=0x00007f3f392509d0 nid=0x101 in Object.wait()  [0x00007f3f6ddfb000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
	at java.lang.Object.wait([email protected]/Native Method)
	- waiting on <no object reference available>
	at org.apache.lucene.index.IndexWriter.doWait(IndexWriter.java:5410)
	- locked <0x0000001022b0abe8> (a org.apache.lucene.index.IndexWriter)
	at org.apache.lucene.index.IndexWriter.abortMerges(IndexWriter.java:2721)
	- locked <0x0000001022b0abe8> (a org.apache.lucene.index.IndexWriter)
	at org.apache.lucene.index.IndexWriter.rollbackInternalNoCommit(IndexWriter.java:2469)
	- locked <0x0000001022b0abe8> (a org.apache.lucene.index.IndexWriter)
	at org.apache.lucene.index.IndexWriter.rollbackInternal(IndexWriter.java:2449)
	- locked <0x0000001022bae6d0> (a java.lang.Object)
	at org.apache.lucene.index.IndexWriter.rollback(IndexWriter.java:2441)
	at org.opensearch.index.engine.InternalEngine.closeNoLock(InternalEngine.java:2370)
	at org.opensearch.index.engine.Engine.close(Engine.java:2000)
	at org.opensearch.index.engine.Engine.flushAndClose(Engine.java:1987)
	at org.opensearch.index.shard.IndexShard.close(IndexShard.java:1907)
	- locked <0x0000001022b07ea0> (a java.lang.Object)
	at org.opensearch.index.IndexService.closeShard(IndexService.java:623)
	at org.opensearch.index.IndexService.removeShard(IndexService.java:599)
	- locked <0x0000001022a976a8> (a org.opensearch.index.IndexService)
	at org.opensearch.index.IndexService.close(IndexService.java:374)
	- locked <0x0000001022a976a8> (a org.opensearch.index.IndexService)
	at org.opensearch.indices.IndicesService.removeIndex(IndicesService.java:993)
	at org.opensearch.indices.cluster.IndicesClusterStateService.removeIndices(IndicesClusterStateService.java:446)
	at org.opensearch.indices.cluster.IndicesClusterStateService.applyClusterState(IndicesClusterStateService.java:287)
	- locked <0x000000100b7da520> (a org.opensearch.indices.cluster.IndicesClusterStateService)
	at org.opensearch.cluster.service.ClusterApplierService.callClusterStateAppliers(ClusterApplierService.java:606)
	at org.opensearch.cluster.service.ClusterApplierService.callClusterStateAppliers(ClusterApplierService.java:593)

Proposal

i think we can introduce abort mechanism for long time merge task meanwhile close shard called.

i think we can introduce KNNMergeHelper class to check if merge aborted. and when build the graph, we can reuse faiss::InterruptCallback which is interrupt callback mechanism to check whether aborted or not

BUT ConcurrentMergeScheduler#MergeThread is a internal class, we can not call this directly. it throws org.apache.lucene.index.ConcurrentMergeScheduler$MergeThread is in unnamed module of loader 'app'

we can added this static method into OpenSearch Core like OneMergeHelper

Related Issues

Resolves #[Issue number to be closed when this PR is merged]
#2530

Check List

  • New functionality includes testing.
  • New functionality has been documented.
  • API changes companion pull request created.
  • Commits are signed per the DCO using --signoff.
  • Public documentation issue/PR created.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@jmazanec15
Copy link
Member

@luyuncheng This is interesting - but can the interrupt callback with faiss be per graph or would it be for all graphs? In other words, would it cancel all graph builds happening on the node instead of just the one for the closed shard.

@navneet1v
Copy link
Collaborator

navneet1v commented Feb 14, 2025

@luyuncheng thanks for creating the GH issue. and I think we ourselves have seen this problem in couple of places(ref: opensearch-project/OpenSearch#14828, @kotwanikunal created this), and seeing a solution around this problem is really great. I looked through the code and I want to know how this code is even working? Because as per my understanding of the code you are checking if merge is aborted if somehow write fails and then eating up that exception.

Is this PR only to see if the merge is aborted and write to directory fails how we can handle the errors in case shard is not present on the node because it moved? Because if that is the case then in the 2.19 version of k-NN plugin we added the support writing index using IndexInput/Output. So if a rather than checking if merge is aborted or not, can we not see if IndexInput/Output is closed or not?

@luyuncheng
Copy link
Collaborator Author

This is interesting - but can the interrupt callback with faiss be per graph or would it be for all graphs? In other words, would it cancel all graph builds happening on the node instead of just the one for the closed shard.

@jmazanec15 In Sample Code,

public static boolean isMergeAborted() {
Thread mergeThread = Thread.currentThread();
if (mergeThread instanceof ConcurrentMergeScheduler.MergeThread) {
return ((ConcurrentMergeScheduler.MergeThread) mergeThread).merge.isAborted();
}
return false;
which need include in OpenSearch repo, it shows that it only checked for the current merge thread like OS code https://github.com/opensearch-project/OpenSearch/blob/99a9a81da366173b0c2b963b26ea92e15ef34547/server/src/main/java/org/apache/lucene/index/OneMergeHelper.java#L65-L69

@luyuncheng
Copy link
Collaborator Author

I looked through the code and I want to know how this code is even working? Because as per my understanding of the code you are checking if merge is aborted if somehow write fails and then eating up that exception.

@navneet1v i think the call chain is like following shows

  1. When faiss is building the graph, meanwhile, close shard triggered.
  2. Lucene would do IndexWriter#rollbackInternal which would not commit current segment, and wait for current process end.
  3. faiss checked abort using InterruptCallback, and throw FaissException
  4. we need catch FaissException from native code, and throw MergeAbortedException to Lucene

we added the support writing index using IndexInput/Output. So if a rather than checking if merge is aborted or not, can we not see if IndexInput/Output is closed or not

Nice catch, we need to handle FaissException and do close indexinput/output manually, then throw MergeAbortedException to lucene

@jmazanec15
Copy link
Member

@luyuncheng right, but in the faiss InterruptCallback: https://github.com/facebookresearch/faiss/blob/657c563604c774461aed0394ae99210713145e03/faiss/impl/AuxIndexStructures.h#L135-L162, it operates globally. So, setting the instance in one thread, will set it in another thread.

So, for instance, here is the interrupt check in HNSW during graph build: https://github.com/facebookresearch/faiss/blob/657c563604c774461aed0394ae99210713145e03/faiss/IndexHNSW.cpp#L178-L180. If the interrupt gets set and another shard is building a graph on the node, wont this shard's graph build fail too?

@luyuncheng
Copy link
Collaborator Author

luyuncheng commented Feb 17, 2025

it operates globally. So, setting the instance in one thread, will set it in another thread.

@jmazanec15 exactly right, AuxIndexStructures is an singleton struct.

So, for instance, here is the interrupt check in HNSW during graph build: https://github.com/facebookresearch/faiss/blob/657c563604c774461aed0394ae99210713145e03/faiss/IndexHNSW.cpp#L178-L180. If the interrupt gets set and another shard is building a graph on the node, wont this shard's graph build fail too?

every different thread call want_interrupt have different return because current thread context is different because we override want_interrupt. also https://github.com/facebookresearch/faiss/blob/657c563604c774461aed0394ae99210713145e03/faiss/IndexHNSW.cpp#L139 interrupt is a local variable, so different thread would not throw exception

so every graph build would go into check if (InterruptCallback::is_interrupted()) but only the current merge thread which aborted would return true.

also, multi thread would get into lock area which would reduce graph build performance, but check_period would help reduce the impacts.

@jmazanec15
Copy link
Member

int[] parentIds
);

public static native void setMergeInterruptCallback();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove these and just have one interrupt callback registered at the time of library initialization? I dont think these need to be in the interface.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just prefer not to do this when its a global, static callback.

Copy link
Collaborator Author

@luyuncheng luyuncheng Jun 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jmazanec15 how about put it into FaissService#initLibrary with global init

@jmazanec15
Copy link
Member

@luyuncheng are you still working on this?

@luyuncheng
Copy link
Collaborator Author

@luyuncheng are you still working on this?

Yes, Sorry for the late. i am trying to find a way bypass the KNNMergeHelper for protect calling into OpenSearch. and trying to find a way for testing

Signed-off-by: luyuncheng <[email protected]>
Signed-off-by: luyuncheng <[email protected]>
Signed-off-by: luyuncheng <[email protected]>
Signed-off-by: luyuncheng <[email protected]>
Signed-off-by: luyuncheng <[email protected]>
Signed-off-by: luyuncheng <[email protected]>
Signed-off-by: luyuncheng <[email protected]>
@luyuncheng
Copy link
Collaborator Author

@jmazanec15 i found a way using reflect, it would get Accessable into ConcurrentMergeScheduler.MergeThread.class like following code:

public static boolean isMergeAborted() {
Thread mergeThread = Thread.currentThread();
if (mergeThread instanceof ConcurrentMergeScheduler.MergeThread) {
// return ((ConcurrentMergeScheduler.MergeThread) mergeThread).merge.isAborted();
try {
Object mergeObject = LucenePackagePrivateCaller.callPrivateFieldWithMethod(
ConcurrentMergeScheduler.MergeThread.class,
"merge",
"isAborted",
mergeThread
);
return ((Boolean) mergeObject).booleanValue();
} catch (RuntimeException e) {
return false;
}
}
return false;
}

and call into reflect as following which using to get org.apache.lucene.index.MergePolicy.OneMerge in org.apache.lucene.index.ConcurrentMergeScheduler.MergeThread
public class LucenePackagePrivateCaller {
public static Object callPrivateFieldWithMethod(Class<?> clz, String fieldName, String methodName, Object called) {
return AccessController.doPrivileged((PrivilegedAction<Object>) () -> {
try {
Field field = clz.getDeclaredField(fieldName);
field.setAccessible(true);
return callMethod(field.getType(), methodName, null, field.get(called), null);
} catch (Exception e) {
log.error("callPrivateFieldWithMethod", e);
throw new RuntimeException(e);
}
});
}

so we can test it without modified OpenSearch Core.
Also i added tests in KNN80DocValuesConsumerTests.java

@luyuncheng luyuncheng requested a review from jmazanec15 June 23, 2025 07:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants