Open
Description
Hello Team,
I am using TF dataset using tfio bq connected. After training for 2-3 hours, my training job fails every time. Can someone please take a look. Failure occurs only after the training prolonged preiod of training. Is there something i am doing wrong creating the TFDS?
from typing import Any, Dict
import tensorflow as tf
from tensorflow_io.bigquery import BigQueryClient
def get_tfds_from_bq(
tableref: str,
feature_spec_config: FeatureSpecConfig,
num_streams: int = 4,
add_blacklist_cols: bool = False,
) -> tf.data.Dataset:
tableref_splits = tableref.split(".")
(
project_id,
dataset_id,
table_id,
) = (
tableref_splits[0],
tableref_splits[1],
tableref_splits[2],
)
tensorflow_io_bigquery_client = BigQueryClient()
########################################################################
################## Gather Column Names and TF-Types ####################
########################################################################
f_dtypes = feature_spec_config.get_all_feature_tftypes()
feature_names = list(f_dtypes.keys())
feature_dtypes = list(f_dtypes.values())
column_names = feature_names # + [label_name]
column_dtypes = feature_dtypes # + [label_dtype]
if add_blacklist_cols:
extraf_dtypes = feature_spec_config.get_blacklist_feature_tftypes()
extraf_names = list(extraf_dtypes.keys())
extraf_types = list(extraf_dtypes.values())
column_names += extraf_names
column_dtypes += extraf_types
label_name = feature_spec_config.get_label_col
label_dtype = feature_spec_config.get_label_dtype
column_names_with_label = column_names + [label_name]
column_dtypes_with_label = column_dtypes + [label_dtype]
########################################################################
################## Read BQ Table using TFIO Session ####################
########################################################################
read_session = tensorflow_io_bigquery_client.read_session(
"projects/" + project_id,
project_id,
table_id,
dataset_id,
selected_fields=column_names_with_label,
output_types=column_dtypes_with_label,
requested_streams=num_streams,
)
dataset = read_session.parallel_read_rows()
########################################################################
############ Convert TFRecord to features-Dict and Label ###############
########################################################################
def extract_features_and_labels(row: Dict[str, Any]):
features = {col: row[col] for col in column_names}
label = row[feature_spec_config.get_label_col]
return features, label
dataset = dataset.map(
extract_features_and_labels,
num_parallel_calls=tf.data.AUTOTUNE,
)
dataset = dataset.prefetch(tf.data.AUTOTUNE)
return dataset
This is the GRPC debug logs before the training fails
I0819 20:32:57.854700572 1127204 completion_queue.cc:1298] RETURN_EVENT[0x707df4008120]: OP_COMPLETE: tag:0x707e1dffa060 OK
I0819 20:32:57.858648025 1127212 call.cc:1964] grpc_call_start_batch(call=0x707ea8008520, ops=0x707deeffbdc0, nops=1, tag=0x707deeffc060, reserved=(nil))
I0819 20:32:57.858673705 1127212 call.cc:1565] ops[0]: RECV_MESSAGE ptr=0x707deeffc088
I0819 20:32:57.858719982 1127212 completion_queue.cc:764] cq_end_op_for_pluck(cq=0x707ea8008190, tag=0x707deeffc060, error="No Error", done=0x707f4120f53f, done_arg=0x707ea800b560, storage=0x707ea800b5d0)
I0819 20:32:57.858736681 1127212 completion_queue.cc:1298] RETURN_EVENT[0x707ea8008190]: OP_COMPLETE: tag:0x707deeffc060 OK
I0819 20:32:57.862616068 1127212 call.cc:1964] grpc_call_start_batch(call=0x707e6000c760, ops=0x707deeffbdc0, nops=1, tag=0x707deeffc060, reserved=(nil))
I0819 20:32:57.862655671 1127212 call.cc:1565] ops[0]: RECV_MESSAGE ptr=0x707deeffc088
I0819 20:32:57.862723114 1127212 completion_queue.cc:764] cq_end_op_for_pluck(cq=0x707e60004500, tag=0x707deeffc060, error="No Error", done=0x707f4120f53f, done_arg=0x707e6000f7a0, storage=0x707e6000f810)
I0819 20:32:57.862748906 1127212 completion_queue.cc:1298] RETURN_EVENT[0x707e60004500]: OP_COMPLETE: tag:0x707deeffc060 OK
I0819 20:32:57.875126214 1127212 call.cc:1964] grpc_call_start_batch(call=0x707ed00086a0, ops=0x707deeffbdc0, nops=1, tag=0x707deeffc060, reserved=(nil))
I0819 20:32:57.875163922 1127212 call.cc:1565] ops[0]: RECV_MESSAGE ptr=0x707deeffc088
I0819 20:32:57.875238139 1127212 completion_queue.cc:764] cq_end_op_for_pluck(cq=0x707ed0008330, tag=0x707deeffc060, error="No Error", done=0x707f4120f53f, done_arg=0x707ed000b6e0, storage=0x707ed000b750)
I0819 20:32:57.875256129 1127212 completion_queue.cc:1298] RETURN_EVENT[0x707ed0008330]: OP_COMPLETE: tag:0x707deeffc060 OK
I0819 20:32:57.875265760 1127212 call.cc:1964] grpc_call_start_batch(call=0x707ed00086a0, ops=0x707deeffbda0, nops=1, tag=0x707deeffc040, reserved=(nil))
I0819 20:32:57.875274654 1127212 call.cc:1565] ops[0]: RECV_STATUS_ON_CLIENT metadata=0x707ed0001740 status=0x707deeffc070 details=0x707deeffc078
D0819 20:32:57.875358205 1127212 call.cc:733] set_final_status CLI
D0819 20:32:57.875381066 1127212 call.cc:734] {"created":"@1724099577.875340028","description":"Error received from peer ipv4:74.125.197.95:443","file":"external/com_github_grpc_grpc/src/core/lib/surface/call.cc","file_line":1056,"grpc_message":"Connection reset by peer","grpc_status":14}
I0819 20:32:57.875394529 1127212 completion_queue.cc:764] cq_end_op_for_pluck(cq=0x707ed0008330, tag=0x707deeffc040, error="No Error", done=0x707f4120f53f, done_arg=0x707dd071ea30, storage=0x707dd071eaa0)
I0819 20:32:57.875404808 1127212 completion_queue.cc:1298] RETURN_EVENT[0x707ed0008330]: OP_COMPLETE: tag:0x707deeffc040 OK
I0819 20:32:57.875576937 1127212 call.cc:1964] grpc_call_start_batch(call=0x707ed00086a0, ops=0x707deeffbdc0, nops=1, tag=0x707deeffc060, reserved=(nil))
I0819 20:32:57.875586053 1127212 call.cc:1565] ops[0]: RECV_MESSAGE ptr=0x707deeffc088
I0819 20:32:57.875595125 1127212 completion_queue.cc:764] cq_end_op_for_pluck(cq=0x707ed0008330, tag=0x707deeffc060, error="No Error", done=0x707f4120f53f, done_arg=0x707ed000b6e0, storage=0x707ed000b750)
I0819 20:32:57.875606287 1127212 completion_queue.cc:1298] RETURN_EVENT[0x707ed0008330]: OP_COMPLETE: tag:0x707deeffc060 OK
I0819 20:32:57.875615791 1127212 call.cc:1964] grpc_call_start_batch(call=0x707ed00086a0, ops=0x707deeffbda0, nops=1, tag=0x707deeffc040, reserved=(nil))
I0819 20:32:57.875623693 1127212 call.cc:1565] ops[0]: RECV_STATUS_ON_CLIENT metadata=0x707ed0001740 status=0x707deeffc070 details=0x707deeffc078
E0819 20:32:57.875628783 1127212 call_op_set.h:947] assertion failed: false
[1] 1126500 IOT instruction (core dumped) python -m train_model --n_jobs 8
Library version
tensorboard==2.15.2
tensorboard-data-server==0.7.2
tensorboard-plugin-profile==2.16.0
tensorboard-plugin-wit==1.8.1
tensorflow==2.15.1
tensorflow-addons==0.21.0
tensorflow-data-validation==1.15.1
tensorflow-decision-forests==1.8.1
tensorflow-estimator==2.15.0
tensorflow-gpu==2.11.0
tensorflow-hub==0.16.1
tensorflow-io==0.36.0
tensorflow-io-gcs-filesystem==0.36.0
tensorflow-metadata==1.14.0
tensorflow-ranking==0.5.5
tensorflow-serving-api==2.14.1
tensorflow-transform==1.14.0
tensorflowjs==4.17.0
Metadata
Metadata
Assignees
Labels
No labels
Activity