Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 176 additions & 9 deletions libs/cli/langgraph_cli/cli.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""CLI entrypoint for LangGraph API server."""

import base64
Expand Down Expand Up @@ -26,6 +26,7 @@
from langgraph_cli.constants import DEFAULT_CONFIG, DEFAULT_PORT
from langgraph_cli.docker import DockerCapabilities
from langgraph_cli.exec import Runner, subp_exec
from langgraph_cli.helpers import format_log_entry, level_fg, resolve_deployment_id
from langgraph_cli.host_backend import HostBackendClient, HostBackendError
from langgraph_cli.progress import Progress
from langgraph_cli.templates import TEMPLATE_HELP_STRING, create_new
Expand Down Expand Up @@ -296,6 +297,17 @@
),
)


OPT_HOST_DEPLOYMENT_NAME = click.option(
"--name",
envvar=_DEPLOYMENT_NAME_ENV,
help=(
"Deployment name. Can also be set via LANGSMITH_DEPLOYMENT_NAME "
"environment variable or .env file. Defaults to current directory name "
"if --deployment-id is not provided."
),
)

OPT_HOST_URL = click.option(
"--host-url",
envvar="LANGGRAPH_HOST_URL",
Expand Down Expand Up @@ -684,15 +696,7 @@
def _apply(target: Callable) -> Callable:
decorators = [
OPT_HOST_API_KEY,
click.option(
"--name",
envvar="LANGSMITH_DEPLOYMENT_NAME",
help=(
"Deployment name. Can also be set via LANGSMITH_DEPLOYMENT_NAME "
"environment variable or .env file. Defaults to current directory name "
"if --deployment-id is not provided."
),
),
OPT_HOST_DEPLOYMENT_NAME,
click.option(
"--deployment-id",
help=(
Expand Down Expand Up @@ -1294,6 +1298,169 @@
return value


@OPT_HOST_API_KEY
@OPT_HOST_DEPLOYMENT_NAME
@click.option(
"--deployment-id",
help="Deployment ID. If omitted, --name is used to find the deployment.",
)
@click.option(
"--type",
"log_type",
type=click.Choice(["deploy", "build"]),
default="deploy",
show_default=True,
help="Type of logs to fetch.",
)
@click.option(
"--revision-id",
help="Specific revision ID. For build logs, defaults to latest revision.",
)
@click.option(
"--level",
type=click.Choice(["DEBUG", "INFO", "WARNING", "ERROR"], case_sensitive=False),
help="Filter by log level.",
)
@click.option(
"--limit",
type=int,
default=100,
show_default=True,
help="Max log entries to fetch.",
)
@click.option(
"--query",
"-q",
help="Search string filter.",
)
@click.option(
"--start-time",
help="ISO8601 start time (e.g. 2026-03-08T00:00:00Z).",
)
@click.option(
"--end-time",
help="ISO8601 end time.",
)
@click.option(
"--follow",
"-f",
is_flag=True,
default=False,
help="Continuously poll for new logs.",
)
@OPT_HOST_URL
@deploy.command(
"logs",
help="[Beta] Fetch build or deploy logs for a LangSmith deployment.",
)
@log_command
def deploy_logs(
api_key: str | None,
name: str | None,
deployment_id: str | None,
log_type: str,
revision_id: str | None,
level: str | None,
limit: int,
query: str | None,
start_time: str | None,
end_time: str | None,
follow: bool,
host_url: str,
):
client = _create_host_backend_client(host_url, api_key)
dep_id = resolve_deployment_id(client, deployment_id, name)

if log_type == "build" and not revision_id:
revisions_resp = client.list_revisions(dep_id, limit=1)
resources = (
revisions_resp.get("resources", [])
if isinstance(revisions_resp, dict)
else []
)
if not resources:
raise click.ClickException(
"No revisions found for this deployment. Cannot fetch build logs."
)
revision_id = str(resources[0]["id"])
click.secho(f"Using latest revision: {revision_id}", fg="cyan")

payload: dict = {"limit": limit, "order": "desc"}
if level:
payload["level"] = level.upper()
if query:
payload["query"] = query
if start_time:
payload["start_time"] = start_time
if end_time:
payload["end_time"] = end_time

def _fetch(request_payload: dict) -> list[dict]:
if log_type == "build":
resp = client.get_build_logs(dep_id, revision_id, request_payload)
else:
resp = client.get_deploy_logs(dep_id, request_payload, revision_id)

if isinstance(resp, dict):
return resp.get("logs", resp.get("entries", []))
elif isinstance(resp, list):
return resp
return []

def _print_entries(entries: list[dict], *, reverse: bool = False) -> None:
iterable = reversed(entries) if reverse else entries
for entry in iterable:
line = format_log_entry(entry)
fg = level_fg(entry.get("level", ""))
click.secho(line, fg=fg)

def _fetch_and_print(request_payload: dict, *, reverse: bool = False) -> list[dict]:
entries = _fetch(request_payload)
_print_entries(entries, reverse=reverse)
return entries

def _fetch_and_print_new(request_payload: dict, seen_ids: set[str]) -> list[dict]:
entries = _fetch(request_payload)
new = [e for e in entries if e.get("id", "") not in seen_ids]
if new:
_print_entries(new)
seen_ids.update(e.get("id", "") for e in new)
return new

entries = _fetch_and_print(payload, reverse=True)

if not follow:
if not entries:
click.secho("No log entries found.", fg="yellow")
return

payload["order"] = "asc"
seen_ids: set[str] = {e.get("id", "") for e in entries if e.get("id")}
from datetime import datetime, timezone

def _update_start_time(ts) -> None:
if ts is None:
return
if isinstance(ts, (int, float)):
dt = datetime.fromtimestamp(ts / 1000, tz=timezone.utc)
payload["start_time"] = dt.isoformat()
else:
payload["start_time"] = str(ts)

if entries:
# entries are in descending order here, so index 0 is the newest log
_update_start_time(entries[0].get("timestamp"))

try:
while True:
time.sleep(2)
new_entries = _fetch_and_print_new(payload, seen_ids)
if new_entries:
_update_start_time(new_entries[-1].get("timestamp"))
except KeyboardInterrupt:
click.echo("\nStopped.")


def _get_docker_ignore_content() -> str:
"""Return the content of a .dockerignore file.

Expand Down
59 changes: 59 additions & 0 deletions libs/cli/langgraph_cli/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""Helpers for the ``langgraph logs`` CLI command."""

from __future__ import annotations

from datetime import datetime, timezone

import click

from langgraph_cli.host_backend import HostBackendClient


def resolve_deployment_id(
client: HostBackendClient,
deployment_id: str | None,
name: str | None,
) -> str:
"""Resolve a deployment ID from --deployment-id or --name."""
if deployment_id:
return deployment_id
if not name:
raise click.UsageError("Either --deployment-id or --name is required.")
existing = client.list_deployments(name_contains=name)
if isinstance(existing, dict):
for dep in existing.get("resources", []):
if isinstance(dep, dict) and dep.get("name") == name:
found_id = dep.get("id")
if found_id:
return str(found_id)
raise click.ClickException(f"Deployment '{name}' not found.")


def format_timestamp(ts) -> str:
"""Convert a timestamp (epoch ms or string) to a readable string."""
if isinstance(ts, (int, float)):
dt = datetime.fromtimestamp(ts / 1000, tz=timezone.utc)
return dt.strftime("%Y-%m-%d %H:%M:%S")
return str(ts) if ts else ""


def format_log_entry(entry: dict) -> str:
"""Format a single log entry for display."""
ts = format_timestamp(entry.get("timestamp", ""))
level = entry.get("level", "")
message = entry.get("message", "")
if ts and level:
return f"[{ts}] [{level}] {message}"
elif ts:
return f"[{ts}] {message}"
return message


def level_fg(level: str) -> str | None:
"""Return click color for a log level."""
level_upper = level.upper() if level else ""
if level_upper == "ERROR":
return "red"
if level_upper == "WARNING":
return "yellow"
return None
29 changes: 27 additions & 2 deletions libs/cli/langgraph_cli/host_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ def __init__(self, message: str, status_code: int | None = None):
class HostBackendClient:
"""Minimal JSON HTTP client for the host backend deployment service."""

def __init__(self, base_url: str, api_key: str, tenant_id: str | None = None):
def __init__(
self,
base_url: str,
api_key: str,
tenant_id: str | None = None,
):
if not base_url:
raise click.UsageError("Host backend URL is required")
transport = httpx.HTTPTransport(retries=3)
Expand All @@ -30,7 +35,6 @@ def __init__(self, base_url: str, api_key: str, tenant_id: str | None = None):
if tenant_id:
headers["X-Tenant-ID"] = tenant_id
self._base_url = base_url.rstrip("/")
self._api_key = api_key

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this removed?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its not used in any of the helper functions, we just use it to set the header

self._client = httpx.Client(
base_url=self._base_url,
headers=headers,
Expand Down Expand Up @@ -116,3 +120,24 @@ def get_revision(self, deployment_id: str, revision_id: str) -> dict[str, Any]:
"GET",
f"/v2/deployments/{deployment_id}/revisions/{revision_id}",
)

def get_build_logs(
self, project_id: str, revision_id: str, payload: dict[str, Any]
) -> Any:
return self._request(
"POST",
f"/v1/projects/{project_id}/revisions/{revision_id}/build_logs",
payload,
)

def get_deploy_logs(
self,
project_id: str,
payload: dict[str, Any],
revision_id: str | None = None,
) -> Any:
if revision_id:
path = f"/v1/projects/{project_id}/revisions/{revision_id}/deploy_logs"
else:
path = f"/v1/projects/{project_id}/deploy_logs"
return self._request("POST", path, payload)
38 changes: 38 additions & 0 deletions libs/cli/tests/unit_tests/test_host_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,41 @@ def test_list_revisions(client):
def test_get_revision(client):
result = client.get_revision("dep-123", "rev-456")
assert result == {"ok": True}


def test_get_build_logs(client):
result = client.get_build_logs("proj-1", "rev-1", {"limit": 10})
assert result == {"ok": True}


def test_get_deploy_logs_all_revisions():
def handler(req: httpx.Request) -> httpx.Response:
assert "/v1/projects/proj-1/deploy_logs" in str(req.url)
assert "/revisions/" not in str(req.url)
return httpx.Response(200, json={"logs": [{"message": "running"}]})

c = HostBackendClient("https://api.example.com", "key")
c._client = httpx.Client(
base_url="https://api.example.com",
transport=httpx.MockTransport(handler),
headers={"X-Api-Key": "key", "Accept": "application/json"},
timeout=30,
)
result = c.get_deploy_logs("proj-1", {"limit": 10})
assert result == {"logs": [{"message": "running"}]}


def test_get_deploy_logs_specific_revision():
def handler(req: httpx.Request) -> httpx.Response:
assert "/v1/projects/proj-1/revisions/rev-2/deploy_logs" in str(req.url)
return httpx.Response(200, json={"logs": []})

c = HostBackendClient("https://api.example.com", "key")
c._client = httpx.Client(
base_url="https://api.example.com",
transport=httpx.MockTransport(handler),
headers={"X-Api-Key": "key", "Accept": "application/json"},
timeout=30,
)
result = c.get_deploy_logs("proj-1", {"limit": 10}, revision_id="rev-2")
assert result == {"logs": []}
Loading
Loading