Skip to content

Commit 157a90d

Browse files
authored
chore: ud-container env variable for formalising the errors (numaproj#220)
1 parent a6322f8 commit 157a90d

File tree

13 files changed

+40
-40
lines changed

13 files changed

+40
-40
lines changed

pynumaflow/_constants.py

+5-9
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,12 @@
55
from pynumaflow import setup_logging
66

77
SIDE_INPUT_DIR_PATH = "/var/numaflow/side-inputs"
8+
ENV_UD_CONTAINER_TYPE = "NUMAFLOW_UD_CONTAINER_TYPE"
89

9-
# UDF execution error prefixes
10-
ERR_SOURCE_EXCEPTION = "UDF_EXECUTION_ERROR(source)"
11-
ERR_TRANSFORMER_EXCEPTION = "UDF_EXECUTION_ERROR(transformer)"
12-
ERR_SINK_EXCEPTION = "UDF_EXECUTION_ERROR(sink)"
13-
ERR_MAP_STREAM_EXCEPTION = "UDF_EXECUTION_ERROR(mapstream)"
14-
ERR_MAP_EXCEPTION = "UDF_EXECUTION_ERROR(map)"
15-
ERR_BATCH_MAP_EXCEPTION = "UDF_EXECUTION_ERROR(batchmap)"
16-
ERR_REDUCE_EXCEPTION = "UDF_EXECUTION_ERROR(reduce)"
17-
ERR_SIDE_INPUT_RETRIEVAL_EXCEPTION = "UDF_EXECUTION_ERROR(sideinput)"
10+
# Get container type from env var, default to unknown-container
11+
CONTAINER_TYPE = os.getenv(ENV_UD_CONTAINER_TYPE, "unknown-container")
12+
# UDF exception error string with container type
13+
ERR_UDF_EXCEPTION_STRING = f"UDF_EXECUTION_ERROR({CONTAINER_TYPE})"
1814

1915
# Socket configs
2016
MAP_SOCK_PATH = "/var/run/numaflow/map.sock"

pynumaflow/batchmapper/servicer/async_servicer.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from pynumaflow.shared.asynciter import NonBlockingIterator
1010
from pynumaflow.shared.server import handle_async_error
1111
from pynumaflow.types import NumaflowServicerContext
12-
from pynumaflow._constants import _LOGGER, STREAM_EOF, ERR_BATCH_MAP_EXCEPTION
12+
from pynumaflow._constants import _LOGGER, STREAM_EOF, ERR_UDF_EXCEPTION_STRING
1313

1414

1515
class AsyncBatchMapServicer(map_pb2_grpc.MapServicer):
@@ -98,7 +98,7 @@ async def MapFn(
9898

9999
except BaseException as err:
100100
_LOGGER.critical("UDFError, re-raising the error", exc_info=True)
101-
await handle_async_error(context, err, ERR_BATCH_MAP_EXCEPTION)
101+
await handle_async_error(context, err, ERR_UDF_EXCEPTION_STRING)
102102
return
103103

104104
async def IsReady(

pynumaflow/mapper/_servicer/_async_servicer.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from google.protobuf import empty_pb2 as _empty_pb2
55
from pynumaflow.shared.asynciter import NonBlockingIterator
66

7-
from pynumaflow._constants import _LOGGER, STREAM_EOF, ERR_MAP_EXCEPTION
7+
from pynumaflow._constants import _LOGGER, STREAM_EOF, ERR_UDF_EXCEPTION_STRING
88
from pynumaflow.mapper._dtypes import MapAsyncCallable, Datum, MapError
99
from pynumaflow.proto.mapper import map_pb2, map_pb2_grpc
1010
from pynumaflow.shared.server import handle_async_error
@@ -56,7 +56,7 @@ async def MapFn(
5656
async for msg in consumer:
5757
# If the message is an exception, we raise the exception
5858
if isinstance(msg, BaseException):
59-
await handle_async_error(context, msg, ERR_MAP_EXCEPTION)
59+
await handle_async_error(context, msg, ERR_UDF_EXCEPTION_STRING)
6060
return
6161
# Send window response back to the client
6262
else:
@@ -65,7 +65,7 @@ async def MapFn(
6565
await producer
6666
except BaseException as e:
6767
_LOGGER.critical("UDFError, re-raising the error", exc_info=True)
68-
await handle_async_error(context, e, ERR_MAP_EXCEPTION)
68+
await handle_async_error(context, e, ERR_UDF_EXCEPTION_STRING)
6969
return
7070

7171
async def _process_inputs(

pynumaflow/mapper/_servicer/_sync_servicer.py

+5-3
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from google.protobuf import empty_pb2 as _empty_pb2
66
from pynumaflow.shared.server import exit_on_error
77

8-
from pynumaflow._constants import NUM_THREADS_DEFAULT, STREAM_EOF, _LOGGER, ERR_MAP_EXCEPTION
8+
from pynumaflow._constants import NUM_THREADS_DEFAULT, STREAM_EOF, _LOGGER, ERR_UDF_EXCEPTION_STRING
99
from pynumaflow.mapper._dtypes import MapSyncCallable, Datum, MapError
1010
from pynumaflow.proto.mapper import map_pb2, map_pb2_grpc
1111
from pynumaflow.shared.synciter import SyncIterator
@@ -58,7 +58,7 @@ def MapFn(
5858
if isinstance(res, BaseException):
5959
# Terminate the current server process due to exception
6060
exit_on_error(
61-
context, f"{ERR_MAP_EXCEPTION}: {repr(res)}", parent=self.multiproc
61+
context, f"{ERR_UDF_EXCEPTION_STRING}: {repr(res)}", parent=self.multiproc
6262
)
6363
return
6464
# return the result
@@ -71,7 +71,9 @@ def MapFn(
7171
except BaseException as err:
7272
_LOGGER.critical("UDFError, re-raising the error", exc_info=True)
7373
# Terminate the current server process due to exception
74-
exit_on_error(context, f"{ERR_MAP_EXCEPTION}: {repr(err)}", parent=self.multiproc)
74+
exit_on_error(
75+
context, f"{ERR_UDF_EXCEPTION_STRING}: {repr(err)}", parent=self.multiproc
76+
)
7577
return
7678

7779
def _process_requests(

pynumaflow/mapstreamer/servicer/async_servicer.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from pynumaflow.proto.mapper import map_pb2_grpc, map_pb2
88
from pynumaflow.shared.server import handle_async_error
99
from pynumaflow.types import NumaflowServicerContext
10-
from pynumaflow._constants import _LOGGER, ERR_MAP_STREAM_EXCEPTION
10+
from pynumaflow._constants import _LOGGER, ERR_UDF_EXCEPTION_STRING
1111

1212

1313
class AsyncMapStreamServicer(map_pb2_grpc.MapServicer):
@@ -59,7 +59,7 @@ async def MapFn(
5959
yield map_pb2.MapResponse(status=map_pb2.TransmissionStatus(eot=True), id=req.id)
6060
except BaseException as err:
6161
_LOGGER.critical("UDFError, re-raising the error", exc_info=True)
62-
await handle_async_error(context, err, ERR_MAP_STREAM_EXCEPTION)
62+
await handle_async_error(context, err, ERR_UDF_EXCEPTION_STRING)
6363
return
6464

6565
async def __invoke_map_stream(self, keys: list[str], req: Datum):

pynumaflow/reducer/servicer/async_servicer.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
from google.protobuf import empty_pb2 as _empty_pb2
55

6-
from pynumaflow._constants import _LOGGER, ERR_REDUCE_EXCEPTION
6+
from pynumaflow._constants import _LOGGER, ERR_UDF_EXCEPTION_STRING
77
from pynumaflow.proto.reducer import reduce_pb2, reduce_pb2_grpc
88
from pynumaflow.reducer._dtypes import (
99
Datum,
@@ -105,7 +105,7 @@ async def ReduceFn(
105105
_LOGGER.critical("Reduce Error", exc_info=True)
106106
# Send a context abort signal for the rpc, this is required for numa container to get
107107
# the correct grpc error
108-
await handle_async_error(context, e, ERR_REDUCE_EXCEPTION)
108+
await handle_async_error(context, e, ERR_UDF_EXCEPTION_STRING)
109109

110110
# send EOF to all the tasks once the request iterator is exhausted
111111
# This will signal the tasks to stop reading the data on their
@@ -136,7 +136,7 @@ async def ReduceFn(
136136
_LOGGER.critical("Reduce Error", exc_info=True)
137137
# Send a context abort signal for the rpc, this is required for numa container to get
138138
# the correct grpc error
139-
await handle_async_error(context, e, ERR_REDUCE_EXCEPTION)
139+
await handle_async_error(context, e, ERR_UDF_EXCEPTION_STRING)
140140

141141
async def IsReady(
142142
self, request: _empty_pb2.Empty, context: NumaflowServicerContext

pynumaflow/reducestreamer/servicer/async_servicer.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
from google.protobuf import empty_pb2 as _empty_pb2
66

7-
from pynumaflow._constants import ERR_REDUCE_EXCEPTION
7+
from pynumaflow._constants import ERR_UDF_EXCEPTION_STRING
88
from pynumaflow.proto.reducer import reduce_pb2, reduce_pb2_grpc
99
from pynumaflow.reducestreamer._dtypes import (
1010
Datum,
@@ -95,20 +95,20 @@ async def ReduceFn(
9595
async for msg in consumer:
9696
# If the message is an exception, we raise the exception
9797
if isinstance(msg, BaseException):
98-
await handle_async_error(context, msg, ERR_REDUCE_EXCEPTION)
98+
await handle_async_error(context, msg, ERR_UDF_EXCEPTION_STRING)
9999
return
100100
# Send window EOF response or Window result response
101101
# back to the client
102102
else:
103103
yield msg
104104
except BaseException as e:
105-
await handle_async_error(context, e, ERR_REDUCE_EXCEPTION)
105+
await handle_async_error(context, e, ERR_UDF_EXCEPTION_STRING)
106106
return
107107
# Wait for the process_input_stream task to finish for a clean exit
108108
try:
109109
await producer
110110
except BaseException as e:
111-
await handle_async_error(context, e, ERR_REDUCE_EXCEPTION)
111+
await handle_async_error(context, e, ERR_UDF_EXCEPTION_STRING)
112112
return
113113

114114
async def IsReady(

pynumaflow/sideinput/servicer/servicer.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from pynumaflow._constants import (
44
_LOGGER,
5-
ERR_SIDE_INPUT_RETRIEVAL_EXCEPTION,
5+
ERR_UDF_EXCEPTION_STRING,
66
)
77
from pynumaflow.proto.sideinput import sideinput_pb2_grpc, sideinput_pb2
88
from pynumaflow.shared.server import exit_on_error
@@ -28,7 +28,7 @@ def RetrieveSideInput(
2828
try:
2929
rspn = self.__retrieve_handler()
3030
except BaseException as err:
31-
err_msg = f"{ERR_SIDE_INPUT_RETRIEVAL_EXCEPTION}: {repr(err)}"
31+
err_msg = f"{ERR_UDF_EXCEPTION_STRING}: {repr(err)}"
3232
_LOGGER.critical(err_msg, exc_info=True)
3333
exit_on_error(context, err_msg)
3434
return

pynumaflow/sinker/servicer/async_servicer.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
build_sink_resp_results,
1414
)
1515
from pynumaflow.types import NumaflowServicerContext
16-
from pynumaflow._constants import _LOGGER, STREAM_EOF, ERR_SINK_EXCEPTION
16+
from pynumaflow._constants import _LOGGER, STREAM_EOF, ERR_UDF_EXCEPTION_STRING
1717

1818

1919
class AsyncSinkServicer(sink_pb2_grpc.SinkServicer):
@@ -85,7 +85,7 @@ async def SinkFn(
8585
# if there is an exception, we will mark all the responses as a failure
8686
err_msg = f"UDSinkError: {repr(err)}"
8787
_LOGGER.critical(err_msg, exc_info=True)
88-
await handle_async_error(context, err, ERR_SINK_EXCEPTION)
88+
await handle_async_error(context, err, ERR_UDF_EXCEPTION_STRING)
8989
return
9090

9191
async def __invoke_sink(

pynumaflow/sourcer/servicer/async_servicer.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from pynumaflow.proto.sourcer import source_pb2
1212
from pynumaflow.proto.sourcer import source_pb2_grpc
1313
from pynumaflow.types import NumaflowServicerContext
14-
from pynumaflow._constants import _LOGGER, STREAM_EOF, ERR_SOURCE_EXCEPTION
14+
from pynumaflow._constants import _LOGGER, STREAM_EOF, ERR_UDF_EXCEPTION_STRING
1515

1616

1717
def _create_read_handshake_response():
@@ -119,7 +119,7 @@ async def ReadFn(
119119
yield _create_eot_response()
120120
except BaseException as err:
121121
_LOGGER.critical("User-Defined Source ReadFn error", exc_info=True)
122-
await handle_async_error(context, err, ERR_SOURCE_EXCEPTION)
122+
await handle_async_error(context, err, ERR_UDF_EXCEPTION_STRING)
123123

124124
async def __invoke_read(self, req, niter):
125125
"""Invoke the read handler and manage the iterator."""
@@ -165,7 +165,7 @@ async def AckFn(
165165
yield _create_ack_response()
166166
except BaseException as err:
167167
_LOGGER.critical("User-Defined Source AckFn error", exc_info=True)
168-
await handle_async_error(context, err, ERR_SOURCE_EXCEPTION)
168+
await handle_async_error(context, err, ERR_UDF_EXCEPTION_STRING)
169169

170170
async def IsReady(
171171
self, request: _empty_pb2.Empty, context: NumaflowServicerContext
@@ -187,7 +187,7 @@ async def PendingFn(
187187
count = await self.__source_pending_handler()
188188
except BaseException as err:
189189
_LOGGER.critical("PendingFn Error", exc_info=True)
190-
await handle_async_error(context, err, ERR_SOURCE_EXCEPTION)
190+
await handle_async_error(context, err, ERR_UDF_EXCEPTION_STRING)
191191
return
192192
resp = source_pb2.PendingResponse.Result(count=count.count)
193193
return source_pb2.PendingResponse(result=resp)
@@ -202,7 +202,7 @@ async def PartitionsFn(
202202
partitions = await self.__source_partitions_handler()
203203
except BaseException as err:
204204
_LOGGER.critical("PartitionsFn Error", exc_info=True)
205-
await handle_async_error(context, err, ERR_SOURCE_EXCEPTION)
205+
await handle_async_error(context, err, ERR_UDF_EXCEPTION_STRING)
206206
return
207207
resp = source_pb2.PartitionsResponse.Result(partitions=partitions.partitions)
208208
return source_pb2.PartitionsResponse(result=resp)

pynumaflow/sourcetransformer/servicer/_servicer.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
_LOGGER,
1717
STREAM_EOF,
1818
NUM_THREADS_DEFAULT,
19-
ERR_TRANSFORMER_EXCEPTION,
19+
ERR_UDF_EXCEPTION_STRING,
2020
)
2121

2222

@@ -79,7 +79,7 @@ def SourceTransformFn(
7979
if isinstance(res, BaseException):
8080
# Terminate the current server process due to exception
8181
exit_on_error(
82-
context, f"{ERR_TRANSFORMER_EXCEPTION}: {repr(res)}", parent=self.multiproc
82+
context, f"{ERR_UDF_EXCEPTION_STRING}: {repr(res)}", parent=self.multiproc
8383
)
8484
return
8585
# return the result
@@ -93,7 +93,7 @@ def SourceTransformFn(
9393
_LOGGER.critical("UDFError, re-raising the error", exc_info=True)
9494
# Terminate the current server process due to exception
9595
exit_on_error(
96-
context, f"{ERR_TRANSFORMER_EXCEPTION}: {repr(err)}", parent=self.multiproc
96+
context, f"{ERR_UDF_EXCEPTION_STRING}: {repr(err)}", parent=self.multiproc
9797
)
9898
return
9999

tests/sink/test_async_sink.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
from pynumaflow import setup_logging
1313
from pynumaflow._constants import (
14+
ERR_UDF_EXCEPTION_STRING,
1415
UD_CONTAINER_FALLBACK_SINK,
1516
FALLBACK_SINK_SOCK_PATH,
1617
FALLBACK_SINK_SERVER_INFO_FILE_PATH,
@@ -183,7 +184,7 @@ def test_sink_err(self) -> None:
183184
pass
184185
except BaseException as e:
185186
self.assertTrue(
186-
"UDF_EXECUTION_ERROR(sink): ValueError('test_mock_err_message')" in e.__str__()
187+
f"{ERR_UDF_EXCEPTION_STRING}: ValueError('test_mock_err_message')" in e.__str__()
187188
)
188189
return
189190
except grpc.RpcError as e:

tests/source/test_async_source_err.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from grpc.aio._server import Server
1010

1111
from pynumaflow import setup_logging
12+
from pynumaflow._constants import ERR_UDF_EXCEPTION_STRING
1213
from pynumaflow.sourcer import SourceAsyncServer
1314
from pynumaflow.proto.sourcer import source_pb2_grpc
1415
from google.protobuf import empty_pb2 as _empty_pb2
@@ -93,7 +94,7 @@ def test_read_error(self) -> None:
9394
pass
9495
except BaseException as e:
9596
self.assertTrue(
96-
"UDF_EXECUTION_ERROR(source): TypeError("
97+
f"{ERR_UDF_EXCEPTION_STRING}: TypeError("
9798
'"handle_async_error() missing 1 required positional argument: '
9899
"'exception_type'\")" in e.__str__()
99100
)

0 commit comments

Comments
 (0)