Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log stack traces on data nodes before they are cleared for transport #125732

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ expensive messages that will usually be discarded:

Logging is an important behaviour of the system and sometimes deserves its own
unit tests, especially if there is complex logic for computing what is logged
and when to log it. You can use a `org.elasticsearch.test.MockLogAppender` to
and when to log it. You can use a `org.elasticsearch.test.MockLog` to
make assertions about the logs that are being emitted.

Logging is a powerful diagnostic technique, but it is not the only possibility.
Expand Down
5 changes: 5 additions & 0 deletions docs/changelog/125732.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 125732
summary: Log stack traces on data nodes before they are cleared for transport
area: Search
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,25 @@

import org.apache.http.entity.ContentType;
import org.apache.http.nio.entity.NByteArrayEntity;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.config.Configurator;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Request;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.ErrorTraceHelper;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.MockLog;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.xcontent.XContentType;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;

import java.io.IOException;
import java.nio.charset.Charset;
Expand All @@ -32,24 +41,41 @@
public class SearchErrorTraceIT extends HttpSmokeTestCase {
private BooleanSupplier hasStackTrace;

private static final String loggerName = "org.elasticsearch.search.SearchService";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably use SearchService.class here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, done.

private static Level originalLogLevel;

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopyNoNullElements(super.nodePlugins(), MockTransportService.TestPlugin.class);
}

@BeforeClass
public static void setDebugLogLevel() {
originalLogLevel = LogManager.getLogger(loggerName).getLevel();
Configurator.setLevel(loggerName, Level.DEBUG);
}

@AfterClass
public static void resetLogLevel() {
Configurator.setLevel(loggerName, originalLogLevel);
}

@Before
public void setupMessageListener() {
hasStackTrace = ErrorTraceHelper.setupErrorTraceListener(internalCluster());
}

private void setupIndexWithDocs() {
createIndex("test1", "test2");
private int setupIndexWithDocs() {
int numShards = between(DEFAULT_MIN_NUM_SHARDS, DEFAULT_MAX_NUM_SHARDS);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the number of shards is already randomized if you call createIndex . See ESIntegTestCase#numberOfShards.

createIndex("test1", Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards).build());
createIndex("test2", Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards).build());
indexRandom(
true,
prepareIndex("test1").setId("1").setSource("field", "foo"),
prepareIndex("test2").setId("10").setSource("field", 5)
);
refresh();
return numShards;
}

public void testSearchFailingQueryErrorTraceDefault() throws IOException {
Expand Down Expand Up @@ -108,6 +134,80 @@ public void testSearchFailingQueryErrorTraceFalse() throws IOException {
assertFalse(hasStackTrace.getAsBoolean());
}

public void testLoggingInSearchFailingQueryErrorTraceDefault() throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could probably fold this test into the errortrace true test? You could randomly set the flag to true, otherwise not, the result should be the same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(done)

int numShards = setupIndexWithDocs();

Request searchRequest = new Request("POST", "/_search");
searchRequest.setJsonEntity("""
{
"query": {
"simple_query_string" : {
"query": "foo",
"fields": ["field"]
}
}
}
""");

String errorTriggeringIndex = "test2";
try (var mockLog = MockLog.capture(SearchService.class)) {
ErrorTraceHelper.addSeenLoggingExpectations(numShards, mockLog, errorTriggeringIndex);

getRestClient().performRequest(searchRequest);
mockLog.assertAllExpectationsMatched();
}
}

public void testNoLoggingInSearchFailingQueryErrorTraceTrue() throws IOException {
int numShards = setupIndexWithDocs();

Request searchRequest = new Request("POST", "/_search");
searchRequest.setJsonEntity("""
{
"query": {
"simple_query_string" : {
"query": "foo",
"fields": ["field"]
}
}
}
""");

String errorTriggeringIndex = "test2";
try (var mockLog = MockLog.capture(SearchService.class)) {
ErrorTraceHelper.addUnseenLoggingExpectations(numShards, mockLog, errorTriggeringIndex);

searchRequest.addParameter("error_trace", "true");
getRestClient().performRequest(searchRequest);
mockLog.assertAllExpectationsMatched();
}
}

public void testLoggingInSearchFailingQueryErrorTraceFalse() throws IOException {
int numShards = setupIndexWithDocs();

Request searchRequest = new Request("POST", "/_search");
searchRequest.setJsonEntity("""
{
"query": {
"simple_query_string" : {
"query": "foo",
"fields": ["field"]
}
}
}
""");

String errorTriggeringIndex = "test2";
try (var mockLog = MockLog.capture(SearchService.class)) {
ErrorTraceHelper.addSeenLoggingExpectations(numShards, mockLog, errorTriggeringIndex);

searchRequest.addParameter("error_trace", "false");
getRestClient().performRequest(searchRequest);
mockLog.assertAllExpectationsMatched();
}
}

public void testMultiSearchFailingQueryErrorTraceDefault() throws IOException {
setupIndexWithDocs();

Expand Down Expand Up @@ -158,4 +258,73 @@ public void testMultiSearchFailingQueryErrorTraceFalse() throws IOException {

assertFalse(hasStackTrace.getAsBoolean());
}

public void testLoggingInMultiSearchFailingQueryErrorTraceDefault() throws IOException {
int numShards = setupIndexWithDocs();

XContentType contentType = XContentType.JSON;
MultiSearchRequest multiSearchRequest = new MultiSearchRequest().add(
new SearchRequest("test*").source(new SearchSourceBuilder().query(simpleQueryStringQuery("foo").field("field")))
);
Request searchRequest = new Request("POST", "/_msearch");
byte[] requestBody = MultiSearchRequest.writeMultiLineFormat(multiSearchRequest, contentType.xContent());
searchRequest.setEntity(
new NByteArrayEntity(requestBody, ContentType.create(contentType.mediaTypeWithoutParameters(), (Charset) null))
);

String errorTriggeringIndex = "test2";
try (var mockLog = MockLog.capture(SearchService.class)) {
ErrorTraceHelper.addSeenLoggingExpectations(numShards, mockLog, errorTriggeringIndex);

getRestClient().performRequest(searchRequest);
mockLog.assertAllExpectationsMatched();
}
}

public void testLoggingInMultiSearchFailingQueryErrorTraceTrue() throws IOException {
int numShards = setupIndexWithDocs();

XContentType contentType = XContentType.JSON;
MultiSearchRequest multiSearchRequest = new MultiSearchRequest().add(
new SearchRequest("test*").source(new SearchSourceBuilder().query(simpleQueryStringQuery("foo").field("field")))
);
Request searchRequest = new Request("POST", "/_msearch");
byte[] requestBody = MultiSearchRequest.writeMultiLineFormat(multiSearchRequest, contentType.xContent());
searchRequest.setEntity(
new NByteArrayEntity(requestBody, ContentType.create(contentType.mediaTypeWithoutParameters(), (Charset) null))
);

searchRequest.addParameter("error_trace", "true");

String errorTriggeringIndex = "test2";
try (var mockLog = MockLog.capture(SearchService.class)) {
ErrorTraceHelper.addUnseenLoggingExpectations(numShards, mockLog, errorTriggeringIndex);

getRestClient().performRequest(searchRequest);
mockLog.assertAllExpectationsMatched();
}
}

public void testLoggingInMultiSearchFailingQueryErrorTraceFalse() throws IOException {
int numShards = setupIndexWithDocs();

XContentType contentType = XContentType.JSON;
MultiSearchRequest multiSearchRequest = new MultiSearchRequest().add(
new SearchRequest("test*").source(new SearchSourceBuilder().query(simpleQueryStringQuery("foo").field("field")))
);
Request searchRequest = new Request("POST", "/_msearch");
byte[] requestBody = MultiSearchRequest.writeMultiLineFormat(multiSearchRequest, contentType.xContent());
searchRequest.setEntity(
new NByteArrayEntity(requestBody, ContentType.create(contentType.mediaTypeWithoutParameters(), (Charset) null))
);
searchRequest.addParameter("error_trace", "false");

String errorTriggeringIndex = "test2";
try (var mockLog = MockLog.capture(SearchService.class)) {
ErrorTraceHelper.addSeenLoggingExpectations(numShards, mockLog, errorTriggeringIndex);

getRestClient().performRequest(searchRequest);
mockLog.assertAllExpectationsMatched();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -456,8 +456,7 @@ public static void registerRequestHandler(TransportService transportService, Sea
(request, channel, task) -> searchService.executeQueryPhase(
request,
(SearchShardTask) task,
new ChannelActionListener<>(channel),
channel.getVersion()
new ChannelActionListener<>(channel)
)
);
TransportActionProxy.registerProxyAction(transportService, QUERY_ID_ACTION_NAME, true, QuerySearchResult::new);
Expand All @@ -469,8 +468,7 @@ public static void registerRequestHandler(TransportService transportService, Sea
(request, channel, task) -> searchService.executeQueryPhase(
request,
(SearchShardTask) task,
new ChannelActionListener<>(channel),
channel.getVersion()
new ChannelActionListener<>(channel)
)
);
TransportActionProxy.registerProxyAction(transportService, QUERY_SCROLL_ACTION_NAME, true, ScrollQuerySearchResult::new);
Expand Down
38 changes: 19 additions & 19 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.apache.lucene.search.TopDocs;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.ResolvedIndices;
Expand Down Expand Up @@ -156,6 +155,7 @@
import java.util.function.Supplier;

import static org.elasticsearch.TransportVersions.ERROR_TRACE_IN_TRANSPORT_HEADER;
import static org.elasticsearch.common.Strings.format;
import static org.elasticsearch.core.TimeValue.timeValueHours;
import static org.elasticsearch.core.TimeValue.timeValueMillis;
import static org.elasticsearch.core.TimeValue.timeValueMinutes;
Expand Down Expand Up @@ -537,21 +537,27 @@ protected void doClose() {
*
* @param <T> the type of the response
* @param listener the action listener to be wrapped
* @param version channel version of the request
* @param request the shard request being executed
* @param threadPool with context where to write the new header
* @param clusterService the cluster service
* @return the wrapped action listener
*/
static <T> ActionListener<T> maybeWrapListenerForStackTrace(
ActionListener<T> listener,
TransportVersion version,
ThreadPool threadPool
ShardSearchRequest request,
ThreadPool threadPool,
ClusterService clusterService
) {
boolean header = true;
if (version.onOrAfter(ERROR_TRACE_IN_TRANSPORT_HEADER) && threadPool.getThreadContext() != null) {
if (request.getChannelVersion().onOrAfter(ERROR_TRACE_IN_TRANSPORT_HEADER) && threadPool.getThreadContext() != null) {
header = Boolean.parseBoolean(threadPool.getThreadContext().getHeaderOrDefault("error_trace", "false"));
}
if (header == false) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not see where we are logging the trace? It seems we are only logging that its actually removed.

I am thinking we should log the exception as a WARN if:

  • header == false (meaning the trace wouldn't be provided user side)
  • the exception isn't a user exception (e.g. should be considered a 5xx)

However, I defer to @javanna here. He has much better context around what we are trying to achieve.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing e to the logger here outputs the stack trace. Maybe my log message isn't clear... think something like "search failed with exception:" would be better?

Interesting point on WARN - do you think a WARN log per shard for the same underlying error is acceptable here? Luca and I decided it would be difficult to dedupe these at the moment. It might become easier with batched query execution.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see.

I think that this shouldn't be debug. Maybe debug for all exceptions. But we should log WARN for things that are 5xx, indicating the failure.

I think adding a "Clearing stack trace before...." is unnecessary. Logging isn't just for our debugging, but also for users.

I am not sure indicating that the trace is being removed is overly useful here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the log message to be more clear for users, and raised the level WARN for 5XX. Thanks for the idea there - I think it makes sense for this new message to log at the same level as rest suppressed.

return listener.delegateResponse((l, e) -> {
logger.debug(
() -> format("[%s]%s Clearing stack trace before transport:", clusterService.localNode().getId(), request.shardId()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like the best place to add the logging indeed, because we ensure that we do the additional logging exclusively for the cases where we suppress the stack trace, before doing so.

If we log this at debug, we are not going to see it with the default log level, are we? I think we should use warn instead at least?

The error message looks a little misleading also, all we are interested in is the error itself, so I would log the same that we'd get on the coord node, but this time we'd get the stacktrace.

There's a couple more aspects that deserve attention I think:

  1. if we keep on logging on the coord node, we should probably only log in the data nodes when the error trace is not requested, otherwise we just add redundant logging?
  2. if we keep on logging on the coord node, it may happen that the node acting as coord node acts as a data node as well as part of serving a search request. That would lead to duplicated logging on that node, that may be ok but not ideal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the log message to be more clear for users and raised the level to WARN on the same condition that the rest suppressed logger logs at WARN.

  1. Agreed, and that is the current behavior as this log is only wrapped in if (header == false) {.
  2. That is true. I think the shard failure logs on the coord node (see my example below) are important, but an argument could be made to remove the rest suppressed log if error_trace=false. Then again rest.suppressed is only one log line. But I imagine removing any of these logs would count as a breaking change (?), as alerts out there (like our own) might rely on them.

e
);
ExceptionsHelper.unwrapCausesAndSuppressed(e, err -> {
err.setStackTrace(EMPTY_STACK_TRACE_ARRAY);
return false;
Expand All @@ -563,7 +569,7 @@ static <T> ActionListener<T> maybeWrapListenerForStackTrace(
}

public void executeDfsPhase(ShardSearchRequest request, SearchShardTask task, ActionListener<SearchPhaseResult> listener) {
listener = maybeWrapListenerForStackTrace(listener, request.getChannelVersion(), threadPool);
listener = maybeWrapListenerForStackTrace(listener, request, threadPool, clusterService);
final IndexShard shard = getShard(request);
rewriteAndFetchShardRequest(shard, request, listener.delegateFailure((l, rewritten) -> {
// fork the execution in the search thread pool
Expand Down Expand Up @@ -607,7 +613,7 @@ public void executeQueryPhase(ShardSearchRequest request, CancellableTask task,
rewriteAndFetchShardRequest(
shard,
request,
maybeWrapListenerForStackTrace(listener, request.getChannelVersion(), threadPool).delegateFailure((l, orig) -> {
maybeWrapListenerForStackTrace(listener, request, threadPool, clusterService).delegateFailure((l, orig) -> {
// check if we can shortcut the query phase entirely.
if (orig.canReturnNullResponseIfMatchNoDocs()) {
assert orig.scroll() == null;
Expand Down Expand Up @@ -805,9 +811,9 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, Cancella
}

public void executeRankFeaturePhase(RankFeatureShardRequest request, SearchShardTask task, ActionListener<RankFeatureResult> listener) {
listener = maybeWrapListenerForStackTrace(listener, request.getShardSearchRequest().getChannelVersion(), threadPool);
final ReaderContext readerContext = findReaderContext(request.contextId(), request);
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.getShardSearchRequest());
listener = maybeWrapListenerForStackTrace(listener, shardSearchRequest, threadPool, clusterService);
final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(shardSearchRequest));
runAsync(getExecutor(readerContext.indexShard()), () -> {
try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.RANK_FEATURE, false)) {
Expand Down Expand Up @@ -853,11 +859,11 @@ private QueryFetchSearchResult executeFetchPhase(ReaderContext reader, SearchCon
public void executeQueryPhase(
InternalScrollSearchRequest request,
SearchShardTask task,
ActionListener<ScrollQuerySearchResult> listener,
TransportVersion version
ActionListener<ScrollQuerySearchResult> listener
) {
listener = maybeWrapListenerForStackTrace(listener, version, threadPool);
final LegacyReaderContext readerContext = (LegacyReaderContext) findReaderContext(request.contextId(), request);
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null);
listener = maybeWrapListenerForStackTrace(listener, shardSearchRequest, threadPool, clusterService);
final Releasable markAsUsed;
try {
markAsUsed = readerContext.markAsUsed(getScrollKeepAlive(request.scroll()));
Expand All @@ -867,7 +873,6 @@ public void executeQueryPhase(
throw e;
}
runAsync(getExecutor(readerContext.indexShard()), () -> {
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null);
try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.QUERY, false);) {
var opsListener = searchContext.indexShard().getSearchOperationListener();
final long beforeQueryTime = System.nanoTime();
Expand Down Expand Up @@ -899,15 +904,10 @@ public void executeQueryPhase(
* It is the responsibility of the caller to ensure that the ref count is correctly decremented
* when the object is no longer needed.
*/
public void executeQueryPhase(
QuerySearchRequest request,
SearchShardTask task,
ActionListener<QuerySearchResult> listener,
TransportVersion version
) {
listener = maybeWrapListenerForStackTrace(listener, version, threadPool);
public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task, ActionListener<QuerySearchResult> listener) {
final ReaderContext readerContext = findReaderContext(request.contextId(), request.shardSearchRequest());
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.shardSearchRequest());
listener = maybeWrapListenerForStackTrace(listener, shardSearchRequest, threadPool, clusterService);
final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(shardSearchRequest));
rewriteAndFetchShardRequest(readerContext.indexShard(), shardSearchRequest, listener.delegateFailure((l, rewritten) -> {
// fork the execution in the search thread pool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2630,8 +2630,7 @@ public void testDfsQueryPhaseRewrite() {
service.executeQueryPhase(
new QuerySearchRequest(null, context.id(), request, new AggregatedDfs(Map.of(), Map.of(), 10)),
new SearchShardTask(42L, "", "", "", null, emptyMap()),
plainActionFuture,
TransportVersion.current()
plainActionFuture
);

plainActionFuture.actionGet();
Expand Down
Loading
Loading