Skip to content

Commit 46acc59

Browse files
authored
[BACKPORT] Fix crash when storing data inside Docker containers (#1429) (#1432)
1 parent 29e2b00 commit 46acc59

28 files changed

+274
-147
lines changed

.github/workflows/ci.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ jobs:
8383
conda install -n test --quiet --yes -c pytorch python=$PYTHON faiss-cpu
8484
fi
8585
if [[ $UNAME == "linux" ]] && [[ ! "$PYTHON" =~ "3.8" ]]; then
86-
pip install tensorflow
86+
pip install tensorflow\<2.3.0
8787
pip install torch torchvision
8888
pip install tsfresh
8989
fi
@@ -112,7 +112,7 @@ jobs:
112112
run: |
113113
source ./.github/workflows/reload-env.sh
114114
# stop the build if there are Python syntax errors or undefined names
115-
flake8 mars --count --select=E9,E111,E225,E302,E303,E901,E999,F7,F63,F82,F401,F821,F822,F823,F841,W291,W292,W391 --show-source --statistics
115+
flake8 mars --count --select=E9,E111,E225,E302,E303,E901,E999,F7,F63,F82,F401,F821,F822,F823,F841,W291,W292,W391,W605 --show-source --statistics
116116
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
117117
flake8 mars --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
118118

mars/dataframe/core.py

+5
Original file line numberDiff line numberDiff line change
@@ -105,11 +105,16 @@ class Index(IndexBase):
105105
class RangeIndex(IndexBase):
106106
_name = AnyField('name')
107107
_slice = SliceField('slice')
108+
_dtype = DataTypeField('dtype')
108109

109110
@property
110111
def slice(self):
111112
return self._slice
112113

114+
@property
115+
def dtype(self):
116+
return getattr(self, '_dtype', np.dtype(np.intc))
117+
113118
def to_pandas(self):
114119
slc = self._slice
115120
return pd.RangeIndex(slc.start, slc.stop, slc.step,

mars/dataframe/sort/psrs.py

+11-3
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@
1616
import pandas as pd
1717

1818
from ... import opcodes as OperandDef
19+
from ...context import RunningMode
1920
from ...utils import lazy_import, get_shuffle_input_keys_idxes
2021
from ...operands import OperandStage
21-
from ...serialize import ValueType, Int32Field, ListField, StringField, BoolField
22+
from ...serialize import Int32Field, ListField, StringField, BoolField
2223
from ...tensor.base.psrs import PSRSOperandMixin
2324
from ..utils import standardize_range_index
2425
from ..operands import DataFrameOperandMixin, DataFrameOperand, DataFrameShuffleProxy, \
@@ -254,7 +255,7 @@ class DataFramePSRSChunkOperand(DataFrameOperand):
254255
_sort_type = StringField('sort_type')
255256

256257
_axis = Int32Field('axis')
257-
_by = ListField('by', ValueType.string)
258+
_by = ListField('by')
258259
_ascending = BoolField('ascending')
259260
_inplace = BoolField('inplace')
260261
_kind = StringField('kind')
@@ -381,7 +382,7 @@ class DataFramePSRSShuffle(DataFrameMapReduceOperand, DataFrameOperandMixin):
381382

382383
# for shuffle map
383384
_axis = Int32Field('axis')
384-
_by = ListField('by', ValueType.string)
385+
_by = ListField('by')
385386
_ascending = BoolField('ascending')
386387
_inplace = BoolField('inplace')
387388
_na_position = StringField('na_position')
@@ -459,6 +460,7 @@ def _execute_dataframe_map(cls, ctx, op):
459460
poses = records.searchsorted(p_records, side='right')
460461
else:
461462
poses = len(records) - records[::-1].searchsorted(p_records, side='right')
463+
del records, p_records
462464

463465
poses = (None,) + tuple(poses) + (None,)
464466
for i in range(op.n_partition):
@@ -529,6 +531,12 @@ def _execute_reduce(cls, ctx, op):
529531
raw_inputs = [ctx[(input_key, op.shuffle_key)] for input_key in input_keys]
530532
xdf = pd if isinstance(raw_inputs[0], (pd.DataFrame, pd.Series)) else cudf
531533
concat_values = xdf.concat(raw_inputs, axis=op.axis)
534+
535+
del raw_inputs[:]
536+
if getattr(ctx, 'running_mode', None) == RunningMode.distributed:
537+
for input_key in input_keys:
538+
ctx.pop((input_key, op.shuffle_key), None)
539+
532540
if op.sort_type == 'sort_values':
533541
ctx[op.outputs[0].key] = execute_sort_values(concat_values, op)
534542
else:

mars/dataframe/utils.py

+1
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,7 @@ def _serialize_range_index(index):
237237
'_max_val_close': False,
238238
'_key': key or _tokenize_index(index, *args),
239239
'_name': index.name,
240+
'_dtype': index.dtype,
240241
}
241242
else:
242243
properties = _extract_property(index, IndexValue.RangeIndex, False)

mars/learn/cluster/_k_means_common.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ def execute(cls, ctx, op):
288288
ctx[op.outputs[1].key] = out_weight_in_clusters
289289

290290

291-
def _relocate_empty_clusters(X ,sample_weight, centers_old, centers_new,
291+
def _relocate_empty_clusters(X, sample_weight, centers_old, centers_new,
292292
weight_in_clusters, labels, to_run=None,
293293
session=None, run_kwargs=None):
294294
to_run = to_run or list()

mars/learn/cluster/_kmeans.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -482,7 +482,7 @@ class KMeans(TransformerMixin, ClusterMixin, BaseEstimator):
482482
The number of clusters to form as well as the number of
483483
centroids to generate.
484484
485-
init : {'k-means++', 'random'} or tensor of shape \
485+
init : {'k-means++', 'k-means||', 'random'} or tensor of shape \
486486
(n_clusters, n_features), default='k-means||'
487487
Method for initialization, defaults to 'k-means||':
488488

mars/learn/tests/integrated/base.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,12 @@ def start_distributed_env(self, *args, **kwargs):
4646
self._start_distributed_env(*args, **kwargs)
4747
break
4848
except ProcessRequirementUnmetError:
49+
self.terminate_processes()
4950
fail_count += 1
50-
if fail_count >= 3:
51+
if fail_count >= 10:
5152
raise
53+
time.sleep(5)
5254
logger.error('Failed to start service, retrying')
53-
self.terminate_processes()
5455

5556
def _start_distributed_env(self, n_workers=2):
5657
scheduler_port = self.scheduler_port = str(get_next_port())

mars/operands.py

+15-14
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,12 @@
2727
from .serialize import SerializableMetaclass, ValueType, ProviderType, IdentityField, \
2828
ListField, DictField, Int32Field, BoolField, StringField
2929
from .tiles import NotSupportTile
30-
from .utils import AttributeDict, to_str, calc_data_size, is_eager_mode, is_object_dtype
30+
from .utils import AttributeDict, to_str, calc_data_size, is_eager_mode, calc_object_overhead
3131

3232

3333
operand_type_to_oprand_cls = {}
3434
OP_TYPE_KEY = '_op_type_'
3535
OP_MODULE_KEY = '_op_module_'
36-
OBJECT_FIELD_OVERHEAD = 50
3736
T = TypeVar('T')
3837

3938

@@ -373,27 +372,26 @@ def execute(cls, ctx, op):
373372

374373
@classmethod
375374
def estimate_size(cls, ctx, op):
376-
from .dataframe.core import DATAFRAME_CHUNK_TYPE, SERIES_CHUNK_TYPE, INDEX_CHUNK_TYPE
377-
378375
exec_size = 0
379376
outputs = op.outputs
380377
if all(not c.is_sparse() and hasattr(c, 'nbytes') and not np.isnan(c.nbytes) for c in outputs):
381378
for out in outputs:
382379
ctx[out.key] = (out.nbytes, out.nbytes)
383380

381+
all_overhead = 0
384382
for inp in op.inputs or ():
385383
try:
384+
if isinstance(inp.op, FetchShuffle):
385+
keys_and_shapes = inp.extra_params.get('_shapes', dict()).items()
386+
else:
387+
keys_and_shapes = [(inp.key, getattr(inp, 'shape', None))]
388+
386389
# execution size of a specific data chunk may be
387390
# larger than stored type due to objects
388-
obj_overhead = n_strings = 0
389-
if getattr(inp, 'shape', None) and not np.isnan(inp.shape[0]):
390-
if isinstance(inp, DATAFRAME_CHUNK_TYPE) and inp.dtypes is not None:
391-
n_strings = len([dt for dt in inp.dtypes if is_object_dtype(dt)])
392-
elif isinstance(inp, (INDEX_CHUNK_TYPE, SERIES_CHUNK_TYPE)) and inp.dtype is not None:
393-
n_strings = 1 if is_object_dtype(inp.dtype) else 0
394-
obj_overhead += n_strings * inp.shape[0] * OBJECT_FIELD_OVERHEAD
395-
396-
exec_size += ctx[inp.key][0] + obj_overhead
391+
for key, shape in keys_and_shapes:
392+
overhead = calc_object_overhead(inp, shape)
393+
all_overhead += overhead
394+
exec_size += ctx[key][0] + overhead
397395
except KeyError:
398396
if not op.sparse:
399397
inp_size = calc_data_size(inp)
@@ -405,7 +403,10 @@ def estimate_size(cls, ctx, op):
405403
chunk_sizes = dict()
406404
for out in outputs:
407405
try:
408-
chunk_size = calc_data_size(out) if not out.is_sparse() else exec_size
406+
if not out.is_sparse():
407+
chunk_size = calc_data_size(out) + all_overhead // len(outputs)
408+
else:
409+
chunk_size = exec_size
409410
if np.isnan(chunk_size):
410411
raise TypeError
411412
chunk_sizes[out.key] = chunk_size

mars/scheduler/operands/base.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ def stop_operand(self, state=OperandState.CANCELLING):
236236
def add_running_predecessor(self, op_key, worker):
237237
self._running_preds.add(op_key)
238238

239-
def add_finished_predecessor(self, op_key, worker, output_sizes=None):
239+
def add_finished_predecessor(self, op_key, worker, output_sizes=None, output_shapes=None):
240240
self._finish_preds.add(op_key)
241241

242242
def add_finished_successor(self, op_key, worker):

mars/scheduler/operands/common.py

+8-4
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ def __init__(self, session_id, graph_id, op_key, op_info, worker=None, allocated
5454
self._pred_workers = set()
5555

5656
self._data_sizes = None
57+
self._data_shapes = None
5758

5859
self._input_worker_scores = dict()
5960
self._worker_scores = dict()
@@ -118,13 +119,14 @@ def add_running_predecessor(self, op_key, worker):
118119
self.update_demand_depths(self._info.get('optimize', {}).get('depth', 0))
119120

120121
@log_unhandled
121-
def add_finished_predecessor(self, op_key, worker, output_sizes=None):
122+
def add_finished_predecessor(self, op_key, worker, output_sizes=None, output_shapes=None):
122123
"""
123124
This function shall return whether current node is ready. The return values will
124125
be collected by the predecessor to judge if a node with lower-priority can be
125126
scheduled.
126127
"""
127-
super().add_finished_predecessor(op_key, worker, output_sizes=output_sizes)
128+
super().add_finished_predecessor(op_key, worker, output_sizes=output_sizes,
129+
output_shapes=output_shapes)
128130
if all(k in self._finish_preds for k in self._pred_keys):
129131
# all predecessors done, the operand can be executed now
130132
if self.state == OperandState.UNSCHEDULED:
@@ -411,14 +413,15 @@ def _on_running(self):
411413
self._op_key, self.worker, _tell=True, _wait=False)
412414

413415
@log_unhandled
414-
def _acceptor(data_sizes):
416+
def _acceptor(data_sizes, data_shapes):
415417
self._allocated = False
416418
if not self._is_worker_alive():
417419
return
418420
self._resource_ref.deallocate_resource(
419421
self._session_id, self._op_key, self.worker, _tell=True, _wait=False)
420422

421423
self._data_sizes = data_sizes
424+
self._data_shapes = data_shapes
422425
self._io_meta['data_targets'] = list(data_sizes)
423426
self.start_operand(OperandState.FINISHED)
424427

@@ -482,7 +485,8 @@ def _on_finished(self):
482485
# record if successors can be executed
483486
for out_key in self._succ_keys:
484487
succ_futures.append(self._get_operand_actor(out_key).add_finished_predecessor(
485-
self._op_key, self.worker, output_sizes=self._data_sizes, _wait=False))
488+
self._op_key, self.worker, output_sizes=self._data_sizes,
489+
output_shapes=self._data_shapes, _wait=False))
486490

487491
pred_futures = []
488492
for in_key in self._pred_keys:

mars/scheduler/operands/shuffle.py

+5-3
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,9 @@ def __init__(self, session_id, graph_id, op_key, op_info, **kwargs):
4141
self._mapper_op_to_chunk = dict()
4242
self._reducer_to_mapper = defaultdict(dict)
4343

44-
def add_finished_predecessor(self, op_key, worker, output_sizes=None):
45-
super().add_finished_predecessor(op_key, worker, output_sizes=output_sizes)
44+
def add_finished_predecessor(self, op_key, worker, output_sizes=None, output_shapes=None):
45+
super().add_finished_predecessor(op_key, worker, output_sizes=output_sizes,
46+
output_shapes=output_shapes)
4647

4748
from ..chunkmeta import WorkerMeta
4849
chunk_key = next(iter(output_sizes.keys()))[0]
@@ -62,7 +63,8 @@ def add_finished_predecessor(self, op_key, worker, output_sizes=None):
6263
for (chunk_key, shuffle_key), data_size in output_sizes.items() or ():
6364
succ_op_key = shuffle_keys_to_op[shuffle_key]
6465
meta = self._reducer_to_mapper[succ_op_key][op_key] = \
65-
WorkerMeta(chunk_size=data_size, workers=(worker,))
66+
WorkerMeta(chunk_size=data_size, workers=(worker,),
67+
chunk_shape=output_shapes.get((chunk_key, shuffle_key)))
6668
reducer_worker = reducer_workers.get(succ_op_key)
6769
if reducer_worker and reducer_worker != worker:
6870
data_to_addresses[(chunk_key, shuffle_key)] = [reducer_worker]

mars/scheduler/operands/successors_exclusive.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,12 @@ def __init__(self, session_id, graph_id, op_key, op_info, **kwargs):
3131
self._finished_sucessors = set()
3232
self._is_successor_running = False
3333

34-
def add_finished_predecessor(self, op_key, worker, output_sizes=None):
35-
super().add_finished_predecessor(op_key, worker, output_sizes=output_sizes)
34+
def add_finished_predecessor(self, op_key, worker, output_sizes=None, output_shapes=None):
35+
super().add_finished_predecessor(op_key, worker, output_sizes=output_sizes,
36+
output_shapes=output_shapes)
3637

3738
from ..chunkmeta import WorkerMeta
38-
data_meta = {k: WorkerMeta(chunk_size=v, workers=(worker,))
39+
data_meta = {k: WorkerMeta(chunk_size=v, workers=(worker,), chunk_shape=output_shapes.get(k))
3940
for k, v in output_sizes.items()}
4041
sucessor_op_key = self._predecessors_to_sucessors[op_key]
4142
self._ready_successors_queue.append((sucessor_op_key, data_meta))

mars/scheduler/operands/tests/test_common_exec.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,9 @@ def actual_exec(self, session_id, graph_key):
8787
for tk in rec.data_targets:
8888
for n in key_to_chunks[tk]:
8989
self.chunk_meta.add_worker(session_id, n.key, 'localhost:12345')
90-
self._results[graph_key] = ((dict(),), dict())
90+
self._results[graph_key] = ((dict(),), dict(), dict())
9191
for cb in rec.finish_callbacks:
92-
self.tell_promise(cb, {})
92+
self.tell_promise(cb, {}, {})
9393
rec.finish_callbacks = []
9494

9595
@log_unhandled

mars/scheduler/tests/integrated/base.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,12 @@ def start_processes(self, *args, **kwargs):
108108
self._start_processes(*args, **kwargs)
109109
break
110110
except ProcessRequirementUnmetError:
111+
self.terminate_processes()
111112
fail_count += 1
112-
if fail_count >= 3:
113+
if fail_count >= 10:
113114
raise
115+
time.sleep(5)
114116
logger.error('Failed to start service, retrying')
115-
self.terminate_processes()
116117

117118
def _start_processes(self, n_schedulers=2, n_workers=2, etcd=False, cuda=False, modules=None,
118119
log_scheduler=True, log_worker=True, env=None, scheduler_args=None,

mars/scheduler/tests/integrated/test_normal_execution.py

+7
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,13 @@ def testMainDataFrameWithoutEtcd(self):
135135
result = r.execute(session=sess, timeout=self.timeout).fetch(session=sess)
136136
pd.testing.assert_frame_equal(result, raw1 + raw2)
137137

138+
raw1 = pd.DataFrame(np.random.rand(10, 10))
139+
raw1[0] = raw1[0].apply(str)
140+
df1 = md.DataFrame(raw1, chunk_size=5)
141+
r = df1.sort_values(0)
142+
result = r.execute(session=sess, timeout=self.timeout).fetch(session=sess)
143+
pd.testing.assert_frame_equal(result, raw1.sort_values(0))
144+
138145
s1 = pd.Series(np.random.rand(10), index=[11, 1, 2, 5, 7, 6, 8, 9, 10, 3])
139146
series1 = md.Series(s1, chunk_size=6)
140147
result = series1.execute(session=sess, timeout=self.timeout).fetch(session=sess)

mars/serialize/protos/indexvalue.proto

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ message IndexValue {
2424
message RangeIndex {
2525
Value name = 1;
2626
Value slice = 2;
27+
Value dtype = 3;
2728
// public fields
2829
string key = 51;
2930
bool is_monotonic_increasing = 52;

0 commit comments

Comments
 (0)