Skip to content

Commit a3061e3

Browse files
haoshan98jiahueideafnv
committed
[owl] Handle code execution upstream errors (#853)
* Replace upstream error with None for code execution * Handle regen error propagation by saving error column to state * Error state cleared when regen succeed * Refactor error state handling (#865) * [UI] fix build error; remove electron build (#861) * [owl] Implement endpoints to update owner (#864) * Don't log expected JamaiExceptions in code executor * Refactor error state handling * update * Bug fix: `ModelConfigCreate.id` must not have a default value * update * [owl] Bug fixes and updates (#849) Backend - owl (API server) - Bug fixes: - Always update reasoning content in state - Only record reasoning time if there's reasoning content - Anthropic 4.1 and 4.5 models cannot specify both `temperature` and `top_p` - Delete org secret upon org deletion - Bump dependencies - Deps: Pin Pydantic to `2.11.x` - Test-LLM: Support reasoning content * update * Handle possible mid-stream provider error --------- Co-authored-by: Okiniri <[email protected]> --------- Co-authored-by: Tan Jia Huei <[email protected]> Co-authored-by: Okiniri <[email protected]>
1 parent df0f10e commit a3061e3

File tree

4 files changed

+238
-25
lines changed

4 files changed

+238
-25
lines changed

clients/python/src/jamaibase/types/db.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -720,6 +720,14 @@ def check_rerank_cost_per_ksearch(self) -> Self:
720720

721721
class ModelConfigCreate(ModelConfigUpdate):
722722
# Overrides to make these field required in ModelConfigCreate.
723+
id: SanitisedNonEmptyStr = Field(
724+
description=(
725+
"Unique identifier. "
726+
"Users will specify this to select a model. "
727+
"Must follow the following format: `{provider}/{model_id}`. "
728+
"Examples=['openai/gpt-4o-mini', 'Qwen/Qwen2.5-0.5B']"
729+
),
730+
)
723731
type: _ModelType = Field(
724732
description="Model type. Can be completion, chat, embed, or rerank.",
725733
)

services/api/src/owl/db/gen_executor.py

Lines changed: 55 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -693,6 +693,8 @@ async def _execute_chat_task(self, task: Task, q: Queue[ResultT | None]) -> None
693693
pass
694694

695695
# Perform completion
696+
state_col = f"{task.output_column_name}_"
697+
state: dict[str, Any] = self._column_dict.get(state_col, {})
696698
result = ""
697699
reasoning = ""
698700
references = None
@@ -765,7 +767,9 @@ async def _execute_chat_task(self, task: Task, q: Queue[ResultT | None]) -> None
765767
)
766768
)
767769
if chunk.finish_reason == "error":
768-
self._error_columns.append(output_column)
770+
raise BadInputError(
771+
"LLM provider encountered unknown error during streaming."
772+
)
769773
else:
770774
response = await self.lm.chat_completion(
771775
messages=req.messages,
@@ -783,14 +787,15 @@ async def _execute_chat_task(self, task: Task, q: Queue[ResultT | None]) -> None
783787
reasoning = response.reasoning_content
784788

785789
except Exception as e:
790+
result = f"[ERROR] {str(e)}"
786791
response_kwargs = dict(
787792
id=self._request_id,
788793
created=int(time()),
789794
model="",
790795
usage=ChatCompletionUsage(),
791796
choices=[
792797
ChatCompletionChoice(
793-
message=ChatCompletionMessage(content=f"[ERROR] {str(e)}"),
798+
message=ChatCompletionMessage(content=result),
794799
index=0,
795800
finish_reason="error",
796801
)
@@ -811,17 +816,16 @@ async def _execute_chat_task(self, task: Task, q: Queue[ResultT | None]) -> None
811816
row_id=self._row_id,
812817
)
813818
)
814-
result = response.content
815-
reasoning = response.reasoning_content
819+
state["error"] = {"message": result}
816820
self._error_columns.append(output_column)
817821
self.log_exception(
818822
f'Table "{self._table_id}": Failed to generate completion for column "{output_column}": {repr(e)}',
819823
e,
820824
)
825+
else:
826+
state.pop("error", None)
821827
finally:
822828
await q.put(None)
823-
state_col = f"{task.output_column_name}_"
824-
state = self._column_dict.get(state_col, {})
825829
# Always update state
826830
state["references"] = references.model_dump(mode="json") if references else None
827831
state["reasoning_content"] = reasoning if reasoning else None
@@ -911,21 +915,25 @@ async def _execute_code_task(self, task: Task, q: Queue[ResultT | None]) -> None
911915
pass
912916

913917
# Perform code execution
918+
state_col = f"{task.output_column_name}_"
919+
state: dict[str, Any] = self._column_dict.get(state_col, {})
914920
result = ""
915921
try:
916-
# Error circuit breaker
917-
self._check_upstream_error([body.source_column])
922+
error_cols = self._get_upstream_error([body.source_column])
918923
source_code = self._column_dict.get(body.source_column, "")
919924

920925
# Extract bytes from ColumnDtype.AUDIO and ColumnDtype.IMAGE and put it into a dictionary
921926
row_data = self._column_dict.copy()
922927
self.table.postprocess_rows([row_data], include_state=False)
928+
# Replace error columns with None value
929+
for ec in error_cols:
930+
row_data[ec] = None
923931
for k, v in row_data.items():
924932
col = next((col for col in self.table.column_metadata if col.column_id == k), None)
925933
if col and (col.dtype == ColumnDtype.AUDIO or col.dtype == ColumnDtype.IMAGE):
926934
row_data[k] = await _load_uri_as_bytes(v)
927935

928-
if source_code and row_data:
936+
if source_code:
929937
result = await code_executor(
930938
request=self.request,
931939
organization_id=self.organization.id,
@@ -945,7 +953,7 @@ async def _execute_code_task(self, task: Task, q: Queue[ResultT | None]) -> None
945953
usage=ChatCompletionUsage(),
946954
choices=[
947955
ChatCompletionChoice(
948-
message=ChatCompletionMessage(content=result),
956+
message=ChatCompletionMessage(content=str(result)),
949957
index=0,
950958
)
951959
],
@@ -966,10 +974,10 @@ async def _execute_code_task(self, task: Task, q: Queue[ResultT | None]) -> None
966974
row_id=self._row_id,
967975
)
968976
)
969-
970977
self.log(f'Executed code for column "{output_column}": <{mask_string(result)}>.')
971978

972979
except Exception as e:
980+
result = None
973981
response_kwargs = dict(
974982
id=self._request_id,
975983
created=int(time()),
@@ -998,14 +1006,17 @@ async def _execute_code_task(self, task: Task, q: Queue[ResultT | None]) -> None
9981006
row_id=self._row_id,
9991007
)
10001008
)
1001-
result = response.content
1009+
state["error"] = {"message": f"[ERROR] {str(e)}"}
10021010
self._error_columns.append(output_column)
10031011
self.log_exception(
10041012
f'Table "{self._table_id}": Failed to execute code for column "{output_column}": {repr(e)}',
10051013
e,
10061014
)
1015+
else:
1016+
state.pop("error", None)
10071017
finally:
10081018
await q.put(None)
1019+
self._column_dict[state_col] = state
10091020
await self._signal_task_completion(task, result)
10101021

10111022
async def _execute_python_task(self, task: Task, q: Queue[ResultT | None]) -> None:
@@ -1052,21 +1063,27 @@ async def _execute_python_task(self, task: Task, q: Queue[ResultT | None]) -> No
10521063
pass
10531064

10541065
# Perform python fixed function execution
1066+
state_col = f"{task.output_column_name}_"
1067+
state: dict[str, Any] = self._column_dict.get(state_col, {})
10551068
result = ""
10561069
try:
1057-
# Error circuit breaker
1058-
# Extract all columns to the left and check for upstream errors
1059-
self._check_upstream_error(self._extract_all_upstream_columns(output_column))
1070+
# Extract all columns to the left and get upstream error columns
1071+
error_cols = self._get_upstream_error(
1072+
self._extract_all_upstream_columns(output_column)
1073+
)
10601074

10611075
# Extract bytes from ColumnDtype.AUDIO and ColumnDtype.IMAGE and put it into a dictionary
10621076
row_data = self._column_dict.copy()
10631077
self.table.postprocess_rows([row_data], include_state=False)
1078+
# Replace error columns with None value
1079+
for ec in error_cols:
1080+
row_data[ec] = None
10641081
for k, v in row_data.items():
10651082
col = next((col for col in self.table.column_metadata if col.column_id == k), None)
10661083
if col and (col.dtype == ColumnDtype.AUDIO or col.dtype == ColumnDtype.IMAGE):
10671084
row_data[k] = await _load_uri_as_bytes(v)
10681085

1069-
if body.python_code and row_data:
1086+
if body.python_code:
10701087
result = await code_executor(
10711088
request=self.request,
10721089
organization_id=self.organization.id,
@@ -1084,7 +1101,7 @@ async def _execute_python_task(self, task: Task, q: Queue[ResultT | None]) -> No
10841101
usage=ChatCompletionUsage(),
10851102
choices=[
10861103
ChatCompletionChoice(
1087-
message=ChatCompletionMessage(content=result),
1104+
message=ChatCompletionMessage(content=str(result)),
10881105
index=0,
10891106
)
10901107
],
@@ -1104,12 +1121,12 @@ async def _execute_python_task(self, task: Task, q: Queue[ResultT | None]) -> No
11041121
row_id=self._row_id,
11051122
)
11061123
)
1107-
11081124
self.log(
11091125
f'Executed python code for column "{output_column}": <{mask_string(result)}>.'
11101126
)
11111127

11121128
except Exception as e:
1129+
result = None
11131130
response_kwargs = dict(
11141131
id=self._request_id,
11151132
created=int(time()),
@@ -1138,14 +1155,17 @@ async def _execute_python_task(self, task: Task, q: Queue[ResultT | None]) -> No
11381155
row_id=self._row_id,
11391156
)
11401157
)
1141-
result = response.content
1158+
state["error"] = {"message": f"[ERROR] {str(e)}"}
11421159
self._error_columns.append(output_column)
11431160
self.log_exception(
11441161
f'Table "{self._table_id}": Failed to execute python code for column "{output_column}": {repr(e)}',
11451162
e,
11461163
)
1164+
else:
1165+
state.pop("error", None)
11471166
finally:
11481167
await q.put(None)
1168+
self._column_dict[state_col] = state
11491169
await self._signal_task_completion(task, result)
11501170

11511171
async def _signal_task_completion(self, task: Task, result: Any) -> None:
@@ -1197,12 +1217,25 @@ async def _load_files(self, message: ChatThreadEntry) -> ChatThreadEntry | ChatE
11971217
# logger.warning(f"{message=}")
11981218
return message
11991219

1200-
def _check_upstream_error(self, upstream_cols: list[str]) -> None:
1220+
def _get_upstream_error(self, upstream_cols: list[str]) -> list[str]:
12011221
if not isinstance(upstream_cols, list):
12021222
raise TypeError(f"`upstream_cols` must be a list, got: {type(upstream_cols)}")
1203-
error_cols = [f'"{col}"' for col in upstream_cols if col in self._error_columns]
1223+
error_cols = [
1224+
col
1225+
for col in upstream_cols
1226+
if col in self._error_columns
1227+
or (
1228+
f"{col}_" in self._column_dict
1229+
and self._column_dict[f"{col}_"].get("error", {}).get("message", None)
1230+
)
1231+
]
1232+
return list(set(error_cols))
1233+
1234+
def _check_upstream_error(self, upstream_cols: list[str]) -> None:
1235+
error_cols = self._get_upstream_error(upstream_cols)
1236+
formatted_error_cols = [f'"{col}"' for col in error_cols]
12041237
if len(error_cols) > 0:
1205-
raise UpStreamError(f"Upstream columns errored out: {', '.join(error_cols)}")
1238+
raise UpStreamError(f"Upstream columns errored out: {', '.join(formatted_error_cols)}")
12061239

12071240
@classmethod
12081241
async def setup_rag(

services/api/src/owl/utils/code.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from owl.types import AUDIO_FILE_EXTENSIONS, IMAGE_FILE_EXTENSIONS, ColumnDtype
1919
from owl.utils.billing import OPENTELEMETRY_CLIENT
2020
from owl.utils.crypt import decrypt
21-
from owl.utils.exceptions import BadInputError
21+
from owl.utils.exceptions import BadInputError, JamaiException
2222
from owl.utils.io import s3_upload
2323

2424
REQ_COUNTER = OPENTELEMETRY_CLIENT.get_counter("code_executor_requests_total")
@@ -126,7 +126,7 @@ async def code_executor(
126126
output_column: str,
127127
row_data: dict | None,
128128
dtype: str,
129-
) -> str:
129+
) -> str | None:
130130
async with observe_code_execution(
131131
organization_id=organization_id,
132132
project_id=project_id,
@@ -160,7 +160,7 @@ async def code_executor(
160160
logger.info(
161161
f"Code Executor: {request.state.id} - Python code execution completed for column {output_column}"
162162
)
163-
return str(result)
163+
return None if result is None else str(result)
164164

165165
if not isinstance(result, bytes):
166166
raise BadInputError(
@@ -195,6 +195,9 @@ async def code_executor(
195195
)
196196
return uri
197197

198+
except JamaiException:
199+
# Don't log expected JamaiExceptions
200+
raise
198201
except Exception as e:
199202
logger.error(
200203
f"Code Executor: {request.state.id} - Python code execution encountered error for column {output_column} : {e}"

0 commit comments

Comments
 (0)