Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions src/utils/connectors/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import json
import logging
import math
import types
import os
import re
import threading
Expand Down Expand Up @@ -446,15 +447,12 @@ def execute_fetch_command(self, command: str,
cur = conn.cursor(
cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute(command, args)
# Create a pydantic instance from dictionary pairs
rows = cur.fetchall()
if not return_raw:
# Pydantic cannot deep copy memoryview object, so cast it to bytes object
# Cast memoryview objects to bytes and provide attribute access
rows = [
pydantic.create_model(
'DynamicModel', **{k: common.handle_memoryview(v) or \
(Any, common.handle_memoryview(v))
for k, v in row.items()})() # type: ignore
types.SimpleNamespace(**{k: common.handle_memoryview(v)
for k, v in row.items()})
for row in rows]
cur.close()
conn.commit()
Expand Down
3 changes: 2 additions & 1 deletion src/utils/job/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,8 @@ def execute(self, context: JobExecutionContext,

workflow_config = context.postgres.get_workflow_configs()
backend_config_cache = connectors.BackendConfigCache()
workflow_obj = workflow.Workflow.fetch_from_db(context.postgres, self.workflow_id)
workflow_obj = workflow.Workflow.fetch_from_db(
context.postgres, self.workflow_id, fetch_groups=False)
total_timeout = task_common.calculate_total_timeout(
workflow_obj.workflow_id,
workflow_obj.timeout.queue_timeout, workflow_obj.timeout.exec_timeout)
Expand Down