-
Notifications
You must be signed in to change notification settings - Fork 31
Fix IndexError that arises when multiple executors per machine where partition ids are not continuous #742
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
c1e902d to
4b235cb
Compare
| logging.warning("cupy import failed; falling back to numpy.") | ||
|
|
||
| partition_index = pyspark.TaskContext().partitionId() | ||
| my_seed = partition_seeds[partition_index % len(partition_seeds)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good finding on this, but is there a way to guarantee unique seeds across the partitions? With this, we could have two partitions_indexes that differ by a multiple of partition_seeds and then they would have the same seed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can set task id as the random seed. However, the generated dataset is non-deterministic because the set of task ids changes in different runs.
Will think about if there is a way to guarantee both unique seeds and deterministic dataset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will you be revising this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revised to "partition_seed = global_random_state + partition id".
|
This is tricky. I think the main problem is that the final rdd when the computation happens can have a different number of partitions than the intended num_partitions. |
|
Maybe we can do this a different way and simply run double precision random number generator with an initial deterministic seed partitionID number of times to get the partition's seed. Then no need to know number of total partitions and partition seeds will be unique with high probability. |
|
The first stage has set the numPartitions explicitly, so even if there's repartition, the num partitions of shuffle write stage should be equal to numPartitions, which should not be a problem here. # Initial DataFrame with only row numbers
init = spark.range(rows, numPartitions=num_partitions)
res = init.mapInPandas(make_sparse_regression_udf, schema) |
|
build |
Does "double precision random number" mean integer random number generator? The random seed expects an integer value. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
…rminism of SparseRegressionDataGen (#894) Issue: #892 Relevant PR: #742 --------- Signed-off-by: Jinfeng <[email protected]>
The partition ids are not continuous when spark standalone mode launches multiple executors per machine. And this triggers an error in gen_data_distributed.py.