@@ -311,25 +311,27 @@ void validatePipeline(Map<DiscoveryNode, SearchPipelineInfo> searchPipelineInfos
311311 new Processor .PipelineContext (Processor .PipelineSource .VALIDATE_PIPELINE )
312312 );
313313 List <Exception > exceptions = new ArrayList <>();
314- for (SearchRequestProcessor processor : pipeline .getSearchRequestProcessors ()) {
315- for (Map .Entry <DiscoveryNode , SearchPipelineInfo > entry : searchPipelineInfos .entrySet ()) {
316- String type = processor .getType ();
317- if (entry .getValue ().containsProcessor (Pipeline .REQUEST_PROCESSORS_KEY , type ) == false ) {
318- String message = "Processor type [" + processor .getType () + "] is not installed on node [" + entry .getKey () + "]" ;
319- exceptions .add (ConfigurationUtils .newConfigurationException (processor .getType (), processor .getTag (), null , message ));
320- }
321- }
322- }
323- for (SearchResponseProcessor processor : pipeline .getSearchResponseProcessors ()) {
314+ validateProcessors (searchPipelineInfos , exceptions , Pipeline .REQUEST_PROCESSORS_KEY , pipeline .getSearchRequestProcessors ());
315+ validateProcessors (searchPipelineInfos , exceptions , Pipeline .RESPONSE_PROCESSORS_KEY , pipeline .getSearchResponseProcessors ());
316+ validateProcessors (searchPipelineInfos , exceptions , Pipeline .PHASE_PROCESSORS_KEY , pipeline .getSearchPhaseResultsProcessors ());
317+ ExceptionsHelper .rethrowAndSuppress (exceptions );
318+ }
319+
320+ private void validateProcessors (
321+ Map <DiscoveryNode , SearchPipelineInfo > searchPipelineInfos ,
322+ List <Exception > exceptions ,
323+ String processorKey ,
324+ List <? extends Processor > processors
325+ ) {
326+ for (Processor processor : processors ) {
324327 for (Map .Entry <DiscoveryNode , SearchPipelineInfo > entry : searchPipelineInfos .entrySet ()) {
325328 String type = processor .getType ();
326- if (entry .getValue ().containsProcessor (Pipeline . RESPONSE_PROCESSORS_KEY , type ) == false ) {
329+ if (entry .getValue ().containsProcessor (processorKey , type ) == false ) {
327330 String message = "Processor type [" + processor .getType () + "] is not installed on node [" + entry .getKey () + "]" ;
328331 exceptions .add (ConfigurationUtils .newConfigurationException (processor .getType (), processor .getTag (), null , message ));
329332 }
330333 }
331334 }
332- ExceptionsHelper .rethrowAndSuppress (exceptions );
333335 }
334336
335337 public void deletePipeline (DeleteSearchPipelineRequest request , ActionListener <AcknowledgedResponse > listener ) throws Exception {
@@ -460,6 +462,10 @@ Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProcessorFact
460462 return responseProcessorFactories ;
461463 }
462464
465+ Map <String , Processor .Factory <SearchPhaseResultsProcessor >> getSearchPhaseResultsProcessorFactories () {
466+ return phaseInjectorProcessorFactories ;
467+ }
468+
463469 @ Override
464470 public SearchPipelineInfo info () {
465471 List <ProcessorInfo > requestProcessorInfoList = requestProcessorFactories .keySet ()
0 commit comments