@@ -446,18 +446,22 @@ def engine_adapter(self) -> EngineAdapter:
446446 self ._engine_adapter = self .connection_config .create_engine_adapter ()
447447 return self ._engine_adapter
448448
449- def snapshot_evaluator (self , job_id : t .Optional [CorrelationId ] = None ) -> SnapshotEvaluator :
450- # Cache snapshot evaluators by job_id to avoid old job_ids being attached to future Context operations
451- if job_id not in self ._snapshot_evaluators :
452- self ._snapshot_evaluators [job_id ] = SnapshotEvaluator (
449+ def snapshot_evaluator (
450+ self , correlation_id : t .Optional [CorrelationId ] = None
451+ ) -> SnapshotEvaluator :
452+ # Cache snapshot evaluators by correlation_id to avoid old correlation_ids being attached to future Context operations
453+ if correlation_id not in self ._snapshot_evaluators :
454+ self ._snapshot_evaluators [correlation_id ] = SnapshotEvaluator (
453455 {
454- gateway : adapter .with_settings (log_level = logging .INFO , job_id = job_id )
456+ gateway : adapter .with_settings (
457+ log_level = logging .INFO , correlation_id = correlation_id
458+ )
455459 for gateway , adapter in self .engine_adapters .items ()
456460 },
457461 ddl_concurrent_tasks = self .concurrent_tasks ,
458462 selected_gateway = self .selected_gateway ,
459463 )
460- return self ._snapshot_evaluators [job_id ]
464+ return self ._snapshot_evaluators [correlation_id ]
461465
462466 def execution_context (
463467 self ,
@@ -539,7 +543,7 @@ def scheduler(self, environment: t.Optional[str] = None) -> Scheduler:
539543 return self .create_scheduler (snapshots )
540544
541545 def create_scheduler (
542- self , snapshots : t .Iterable [Snapshot ], job_id : t .Optional [CorrelationId ] = None
546+ self , snapshots : t .Iterable [Snapshot ], correlation_id : t .Optional [CorrelationId ] = None
543547 ) -> Scheduler :
544548 """Creates the built-in scheduler.
545549
@@ -551,7 +555,7 @@ def create_scheduler(
551555 """
552556 return Scheduler (
553557 snapshots ,
554- self .snapshot_evaluator (job_id ),
558+ self .snapshot_evaluator (correlation_id ),
555559 self .state_sync ,
556560 default_catalog = self .default_catalog ,
557561 max_workers = self .concurrent_tasks ,
@@ -1592,7 +1596,9 @@ def apply(
15921596 )
15931597 explainer .evaluate (
15941598 plan .to_evaluatable (),
1595- snapshot_evaluator = self .snapshot_evaluator (job_id = CorrelationId .from_plan (plan )),
1599+ snapshot_evaluator = self .snapshot_evaluator (
1600+ correlation_id = CorrelationId .from_plan (plan )
1601+ ),
15961602 )
15971603 return
15981604
@@ -2341,9 +2347,12 @@ def close(self) -> None:
23412347 """Releases all resources allocated by this context."""
23422348 for evaluator in self ._snapshot_evaluators .values ():
23432349 evaluator .close ()
2350+
23442351 if self ._state_sync :
23452352 self ._state_sync .close ()
23462353
2354+ self ._snapshot_evaluators .clear ()
2355+
23472356 def _run (
23482357 self ,
23492358 environment : str ,
@@ -2395,7 +2404,9 @@ def _run(
23952404 def _apply (self , plan : Plan , circuit_breaker : t .Optional [t .Callable [[], bool ]]) -> None :
23962405 self ._scheduler .create_plan_evaluator (self ).evaluate (
23972406 plan .to_evaluatable (),
2398- snapshot_evaluator = self .snapshot_evaluator (job_id = CorrelationId .from_plan (plan )),
2407+ snapshot_evaluator = self .snapshot_evaluator (
2408+ correlation_id = CorrelationId .from_plan (plan )
2409+ ),
23992410 circuit_breaker = circuit_breaker ,
24002411 )
24012412
0 commit comments