Skip to content

Fix SearchErrorTraceIT and friends to work with batched query execution #127150

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ public static void setDebugLogLevel() {
@Before
public void setupMessageListener() {
hasStackTrace = ErrorTraceHelper.setupErrorTraceListener(internalCluster());
// TODO: make this test work with batched query execution by enhancing ErrorTraceHelper.setupErrorTraceListener
updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false));
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ private void onNodeQueryFailure(Exception e, NodeQueryRequest request, CanMatchP
}
}

private static final String NODE_SEARCH_ACTION_NAME = "indices:data/read/search[query][n]";
public static final String NODE_SEARCH_ACTION_NAME = "indices:data/read/search[query][n]";

static void registerNodeSearchAction(
SearchTransportService searchTransportService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ void sendResponse(
) {
assert assertValidTransportVersion(transportVersion);
assert response.hasReferences();
var messageListener = this.messageListener;
messageListener.onBeforeResponseSent(requestId, action, response);
try {
sendMessage(
channel,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ default void onRequestReceived(long requestId, String action) {}
*/
default void onResponseSent(long requestId, String action) {}

/**
* Called for every action response sent before the response has been passed to the underlying network implementation.
* @param requestId the request ID (unique per client)
* @param action the request action
* @param response response instance
*/
default void onBeforeResponseSent(long requestId, String action, TransportResponse response) {}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DaveCTurner wdyt? this should be ok right? In prod the overhead is negligible I think.


/***
* Called for every failed action response after the response has been passed to the underlying network implementation.
* @param requestId the request ID (unique per client)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@

import org.apache.logging.log4j.Level;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.search.SearchQueryThenFetchAsyncAction;
import org.elasticsearch.index.query.QueryShardException;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.MockLog;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportMessageListener;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;

import java.util.Arrays;
Expand All @@ -38,20 +40,36 @@ public enum ErrorTraceHelper {

public static BooleanSupplier setupErrorTraceListener(InternalTestCluster internalCluster) {
final AtomicBoolean transportMessageHasStackTrace = new AtomicBoolean(false);
internalCluster.getDataNodeInstances(TransportService.class)
.forEach(ts -> asInstanceOf(MockTransportService.class, ts).addMessageListener(new TransportMessageListener() {
internalCluster.getDataNodeInstances(TransportService.class).forEach(ts -> {
var mockTs = asInstanceOf(MockTransportService.class, ts);
mockTs.addMessageListener(new TransportMessageListener() {
@Override
public void onResponseSent(long requestId, String action, Exception error) {
TransportMessageListener.super.onResponseSent(requestId, action, error);
if (action.startsWith("indices:data/read/search")) {
Optional<Throwable> throwable = ExceptionsHelper.unwrapCausesAndSuppressed(
error,
t -> t.getStackTrace().length > 0
);
transportMessageHasStackTrace.set(throwable.isPresent());
checkStacktraceStateAndRemove(error, mockTs);
}
}
}));

@Override
public void onBeforeResponseSent(long requestId, String action, TransportResponse response) {
if (SearchQueryThenFetchAsyncAction.NODE_SEARCH_ACTION_NAME.equals(action)) {
var r = asInstanceOf(SearchQueryThenFetchAsyncAction.NodeQueryResponse.class, response);
for (Object result : r.getResults()) {
if (result instanceof Exception error) {
checkStacktraceStateAndRemove(error, mockTs);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is looping + setting the transportMessageHasStackTrace correct here? What if one result has a stack trace: transportMessageHasStackTrace.set(true), then a result in a future iteration doesn't: transportMessageHasStackTrace.set(false). Then transportMessageHasStackTrace really means "did the last-checked shard have a stack trace." I believe our current test cases trigger errors on all shards so the issue is not noticeable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not great, I sort of tried to point this out via the fact that we only see a single request per thread in the op too.
I just didn't want to refactor the test here, it seems we could invert the logic here easily and set a boolean "expectsStacktrace" or so in the transport message listener and then simply assert inline instead of after the fact. If we can only communicate one bit back from the listener there isn't really a mathematical way to cleanly assert on it for multiple things :P

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this do something like: capture the existence of a stack trace for ALL exception results, and:

  • if they're all true: transportMessageHasStackTrace.set(true)
  • if they're all false: transportMessageHasStackTrace.set(false)
  • else there's something wrong, so throw some assertion error

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right that would work but also make this even harder to follow? If we want to fix this my vote would be to invest 5 more minutes here and just assert inline so that we can pass the expectation for everything to the listener at the beginning of each test? :) Otherwise if we go for the 3 outcome logic, we'll have some inline assertions and some "at the end of the test" assertions mixed, that's just needlessly complex?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see what you mean now - throw out transportMessageHasStackTrace and pass the "expectsStacktrace" to assert against inside the onBeforeResponseSent override. I agree that's better than the mixed assertions.

}
}
}
}

private void checkStacktraceStateAndRemove(Exception error, MockTransportService mockTs) {
Optional<Throwable> throwable = ExceptionsHelper.unwrapCausesAndSuppressed(error, t -> t.getStackTrace().length > 0);
transportMessageHasStackTrace.set(throwable.isPresent());
mockTs.removeMessageListener(this);
}
});
});
return transportMessageHasStackTrace::get;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.elasticsearch.transport.TransportMessageListener;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.TransportSettings;
import org.elasticsearch.transport.netty4.Netty4Transport;
Expand Down Expand Up @@ -849,6 +850,12 @@ public void onResponseSent(long requestId, String action) {
messageListener.onResponseSent(requestId, action);
}

@Override
public void onBeforeResponseSent(long requestId, String action, TransportResponse response) {
super.onBeforeResponseSent(requestId, action, response);
messageListener.onBeforeResponseSent(requestId, action, response);
}

@Override
public void onResponseSent(long requestId, String action, Exception e) {
super.onResponseSent(requestId, action, e);
Expand Down Expand Up @@ -901,6 +908,13 @@ public void onRequestSent(
}
}

@Override
public void onBeforeResponseSent(long requestId, String action, TransportResponse response) {
for (TransportMessageListener listener : listeners) {
listener.onBeforeResponseSent(requestId, action, response);
}
}

@Override
@SuppressWarnings("rawtypes")
public void onResponseReceived(long requestId, Transport.ResponseContext holder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ public static void setDebugLogLevel() {
@Before
public void setupMessageListener() {
transportMessageHasStackTrace = ErrorTraceHelper.setupErrorTraceListener(internalCluster());
// TODO: make this test work with batched query execution by enhancing ErrorTraceHelper.setupErrorTraceListener
updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false));
}

@After
Expand Down