-
-
Notifications
You must be signed in to change notification settings - Fork 27
/
Copy path_collection.py
6617 lines (5690 loc) · 236 KB
/
_collection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from __future__ import annotations
import datetime
import functools
import inspect
import warnings
from collections.abc import Callable, Hashable, Iterable, Mapping
from functools import wraps
from numbers import Integral, Number
from typing import Any, ClassVar, Literal
import dask.array as da
import dask.dataframe.methods as methods
import numpy as np
import pandas as pd
import pyarrow as pa
from dask import compute, get_annotations
from dask.array import Array
from dask.base import DaskMethodsMixin, is_dask_collection, named_schedulers
from dask.core import flatten
from dask.dataframe._compat import PANDAS_GE_210, PANDAS_GE_220, PANDAS_VERSION
from dask.dataframe.accessor import CachedAccessor
from dask.dataframe.core import (
_concat,
_convert_to_numeric,
_Frame,
_repr_data_series,
_sqrt_and_convert_to_timedelta,
check_divisions,
has_parallel_type,
is_arraylike,
is_dataframe_like,
is_series_like,
meta_warning,
new_dd_object,
)
from dask.dataframe.dispatch import is_categorical_dtype, make_meta, meta_nonempty
from dask.dataframe.multi import warn_dtype_mismatch
from dask.dataframe.utils import (
AttributeNotImplementedError,
has_known_categories,
index_summary,
insert_meta_param_description,
meta_frame_constructor,
meta_series_constructor,
pyarrow_strings_enabled,
)
from dask.delayed import Delayed, delayed
from dask.utils import (
IndexCallable,
M,
derived_from,
get_default_shuffle_method,
get_meta_library,
key_split,
maybe_pluralize,
memory_repr,
put_lines,
random_state_data,
typename,
)
from dask.widgets import get_template
from fsspec.utils import stringify_path
from packaging.version import parse as parse_version
from pandas import CategoricalDtype
from pandas.api.types import is_bool_dtype, is_datetime64_any_dtype, is_numeric_dtype
from pandas.api.types import is_scalar as pd_is_scalar
from pandas.api.types import is_timedelta64_dtype
from pandas.core.dtypes.common import is_extension_array_dtype
from pyarrow import fs as pa_fs
from tlz import first
import dask_expr._backends # noqa: F401
from dask_expr import _expr as expr
from dask_expr._backends import dataframe_creation_dispatch
from dask_expr._categorical import CategoricalAccessor, Categorize, GetCategories
from dask_expr._concat import Concat
from dask_expr._core import OptimizerStage
from dask_expr._datetime import DatetimeAccessor
from dask_expr._describe import DescribeNonNumeric, DescribeNumeric
from dask_expr._dispatch import get_collection_type
from dask_expr._expr import (
BFill,
Diff,
Eval,
FFill,
FillnaCheck,
Query,
Shift,
ToDatetime,
ToNumeric,
ToTimedelta,
no_default,
)
from dask_expr._merge import JoinRecursive, Merge
from dask_expr._quantile import SeriesQuantile
from dask_expr._quantiles import RepartitionQuantiles
from dask_expr._reductions import (
Corr,
Cov,
CustomReduction,
DropDuplicates,
IndexCount,
IsMonotonicDecreasing,
IsMonotonicIncreasing,
Len,
MemoryUsageFrame,
MemoryUsageIndex,
Moment,
NLargest,
NSmallest,
PivotTable,
Unique,
ValueCounts,
)
from dask_expr._repartition import Repartition, RepartitionFreq
from dask_expr._shuffle import (
RearrangeByColumn,
SetIndex,
SetIndexBlockwise,
SortValues,
)
from dask_expr._str_accessor import StringAccessor
from dask_expr._util import (
PANDAS_GE_300,
_BackendData,
_convert_to_list,
_get_shuffle_preferring_order,
_is_any_real_numeric_dtype,
_maybe_from_pandas,
_raise_if_object_series,
_tokenize_deterministic,
_validate_axis,
get_specified_shuffle,
is_scalar,
)
from dask_expr.io import FromPandasDivisions, FromScalars
#
# Utilities to wrap Expr API
# (Helps limit boiler-plate code in collection APIs)
#
def _wrap_expr_api(*args, wrap_api=None, **kwargs):
# Use Expr API, but convert to/from Expr objects
assert wrap_api is not None
result = wrap_api(
*[arg.expr if isinstance(arg, FrameBase) else arg for arg in args],
**kwargs,
)
if isinstance(result, expr.Expr):
return new_collection(result)
return result
def _wrap_expr_op(self, other, op=None):
# Wrap expr operator
assert op is not None
if isinstance(other, FrameBase):
other = other.expr
elif isinstance(other, da.Array):
other = from_dask_array(other, index=self.index, columns=self.columns)
if self.ndim == 1 and len(self.columns):
other = other[self.columns[0]]
if (
not isinstance(other, expr.Expr)
and is_dataframe_like(other)
or is_series_like(other)
):
other = self._create_alignable_frame(other).expr
if not isinstance(other, expr.Expr):
return new_collection(getattr(self.expr, op)(other))
elif expr.are_co_aligned(self.expr, other):
return new_collection(getattr(self.expr, op)(other))
else:
return new_collection(expr.OpAlignPartitions(self, other, op))
def _wrap_expr_method_operator(name, class_):
"""
Add method operators to Series or DataFrame like DataFrame.add.
_wrap_expr_method_operator("add", DataFrame)
"""
if class_ == DataFrame:
def method(self, other, axis="columns", level=None, fill_value=None):
if level is not None:
raise NotImplementedError("level must be None")
axis = _validate_axis(axis)
if (
is_dataframe_like(other)
or is_series_like(other)
and axis in (0, "index")
) and not is_dask_collection(other):
other = self._create_alignable_frame(other)
if axis in (1, "columns"):
if isinstance(other, Series):
msg = f"Unable to {name} dd.Series with axis=1"
raise ValueError(msg)
frame = self
if isinstance(other, FrameBase) and not expr.are_co_aligned(
self.expr, other.expr
):
return new_collection(
expr.MethodOperatorAlign(
op=name,
frame=frame,
other=other,
axis=axis,
level=level,
fill_value=fill_value,
)
)
return new_collection(
expr.MethodOperator(
name=name,
left=frame,
right=other,
axis=axis,
level=level,
fill_value=fill_value,
)
)
elif class_ == Series:
def method(self, other, level=None, fill_value=None, axis=0):
if level is not None:
raise NotImplementedError("level must be None")
axis = _validate_axis(axis)
if is_series_like(other) and not is_dask_collection(other):
other = self._create_alignable_frame(other)
frame = self
if isinstance(other, FrameBase) and not expr.are_co_aligned(
self.expr, other.expr
):
return new_collection(
expr.MethodOperatorAlign(
op=name,
frame=frame,
other=other,
axis=axis,
level=level,
fill_value=fill_value,
)
)
return new_collection(
expr.MethodOperator(
name=name,
left=frame,
right=other,
axis=axis,
fill_value=fill_value,
level=level,
)
)
else:
raise NotImplementedError(f"Cannot create method operator for {class_=}")
method.__name__ = name
return method
def _wrap_unary_expr_op(self, op=None):
# Wrap expr operator
assert op is not None
return new_collection(getattr(self.expr, op)())
_WARN_ANNOTATIONS = True
#
# Collection classes
#
class FrameBase(DaskMethodsMixin):
"""Base class for Expr-backed Collections"""
__dask_scheduler__ = staticmethod(
named_schedulers.get("threads", named_schedulers["sync"])
)
__dask_optimize__ = staticmethod(lambda dsk, keys, **kwargs: dsk)
def __init__(self, expr):
global _WARN_ANNOTATIONS
if _WARN_ANNOTATIONS and (annot := get_annotations()):
_WARN_ANNOTATIONS = False
warnings.warn(
f"Dask annotations {annot} detected. Annotations will be ignored when using query-planning."
)
self._expr = expr
@property
def expr(self) -> expr.Expr:
return self._expr
@property
def _meta(self):
return self.expr._meta
@functools.cached_property
def _meta_nonempty(self):
return meta_nonempty(self._meta)
@property
def divisions(self):
"""
Tuple of ``npartitions + 1`` values, in ascending order, marking the
lower/upper bounds of each partition's index. Divisions allow Dask
to know which partition will contain a given value, significantly
speeding up operations like `loc`, `merge`, and `groupby` by not
having to search the full dataset.
Example: for ``divisions = (0, 10, 50, 100)``, there are three partitions,
where the index in each partition contains values [0, 10), [10, 50),
and [50, 100], respectively. Dask therefore knows ``df.loc[45]``
will be in the second partition.
When every item in ``divisions`` is ``None``, the divisions are unknown.
Most operations can still be performed, but some will be much slower,
and a few may fail.
It is not supported to set ``divisions`` directly. Instead, use ``set_index``,
which sorts and splits the data as needed.
See https://docs.dask.org/en/latest/dataframe-design.html#partitions.
"""
return self.expr.divisions
@property
def npartitions(self):
"""Return number of partitions"""
return self.expr.npartitions
@property
def dtypes(self):
"""Return data types"""
return self.expr._meta.dtypes
@property
def size(self):
"""Size of the Series or DataFrame as a Delayed object.
Examples
--------
>>> series.size # doctest: +SKIP
<dask_expr.expr.Scalar: expr=df.size(), dtype=int64>
"""
return new_collection(self.expr.size)
@property
def columns(self):
return self._meta.columns
@columns.setter
def columns(self, columns):
if len(columns) != len(self.columns):
# surface pandas error
self._expr._meta.columns = columns
self._expr = expr.ColumnsSetter(self, columns)
def clear_divisions(self):
"""Forget division information.
This is useful if the divisions are no longer meaningful.
"""
return new_collection(expr.ClearDivisions(self))
def __len__(self):
return new_collection(Len(self)).compute()
@property
def nbytes(self):
raise NotImplementedError("nbytes is not implemented on DataFrame")
def __reduce__(self):
return new_collection, (self._expr,)
def __getitem__(self, other):
if isinstance(other, FrameBase):
return new_collection(self.expr.__getitem__(other.expr))
elif isinstance(other, slice):
from pandas.api.types import is_float_dtype
is_integer_slice = any(
isinstance(i, Integral) for i in (other.start, other.step, other.stop)
)
if (
self.ndim == 2
and is_integer_slice
and (not is_float_dtype(self.index.dtype) or PANDAS_GE_300)
):
return self.iloc[other]
else:
return self.loc[other]
if isinstance(other, np.ndarray) or is_series_like(other):
other = list(other)
elif isinstance(other, list):
other = other.copy()
return new_collection(self.expr.__getitem__(other))
def __dask_tokenize__(self):
return type(self).__name__, self._expr._name
def __repr__(self):
data = self._repr_data().to_string(max_rows=5)
_str_fmt = """Dask {klass} Structure:
{data}
Dask Name: {name}, {n_expr}
Expr={expr}"""
if not isinstance(self, Series) and not len(self.columns):
data = data.partition("\n")[-1].replace("Index", "Divisions")
_str_fmt = f"Empty {_str_fmt}"
n_expr = len({e._name for e in self.expr.walk()})
return _str_fmt.format(
klass=self.__class__.__name__,
data=data,
name=key_split(self._name),
n_expr=maybe_pluralize(n_expr, "expression"),
expr=self.expr,
)
def __bool__(self):
raise ValueError(
f"The truth value of a {self.__class__.__name__} is ambiguous. "
"Use a.any() or a.all()."
)
def __array__(self, dtype=None, **kwargs):
return np.array(self.compute())
def persist(self, fuse=True, **kwargs):
out = self.optimize(fuse=fuse)
return DaskMethodsMixin.persist(out, **kwargs)
def compute(self, fuse=True, concatenate=True, **kwargs):
"""Compute this DataFrame.
This turns a lazy Dask DataFrame into an in-memory pandas DataFrame.
The entire dataset must fit into memory before calling this operation.
The optimizer runs over the DataFrame before triggering the computation.
The optimizer injects a repartition operation that reduces the partition
count to 1 to enable better optimization strategies.
Parameters
----------
fuse : bool, default True
Whether to fuse the expression tree before computing. Fusing significantly
reduces the number of tasks and improves performance. It shouldn't be
disabled unless absolutely necessary.
concatenate : bool, default True
Whether to concatenate all partitions into a single one before computing.
Concatenating enables more powerful optimizations but it also incurs additional
data transfer cost. Generally, it should be enabled.
kwargs
Extra keywords to forward to the base compute function.
See Also
--------
dask.compute
"""
out = self
if not isinstance(out, Scalar) and concatenate:
out = out.repartition(npartitions=1)
out = out.optimize(fuse=fuse)
return DaskMethodsMixin.compute(out, **kwargs)
def analyze(self, filename: str | None = None, format: str | None = None) -> None:
"""Outputs statistics about every node in the expression.
analyze optimizes the expression and triggers a computation. It records statistics
like memory usage per partition to analyze how data flow through the graph.
.. warning::
analyze adds plugins to the scheduler and the workers that have a non-trivial
cost. This method should not be used in production workflows.
Parameters
----------
filename: str, None
File to store the graph representation.
format: str, default is png
File format for the graph representation.
Returns
-------
None, but writes a graph representation of the expression enriched with
statistics to disk.
"""
out = self
if not isinstance(out, Scalar):
out = out.repartition(npartitions=1)
return out.expr.analyze(filename=filename, format=format)
def explain(self, stage: OptimizerStage = "fused", format: str | None = None):
"""Create a graph representation of the Expression.
explain runs the optimizer and creates a graph of the optimized expression
with graphviz. No computation is triggered.
Parameters
----------
stage: {"logical", "simplified-logical", "tuned-logical", "physical", "simplified-physical", "fused"}
The optimizer stage that is returned. Default is "fused".
- logical: outputs the expression as is
- simplified-logical: simplifies the expression which includes predicate
pushdown and column projection.
- tuned-logical: applies additional optimizations like partition squashing
- physical: outputs the physical expression; this expression can actually
be computed
- simplified-physical: runs another simplification after the physical
plan is generated
- fused: fuses the physical expression to reduce the nodes in thr graph.
.. warning::
The optimizer stages are subject to change.
format: str, default None
The format of the output. Default is "png".
Returns
-------
None, but opens a new window with the graph visualization and outputs
a file with the graph representation.
"""
out = self
if not isinstance(out, Scalar):
out = out.repartition(npartitions=1)
return out.expr.explain(stage, format)
def pprint(self):
"""Outputs a string representation of the DataFrame.
The expression is returned as is. Please run optimize manually if necessary.
Returns
-------
None, the representation is put into stdout.
"""
return self.expr.pprint()
@property
def dask(self):
return self.__dask_graph__()
def __dask_graph__(self):
out = self.expr
out = out.lower_completely()
return out.__dask_graph__()
def __dask_keys__(self):
out = self.expr
out = out.lower_completely()
return out.__dask_keys__()
def simplify(self):
return new_collection(self.expr.simplify())
def lower_once(self):
return new_collection(self.expr.lower_once({}))
def optimize(self, fuse: bool = True):
"""Optimizes the DataFrame.
Runs the optimizer with all steps over the DataFrame and wraps the result in a
new DataFrame collection. Only use this method if you want to analyze the
optimized expression.
Parameters
----------
fuse: bool, default True
Whether to fuse the expression tree after running the optimizer.
It is often easier to look at the non-fused expression when analyzing
the result.
Returns
-------
The optimized Dask Dataframe
"""
return new_collection(self.expr.optimize(fuse=fuse))
@property
def dask(self):
return self.__dask_graph__()
def __dask_postcompute__(self):
state = new_collection(self.expr.lower_completely())
if type(self) != type(state):
return state.__dask_postcompute__()
return _concat, ()
def __dask_postpersist__(self):
state = new_collection(self.expr.lower_completely())
return from_graph, (
state._meta,
state.divisions,
state.__dask_keys__(),
key_split(state._name),
)
def __getattr__(self, key):
try:
# Prioritize `FrameBase` attributes
return object.__getattribute__(self, key)
except AttributeError as err:
try:
# Fall back to `expr` API
# (Making sure to convert to/from Expr)
val = getattr(self.expr, key)
if callable(val):
return functools.partial(_wrap_expr_api, wrap_api=val)
return val
except AttributeError:
# Raise original error
raise err
def visualize(self, tasks: bool = False, **kwargs):
"""Visualize the expression or task graph
Parameters
----------
tasks:
Whether to visualize the task graph. By default
the expression graph will be visualized instead.
"""
if tasks:
return super().visualize(**kwargs)
return self.expr.visualize(**kwargs)
@property
def known_divisions(self):
"""Whether the divisions are known.
This check can be expensive if the division calculation is expensive.
DataFrame.set_index is a good example where the calculation needs an
inspection of the data.
"""
return self.expr.known_divisions
@property
def index(self):
"""Return dask Index instance"""
return new_collection(self.expr.index)
@index.setter
def index(self, value):
assert expr.are_co_aligned(
self.expr, value.expr
), "value needs to be aligned with the index"
_expr = expr.AssignIndex(self, value)
self._expr = _expr
def reset_index(self, drop: bool = False):
"""Reset the index to the default index.
Note that unlike in ``pandas``, the reset index for a Dask DataFrame will
not be monotonically increasing from 0. Instead, it will restart at 0
for each partition (e.g. ``index1 = [0, ..., 10], index2 = [0, ...]``).
This is due to the inability to statically know the full length of the
index.
For DataFrame with multi-level index, returns a new DataFrame with
labeling information in the columns under the index names, defaulting
to 'level_0', 'level_1', etc. if any are None. For a standard index,
the index name will be used (if set), otherwise a default 'index' or
'level_0' (if 'index' is already taken) will be used.
Parameters
----------
drop : boolean, default False
Do not try to insert index into dataframe columns.
"""
return new_collection(expr.ResetIndex(self, drop))
def head(self, n: int = 5, npartitions=1, compute: bool = True):
"""First n rows of the dataset
Parameters
----------
n : int, optional
The number of rows to return. Default is 5.
npartitions : int, optional
Elements are only taken from the first ``npartitions``, with a
default of 1. If there are fewer than ``n`` rows in the first
``npartitions`` a warning will be raised and any found rows
returned. Pass -1 to use all partitions.
compute : bool, optional
Whether to compute the result, default is True.
"""
out = new_collection(expr.Head(self, n=n, npartitions=npartitions))
if compute:
out = out.compute()
return out
def tail(self, n: int = 5, compute: bool = True):
"""Last n rows of the dataset
Caveat, the only checks the last n rows of the last partition.
"""
out = new_collection(expr.Tail(self, n=n))
if compute:
out = out.compute()
return out
def copy(self, deep: bool = False):
"""Make a copy of the dataframe
This is strictly a shallow copy of the underlying computational graph.
It does not affect the underlying data
Parameters
----------
deep : boolean, default False
The deep value must be `False` and it is declared as a parameter just for
compatibility with third-party libraries like cuDF and pandas
"""
if deep is not False:
raise ValueError(
"The `deep` value must be False. This is strictly a shallow copy "
"of the underlying computational graph."
)
return new_collection(self.expr)
@derived_from(pd.DataFrame)
def isin(self, values):
if isinstance(self, DataFrame):
# DataFrame.isin does weird alignment stuff
bad_types = (FrameBase, pd.Series, pd.DataFrame)
else:
bad_types = (FrameBase,)
if isinstance(values, bad_types):
if (
isinstance(values, FrameBase)
and values.ndim == 1
and values.npartitions == 1
):
# Can broadcast
return new_collection(expr.Isin(self, values=values))
raise NotImplementedError("Passing a %r to `isin`" % typename(type(values)))
# We wrap values in a delayed for two reasons:
# - avoid serializing data in every task
# - avoid cost of traversal of large list in optimizations
if isinstance(values, list):
# Motivated by https://github.com/dask/dask/issues/9411. This appears to be
# caused by https://github.com/dask/distributed/issues/6368, and further
# exacerbated by the fact that the list contains duplicates. This is a patch until
# we can create a better fix for Serialization.
try:
values = list(set(values))
except TypeError:
pass
if not any(is_dask_collection(v) for v in values):
try:
values = np.fromiter(values, dtype=object)
except ValueError:
# Numpy 1.23 supports creating arrays of iterables, while lower
# version 1.21.x and 1.22.x do not
pass
return new_collection(
expr.Isin(
self,
values=expr._DelayedExpr(
delayed(values, name="delayed-" + _tokenize_deterministic(values))
),
)
)
def _partitions(self, index):
# Used by `partitions` for partition-wise slicing
# Convert index to list
if isinstance(index, int):
index = [index]
index = np.arange(self.npartitions, dtype=object)[index].tolist()
# Check that selection makes sense
assert set(index).issubset(range(self.npartitions))
# Return selected partitions
return new_collection(expr.Partitions(self, index))
@property
def partitions(self):
"""Slice dataframe by partitions
This allows partitionwise slicing of a Dask Dataframe. You can perform normal
Numpy-style slicing, but now rather than slice elements of the array you
slice along partitions so, for example, ``df.partitions[:5]`` produces a new
Dask Dataframe of the first five partitions. Valid indexers are integers, sequences
of integers, slices, or boolean masks.
Examples
--------
>>> df.partitions[0] # doctest: +SKIP
>>> df.partitions[:3] # doctest: +SKIP
>>> df.partitions[::10] # doctest: +SKIP
Returns
-------
A Dask DataFrame
"""
return IndexCallable(self._partitions)
def get_partition(self, n):
"""
Get a dask DataFrame/Series representing the `nth` partition.
Parameters
----------
n : int
The 0-indexed partition number to select.
Returns
-------
Dask DataFrame or Series
The same type as the original object.
See Also
--------
DataFrame.partitions
"""
if not 0 <= n < self.npartitions:
msg = f"n must be 0 <= n < {self.npartitions}"
raise ValueError(msg)
return self.partitions[n]
def shuffle(
self,
on: str | list | no_default = no_default,
ignore_index: bool = False,
npartitions: int | None = None,
shuffle_method: str | None = None,
on_index: bool = False,
**options,
):
"""Rearrange DataFrame into new partitions
Uses hashing of `on` to map rows to output partitions. After this
operation, rows with the same value of `on` will be in the same
partition.
Parameters
----------
on : str, list of str, or Series, Index, or DataFrame
Column names to shuffle by.
ignore_index : optional
Whether to ignore the index. Default is ``False``.
npartitions : optional
Number of output partitions. The partition count will
be preserved by default.
shuffle_method : optional
Desired shuffle method. Default chosen at optimization time.
on_index : bool, default False
Whether to shuffle on the index. Mutually exclusive with 'on'.
Set this to ``True`` if 'on' is not provided.
**options : optional
Algorithm-specific options.
Notes
-----
This does not preserve a meaningful index/partitioning scheme. This
is not deterministic if done in parallel.
Examples
--------
>>> df = df.shuffle(df.columns[0]) # doctest: +SKIP
"""
if on is no_default and not on_index:
raise TypeError(
"Must shuffle on either columns or the index; currently shuffling on "
"neither. Pass column(s) to 'on' or set 'on_index' to True."
)
elif on is not no_default and on_index:
raise TypeError(
"Cannot shuffle on both columns and the index. Do not pass column(s) "
"to 'on' or set 'on_index' to False."
)
# Preserve partition count by default
npartitions = npartitions or self.npartitions
if isinstance(on, FrameBase):
if not expr.are_co_aligned(self.expr, on.expr):
raise TypeError(
"index must be aligned with the DataFrame to use as shuffle index."
)
else:
if pd.api.types.is_list_like(on) and not is_dask_collection(on):
on = list(on)
elif isinstance(on, str) or isinstance(on, int):
on = [on]
elif on_index:
on = []
bad_cols = [
index_col
for index_col in on
if (index_col not in self.columns) and (index_col != self.index.name)
]
if bad_cols:
raise KeyError(
f"Cannot shuffle on {bad_cols}, column(s) not in dataframe to shuffle"
)
if (shuffle_method or get_default_shuffle_method()) == "p2p":
from distributed.shuffle._arrow import check_dtype_support
check_dtype_support(self._meta)
if any(not isinstance(c, str) for c in self._meta.columns):
unsupported = {
c: type(c) for c in self._meta.columns if not isinstance(c, str)
}
raise TypeError(
f"p2p requires all column names to be str, found: {unsupported}",
)
# Returned shuffled result
return new_collection(
RearrangeByColumn(
self,
on,
npartitions,
ignore_index,
get_specified_shuffle(shuffle_method),
options,
index_shuffle=on_index,
)
)
@derived_from(pd.DataFrame)
def resample(self, rule, closed=None, label=None):
from dask_expr._resample import Resampler
return Resampler(self, rule, **{"closed": closed, "label": label})
def rolling(self, window, **kwargs):
"""Provides rolling transformations.
Parameters
----------
window : int, str, offset
Size of the moving window. This is the number of observations used
for calculating the statistic. When not using a ``DatetimeIndex``,
the window size must not be so large as to span more than one
adjacent partition. If using an offset or offset alias like '5D',
the data must have a ``DatetimeIndex``
min_periods : int, default None
Minimum number of observations in window required to have a value
(otherwise result is NA).
center : boolean, default False
Set the labels at the center of the window.
win_type : string, default None
Provide a window type. The recognized window types are identical
to pandas.
axis : int, str, None, default 0
This parameter is deprecated with ``pandas>=2.1``.
Returns
-------
a Rolling object on which to call a method to compute a statistic
"""
from dask_expr._rolling import Rolling
return Rolling(self, window, **kwargs)
@insert_meta_param_description(pad=12)
def map_partitions(
self,
func,
*args,
meta=no_default,
enforce_metadata=True,
transform_divisions=True,
clear_divisions=False,
align_dataframes=False,
parent_meta=None,
**kwargs,
):
"""Apply a Python function to each partition
Parameters
----------
func : function
Function applied to each partition.
args, kwargs :