@@ -1180,33 +1180,39 @@ async def test_statistical_profiling_2(c, s, a, b):
1180
1180
1181
1181
1182
1182
@gen_cluster (
1183
- nthreads = [("127.0.0.1 " , 1 )],
1183
+ nthreads = [("" , 1 )],
1184
1184
client = True ,
1185
- worker_kwargs = {"memory_monitor_interval" : 10 },
1185
+ config = {
1186
+ "distributed.worker.memory.target" : False ,
1187
+ "distributed.worker.memory.spill" : 0.7 ,
1188
+ },
1189
+ worker_kwargs = {"memory_monitor_interval" : "10ms" },
1186
1190
)
1187
1191
async def test_robust_to_bad_sizeof_estimates (c , s , a ):
1188
- np = pytest .importorskip ("numpy" )
1189
- memory = psutil .Process ().memory_info ().rss
1192
+ """Test that the spill threshold uses the process memory and not the managed memory
1193
+ reported by sizeof(), which may be inaccurate
1194
+ """
1195
+ memory = s .workers [a .address ].memory .process
1196
+ # Reach 'spill' threshold after 400MB of managed data
1190
1197
a .memory_limit = memory / 0.7 + 400e6
1191
1198
1192
1199
class BadAccounting :
1193
- def __init__ (self , data ):
1194
- self .data = data
1200
+ """100 MB process memory, 10 bytes reported managed memory"""
1201
+
1202
+ def __init__ (self , * args ):
1203
+ self .data = "x" * int (100e6 )
1195
1204
1196
1205
def __sizeof__ (self ):
1197
1206
return 10
1198
1207
1199
- def f (n ):
1200
- x = np .ones (int (n ), dtype = "u1" )
1201
- result = BadAccounting (x )
1202
- return result
1208
+ def __reduce__ (self ):
1209
+ """Speed up test by writing very little to disk when spilling"""
1210
+ return BadAccounting , ()
1203
1211
1204
- futures = c .map (f , [ 100e6 ] * 8 , pure = False )
1212
+ futures = c .map (BadAccounting , range ( 8 ) )
1205
1213
1206
- start = time ()
1207
1214
while not a .data .disk :
1208
- await asyncio .sleep (0.1 )
1209
- assert time () < start + 5
1215
+ await asyncio .sleep (0.01 )
1210
1216
1211
1217
1212
1218
@pytest .mark .slow
0 commit comments