Description
What happened?
I'm trying to get the SparkRunner working properly with pre-compiled JARs from my Python job (created with --output_executable=job.jar
) that can be run with spark-submit
on Kubernetes. Unfortunately, there seems to be a problem with the log routing / plumbing, so most of the job logs just get lost in the weeds.
I first tried the PROCESS
environment type in a custom Spark image, but any output from the boot
binary in the container gets lost entirely. Hence the job either fails or succeeds, but I have no idea why, unless I attach to the executor container and strace the boot
process.
I then tried EXTERNAL
, because there at least I can check the logs of the sidecar SDK container. The example job is running, but the stdout/stderr of the job itself isn't fed back to the Spark driver, so even in client, mode I have no idea what's going on.
There seems to be some sort of problem with the GRPC log server. The Spark executor container shows a bunch of these stacktraces:
23/12/08 09:52:19 INFO GrpcLoggingService: Beam Fn Logging client connected.
23/12/08 09:52:31 INFO GrpcLoggingService: Beam Fn Logging client connected.
23/12/08 09:52:31 WARN py:291: Not setting flag with value None: runner
23/12/08 09:52:31 INFO py:111: semi_persistent_directory: /tmp
23/12/08 09:52:31 WARN py:356: No session file found: /tmp/staged/pickled_main_session. Functions defined in __main__ (interactive session) may fail.
23/12/08 09:52:31 WARN py:367: Discarding unparseable args: ['--app_name=BeamApp-roce3528-1208092121-c24b0136_accc3bf6-1ac5-4869-93df-9a9553150bef', '--direct_runner_use_stacked_bundle', '--enable_spark_metric_sinks', '--log_mdc', '--options_id=2', '--pipeline_type_check']
23/12/08 09:52:31 INFO py:135: Pipeline_options: {'job_name': 'BeamApp-roce3528-1208092121-c24b0136', 'gcp_oauth_scopes': ['https://www.googleapis.com/auth/bigquery', 'https://www.googleapis.com/auth/cloud-platform', 'https://www.googleapis.com/auth/devstorage.full_control', 'https://www.googleapis.com/auth/userinfo.email', 'https://www.googleapis.com/auth/datastore', 'https://www.googleapis.com/auth/spanner.admin', 'https://www.googleapis.com/auth/spanner.data', 'https://www.googleapis.com/auth/bigquery', 'https://www.googleapis.com/auth/cloud-platform', 'https://www.googleapis.com/auth/devstorage.full_control', 'https://www.googleapis.com/auth/userinfo.email', 'https://www.googleapis.com/auth/datastore', 'https://www.googleapis.com/auth/spanner.admin', 'https://www.googleapis.com/auth/spanner.data'], 'experiments': ['beam_fn_api'], 'sdk_location': 'container', 'environment_type': 'EXTERNAL', 'environment_config': 'localhost:50000', 'sdk_worker_parallelism': '1', 'environment_cache_millis': '0', 'output_executable_path': 'job.jar'}
23/12/08 09:52:31 INFO py:234: Creating state cache with size 104857600
23/12/08 09:52:31 INFO py:187: Creating insecure control channel for localhost:33659.
23/12/08 09:52:31 INFO py:195: Control channel established.
23/12/08 09:52:31 INFO py:243: Initializing SDKHarness with unbounded number of workers.
23/12/08 09:52:31 INFO py:211: Python sdk harness starting.
23/12/08 09:52:31 INFO FnApiControlClientPoolService: Beam Fn Control client connected with id 1-1
23/12/08 09:52:31 INFO FnApiControlClientPoolService: getProcessBundleDescriptor request with id 1-2
23/12/08 09:52:31 INFO py:885: Creating insecure state channel for localhost:45221.
23/12/08 09:52:31 INFO py:892: State channel established.
23/12/08 09:52:31 INFO py:770: Creating client data channel for localhost:37017
23/12/08 09:52:31 INFO GrpcDataService: Beam Fn Data client connected.
23/12/08 09:52:32 INFO DefaultJobBundleFactory: Closing environment urn: "beam:env:external:v1"
payload: "\n\021\n\017localhost:50000"
capabilities: "beam:coder:bytes:v1"
capabilities: "beam:coder:string_utf8:v1"
capabilities: "beam:coder:kv:v1"
capabilities: "beam:coder:bool:v1"
capabilities: "beam:coder:varint:v1"
capabilities: "beam:coder:double:v1"
capabilities: "beam:coder:iterable:v1"
capabilities: "beam:coder:timer:v1"
capabilities: "beam:coder:interval_window:v1"
capabilities: "beam:coder:length_prefix:v1"
capabilities: "beam:coder:global_window:v1"
capabilities: "beam:coder:windowed_value:v1"
capabilities: "beam:coder:param_windowed_value:v1"
capabilities: "beam:coder:state_backed_iterable:v1"
capabilities: "beam:coder:custom_window:v1"
capabilities: "beam:coder:row:v1"
capabilities: "beam:coder:sharded_key:v1"
capabilities: "beam:coder:nullable:v1"
capabilities: "beam:protocol:progress_reporting:v0"
capabilities: "beam:protocol:harness_monitoring_infos:v1"
capabilities: "beam:protocol:worker_status:v1"
capabilities: "beam:combinefn:packed_python:v1"
capabilities: "beam:version:sdk_base:apache/beam_python3.11_sdk:2.52.0"
capabilities: "beam:transform:sdf_truncate_sized_restrictions:v1"
capabilities: "beam:transform:to_string:v1"
capabilities: "beam:protocol:data_sampling:v1"
23/12/08 09:52:32 INFO GrpcLoggingService: 2 Beam Fn Logging clients still connected during shutdown.
23/12/08 09:52:32 WARN BeamFnDataGrpcMultiplexer: Hanged up for unknown endpoint.
23/12/08 09:52:32 ERROR SerializingExecutor: Exception while executing runnable org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed@721ac343
java.lang.IllegalStateException: call already closed
at org.apache.beam.vendor.grpc.v1p54p0.com.google.common.base.Preconditions.checkState(Preconditions.java:502)
at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerCallImpl.closeInternal(ServerCallImpl.java:219)
at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerCallImpl.close(ServerCallImpl.java:212)
at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onCompleted(ServerCalls.java:395)
at org.apache.beam.runners.fnexecution.state.GrpcStateService$Inbound.onCompleted(GrpcStateService.java:150)
at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onHalfClose(ServerCalls.java:273)
at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:355)
at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:867)
at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
23/12/08 09:52:33 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 12698 bytes result sent to driver
and the SDK container shows
2023/12/08 09:51:24 Starting worker pool 1: python -m apache_beam.runners.worker.worker_pool_main --service_port=50000 --container_executable=/opt/apache/beam/boot
Starting worker with command ['/opt/apache/beam/boot', '--id=1-1', '--logging_endpoint=localhost:43205', '--artifact_endpoint=localhost:37223', '--provision_endpoint=localhost:46595', '--control_endpoint=localhost:33659']
2023/12/08 09:52:19 Provision info:
pipeline_options:{fields:{key:"beam:option:allow_non_deterministic_key_coders:v1" value:{bool_value:false}} fields:{key:"beam:option:allow_unsafe_triggers:v1" value:{bool_value:false}} fields:{key:"beam:option:app_name:v1" value:{string_value:"BeamApp-roce3528-1208092121-c24b0136_accc3bf6-1ac5-4869-93df-9a9553150bef"}} fields:{key:"beam:option:artifact_port:v1" value:{string_value:"0"}} fields:{key:"beam:option:auto_unique_labels:v1" value:{bool_value:false}} fields:{key:"beam:option:beam_services:v1" value:{struct_value:{}}} fields:{key:"beam:option:cache_disabled:v1" value:{bool_value:false}} fields:{key:"beam:option:dataflow_endpoint:v1" value:{string_value:"https://dataflow.googleapis.com"}} fields:{key:"beam:option:direct_embed_docker_python:v1" value:{bool_value:false}} fields:{key:"beam:option:direct_num_workers:v1" value:{string_value:"1"}} fields:{key:"beam:option:direct_runner_bundle_repeat:v1" value:{string_value:"0"}} fields:{key:"beam:option:direct_runner_use_stacked_bundle:v1" value:{bool_value:true}} fields:{key:"beam:option:direct_running_mode:v1" value:{string_value:"in_memory"}} fields:{key:"beam:option:direct_test_splits:v1" value:{struct_value:{}}} fields:{key:"beam:option:dry_run:v1" value:{bool_value:false}} fields:{key:"beam:option:enable_artifact_caching:v1" value:{bool_value:false}} fields:{key:"beam:option:enable_heap_dumps:v1" value:{bool_value:false}} fields:{key:"beam:option:enable_hot_key_logging:v1" value:{bool_value:false}} fields:{key:"beam:option:enable_spark_metric_sinks:v1" value:{bool_value:true}} fields:{key:"beam:option:enable_streaming_engine:v1" value:{bool_value:false}} fields:{key:"beam:option:environment_cache_millis:v1" value:{string_value:"0"}} fields:{key:"beam:option:environment_config:v1" value:{string_value:"localhost:50000"}} fields:{key:"beam:option:environment_type:v1" value:{string_value:"EXTERNAL"}} fields:{key:"beam:option:expansion_port:v1" value:{string_value:"0"}} fields:{key:"beam:option:experiments:v1" value:{list_value:{values:{string_value:"beam_fn_api"}}}} fields:{key:"beam:option:flink_master:v1" value:{string_value:"[auto]"}} fields:{key:"beam:option:flink_submit_uber_jar:v1" value:{bool_value:false}} fields:{key:"beam:option:flink_version:v1" value:{string_value:"1.16"}} fields:{key:"beam:option:gcp_oauth_scopes:v1" value:{list_value:{values:{string_value:"https://www.googleapis.com/auth/bigquery"} values:{string_value:"https://www.googleapis.com/auth/cloud-platform"} values:{string_value:"https://www.googleapis.com/auth/devstorage.full_control"} values:{string_value:"https://www.googleapis.com/auth/userinfo.email"} values:{string_value:"https://www.googleapis.com/auth/datastore"} values:{string_value:"https://www.googleapis.com/auth/spanner.admin"} values:{string_value:"https://www.googleapis.com/auth/spanner.data"}}}} fields:{key:"beam:option:gcs_performance_metrics:v1" value:{bool_value:false}} fields:{key:"beam:option:hdfs_full_urls:v1" value:{bool_value:false}} fields:{key:"beam:option:job_name:v1" value:{string_value:"BeamApp-roce3528-1208092121-c24b0136"}} fields:{key:"beam:option:job_port:v1" value:{string_value:"0"}} fields:{key:"beam:option:job_server_java_launcher:v1" value:{string_value:"java"}} fields:{key:"beam:option:job_server_jvm_properties:v1" value:{list_value:{}}} fields:{key:"beam:option:job_server_timeout:v1" value:{string_value:"60"}} fields:{key:"beam:option:load_balance_bundles:v1" value:{bool_value:false}} fields:{key:"beam:option:log_mdc:v1" value:{bool_value:true}} fields:{key:"beam:option:max_cache_memory_usage_mb:v1" value:{string_value:"100"}} fields:{key:"beam:option:max_parallelism:v1" value:{string_value:"-1"}} fields:{key:"beam:option:no_auth:v1" value:{bool_value:false}} fields:{key:"beam:option:options_id:v1" value:{number_value:2}} fields:{key:"beam:option:output_executable_path:v1" value:{string_value:"job.jar"}} fields:{key:"beam:option:parallelism:v1" value:{string_value:"-1"}} fields:{key:"beam:option:performance_runtime_type_check:v1" value:{bool_value:false}} fields:{key:"beam:option:pickle_library:v1" value:{string_value:"default"}} fields:{key:"beam:option:pipeline_type_check:v1" value:{bool_value:true}} fields:{key:"beam:option:profile_cpu:v1" value:{bool_value:false}} fields:{key:"beam:option:profile_memory:v1" value:{bool_value:false}} fields:{key:"beam:option:profile_sample_rate:v1" value:{number_value:1}} fields:{key:"beam:option:requirements_cache_only_sources:v1" value:{bool_value:false}} fields:{key:"beam:option:resource_hints:v1" value:{list_value:{}}} fields:{key:"beam:option:retain_docker_containers:v1" value:{bool_value:false}} fields:{key:"beam:option:runner:v1" value:{null_value:NULL_VALUE}} fields:{key:"beam:option:runtime_type_check:v1" value:{bool_value:false}} fields:{key:"beam:option:s3_disable_ssl:v1" value:{bool_value:false}} fields:{key:"beam:option:save_main_session:v1" value:{bool_value:false}} fields:{key:"beam:option:sdk_location:v1" value:{string_value:"container"}} fields:{key:"beam:option:sdk_worker_parallelism:v1" value:{string_value:"1"}} fields:{key:"beam:option:spark_master:v1" value:{string_value:"local[4]"}} fields:{key:"beam:option:spark_master_url:v1" value:{string_value:"local[4]"}} fields:{key:"beam:option:spark_submit_uber_jar:v1" value:{bool_value:false}} fields:{key:"beam:option:spark_version:v1" value:{string_value:"3"}} fields:{key:"beam:option:streaming:v1" value:{bool_value:false}} fields:{key:"beam:option:test_mode:v1" value:{bool_value:false}} fields:{key:"beam:option:type_check_additional:v1" value:{string_value:""}} fields:{key:"beam:option:type_check_strictness:v1" value:{string_value:"DEFAULT_TO_ANY"}} fields:{key:"beam:option:update:v1" value:{bool_value:false}} fields:{key:"beam:option:use_active_spark_session:v1" value:{bool_value:false}} fields:{key:"beam:option:use_transform_service:v1" value:{bool_value:false}} fields:{key:"beam:option:uses_provided_spark_context:v1" value:{bool_value:false}}} retrieval_token:"__no_artifacts_staged__" logging_endpoint:{url:"localhost:43205"} artifact_endpoint:{url:"localhost:37223"} control_endpoint:{url:"localhost:33659"} runner_capabilities:"beam:protocol:control_response_elements_embedding:v1"
2023/12/08 09:52:32 boot.go: error logging message over FnAPI. endpoint localhost:43205 error: EOF message follows
2023/12/08 09:52:32 DEBUG Python (worker 1-1) exited.
2023/12/08 09:52:32 boot.go: error logging message over FnAPI. endpoint localhost:43205 error: EOF message follows
2023/12/08 09:52:32 DEBUG Received signal: terminated
2023/12/08 09:52:32 boot.go: error logging message over FnAPI. endpoint localhost:43205 error: EOF message follows
2023/12/08 09:52:32 DEBUG Cleaned up temporary venv for worker 1-1.
This is repeated several times with different port numbers (I guess for different stages).
With the fruit sampling example from the website, I finally get
2023/12/08 09:53:29 boot.go: error logging message over FnAPI. endpoint localhost:43597 error: EOF message follows
2023/12/08 09:53:29 DEBUG ['🍓 Strawberry', '🥕 Carrot', '🍅 Tomato']
2023/12/08 09:53:29 boot.go: error logging message over FnAPI. endpoint localhost:43597 error: EOF message follows
2023/12/08 09:53:29 DEBUG Python (worker 5-1) exited.
2023/12/08 09:53:29 boot.go: error logging message over FnAPI. endpoint localhost:43597 error: EOF message follows
2023/12/08 09:53:29 DEBUG Received signal: terminated
2023/12/08 09:53:29 boot.go: error logging message over FnAPI. endpoint localhost:43597 error: EOF message follows
2023/12/08 09:53:29 DEBUG Cleaned up temporary venv for worker 5-1
in one of the (two) SDK container outputs, but it's never fed back to the Spark executor or the Spark driver, probably due to this log endpoint error.
Here's the job code:
with beam.Pipeline(options=PipelineOptions(sys.argv[1:])) as pipeline:
sample = (
pipeline
| 'Create produce' >> beam.Create([
'🍓 Strawberry',
'🥕 Carrot',
'🍆 Eggplant',
'🍅 Tomato',
'🥔 Potato',
])
| 'Sample N elements' >> beam.combiners.Sample.FixedSizeGlobally(3)
| beam.Map(print))
I compile the JAR with
python test.py --runner=SparkRunner --output_executable=job.jar --environment_type=EXTERNAL --environment_config=localhost:50000 --spark_version=3
and then spark-submit
that to the cluster.
Issue Priority
Priority: 2 (default)
Issue Components
- Component: Python SDK
- Component: Java SDK
- Component: Go SDK
- Component: Typescript SDK
- Component: IO connector
- Component: Beam YAML
- Component: Beam examples
- Component: Beam playground
- Component: Beam katas
- Component: Website
- Component: Spark Runner
- Component: Flink Runner
- Component: Samza Runner
- Component: Twister2 Runner
- Component: Hazelcast Jet Runner
- Component: Google Cloud Dataflow Runner