diff --git a/dbt/adapters/flink/handler.py b/dbt/adapters/flink/handler.py index adb3605..e7033ad 100644 --- a/dbt/adapters/flink/handler.py +++ b/dbt/adapters/flink/handler.py @@ -106,7 +106,12 @@ def execute(self, sql: str, bindings: Optional[Sequence[Any]] = None) -> None: ) ) if status == "ERROR": - raise Exception("Statement execution failed") + errors = operation_handle.get_errors() + if len(errors) > 0: + raise Exception(f"{errors[0]}: {errors[1]}") + else: + raise Exception("Statement execution failed") + self.last_query_start_time = self._get_current_timestamp() diff --git a/flink/sqlgateway/operation.py b/flink/sqlgateway/operation.py index cdacab0..f9f7f48 100644 --- a/flink/sqlgateway/operation.py +++ b/flink/sqlgateway/operation.py @@ -51,6 +51,16 @@ def get_status(self) -> str: else: raise Exception("SQL gateway error: ", response.status_code) + def get_errors(self) -> str: + response = requests.get( + url=f"{self.statement_endpoint_url()}/result/0", + headers={ + "Content-Type": "application/json", + }, + ) + + return response.json()["errors"] + def cancel(self) -> str: response = requests.post( url=f"{self.statement_endpoint_url()}/cancel", diff --git a/flink/sqlgateway/result_parser.py b/flink/sqlgateway/result_parser.py index 61150cb..f491409 100644 --- a/flink/sqlgateway/result_parser.py +++ b/flink/sqlgateway/result_parser.py @@ -31,7 +31,7 @@ class SqlGatewayResultParser: def parse_result(data: Dict[str, Any]) -> SqlGatewayResult: columns = data["results"]["columns"] rows: List[Dict[str, Any]] = [] - next_result_url = data["nextResultUri"] + next_result_url = data.get("nextResultUri") column_names: List[str] = list(map(lambda c: c["name"], columns)) is_end_of_steam = data["resultType"] == "EOS"