Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 1 addition & 57 deletions lib/iris/src/iris/cluster/controller/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import json
import logging
import re
import secrets
import threading
import time
Expand Down Expand Up @@ -95,7 +94,7 @@
task_updates_from_proto,
)
from iris.cluster.controller.provider import ProviderError
from iris.cluster.log_store import build_log_source, worker_log_key
from iris.cluster.log_store import worker_log_key
from iris.cluster.process_status import get_process_status
from iris.cluster.runtime.profile import is_system_target, parse_profile_target, profile_local_process
from iris.cluster.types import (
Expand All @@ -119,8 +118,6 @@

logger = logging.getLogger(__name__)

DEFAULT_MAX_TOTAL_LINES = 100000

# Maximum bundle size in bytes (25 MB) - matches client-side limit
MAX_BUNDLE_SIZE_BYTES = 25 * 1024 * 1024

Expand Down Expand Up @@ -1816,59 +1813,6 @@ def get_kubernetes_cluster_status(

# --- VM Logs ---

# --- Task/Job Logs (batch fetching) ---

def get_task_logs(
self,
request: controller_pb2.Controller.GetTaskLogsRequest,
ctx: RequestContext,
) -> controller_pb2.Controller.GetTaskLogsResponse:
"""DEPRECATED: use FetchLogs with regex patterns instead. Scheduled for removal 2026-05-01.

Forwards to fetch_logs internally, wrapping the response in the legacy format.
"""
job_name = JobName.from_wire(request.id)

# Build the regex source pattern from the legacy request fields
if job_name.is_task:
source = build_log_source(job_name, request.attempt_id)
elif request.include_children:
source = build_log_source(job_name)
else:
# Direct tasks only: match keys like /user/job/0:attempt but not
# /user/job/child-job/0:attempt. Use \d+ to restrict to numeric
# task indices, pushing the filter into DuckDB.
escaped_wire = re.escape(job_name.to_wire())
source = f"{escaped_wire}/\\d+:.*"

max_lines = request.max_total_lines if request.max_total_lines > 0 else DEFAULT_MAX_TOTAL_LINES

fetch_request = logging_pb2.FetchLogsRequest(
source=source,
since_ms=request.since_ms,
cursor=request.cursor,
substring=request.substring,
max_lines=max_lines,
tail=request.tail,
min_level=request.min_level,
)

fetch_response = self._log_service.fetch_logs(fetch_request, ctx)
entries = fetch_response.entries

batch = controller_pb2.Controller.TaskLogBatch(
task_id=request.id,
logs=entries,
)

truncated = max_lines > 0 and len(fetch_response.entries) >= max_lines

return controller_pb2.Controller.GetTaskLogsResponse(
task_logs=[batch],
truncated=truncated,
cursor=fetch_response.cursor,
)

# --- Profiling ---

def profile_task(
Expand Down
36 changes: 0 additions & 36 deletions lib/iris/src/iris/rpc/controller.proto
Original file line number Diff line number Diff line change
Expand Up @@ -333,38 +333,6 @@ message Controller {
repeated UserSummary users = 1;
}

// --- Task/Job Logs (batch fetching) ---
message GetTaskLogsRequest {
string id = 1; // Job ID or Task ID (detected by trailing numeric)
bool include_children = 2; // Include child jobs (only for job IDs)
int64 since_ms = 3; // Only return logs after this timestamp (exclusive)
int64 max_total_lines = 4; // Max lines total (0 = default 10000)
string substring = 5; // Literal substring match on log data
int32 attempt_id = 6; // Filter to specific attempt (-1 = all attempts)
string min_level = 7; // Minimum log level filter (DEBUG/INFO/WARNING/ERROR/CRITICAL)
int64 cursor = 8; // Autoincrement id cursor for incremental polling
bool tail = 9; // If true, return the last N lines instead of the first N
}

message TaskLogBatch {
string task_id = 1;
repeated iris.logging.LogEntry logs = 2;
string error = 3; // If fetch failed for this task
string worker_id = 4; // Worker that produced these logs
}

message GetTaskLogsResponse {
repeated TaskLogBatch task_logs = 1;
bool truncated = 2; // True if max_total_lines hit

// Child job statuses (populated when include_children=true).
// Included on every response so the client can detect state transitions
// (e.g. child moved to FAILED) without a separate ListJobs RPC.
repeated iris.job.JobStatus child_job_statuses = 3;

int64 cursor = 4; // Max autoincrement id seen (cursor for next poll)
}

// --- Worker Detail (unified VM + worker view) ---
message GetWorkerStatusRequest {
string id = 1; // VM ID or worker ID — resolved against both
Expand Down Expand Up @@ -580,10 +548,6 @@ service ControllerService {

rpc ListUsers(Controller.ListUsersRequest) returns (Controller.ListUsersResponse);

// DEPRECATED: use LogService.FetchLogs with regex patterns instead.
// Forwards to LogService.FetchLogs internally. Scheduled for removal 2026-05-01.
rpc GetTaskLogs(Controller.GetTaskLogsRequest) returns (Controller.GetTaskLogsResponse);

// Profiling (proxied to worker)
rpc ProfileTask(iris.job.ProfileTaskRequest) returns (iris.job.ProfileTaskResponse);

Expand Down
65 changes: 0 additions & 65 deletions lib/iris/src/iris/rpc/controller_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,6 @@ async def get_autoscaler_status(self, request: controller__pb2.Controller.GetAut
async def list_users(self, request: controller__pb2.Controller.ListUsersRequest, ctx: RequestContext) -> controller__pb2.Controller.ListUsersResponse:
raise ConnectError(Code.UNIMPLEMENTED, "Not implemented")

async def get_task_logs(self, request: controller__pb2.Controller.GetTaskLogsRequest, ctx: RequestContext) -> controller__pb2.Controller.GetTaskLogsResponse:
raise ConnectError(Code.UNIMPLEMENTED, "Not implemented")

async def profile_task(self, request: job__pb2.ProfileTaskRequest, ctx: RequestContext) -> job__pb2.ProfileTaskResponse:
raise ConnectError(Code.UNIMPLEMENTED, "Not implemented")

Expand Down Expand Up @@ -270,16 +267,6 @@ def __init__(self, service: ControllerService | AsyncGenerator[ControllerService
),
function=svc.list_users,
),
"/iris.cluster.ControllerService/GetTaskLogs": Endpoint.unary(
method=MethodInfo(
name="GetTaskLogs",
service_name="iris.cluster.ControllerService",
input=controller__pb2.Controller.GetTaskLogsRequest,
output=controller__pb2.Controller.GetTaskLogsResponse,
idempotency_level=IdempotencyLevel.UNKNOWN,
),
function=svc.get_task_logs,
),
"/iris.cluster.ControllerService/ProfileTask": Endpoint.unary(
method=MethodInfo(
name="ProfileTask",
Expand Down Expand Up @@ -773,26 +760,6 @@ async def list_users(
timeout_ms=timeout_ms,
)

async def get_task_logs(
self,
request: controller__pb2.Controller.GetTaskLogsRequest,
*,
headers: Headers | Mapping[str, str] | None = None,
timeout_ms: int | None = None,
) -> controller__pb2.Controller.GetTaskLogsResponse:
return await self.execute_unary(
request=request,
method=MethodInfo(
name="GetTaskLogs",
service_name="iris.cluster.ControllerService",
input=controller__pb2.Controller.GetTaskLogsRequest,
output=controller__pb2.Controller.GetTaskLogsResponse,
idempotency_level=IdempotencyLevel.UNKNOWN,
),
headers=headers,
timeout_ms=timeout_ms,
)

async def profile_task(
self,
request: job__pb2.ProfileTaskRequest,
Expand Down Expand Up @@ -1223,8 +1190,6 @@ def get_autoscaler_status(self, request: controller__pb2.Controller.GetAutoscale
raise ConnectError(Code.UNIMPLEMENTED, "Not implemented")
def list_users(self, request: controller__pb2.Controller.ListUsersRequest, ctx: RequestContext) -> controller__pb2.Controller.ListUsersResponse:
raise ConnectError(Code.UNIMPLEMENTED, "Not implemented")
def get_task_logs(self, request: controller__pb2.Controller.GetTaskLogsRequest, ctx: RequestContext) -> controller__pb2.Controller.GetTaskLogsResponse:
raise ConnectError(Code.UNIMPLEMENTED, "Not implemented")
def profile_task(self, request: job__pb2.ProfileTaskRequest, ctx: RequestContext) -> job__pb2.ProfileTaskResponse:
raise ConnectError(Code.UNIMPLEMENTED, "Not implemented")
def exec_in_container(self, request: controller__pb2.Controller.ExecInContainerRequest, ctx: RequestContext) -> controller__pb2.Controller.ExecInContainerResponse:
Expand Down Expand Up @@ -1411,16 +1376,6 @@ def __init__(self, service: ControllerServiceSync, interceptors: Iterable[Interc
),
function=service.list_users,
),
"/iris.cluster.ControllerService/GetTaskLogs": EndpointSync.unary(
method=MethodInfo(
name="GetTaskLogs",
service_name="iris.cluster.ControllerService",
input=controller__pb2.Controller.GetTaskLogsRequest,
output=controller__pb2.Controller.GetTaskLogsResponse,
idempotency_level=IdempotencyLevel.UNKNOWN,
),
function=service.get_task_logs,
),
"/iris.cluster.ControllerService/ProfileTask": EndpointSync.unary(
method=MethodInfo(
name="ProfileTask",
Expand Down Expand Up @@ -1914,26 +1869,6 @@ def list_users(
timeout_ms=timeout_ms,
)

def get_task_logs(
self,
request: controller__pb2.Controller.GetTaskLogsRequest,
*,
headers: Headers | Mapping[str, str] | None = None,
timeout_ms: int | None = None,
) -> controller__pb2.Controller.GetTaskLogsResponse:
return self.execute_unary(
request=request,
method=MethodInfo(
name="GetTaskLogs",
service_name="iris.cluster.ControllerService",
input=controller__pb2.Controller.GetTaskLogsRequest,
output=controller__pb2.Controller.GetTaskLogsResponse,
idempotency_level=IdempotencyLevel.UNKNOWN,
),
headers=headers,
timeout_ms=timeout_ms,
)

def profile_task(
self,
request: job__pb2.ProfileTaskRequest,
Expand Down
Loading
Loading