-
Notifications
You must be signed in to change notification settings - Fork 25.3k
Log stack traces on data nodes before they are cleared for transport #125732
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
fdaba56
79a06aa
e38d20c
6cb677f
78d58a4
b34afc1
9f527eb
8eca23f
93eddcc
47751a8
b4c5baa
93e4625
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
pr: 125732 | ||
summary: Log stack traces on data nodes before they are cleared for transport | ||
area: Search | ||
type: bug | ||
issues: [] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,7 +18,6 @@ | |
import org.apache.lucene.search.TopDocs; | ||
import org.elasticsearch.ElasticsearchException; | ||
import org.elasticsearch.ExceptionsHelper; | ||
import org.elasticsearch.TransportVersion; | ||
import org.elasticsearch.action.ActionListener; | ||
import org.elasticsearch.action.ActionRunnable; | ||
import org.elasticsearch.action.ResolvedIndices; | ||
|
@@ -156,6 +155,7 @@ | |
import java.util.function.Supplier; | ||
|
||
import static org.elasticsearch.TransportVersions.ERROR_TRACE_IN_TRANSPORT_HEADER; | ||
import static org.elasticsearch.common.Strings.format; | ||
import static org.elasticsearch.core.TimeValue.timeValueHours; | ||
import static org.elasticsearch.core.TimeValue.timeValueMillis; | ||
import static org.elasticsearch.core.TimeValue.timeValueMinutes; | ||
|
@@ -537,21 +537,27 @@ protected void doClose() { | |
* | ||
* @param <T> the type of the response | ||
* @param listener the action listener to be wrapped | ||
* @param version channel version of the request | ||
* @param request the shard request being executed | ||
* @param threadPool with context where to write the new header | ||
* @param clusterService the cluster service | ||
* @return the wrapped action listener | ||
*/ | ||
static <T> ActionListener<T> maybeWrapListenerForStackTrace( | ||
ActionListener<T> listener, | ||
TransportVersion version, | ||
ThreadPool threadPool | ||
ShardSearchRequest request, | ||
ThreadPool threadPool, | ||
ClusterService clusterService | ||
javanna marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) { | ||
boolean header = true; | ||
if (version.onOrAfter(ERROR_TRACE_IN_TRANSPORT_HEADER) && threadPool.getThreadContext() != null) { | ||
if (request.getChannelVersion().onOrAfter(ERROR_TRACE_IN_TRANSPORT_HEADER) && threadPool.getThreadContext() != null) { | ||
header = Boolean.parseBoolean(threadPool.getThreadContext().getHeaderOrDefault("error_trace", "false")); | ||
} | ||
if (header == false) { | ||
javanna marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return listener.delegateResponse((l, e) -> { | ||
logger.debug( | ||
() -> format("[%s]%s Clearing stack trace before transport:", clusterService.localNode().getId(), request.shardId()), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks like the best place to add the logging indeed, because we ensure that we do the additional logging exclusively for the cases where we suppress the stack trace, before doing so. If we log this at debug, we are not going to see it with the default log level, are we? I think we should use warn instead at least? The error message looks a little misleading also, all we are interested in is the error itself, so I would log the same that we'd get on the coord node, but this time we'd get the stacktrace. There's a couple more aspects that deserve attention I think:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've updated the log message to be more clear for users and raised the level to
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wouldn't think changing the way we log is a breaking change. But I think this could be a follow-up. |
||
e | ||
); | ||
ExceptionsHelper.unwrapCausesAndSuppressed(e, err -> { | ||
err.setStackTrace(EMPTY_STACK_TRACE_ARRAY); | ||
return false; | ||
|
@@ -563,7 +569,7 @@ static <T> ActionListener<T> maybeWrapListenerForStackTrace( | |
} | ||
|
||
public void executeDfsPhase(ShardSearchRequest request, SearchShardTask task, ActionListener<SearchPhaseResult> listener) { | ||
listener = maybeWrapListenerForStackTrace(listener, request.getChannelVersion(), threadPool); | ||
listener = maybeWrapListenerForStackTrace(listener, request, threadPool, clusterService); | ||
final IndexShard shard = getShard(request); | ||
rewriteAndFetchShardRequest(shard, request, listener.delegateFailure((l, rewritten) -> { | ||
// fork the execution in the search thread pool | ||
|
@@ -607,7 +613,7 @@ public void executeQueryPhase(ShardSearchRequest request, CancellableTask task, | |
rewriteAndFetchShardRequest( | ||
shard, | ||
request, | ||
maybeWrapListenerForStackTrace(listener, request.getChannelVersion(), threadPool).delegateFailure((l, orig) -> { | ||
maybeWrapListenerForStackTrace(listener, request, threadPool, clusterService).delegateFailure((l, orig) -> { | ||
// check if we can shortcut the query phase entirely. | ||
if (orig.canReturnNullResponseIfMatchNoDocs()) { | ||
assert orig.scroll() == null; | ||
|
@@ -805,9 +811,9 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, Cancella | |
} | ||
|
||
public void executeRankFeaturePhase(RankFeatureShardRequest request, SearchShardTask task, ActionListener<RankFeatureResult> listener) { | ||
listener = maybeWrapListenerForStackTrace(listener, request.getShardSearchRequest().getChannelVersion(), threadPool); | ||
final ReaderContext readerContext = findReaderContext(request.contextId(), request); | ||
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.getShardSearchRequest()); | ||
listener = maybeWrapListenerForStackTrace(listener, shardSearchRequest, threadPool, clusterService); | ||
final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(shardSearchRequest)); | ||
runAsync(getExecutor(readerContext.indexShard()), () -> { | ||
try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.RANK_FEATURE, false)) { | ||
|
@@ -853,11 +859,11 @@ private QueryFetchSearchResult executeFetchPhase(ReaderContext reader, SearchCon | |
public void executeQueryPhase( | ||
InternalScrollSearchRequest request, | ||
SearchShardTask task, | ||
ActionListener<ScrollQuerySearchResult> listener, | ||
TransportVersion version | ||
ActionListener<ScrollQuerySearchResult> listener | ||
) { | ||
listener = maybeWrapListenerForStackTrace(listener, version, threadPool); | ||
final LegacyReaderContext readerContext = (LegacyReaderContext) findReaderContext(request.contextId(), request); | ||
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null); | ||
listener = maybeWrapListenerForStackTrace(listener, shardSearchRequest, threadPool, clusterService); | ||
javanna marked this conversation as resolved.
Show resolved
Hide resolved
|
||
final Releasable markAsUsed; | ||
try { | ||
markAsUsed = readerContext.markAsUsed(getScrollKeepAlive(request.scroll())); | ||
|
@@ -867,7 +873,6 @@ public void executeQueryPhase( | |
throw e; | ||
} | ||
runAsync(getExecutor(readerContext.indexShard()), () -> { | ||
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null); | ||
try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.QUERY, false);) { | ||
var opsListener = searchContext.indexShard().getSearchOperationListener(); | ||
final long beforeQueryTime = System.nanoTime(); | ||
|
@@ -899,15 +904,10 @@ public void executeQueryPhase( | |
* It is the responsibility of the caller to ensure that the ref count is correctly decremented | ||
* when the object is no longer needed. | ||
*/ | ||
public void executeQueryPhase( | ||
QuerySearchRequest request, | ||
SearchShardTask task, | ||
ActionListener<QuerySearchResult> listener, | ||
TransportVersion version | ||
) { | ||
listener = maybeWrapListenerForStackTrace(listener, version, threadPool); | ||
public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task, ActionListener<QuerySearchResult> listener) { | ||
final ReaderContext readerContext = findReaderContext(request.contextId(), request.shardSearchRequest()); | ||
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.shardSearchRequest()); | ||
listener = maybeWrapListenerForStackTrace(listener, shardSearchRequest, threadPool, clusterService); | ||
final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(shardSearchRequest)); | ||
rewriteAndFetchShardRequest(readerContext.indexShard(), shardSearchRequest, listener.delegateFailure((l, rewritten) -> { | ||
// fork the execution in the search thread pool | ||
|
Uh oh!
There was an error while loading. Please reload this page.