@@ -60,10 +60,16 @@ def test_basic(self):
6060 map_op_no_concurrency : MagicMock (),
6161 }
6262
63+ mock_resource_manager = MagicMock ()
64+ # Return None to skip dynamic output queue size backpressure check
65+ mock_resource_manager .get_op_usage .return_value = None
66+ mock_resource_manager .get_budget .return_value = None
67+ mock_resource_manager .is_op_eligible .return_value = False
68+
6369 policy = ConcurrencyCapBackpressurePolicy (
6470 DataContext .get_current (),
6571 topology ,
66- MagicMock () ,
72+ mock_resource_manager ,
6773 )
6874
6975 self .assertEqual (policy ._concurrency_caps [map_op ], concurrency )
@@ -177,6 +183,67 @@ def test_can_add_input_with_non_map_operator(self):
177183 # InputDataBuffer has infinite concurrency cap, so should always allow
178184 self .assertTrue (policy .can_add_input (input_op ))
179185
186+ def test_can_add_input_with_ineligible_op (self ):
187+ """Test can_add_input when op is not eligible for backpressure."""
188+ input_op = InputDataBuffer (DataContext .get_current (), input_data = [MagicMock ()])
189+ map_op = TaskPoolMapOperator (
190+ map_transformer = MagicMock (),
191+ data_context = DataContext .get_current (),
192+ input_op = input_op ,
193+ max_concurrency = 5 ,
194+ )
195+ map_op .metrics .num_tasks_running = 3
196+
197+ topology = {map_op : MagicMock (), input_op : MagicMock ()}
198+
199+ mock_resource_manager = MagicMock ()
200+ # Op is not eligible for backpressure
201+ mock_resource_manager .is_op_eligible .return_value = False
202+
203+ policy = ConcurrencyCapBackpressurePolicy (
204+ DataContext .get_current (),
205+ topology ,
206+ mock_resource_manager ,
207+ )
208+ policy .enable_dynamic_output_queue_size_backpressure = True
209+
210+ # Should skip dynamic backpressure and use basic cap check
211+ self .assertTrue (policy .can_add_input (map_op )) # 3 < 5
212+
213+ map_op .metrics .num_tasks_running = 5
214+ self .assertFalse (policy .can_add_input (map_op )) # 5 >= 5
215+
216+ def test_can_add_input_with_materializing_downstream_op (self ):
217+ """Test can_add_input when downstream op is a materializing operator."""
218+ input_op = InputDataBuffer (DataContext .get_current (), input_data = [MagicMock ()])
219+ map_op = TaskPoolMapOperator (
220+ map_transformer = MagicMock (),
221+ data_context = DataContext .get_current (),
222+ input_op = input_op ,
223+ max_concurrency = 5 ,
224+ )
225+ map_op .metrics .num_tasks_running = 3
226+
227+ topology = {map_op : MagicMock (), input_op : MagicMock ()}
228+
229+ mock_resource_manager = MagicMock ()
230+ mock_resource_manager .is_op_eligible .return_value = True
231+ mock_resource_manager .has_materializing_downstream_op .return_value = True
232+
233+ policy = ConcurrencyCapBackpressurePolicy (
234+ DataContext .get_current (),
235+ topology ,
236+ mock_resource_manager ,
237+ )
238+ policy .enable_dynamic_output_queue_size_backpressure = True
239+
240+ # Should skip dynamic backpressure and use basic cap check
241+ # to avoid starving materializing operators
242+ self .assertTrue (policy .can_add_input (map_op )) # 3 < 5
243+
244+ map_op .metrics .num_tasks_running = 5
245+ self .assertFalse (policy .can_add_input (map_op )) # 5 >= 5
246+
180247 def test_can_add_input_with_object_store_memory_usage_ratio_above_threshold (self ):
181248 """Test can_add_input when object store memory usage ratio is above threshold."""
182249 input_op = InputDataBuffer (DataContext .get_current (), input_data = [MagicMock ()])
@@ -193,8 +260,10 @@ def test_can_add_input_with_object_store_memory_usage_ratio_above_threshold(self
193260 mock_resource_manager = MagicMock ()
194261
195262 # Mock object store memory usage ratio above threshold
196- # Ratio = budget / (usage + budget) > OBJECT_STORE_BUDGET_RATIO
197- threshold = ConcurrencyCapBackpressurePolicy .OBJECT_STORE_BUDGET_RATIO
263+ # Ratio = budget / (usage + budget) > AVAILABLE_OBJECT_STORE_BUDGET_THRESHOLD
264+ threshold = (
265+ ConcurrencyCapBackpressurePolicy .AVAILABLE_OBJECT_STORE_BUDGET_THRESHOLD
266+ )
198267 mock_usage = MagicMock ()
199268 mock_usage .object_store_memory = 1000 # usage
200269 mock_budget = MagicMock ()
@@ -207,6 +276,8 @@ def test_can_add_input_with_object_store_memory_usage_ratio_above_threshold(self
207276
208277 mock_resource_manager .get_op_usage .return_value = mock_usage
209278 mock_resource_manager .get_budget .return_value = mock_budget
279+ mock_resource_manager .is_op_eligible .return_value = True
280+ mock_resource_manager .has_materializing_downstream_op .return_value = False
210281
211282 policy = ConcurrencyCapBackpressurePolicy (
212283 DataContext .get_current (),
@@ -249,8 +320,10 @@ def test_can_add_input_with_object_store_memory_usage_ratio_below_threshold(self
249320 mock_resource_manager = MagicMock ()
250321
251322 # Mock object store memory usage ratio below threshold
252- # Ratio = budget / (usage + budget) < OBJECT_STORE_BUDGET_RATIO
253- threshold = ConcurrencyCapBackpressurePolicy .OBJECT_STORE_BUDGET_RATIO
323+ # Ratio = budget / (usage + budget) < AVAILABLE_OBJECT_STORE_BUDGET_THRESHOLD
324+ threshold = (
325+ ConcurrencyCapBackpressurePolicy .AVAILABLE_OBJECT_STORE_BUDGET_THRESHOLD
326+ )
254327 mock_usage = MagicMock ()
255328 mock_usage .object_store_memory = 1000 # usage
256329 mock_budget = MagicMock ()
@@ -263,9 +336,11 @@ def test_can_add_input_with_object_store_memory_usage_ratio_below_threshold(self
263336
264337 mock_resource_manager .get_op_usage .return_value = mock_usage
265338 mock_resource_manager .get_budget .return_value = mock_budget
339+ mock_resource_manager .is_op_eligible .return_value = True
340+ mock_resource_manager .has_materializing_downstream_op .return_value = False
266341
267342 # Mock queue size methods
268- mock_resource_manager .get_op_internal_object_store_usage .return_value = 100
343+ mock_resource_manager .get_mem_op_internal .return_value = 100
269344 mock_resource_manager .get_op_outputs_object_store_usage_with_downstream .return_value = (
270345 200
271346 )
@@ -286,9 +361,10 @@ def test_can_add_input_with_object_store_memory_usage_ratio_below_threshold(self
286361 policy ._q_level_dev [map_op ] = initial_dev
287362
288363 result = policy .can_add_input (map_op )
289- # With queue size 300, initial level=200, dev=50, bounds=[100, 300]
290- # Queue size 300 is at the upper bound, so should hold.
291- # running=3 < effective_cap=3 should be False
364+ # With queue size 300, initial level=200, dev=50, bounds=[150, 250]
365+ # Queue size 300 is above the upper bound, so should backoff.
366+ # running=3, backoff by 1 -> effective_cap=2
367+ # running=3 < effective_cap=2 should be False
292368 self .assertFalse (result )
293369 # EWMA state should be updated when ratio < threshold
294370 # Level should move toward 300 (queue size)
@@ -310,7 +386,9 @@ def test_can_add_input_effective_cap_calculation(self):
310386 topology = {map_op : MagicMock (), input_op : MagicMock ()}
311387
312388 mock_resource_manager = MagicMock ()
313- threshold = ConcurrencyCapBackpressurePolicy .OBJECT_STORE_BUDGET_RATIO
389+ threshold = (
390+ ConcurrencyCapBackpressurePolicy .AVAILABLE_OBJECT_STORE_BUDGET_THRESHOLD
391+ )
314392 mock_usage = MagicMock ()
315393 mock_usage .object_store_memory = 1000
316394 mock_budget = MagicMock ()
@@ -323,6 +401,8 @@ def test_can_add_input_effective_cap_calculation(self):
323401
324402 mock_resource_manager .get_op_usage .return_value = mock_usage
325403 mock_resource_manager .get_budget .return_value = mock_budget
404+ mock_resource_manager .is_op_eligible .return_value = True
405+ mock_resource_manager .has_materializing_downstream_op .return_value = False
326406
327407 policy = ConcurrencyCapBackpressurePolicy (
328408 DataContext .get_current (),
@@ -369,9 +449,7 @@ def test_can_add_input_effective_cap_calculation(self):
369449 description ,
370450 ) in test_cases :
371451 with self .subTest (description = description ):
372- mock_resource_manager .get_op_internal_object_store_usage .return_value = (
373- internal_usage
374- )
452+ mock_resource_manager .get_mem_op_internal .return_value = internal_usage
375453 mock_resource_manager .get_op_outputs_object_store_usage_with_downstream .return_value = (
376454 downstream_usage
377455 )
0 commit comments