-
Notifications
You must be signed in to change notification settings - Fork 819
SOLR-18124: Add tracing spans for UpdateLog replay #4216
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
8eaf9d1
2f4ebeb
ae3003f
d588332
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| # Learnings History | ||
|
|
||
| > Add a new dated entry for every contribution, rerun, or discovery so future agents can read the evolving context without scanning every PR. | ||
|
|
||
| ## 2026-03-15 | ||
| - Created and pushed branch `SOLR-18124-updatelog-replay-tracing`; added tracing spans around UpdateLog replay, per-log spans, and metadata along with the `UpdateLogReplayTracingTest` verification. | ||
| - Documented instrumentation details and test failures/mitigations in AGENTS.md for future readers. | ||
| - Opened PR #4216 targeting SOLR-18124 and noted the need to rerun the new tracing tests before merge. | ||
|
|
||
| ## 2026-03-15 (follow-up) | ||
| - Resolved test setup issues in `UpdateLogReplayTracingTest` by wiring leader-distributed params, `jsonAdd` calls, and run metadata in isolation; test iterations now compile. | ||
| - Installed Java 21 locally, set `GRADLE_USER_HOME` to avoid permission issues, and ran: | ||
| - `./gradlew :solr:core:test --tests org.apache.solr.update.UpdateLogReplayTracingTest` | ||
| - `./gradlew :solr:core:test --tests org.apache.solr.update.PeerSyncWithBufferUpdatesTest` | ||
| both succeeded after the fix. | ||
| - Pushed update to PR #4216 and left a comment summarizing the completed local checks. | ||
|
|
||
| ## 2026-03-16 | ||
| - Reintroduced an automated `solr.security.allow.paths` preload inside `SolrTestCase.beforeSolrTestCase()` so tests that reference `ExternalPaths.SERVER_HOME` no longer need to set the system property manually, and cleaned up the few remaining test files accordingly (including the `RenameCoreAPITest` formatting fix). | ||
| - Verified `RenameCoreAPITest` again under Java 21 using `./gradlew :solr:core:test --tests org.apache.solr.handler.admin.api.RenameCoreAPITest` (build succeeded). |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,6 +22,9 @@ | |
| import com.carrotsearch.hppc.LongHashSet; | ||
| import com.carrotsearch.hppc.LongSet; | ||
| import io.opentelemetry.api.common.Attributes; | ||
| import io.opentelemetry.api.trace.Span; | ||
| import io.opentelemetry.api.trace.StatusCode; | ||
| import io.opentelemetry.context.Scope; | ||
| import java.io.Closeable; | ||
| import java.io.IOException; | ||
| import java.io.UncheckedIOException; | ||
|
|
@@ -88,10 +91,12 @@ | |
| import org.apache.solr.update.processor.DistributedUpdateProcessor; | ||
| import org.apache.solr.update.processor.UpdateRequestProcessor; | ||
| import org.apache.solr.update.processor.UpdateRequestProcessorChain; | ||
| import org.apache.solr.update.processor.UpdateRequestProcessorFactory; | ||
| import org.apache.solr.util.OrderedExecutor; | ||
| import org.apache.solr.util.RefCounted; | ||
| import org.apache.solr.util.TestInjection; | ||
| import org.apache.solr.util.TimeOut; | ||
| import org.apache.solr.util.tracing.TraceUtils; | ||
| import org.apache.solr.util.plugin.PluginInfoInitialized; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
@@ -107,6 +112,8 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { | |
| public static String LOG_FILENAME_PATTERN = "%s.%019d"; | ||
| public static String TLOG_NAME = "tlog"; | ||
| public static String BUFFER_TLOG_NAME = "buffer.tlog"; | ||
| private static final String UPDATELOG_REPLAY_SPAN_NAME = "updatelog.replay"; | ||
| private static final String UPDATELOG_REPLAY_LOG_SPAN_NAME = "updatelog.replay.log"; | ||
|
|
||
| private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); | ||
| private boolean debug = log.isDebugEnabled(); | ||
|
|
@@ -2129,36 +2136,69 @@ public void run() { | |
| // setting request info will help logging | ||
| SolrRequestInfo.setRequestInfo(new SolrRequestInfo(req, rsp)); | ||
|
|
||
| try { | ||
| for (; ; ) { | ||
| TransactionLog translog = translogs.pollFirst(); | ||
| if (translog == null) break; | ||
| doReplay(translog); | ||
| } | ||
| } catch (SolrException e) { | ||
| if (e.code() == ErrorCode.SERVICE_UNAVAILABLE.code) { | ||
| log.error("Replay failed service unavailable", e); | ||
| recoveryInfo.failed = true; | ||
| } else { | ||
| final int initialLogCount = translogs.size(); | ||
| int logsReplayed = 0; | ||
| long replayedOps = 0; | ||
| final int replayErrorsStart = recoveryInfo.errors.get(); | ||
| final Span replaySpan = | ||
| TraceUtils.getGlobalTracer().spanBuilder(UPDATELOG_REPLAY_SPAN_NAME).startSpan(); | ||
| TraceUtils.ifNotNoop( | ||
| replaySpan, | ||
| span -> { | ||
| span.setAttribute("updatelog.replay.state", state.toString()); | ||
| span.setAttribute("updatelog.replay.active_log", activeLog); | ||
| span.setAttribute("updatelog.replay.in_sorted_order", inSortedOrder); | ||
| span.setAttribute("updatelog.replay.logs_total", initialLogCount); | ||
| span.setAttribute("updatelog.replay.core", req.getCore().getName()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably remove, assuming redundant with the line below |
||
| TraceUtils.setDbInstance(span, req.getCore().getName()); | ||
| }); | ||
|
|
||
| try (Scope scope = replaySpan.makeCurrent()) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we avoid adding a try-finally when I see one here already? |
||
| assert scope != null; | ||
| try { | ||
| for (; ; ) { | ||
| TransactionLog translog = translogs.pollFirst(); | ||
| if (translog == null) break; | ||
| replayedOps += doReplay(translog); | ||
| logsReplayed++; | ||
| } | ||
| } catch (SolrException e) { | ||
| if (e.code() == ErrorCode.SERVICE_UNAVAILABLE.code) { | ||
| log.error("Replay failed service unavailable", e); | ||
| recoveryInfo.failed = true; | ||
| } else { | ||
| recoveryInfo.errors.incrementAndGet(); | ||
| log.error("Replay failed due to exception", e); | ||
| } | ||
| replaySpan.recordException(e); | ||
| replaySpan.setStatus(StatusCode.ERROR); | ||
| } catch (Exception e) { | ||
| recoveryInfo.errors.incrementAndGet(); | ||
| log.error("Replay failed due to exception", e); | ||
| replaySpan.recordException(e); | ||
| replaySpan.setStatus(StatusCode.ERROR); | ||
| } finally { | ||
| // change the state while updates are still blocked to prevent races | ||
| state = State.ACTIVE; | ||
| if (finishing) { | ||
| updateLocks.unblockUpdates(); | ||
| } | ||
|
|
||
| // clean up in case we hit some unexpected exception and didn't get | ||
| // to more transaction logs | ||
| for (TransactionLog translog : translogs) { | ||
| log.error("ERROR: didn't get to recover from tlog {}", translog); | ||
| translog.decref(); | ||
| } | ||
| } | ||
| } catch (Exception e) { | ||
| recoveryInfo.errors.incrementAndGet(); | ||
| log.error("Replay failed due to exception", e); | ||
| } finally { | ||
| // change the state while updates are still blocked to prevent races | ||
| state = State.ACTIVE; | ||
| if (finishing) { | ||
| updateLocks.unblockUpdates(); | ||
| } | ||
|
|
||
| // clean up in case we hit some unexpected exception and didn't get | ||
| // to more transaction logs | ||
| for (TransactionLog translog : translogs) { | ||
| log.error("ERROR: didn't get to recover from tlog {}", translog); | ||
| translog.decref(); | ||
| if (replaySpan.isRecording()) { | ||
| replaySpan.setAttribute("updatelog.replay.logs_replayed", logsReplayed); | ||
| replaySpan.setAttribute("updatelog.replay.ops_replayed", replayedOps); | ||
| replaySpan.setAttribute( | ||
| "updatelog.replay.errors", recoveryInfo.errors.get() - replayErrorsStart); | ||
| } | ||
| replaySpan.end(); | ||
| } | ||
|
|
||
| loglog.warn("Log replay finished. recoveryInfo={}", recoveryInfo); | ||
|
|
@@ -2168,8 +2208,24 @@ public void run() { | |
| SolrRequestInfo.clearRequestInfo(); | ||
| } | ||
|
|
||
| public void doReplay(TransactionLog translog) { | ||
| try { | ||
| public long doReplay(TransactionLog translog) { | ||
| long replayedOps = 0L; | ||
| final int replayErrorsStart = recoveryInfo.errors.get(); | ||
| final Span replayLogSpan = | ||
| TraceUtils.getGlobalTracer().spanBuilder(UPDATELOG_REPLAY_LOG_SPAN_NAME).startSpan(); | ||
| TraceUtils.ifNotNoop( | ||
| replayLogSpan, | ||
| span -> { | ||
| if (translog.tlog != null) { | ||
| span.setAttribute("updatelog.replay.log_file", translog.tlog.getFileName().toString()); | ||
| } | ||
| span.setAttribute("updatelog.replay.log_size_bytes", translog.getLogSize()); | ||
| span.setAttribute("updatelog.replay.active_log", activeLog); | ||
| span.setAttribute("updatelog.replay.in_sorted_order", inSortedOrder); | ||
| }); | ||
| boolean replayLogSucceeded = false; | ||
| try (Scope scope = replayLogSpan.makeCurrent()) { | ||
| assert scope != null; | ||
| loglog.warn( | ||
| "Starting log replay {} active={} starting pos={} inSortedOrder={}", | ||
| translog, | ||
|
|
@@ -2192,6 +2248,11 @@ public void doReplay(TransactionLog translog) { | |
|
|
||
| // Use a pool of URPs using a ThreadLocal to have them per-thread. URPs aren't threadsafe. | ||
| UpdateRequestProcessorChain processorChain = req.getCore().getUpdateProcessingChain(null); | ||
| TraceUtils.ifNotNoop( | ||
| replayLogSpan, | ||
| span -> | ||
| span.setAttribute( | ||
| "updatelog.replay.urp_chain", summarizeProcessorChain(processorChain))); | ||
| Collection<UpdateRequestProcessor> procPool = | ||
| Collections.synchronizedList(new ArrayList<>()); | ||
| ThreadLocal<UpdateRequestProcessor> procThreadLocal = | ||
|
|
@@ -2287,6 +2348,7 @@ public void doReplay(TransactionLog translog) { | |
| case UpdateLog.ADD: | ||
| { | ||
| recoveryInfo.adds++; | ||
| replayedOps++; | ||
| AddUpdateCommand cmd = | ||
| convertTlogEntryToAddUpdateCommand(req, entry, oper, version); | ||
| cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT); | ||
|
|
@@ -2297,6 +2359,7 @@ public void doReplay(TransactionLog translog) { | |
| case UpdateLog.DELETE: | ||
| { | ||
| recoveryInfo.deletes++; | ||
| replayedOps++; | ||
| byte[] idBytes = (byte[]) entry.get(2); | ||
| DeleteUpdateCommand cmd = new DeleteUpdateCommand(req); | ||
| cmd.setIndexedId(new BytesRef(idBytes)); | ||
|
|
@@ -2310,6 +2373,7 @@ public void doReplay(TransactionLog translog) { | |
| case UpdateLog.DELETE_BY_QUERY: | ||
| { | ||
| recoveryInfo.deleteByQuery++; | ||
| replayedOps++; | ||
| String query = (String) entry.get(2); | ||
| DeleteUpdateCommand cmd = new DeleteUpdateCommand(req); | ||
| cmd.query = query; | ||
|
|
@@ -2346,10 +2410,12 @@ public void doReplay(TransactionLog translog) { | |
| } catch (ClassCastException cl) { | ||
| recoveryInfo.errors.incrementAndGet(); | ||
| loglog.warn("REPLAY_ERR: Unexpected log entry or corrupt log. Entry={}", o, cl); | ||
| replayLogSpan.recordException(cl); | ||
| // would be caused by a corrupt transaction log | ||
| } catch (Exception ex) { | ||
| recoveryInfo.errors.incrementAndGet(); | ||
| loglog.warn("REPLAY_ERR: Exception replaying log", ex); | ||
| replayLogSpan.recordException(ex); | ||
| // something wrong with the request? | ||
| } | ||
| assert TestInjection.injectUpdateLogReplayRandomPause(); | ||
|
|
@@ -2389,11 +2455,38 @@ public void doReplay(TransactionLog translog) { | |
| IOUtils.closeQuietly(proc); | ||
| } | ||
| } | ||
| replayLogSucceeded = true; | ||
|
|
||
| } finally { | ||
| if (tlogReader != null) tlogReader.close(); | ||
| translog.decref(); | ||
| final int replayErrors = recoveryInfo.errors.get() - replayErrorsStart; | ||
| if (replayLogSpan.isRecording()) { | ||
| replayLogSpan.setAttribute("updatelog.replay.log_ops", replayedOps); | ||
| replayLogSpan.setAttribute("updatelog.replay.log_errors", replayErrors); | ||
| replayLogSpan.setAttribute("updatelog.replay.log_success", replayLogSucceeded); | ||
| } | ||
| if (!replayLogSucceeded || replayErrors > 0) { | ||
| replayLogSpan.setStatus(StatusCode.ERROR); | ||
| } | ||
| replayLogSpan.end(); | ||
| } | ||
| return replayedOps; | ||
| } | ||
|
|
||
| private String summarizeProcessorChain(UpdateRequestProcessorChain processorChain) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lets put this on UpdateRequestProcessorChain.toString(). |
||
| if (processorChain == null) { | ||
| return "none"; | ||
| } | ||
| List<UpdateRequestProcessorFactory> processors = processorChain.getProcessors(); | ||
| if (processors == null || processors.isEmpty()) { | ||
| return "empty"; | ||
| } | ||
| List<String> names = new ArrayList<>(processors.size()); | ||
| for (UpdateRequestProcessorFactory processor : processors) { | ||
| names.add(processor.getClass().getSimpleName()); | ||
| } | ||
| return String.join(">", names); | ||
| } | ||
|
|
||
| private void waitForAllUpdatesGetExecuted(AtomicInteger pendingTasks) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
honestly just inline these. The constants serve no purpose. I know this is a matter of taste. The practice of constants spreads readability around thus reducing readability.