Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 22 additions & 19 deletions backend/app/gateway/routers/threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from __future__ import annotations

import logging
import time
import uuid
from typing import Any

Expand All @@ -27,6 +26,7 @@
from deerflow.config.paths import Paths, get_paths
from deerflow.runtime import serialize_channel_values
from deerflow.runtime.user_context import get_effective_user_id
from deerflow.utils.time import coerce_iso, now_iso

logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/threads", tags=["threads"])
Expand Down Expand Up @@ -234,7 +234,7 @@ async def create_thread(body: ThreadCreateRequest, request: Request) -> ThreadRe
checkpointer = get_checkpointer(request)
thread_store = get_thread_store(request)
thread_id = body.thread_id or str(uuid.uuid4())
now = time.time()
now = now_iso()
# ``body.metadata`` is already stripped of server-reserved keys by
# ``ThreadCreateRequest._strip_reserved`` — see the model definition.

Expand All @@ -244,8 +244,8 @@ async def create_thread(body: ThreadCreateRequest, request: Request) -> ThreadRe
return ThreadResponse(
thread_id=thread_id,
status=existing_record.get("status", "idle"),
created_at=str(existing_record.get("created_at", "")),
updated_at=str(existing_record.get("updated_at", "")),
created_at=coerce_iso(existing_record.get("created_at", "")),
updated_at=coerce_iso(existing_record.get("updated_at", "")),
metadata=existing_record.get("metadata", {}),
)

Expand Down Expand Up @@ -280,8 +280,8 @@ async def create_thread(body: ThreadCreateRequest, request: Request) -> ThreadRe
return ThreadResponse(
thread_id=thread_id,
status="idle",
created_at=str(now),
updated_at=str(now),
created_at=now,
updated_at=now,
metadata=body.metadata,
)

Expand All @@ -306,8 +306,11 @@ async def search_threads(body: ThreadSearchRequest, request: Request) -> list[Th
ThreadResponse(
thread_id=r["thread_id"],
status=r.get("status", "idle"),
created_at=r.get("created_at", ""),
updated_at=r.get("updated_at", ""),
# ``coerce_iso`` heals legacy unix-second values that
# ``MemoryThreadMetaStore`` historically wrote with ``time.time()``;
# SQL-backed rows already arrive as ISO strings and pass through.
created_at=coerce_iso(r.get("created_at", "")),
updated_at=coerce_iso(r.get("updated_at", "")),
metadata=r.get("metadata", {}),
values={"title": r["display_name"]} if r.get("display_name") else {},
interrupts={},
Expand Down Expand Up @@ -339,8 +342,8 @@ async def patch_thread(thread_id: str, body: ThreadPatchRequest, request: Reques
return ThreadResponse(
thread_id=thread_id,
status=record.get("status", "idle"),
created_at=str(record.get("created_at", "")),
updated_at=str(record.get("updated_at", "")),
created_at=coerce_iso(record.get("created_at", "")),
updated_at=coerce_iso(record.get("updated_at", "")),
metadata=record.get("metadata", {}),
)

Expand Down Expand Up @@ -380,8 +383,8 @@ async def get_thread(thread_id: str, request: Request) -> ThreadResponse:
record = {
"thread_id": thread_id,
"status": "idle",
"created_at": ckpt_meta.get("created_at", ""),
"updated_at": ckpt_meta.get("updated_at", ckpt_meta.get("created_at", "")),
"created_at": coerce_iso(ckpt_meta.get("created_at", "")),
"updated_at": coerce_iso(ckpt_meta.get("updated_at", ckpt_meta.get("created_at", ""))),
"metadata": {k: v for k, v in ckpt_meta.items() if k not in ("created_at", "updated_at", "step", "source", "writes", "parents")},
}

Expand All @@ -395,8 +398,8 @@ async def get_thread(thread_id: str, request: Request) -> ThreadResponse:
return ThreadResponse(
thread_id=thread_id,
status=status,
created_at=str(record.get("created_at", "")),
updated_at=str(record.get("updated_at", "")),
created_at=coerce_iso(record.get("created_at", "")),
updated_at=coerce_iso(record.get("updated_at", "")),
metadata=record.get("metadata", {}),
values=serialize_channel_values(channel_values),
)
Expand Down Expand Up @@ -447,10 +450,10 @@ async def get_thread_state(thread_id: str, request: Request) -> ThreadStateRespo
values=values,
next=next_tasks,
metadata=metadata,
checkpoint={"id": checkpoint_id, "ts": str(metadata.get("created_at", ""))},
checkpoint={"id": checkpoint_id, "ts": coerce_iso(metadata.get("created_at", ""))},
checkpoint_id=checkpoint_id,
parent_checkpoint_id=parent_checkpoint_id,
created_at=str(metadata.get("created_at", "")),
created_at=coerce_iso(metadata.get("created_at", "")),
tasks=tasks,
)

Expand Down Expand Up @@ -500,7 +503,7 @@ async def update_thread_state(thread_id: str, body: ThreadStateUpdateRequest, re
channel_values.update(body.values)

checkpoint["channel_values"] = channel_values
metadata["updated_at"] = time.time()
metadata["updated_at"] = now_iso()

if body.as_node:
metadata["source"] = "update"
Expand Down Expand Up @@ -541,7 +544,7 @@ async def update_thread_state(thread_id: str, body: ThreadStateUpdateRequest, re
next=[],
metadata=metadata,
checkpoint_id=new_checkpoint_id,
created_at=str(metadata.get("created_at", "")),
created_at=coerce_iso(metadata.get("created_at", "")),
)


Expand Down Expand Up @@ -608,7 +611,7 @@ async def get_thread_history(thread_id: str, body: ThreadHistoryRequest, request
parent_checkpoint_id=parent_id,
metadata=user_meta,
values=values,
created_at=str(metadata.get("created_at", "")),
created_at=coerce_iso(metadata.get("created_at", "")),
next=next_tasks,
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@

from __future__ import annotations

import time
from typing import Any

from langgraph.store.base import BaseStore

from deerflow.persistence.thread_meta.base import ThreadMetaStore
from deerflow.runtime.user_context import AUTO, _AutoSentinel, resolve_user_id
from deerflow.utils.time import coerce_iso, now_iso

THREADS_NS: tuple[str, ...] = ("threads",)

Expand Down Expand Up @@ -48,7 +48,7 @@ async def create(
metadata: dict | None = None,
) -> dict:
resolved_user_id = resolve_user_id(user_id, method_name="MemoryThreadMetaStore.create")
now = time.time()
now = now_iso()
record: dict[str, Any] = {
"thread_id": thread_id,
"assistant_id": assistant_id,
Expand Down Expand Up @@ -106,15 +106,15 @@ async def update_display_name(self, thread_id: str, display_name: str, *, user_i
if record is None:
return
record["display_name"] = display_name
record["updated_at"] = time.time()
record["updated_at"] = now_iso()
await self._store.aput(THREADS_NS, thread_id, record)

async def update_status(self, thread_id: str, status: str, *, user_id: str | None | _AutoSentinel = AUTO) -> None:
record = await self._get_owned_record(thread_id, user_id, "MemoryThreadMetaStore.update_status")
if record is None:
return
record["status"] = status
record["updated_at"] = time.time()
record["updated_at"] = now_iso()
await self._store.aput(THREADS_NS, thread_id, record)

async def update_metadata(self, thread_id: str, metadata: dict, *, user_id: str | None | _AutoSentinel = AUTO) -> None:
Expand All @@ -124,7 +124,7 @@ async def update_metadata(self, thread_id: str, metadata: dict, *, user_id: str
merged = dict(record.get("metadata") or {})
merged.update(metadata)
record["metadata"] = merged
record["updated_at"] = time.time()
record["updated_at"] = now_iso()
await self._store.aput(THREADS_NS, thread_id, record)

async def delete(self, thread_id: str, *, user_id: str | None | _AutoSentinel = AUTO) -> None:
Expand All @@ -144,6 +144,8 @@ def _item_to_dict(item) -> dict[str, Any]:
"display_name": val.get("display_name"),
"status": val.get("status", "idle"),
"metadata": val.get("metadata", {}),
"created_at": str(val.get("created_at", "")),
"updated_at": str(val.get("updated_at", "")),
# ``coerce_iso`` heals legacy unix-second values written by
# earlier Gateway versions that called ``str(time.time())``.
"created_at": coerce_iso(val.get("created_at", "")),
"updated_at": coerce_iso(val.get("updated_at", "")),
}
7 changes: 2 additions & 5 deletions backend/packages/harness/deerflow/runtime/runs/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
import logging
import uuid
from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import TYPE_CHECKING

from deerflow.utils.time import now_iso as _now_iso

from .schemas import DisconnectMode, RunStatus

if TYPE_CHECKING:
Expand All @@ -17,10 +18,6 @@
logger = logging.getLogger(__name__)


def _now_iso() -> str:
return datetime.now(UTC).isoformat()


@dataclass
class RunRecord:
"""Mutable record for a single run."""
Expand Down
75 changes: 75 additions & 0 deletions backend/packages/harness/deerflow/utils/time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""ISO 8601 timestamp helpers for the Gateway and embedded runtime.

DeerFlow stores and serializes thread/run timestamps as ISO 8601 UTC
strings to match the LangGraph Platform schema (see
``langgraph_sdk.schema.Thread``, where ``created_at`` / ``updated_at``
are ``datetime`` and JSON-encode to ISO 8601). All timestamp generation
should funnel through :func:`now_iso` so the wire format stays
consistent across endpoints, the embedded ``RunManager``, and the
checkpoint metadata written by the Gateway.

:func:`coerce_iso` provides a forward-compatible read path for legacy
records that historically stored ``str(time.time())`` floats.
"""

from __future__ import annotations

import re
from datetime import UTC, datetime

__all__ = ["coerce_iso", "now_iso"]

_UNIX_TIMESTAMP_PATTERN = re.compile(r"^\d{10}(?:\.\d+)?$")
"""Matches the unix-timestamp string shape historically written by
``str(time.time())`` (10-digit seconds with optional fractional part).
The 10-digit anchor avoids accidentally rewriting ISO years like
``"2026"`` and stays valid until the year 2286.
"""


def now_iso() -> str:
"""Return the current UTC time as an ISO 8601 string.

Example: ``"2026-04-27T03:19:46.511479+00:00"``.
"""
return datetime.now(UTC).isoformat()


def coerce_iso(value: object) -> str:
"""Best-effort coerce a stored timestamp to an ISO 8601 string.

Translates legacy unix-timestamp floats / strings written by older
DeerFlow versions into ISO without a one-shot migration. ISO strings
pass through unchanged; ``datetime`` instances are normalised to UTC
(tz-naive values are assumed to be UTC) and emitted via
``isoformat()`` so the wire format always uses the ``T`` separator;
empty values become ``""``; unrecognised values are stringified as a
last resort.
"""
if value is None or value == "":
return ""
if isinstance(value, bool):
# ``bool`` is a subclass of ``int`` — treat as garbage, not 0/1.
return str(value)
if isinstance(value, datetime):
# ``datetime`` must be handled before the ``int``/``float`` check;
# str(datetime) would produce ``"YYYY-MM-DD HH:MM:SS+00:00"``
# (space separator), which breaks strict ISO 8601 consumers.
if value.tzinfo is None:
value = value.replace(tzinfo=UTC)
else:
value = value.astimezone(UTC)
return value.isoformat()
if isinstance(value, (int, float)):
try:
return datetime.fromtimestamp(float(value), UTC).isoformat()
except (ValueError, OverflowError, OSError):
return str(value)
if isinstance(value, str):
if _UNIX_TIMESTAMP_PATTERN.match(value):
try:
return datetime.fromtimestamp(float(value), UTC).isoformat()
except (ValueError, OverflowError, OSError):
return value
return value
return str(value)
Comment on lines +38 to +75
Copy link

Copilot AI May 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

coerce_iso() currently stringifies non-str/non-numeric inputs (including datetime objects). If created_at / updated_at ever come through as datetime (e.g., from LangGraph internals or in-memory stores), str(datetime) produces a space-separated format (YYYY-MM-DD HH:MM:SS+00:00) rather than ISO-8601 with T, which can break consumers expecting strict ISO. Consider handling datetime explicitly (and normalizing to UTC if tz-naive) by returning value.astimezone(UTC).isoformat() / value.replace(tzinfo=UTC).isoformat() as appropriate.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in ed9026f. coerce_iso now branches on datetime before the int/float check and routes through astimezone(UTC).isoformat() (or replace(tzinfo=UTC) when tz-naive), so the output always uses the T separator regardless of how an upstream component handed us the value.

Three new test cases cover the contract:

  • test_coerce_iso_handles_tz_aware_datetime — explicit assertion that T is in the output and a space is not.
  • test_coerce_iso_handles_tz_naive_datetime_as_utc — tz-naive input is treated as UTC.
  • test_coerce_iso_normalises_non_utc_datetime_to_utc+08:00 value gets normalised so the wire format stays UTC.

Verification: uv run pytest tests/test_utils_time.py -v → 12 passed (3 new).

Loading
Loading