1
- from .progress import Progress
2
1
import functools
3
2
import igraph as ig
4
3
import leidenalg
5
4
from math import ceil
6
5
from multiprocessing import Pool , cpu_count
6
+ from tqdm import tqdm
7
7
import numpy as np
8
8
import psutil
9
9
@@ -49,6 +49,11 @@ def singlelayer_leiden(G, gamma, return_partition=False):
49
49
return tuple (partition .membership )
50
50
51
51
52
+ def _wrapped_singlelayer_leiden (args ):
53
+ """Wrapped singlelayer_leiden() for use in multiprocessing.Pool.imap_unordered."""
54
+ return singlelayer_leiden (* args )
55
+
56
+
52
57
def leiden_part (G ):
53
58
return leidenalg .RBConfigurationVertexPartition (G )
54
59
@@ -145,6 +150,11 @@ def multilayer_leiden(G_intralayer, G_interlayer, layer_vec, gamma, omega, optim
145
150
return tuple (intralayer_parts [0 ].membership )
146
151
147
152
153
+ def _wrapped_multilayer_leiden (args ):
154
+ """Wrapped multilayer_leiden() for use in multiprocessing.Pool.imap_unordered."""
155
+ return multilayer_leiden (* args )
156
+
157
+
148
158
def multilayer_leiden_part (G_intralayer , G_interlayer , layer_membership ):
149
159
if 'weight' not in G_intralayer .es :
150
160
G_intralayer .es ['weight' ] = [1.0 ] * G_intralayer .ecount ()
@@ -173,51 +183,29 @@ def repeated_leiden_from_gammas(G, gammas):
173
183
return {sorted_tuple (singlelayer_leiden (G , gamma )) for gamma in gammas }
174
184
175
185
176
- def repeated_parallel_leiden_from_gammas (G , gammas , show_progress = True , chunk_dispatch = True ):
186
+ def repeated_parallel_leiden_from_gammas (G , gammas , show_progress = True ):
177
187
r"""Runs the Leiden modularity maximization algorithm at each provided :math:`\gamma` value, using all CPU cores.
178
188
179
189
:param G: graph of interest
180
190
:type G: igraph.Graph
181
191
:param gammas: list of gammas (resolution parameters) to run Leiden at
182
192
:type gammas: list[float]
183
- :param show_progress: if True, render a progress bar. This will only work if ``chunk_dispatch`` is also True
193
+ :param show_progress: if True, render a progress bar
184
194
:type show_progress: bool
185
- :param chunk_dispatch: if True, dispatch parallel work in chunks. Setting this to False may increase performance,
186
- but can lead to out-of-memory issues
187
- :type chunk_dispatch: bool
188
195
:return: a set of all unique partitions returned by the Leiden algorithm
189
196
:rtype: set of tuple[int]
190
197
"""
191
-
192
- pool = Pool (processes = cpu_count ())
193
198
total = set ()
194
-
195
- chunk_size = len (gammas ) // 99
196
- if chunk_size > 0 and chunk_dispatch :
197
- chunk_params = ([(G , g ) for g in gammas [i :i + chunk_size ]] for i in range (0 , len (gammas ), chunk_size ))
198
- else :
199
- chunk_params = [[(G , g ) for g in gammas ]]
200
- chunk_size = len (gammas )
201
-
202
- if show_progress :
203
- progress = Progress (ceil (len (gammas ) / chunk_size ))
204
-
205
- for chunk in chunk_params :
206
- for partition in pool .starmap (singlelayer_leiden , chunk ):
207
- total .add (sorted_tuple (partition ))
208
-
199
+ pool_chunk_size = max (1 , len (gammas ) // (cpu_count () * 100 ))
200
+ with Pool (processes = cpu_count ()) as pool :
201
+ pool_iterator = pool .imap_unordered (_wrapped_singlelayer_leiden , [(G , g ) for g in gammas ],
202
+ chunksize = pool_chunk_size )
209
203
if show_progress :
210
- progress .increment ()
211
-
212
- if psutil .virtual_memory ().available < LOW_MEMORY_THRESHOLD :
213
- # Reinitialize pool to get around an apparent memory leak in multiprocessing
214
- pool .close ()
215
- pool = Pool (processes = cpu_count ())
204
+ pool_iterator = tqdm (pool_iterator , total = len (gammas ))
216
205
217
- if show_progress :
218
- progress . done ( )
206
+ for partition in pool_iterator :
207
+ total . add ( sorted_tuple ( partition ) )
219
208
220
- pool .close ()
221
209
return total
222
210
223
211
@@ -227,7 +215,7 @@ def repeated_leiden_from_gammas_omegas(G_intralayer, G_interlayer, layer_vec, ga
227
215
228
216
229
217
def repeated_parallel_leiden_from_gammas_omegas (G_intralayer , G_interlayer , layer_vec , gammas , omegas ,
230
- show_progress = True , chunk_dispatch = True ):
218
+ show_progress = True ):
231
219
"""
232
220
Runs leidenalg at each gamma and omega in ``gammas`` and ``omegas``, using all CPU cores available.
233
221
@@ -246,44 +234,23 @@ def repeated_parallel_leiden_from_gammas_omegas(G_intralayer, G_interlayer, laye
246
234
:type omegas: list[float]
247
235
:param show_progress: if True, render a progress bar
248
236
:type show_progress: bool
249
- :param chunk_dispatch: if True, dispatch parallel work in chunks. Setting this to False may increase performance,
250
- but can lead to out-of-memory issues
251
- :type chunk_dispatch: bool
252
237
:return: a set of all unique partitions encountered
253
238
:rtype: set of tuple[int]
254
239
"""
255
240
resolution_parameter_points = [(gamma , omega ) for gamma in gammas for omega in omegas ]
256
241
257
- pool = Pool (processes = cpu_count ())
258
242
total = set ()
259
-
260
- chunk_size = len (resolution_parameter_points ) // 99
261
- if chunk_size > 0 and chunk_dispatch :
262
- chunk_params = ([(G_intralayer , G_interlayer , layer_vec , gamma , omega )
263
- for gamma , omega in resolution_parameter_points [i :i + chunk_size ]]
264
- for i in range (0 , len (resolution_parameter_points ), chunk_size ))
265
- else :
266
- chunk_params = [[(G_intralayer , G_interlayer , layer_vec , gamma , omega )
267
- for gamma , omega in resolution_parameter_points ]]
268
- chunk_size = len (gammas )
269
-
270
- if show_progress :
271
- progress = Progress (ceil (len (resolution_parameter_points ) / chunk_size ))
272
-
273
- for chunk in chunk_params :
274
- for partition in pool .starmap (multilayer_leiden , chunk ):
275
- total .add (sorted_tuple (partition ))
276
-
243
+ pool_chunk_size = max (1 , len (resolution_parameter_points ) // (cpu_count () * 100 ))
244
+ with Pool (processes = cpu_count ()) as pool :
245
+ pool_iterator = pool .imap_unordered (
246
+ _wrapped_multilayer_leiden ,
247
+ [(G_intralayer , G_interlayer , layer_vec , gamma , omega ) for gamma , omega in resolution_parameter_points ],
248
+ chunksize = pool_chunk_size
249
+ )
277
250
if show_progress :
278
- progress . increment ( )
251
+ pool_iterator = tqdm ( pool_iterator , total = len ( resolution_parameter_points ) )
279
252
280
- if psutil .virtual_memory ().available < LOW_MEMORY_THRESHOLD :
281
- # Reinitialize pool to get around an apparent memory leak in multiprocessing
282
- pool .close ()
283
- pool = Pool (processes = cpu_count ())
284
-
285
- if show_progress :
286
- progress .done ()
253
+ for partition in pool_iterator :
254
+ total .add (sorted_tuple (partition ))
287
255
288
- pool .close ()
289
256
return total
0 commit comments