Skip to content

Commit c590b83

Browse files
Fixing BWC tests
Signed-off-by: Martin Gaievski <[email protected]>
1 parent 3a9ff77 commit c590b83

File tree

2 files changed

+23
-6
lines changed

2 files changed

+23
-6
lines changed

server/src/main/java/org/opensearch/search/pipeline/Pipeline.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,13 +289,19 @@ <Result extends SearchPhaseResult> void runSearchPhaseResultsTransformer(
289289
PipelineProcessingContext requestContext
290290
) throws SearchPipelineProcessingException {
291291
long pipelineStart = relativeTimeSupplier.getAsLong();
292+
boolean processorsExecuted = false;
293+
boolean processorsFailed = false;
294+
295+
// Always track phase results metrics to maintain consistent state
292296
beforeTransformPhaseResults();
297+
293298
try {
294299
for (SearchPhaseResultsProcessor searchPhaseResultsProcessor : searchPhaseResultsProcessors) {
295300
beforePhaseResultsProcessor(searchPhaseResultsProcessor);
296301
long start = relativeTimeSupplier.getAsLong();
297302
if (currentPhase.equals(searchPhaseResultsProcessor.getBeforePhase().getName())
298303
&& nextPhase.equals(searchPhaseResultsProcessor.getAfterPhase().getName())) {
304+
processorsExecuted = true;
299305
try {
300306
searchPhaseResultsProcessor.process(searchPhaseResult, context, requestContext);
301307
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start);
@@ -315,6 +321,7 @@ <Result extends SearchPhaseResult> void runSearchPhaseResultsTransformer(
315321
);
316322
} else {
317323
// Only track pipeline-level failure when processor failure is not ignored
324+
processorsFailed = true;
318325
onTransformPhaseResultsFailure();
319326
throw e;
320327
}
@@ -327,9 +334,15 @@ <Result extends SearchPhaseResult> void runSearchPhaseResultsTransformer(
327334
} finally {
328335
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart);
329336
afterTransformPhaseResults(took);
337+
// Notify that processors actually executed for pipeline request tracking
338+
if (processorsExecuted) {
339+
onPhaseResultsProcessorsExecuted(took, processorsFailed);
340+
}
330341
}
331342
}
332343

344+
protected void onPhaseResultsProcessorsExecuted(long timeInNanos, boolean failed) {}
345+
333346
static final Pipeline NO_OP_PIPELINE = new Pipeline(
334347
SearchPipelineService.NOOP_PIPELINE_ID,
335348
"Pipeline that does not transform anything",

server/src/main/java/org/opensearch/search/pipeline/PipelineWithMetrics.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -249,24 +249,18 @@ protected void onResponseProcessorFailed(Processor processor) {
249249
protected void beforeTransformPhaseResults() {
250250
super.beforeTransformPhaseResults();
251251
totalPhaseResultsMetrics.before();
252-
// Phase results processing is also tracked as part of individual pipeline request processing
253-
pipelineRequestMetrics.before();
254252
}
255253

256254
@Override
257255
protected void afterTransformPhaseResults(long timeInNanos) {
258256
super.afterTransformPhaseResults(timeInNanos);
259257
totalPhaseResultsMetrics.after(timeInNanos);
260-
// Phase results processing is also tracked as part of individual pipeline request processing
261-
pipelineRequestMetrics.after(timeInNanos);
262258
}
263259

264260
@Override
265261
protected void onTransformPhaseResultsFailure() {
266262
super.onTransformPhaseResultsFailure();
267263
totalPhaseResultsMetrics.failed();
268-
// Phase results processing failures are also tracked as part of individual pipeline request processing
269-
pipelineRequestMetrics.failed();
270264
}
271265

272266
protected void beforePhaseResultsProcessor(Processor processor) {
@@ -281,6 +275,16 @@ protected void onPhaseResultsProcessorFailed(Processor processor) {
281275
phaseResultsProcessorMetrics.get(getProcessorKey(processor)).failed();
282276
}
283277

278+
@Override
279+
protected void onPhaseResultsProcessorsExecuted(long timeInNanos, boolean failed) {
280+
// Only track as pipeline request when phase results processors actually execute
281+
pipelineRequestMetrics.before();
282+
pipelineRequestMetrics.after(timeInNanos);
283+
if (failed) {
284+
pipelineRequestMetrics.failed();
285+
}
286+
}
287+
284288
void copyMetrics(PipelineWithMetrics oldPipeline) {
285289
pipelineRequestMetrics.add(oldPipeline.pipelineRequestMetrics);
286290
pipelineResponseMetrics.add(oldPipeline.pipelineResponseMetrics);

0 commit comments

Comments
 (0)