@@ -420,7 +420,11 @@ def _locate_externals(self, parameters):
420420 :return: found parameters as dict(StreamKey, Parameter), unfulfilled parameters as set(Parameter)
421421 """
422422 log .debug ('<%s> _locate_externals: %r' , self .request_id , parameters )
423- external_to_process = set (parameters )
423+ # A set of tuples of the dependant stream and the required parameters that it depends on.
424+ # Initially, it is just the requested stream and external parameters that it needs.
425+ external_to_process = set ()
426+ for param in parameters :
427+ external_to_process .add ((self .stream_key , param ))
424428 found = {}
425429 external_unfulfilled = set ()
426430 stream_parameters = {}
@@ -443,19 +447,19 @@ def process_found_stream(stream_key, parameter):
443447 # Add externals not yet processed to the to_process set
444448 for sub_need in sk_needs_external :
445449 if sub_need not in external_unfulfilled :
446- external_to_process .add (sub_need )
450+ external_to_process .add (( stream_key , sub_need ) )
447451 # Add internal parameters to the corresponding stream set
448452 stream_parameters .setdefault (stream_key , set ()).update (sk_needs_internal )
449453
450454 while external_to_process :
451455 # Pop an external from the list of externals to process
452- external = external_to_process .pop ()
456+ stream_key , external = external_to_process .pop ()
453457 stream , poss_params = external
454458 # all non-virtual streams define PD7, skip
455459 if poss_params [0 ].id == 7 :
456460 continue
457461 log .debug ('<%s> _locate_externals: STREAM: %r POSS_PARAMS: %r' , self .request_id , stream , poss_params )
458- found_sk , found_param = self .find_stream (self . stream_key , poss_params , stream = stream )
462+ found_sk , found_param = self .find_stream (stream_key , poss_params , stream = stream )
459463 if found_sk :
460464 process_found_stream (found_sk , found_param )
461465 else :
0 commit comments