-
Notifications
You must be signed in to change notification settings - Fork 945
Description
when I run the yarn mode according to https://github.com/yahoo/TensorFlowOnSpark/wiki/GetStarted_YARN
echo $KRYLOV_WF_HOME
echo $KRYLOV_WF_TASK_NAME/$1
export PYTHON_ROOT=./Python
export LD_LIBRARY_PATH=${PATH}
export PYSPARK_PYTHON=${PYTHON_ROOT}/bin/python3
export SPARK_YARN_USER_ENV="PYSPARK_PYTHON=Python/bin/python3"
export PATH=${PYTHON_ROOT}/bin/:$PATH
export SPARK_WORKER_INSTANCES=4
export CORE_PER_WORKERS=2
export TOTAL_CORES=$((${CORE_PER_WORKERS}*${SPARK_WORKER_INSTANCES}))
/apache/releases/spark-3.1.1.0.9.0-bin-ebay/bin/spark-submit
--master yarn
--deploy-mode cluster
--queue hdlq-business-ads-guidance-high-mem
--num-executors ${SPARK_WORKER_INSTANCES}
--executor-cores ${CORE_PER_WORKERS}
--conf spark.cores.max=${TOTAL_CORES}
--conf spark.task.cpus=${CORES_PER_WORKER}
--executor-memory 24G
--archives "hdfs://user/tfos/Python.zip#Python"
--conf spark.executorEnv.LD_LIBRARY_PATH=$LIB_JVM:$LIB_HDFS
--conf spark.executorEnv.CLASSPATH=$(hadoop classpath --glob)
--py-files $KRYLOV_WF_HOME/src/$KRYLOV_WF_TASK_NAME/TensorFlowOnSpark/tfspark.zip
--conf spark.dynamicAllocation.enabled=false
--conf spark.yarn.maxAppAttempts=1
$KRYLOV_WF_HOME/src/$KRYLOV_WF_TASK_NAME/$1
--images_labels "hdfs/mnist/csv/csv/train/"
--model_dir "./mnist_model"
--export_dir "./mnist_export"
--cluster_size ${SPARK_WORKER_INSTANCES}
When I sumbit it to my spark. There is no error when the executor ips are different from each other.
But when two executors got same ip, the program crash at TFSparkNode.run(). And I found
nodeRDD.foreachPartition(TFSparkNode.run(map_fun,
tf_args,
cluster_meta,
tensorboard,
log_dir,
queues,
background=(input_mode == InputMode.SPARK)))
because two elements are processed by only two executors with different ips. And the other two who share the same ips with the processing one got error at util.read_executor_id()
def read_executor_id():
"""Read worker id from a local file in the executor's current working directory"""
logger.info("read_executor_id os.listdir('./') is {0}".format(os.listdir('./')))
logger.info("read_executor_id os.path.isfile(executor_id) {0}".format(os.path.isfile("executor_id")))
if os.path.isfile("executor_id"):
with open("executor_id", "r") as f:
return int(f.read())
else:
msg = "No executor_id file found on this node, please ensure that:\n" +
"1. Spark num_executors matches TensorFlow cluster_size\n" +
"2. Spark tasks per executor is 1\n" +
"3. Spark dynamic allocation is disabled\n" +
"4. There are no other root-cause exceptions on other nodes\n"
raise Exception(msg)
below I pasted the detail info
2023-09-17 19:26:06,020 INFO (MainThread-736679) sorted_cluster_info : [{'executor_id': 0, 'host': '10.97.210.22', 'job_name': 'chief', 'task_index': 0, 'port': 37505, 'tb_pid': 0, 'tb_port': 0, 'addr': '/tmp/pymp-lh_bcdpi/listener-i_exgo0d', 'authkey': b'\x1d\xe1\xe21L5O\x1a\xb2\x14e3\x96\xc2\x02\x7f'}, {'executor_id': 1, 'host': '10.97.210.22', 'job_name': 'worker', 'task_index': 0, 'port': 36295, 'tb_pid': 0, 'tb_port': 0, 'addr': '/tmp/pymp-x86briu5/listener-lcfs6er9', 'authkey': b'\x9b\x838\xac2\xdfJ\\xa5\xd8\xd6Q\xf8e\xc8O'}, {'executor_id': 2, 'host': '10.183.5.149', 'job_name': 'worker', 'task_index': 1, 'port': 34805, 'tb_pid': 0, 'tb_port': 0, 'addr': '/tmp/pymp-gf6tu6eb/listener-sbphcavb', 'authkey': b'\xbfo\xc1_\x8d\xb9F%\xb7\xe5\xfa%\xd0\x9a\x18K'}, {'executor_id': 3, 'host': '10.183.5.149', 'job_name': 'worker', 'task_index': 2, 'port': 44817, 'tb_pid': 0, 'tb_port': 0, 'addr': '/tmp/pymp-v2ai241q/listener-ybhpgpqi', 'authkey': b'\xdd\xf8\xd6[\xb9\xdfH\xaf\xa6M\xe4aP\xa4\xa7\xc7'}]
2023-09-17 19:26:06,020 INFO (MainThread-736679) node: {'executor_id': 0, 'host': '10.97.210.22', 'job_name': 'chief', 'task_index': 0, 'port': 37505, 'tb_pid': 0, 'tb_port': 0, 'addr': '/tmp/pymp-lh_bcdpi/listener-i_exgo0d', 'authkey': b'\x1d\xe1\xe21L5O\x1a\xb2\x14e3\x96\xc2\x02\x7f'} : last_executor_id : -1
2023-09-17 19:26:06,020 INFO (MainThread-736679) node: {'executor_id': 1, 'host': '10.97.210.22', 'job_name': 'worker', 'task_index': 0, 'port': 36295, 'tb_pid': 0, 'tb_port': 0, 'addr': '/tmp/pymp-x86briu5/listener-lcfs6er9', 'authkey': b'\x9b\x838\xac2\xdfJ\\xa5\xd8\xd6Q\xf8e\xc8O'} : last_executor_id : 0
2023-09-17 19:26:06,020 INFO (MainThread-736679) node: {'executor_id': 2, 'host': '10.183.5.149', 'job_name': 'worker', 'task_index': 1, 'port': 34805, 'tb_pid': 0, 'tb_port': 0, 'addr': '/tmp/pymp-gf6tu6eb/listener-sbphcavb', 'authkey': b'\xbfo\xc1_\x8d\xb9F%\xb7\xe5\xfa%\xd0\x9a\x18K'} : last_executor_id : 1
2023-09-17 19:26:06,020 INFO (MainThread-736679) node: {'executor_id': 3, 'host': '10.183.5.149', 'job_name': 'worker', 'task_index': 2, 'port': 44817, 'tb_pid': 0, 'tb_port': 0, 'addr': '/tmp/pymp-v2ai241q/listener-ybhpgpqi', 'authkey': b'\xdd\xf8\xd6[\xb9\xdfH\xaf\xa6M\xe4aP\xa4\xa7\xc7'} : last_executor_id : 2
2023-09-17 19:26:06,020 INFO (MainThread-736679) export TF_CONFIG: {"cluster": {"chief": ["10.97.210.22:37505"], "worker": ["10.97.210.22:36295", "10.183.5.149:34805", "10.183.5.149:44817"]}, "task": {"type": "chief", "index": 0}, "environment": "cloud"}