From 6be6c5d813775412688132bbfafaca0b756ed699 Mon Sep 17 00:00:00 2001 From: Ethan Yu Date: Tue, 10 Mar 2026 18:00:20 -0700 Subject: [PATCH] Speedup UpdateGroup Jobs --- src/utils/connectors/postgres.py | 10 ++++------ src/utils/job/jobs.py | 3 ++- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/utils/connectors/postgres.py b/src/utils/connectors/postgres.py index 3856734ae..2f744614d 100644 --- a/src/utils/connectors/postgres.py +++ b/src/utils/connectors/postgres.py @@ -24,6 +24,7 @@ import json import logging import math +import types import os import re import threading @@ -446,15 +447,12 @@ def execute_fetch_command(self, command: str, cur = conn.cursor( cursor_factory=psycopg2.extras.RealDictCursor) cur.execute(command, args) - # Create a pydantic instance from dictionary pairs rows = cur.fetchall() if not return_raw: - # Pydantic cannot deep copy memoryview object, so cast it to bytes object + # Cast memoryview objects to bytes and provide attribute access rows = [ - pydantic.create_model( - 'DynamicModel', **{k: common.handle_memoryview(v) or \ - (Any, common.handle_memoryview(v)) - for k, v in row.items()})() # type: ignore + types.SimpleNamespace(**{k: common.handle_memoryview(v) + for k, v in row.items()}) for row in rows] cur.close() conn.commit() diff --git a/src/utils/job/jobs.py b/src/utils/job/jobs.py index 2be44391c..472166a53 100644 --- a/src/utils/job/jobs.py +++ b/src/utils/job/jobs.py @@ -855,7 +855,8 @@ def execute(self, context: JobExecutionContext, workflow_config = context.postgres.get_workflow_configs() backend_config_cache = connectors.BackendConfigCache() - workflow_obj = workflow.Workflow.fetch_from_db(context.postgres, self.workflow_id) + workflow_obj = workflow.Workflow.fetch_from_db( + context.postgres, self.workflow_id, fetch_groups=False) total_timeout = task_common.calculate_total_timeout( workflow_obj.workflow_id, workflow_obj.timeout.queue_timeout, workflow_obj.timeout.exec_timeout)