Skip to content

[iris] tolerate Finelog being unreachable during controller startup#5766

Open
ravwojdyla-agent wants to merge 3 commits into
mainfrom
worktree-rav-iris-controller
Open

[iris] tolerate Finelog being unreachable during controller startup#5766
ravwojdyla-agent wants to merge 3 commits into
mainfrom
worktree-rav-iris-controller

Conversation

@ravwojdyla-agent
Copy link
Copy Markdown
Contributor

@ravwojdyla-agent ravwojdyla-agent commented May 15, 2026

  • make iris controller resilient to finelog being unreachable at startup
  • LogClient.get_table issues a synchronous RegisterTable RPC; previously a finelog outage at startup crashed the controller even though runtime writers already null-check the resulting Table 1
  • add Controller._register_finelog_table helper that wraps get_table, catches failures, and spawns a background ManagedThread that retries registration with bounded exponential backoff (1s → 60s)
    • on retry success the helper invokes an on_late_success callback so the caller can install the Table where runtime writers look for it
    • retry thread joins the controller's ThreadContainer and stops with the controller
  • route the three startup get_table sites through the helper: K8s provider's task_stats_table and profile_table, and ControllerServiceImpl's controller-process profile_table
  • move ControllerServiceImpl._profile_table registration out of __init__; add set_profile_table so the controller installs it after construction (and re-installs from the retry thread)
  • add test_controller_starts_when_finelog_table_registration_fails covering both halves: controller comes up with _profile_table = None, then the background retry installs it

Footnotes

  1. asymmetry confirmed by reading the codebase — Table.write enqueues into an in-memory buffer with bg-thread flush + retry, so once registered the table tolerates finelog outages; only the upfront synchronous register_table was unprotected.

LogClient.get_table issues a synchronous RegisterTable RPC; previously a
finelog outage at startup crashed the controller, even though every
runtime writer already null-checks the resulting Table. Wrap the three
startup get_table calls (K8s task_stats + profile, controller-process
profile) in a helper that catches the failure and schedules a background
ManagedThread to retry registration with bounded exponential backoff.
On retry success the helper installs the Table where runtime writers
look for it.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@ravwojdyla-agent ravwojdyla-agent added the agent-generated Created by automation/agent label May 15, 2026
ravwojdyla and others added 2 commits May 15, 2026 11:08
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@ravwojdyla ravwojdyla requested a review from rjpower May 15, 2026 18:15
@rjpower
Copy link
Copy Markdown
Collaborator

rjpower commented May 15, 2026

Could we fix this on the FineLog side? I think if we make the table creation async, that would get rid of the need for the special helpers.

So you'd always get the table and be able to write to it, and we'd just move the register into the background thread.

@claude can you sketch that out for me in a comment?

@claude
Copy link
Copy Markdown
Contributor

claude Bot commented May 15, 2026

Claude finished @rjpower's task in 2m 25s —— View job


Sketch: move RegisterTable into the background, on the FineLog side

The asymmetry that motivated this PR is in LogClientTable.write already tolerates a finelog outage (in-memory queue + bg flush with retry + endpoint invalidation), but LogClient.get_table does a synchronous RegisterTable RPC before returning the Table (log_client.py L562–588). Fixing it there means every caller — not just iris — gets a Table that is "write-now, register-when-finelog-is-up", and the controller-side scaffolding in this PR disappears.

Shape of the change in lib/finelog/src/finelog/client/log_client.py

get_table becomes a fast, non-blocking factory:

def get_table(self, namespace: str, schema: type | Schema) -> Table:
    if namespace == LOG_NAMESPACE:
        raise InvalidNamespaceError(...)
    requested = schema if isinstance(schema, Schema) else schema_from_dataclass(schema)

    with self._lock:
        existing = self._tables.get(namespace)
        if existing is not None:
            return existing
        if self._closed:
            raise RuntimeError("LogClient is closed")
        table = Table(
            namespace=namespace,
            requested_schema=requested,            # used for arrow encoding until reconciled
            flusher=self._stats_flush,
            querier=self._stats_query,
            registrar=self._register_table_once,   # new: callable the bg thread invokes
        )
        self._tables[namespace] = table
        return table

def _register_table_once(self, namespace: str, requested: Schema) -> Schema:
    """Single RegisterTable RPC. Returns effective schema or raises."""
    client = self._get_stats_client()
    try:
        response = client.register_table(
            stats_pb2.RegisterTableRequest(
                namespace=namespace, schema=schema_to_proto(requested),
            )
        )
    except ConnectError as exc:
        raise _translate_connect_error(exc) from exc
    return schema_from_proto(response.effective_schema)

Shape of the change in Table

The flush thread already exists and already does retry/backoff. Reuse it: have it gate on registration before it sends.

class Table:
    def __init__(self, *, namespace, requested_schema, flusher, registrar, querier=None, ...):
        self._namespace = namespace
        self._schema = requested_schema                 # may be replaced after RegisterTable
        self._arrow_schema = schema_to_arrow(requested_schema)
        self._registrar = registrar
        self._registered = threading.Event()
        self._register_error: BaseException | None = None  # latched permanent error
        # ... existing queue/backoff/thread setup unchanged

In _run, prepend a registration phase:

def _run(self) -> None:
    self._register_loop()           # blocks until registered, dead, or closing
    if self._register_error is not None or self._closing:
        return
    # ... existing send loop unchanged

def _register_loop(self) -> None:
    while not self._closing:
        try:
            effective = self._registrar(self._namespace, self._schema)
        except (SchemaConflictError, SchemaValidationError, InvalidNamespaceError) as exc:
            # Permanent; surface on next write() and stop bg work.
            with self._cond:
                self._register_error = exc
                self._cond.notify_all()
            return
        except Exception as exc:
            retryable = is_retryable_error(exc) or isinstance(exc, (ConnectionError, OSError, TimeoutError))
            if not retryable:
                with self._cond:
                    self._register_error = exc
                    self._cond.notify_all()
                return
            logger.debug("Table(%s) register pending: %s", self._namespace, _format_exc_summary(exc))
            self._wait_backoff()
            continue
        # success
        if effective != self._schema:
            with self._cond:
                self._schema = effective
                self._arrow_schema = schema_to_arrow(effective)
        self._registered.set()
        self._backoff.reset()
        return

write() consults the latched error so programmer mistakes (schema conflict, invalid namespace) surface on the next write rather than silently hanging:

def write(self, rows):
    if self._register_error is not None:
        raise self._register_error
    # ... existing buffer/enqueue path unchanged

flush() and close() need small tweaks:

  • flush(timeout) already waits on _processed_seq; that condition is naturally never reached pre-registration, so timeout semantics carry over. No code change.
  • close() should set _closing and the bg thread must bail out of _register_loop promptly — the existing _cond-based wait already does this if we wait on the condition instead of time.sleep.

query() is the one synchronous read path; the cleanest answer is to keep it raising StatsError("table not yet registered") (or briefly wait on self._registered) until it lands. Callers that need read-after-write semantics can call flush() first.

What this lets us drop from this PR

Once LogClient does this, the controller side collapses back to the pre-PR shape:

  • Controller._register_finelog_table — gone.
  • K8sTaskProvider.task_stats_table / profile_table go back to plain get_table assignments, no callbacks.
  • ControllerServiceImpl._profile_table goes back to a plain field set in __init__, no set_profile_table, no Table | None typing, no null-checks at the writer.
  • tests/cluster/controller/test_finelog_startup.py moves into lib/finelog/tests/test_client.py as a much cleaner test (no make_controller, no monkeypatching LogClient.get_table — just point a LogClient at a closed port, write rows, bring up a server, assert rows land).

Failure modes to think through before committing to this

  1. Pre-registration writes that violate the eventual effective schema. Today this fails synchronously at get_table. With async registration, rows the caller wrote between get_table and RegisterTable succeeding are already in the queue, encoded against the requested schema. Mitigation: server reconciliation is meant to be additive (nullable extensions). For real conflicts, the bg thread latches the error and write() raises on the next call — programmer sees it within one write cycle, not at startup. Worth confirming what _log_store.register_table actually changes vs. requested.
  2. Buffer pressure during a long outage. Already a problem today for post-registration writes — _trim_oldest_locked drops the oldest rows past max_buffer_rows/max_buffer_bytes. Behavior would be identical for pre-registration writes. Fine.
  3. query() semantics. Currently Table.query assumes the namespace exists. If a caller does t = lc.get_table(...); t.query(...) while finelog is down, today it raises at get_table; with the change it raises at query. Same failure surface, different line — acceptable.
  4. Test ergonomics. A test like test_register.py can spin up a LogClient with no reachable server, call get_table, assert it returns without blocking, write some rows, then start the server and assert the rows arrive after flush(timeout=...). No monkeypatching needed.

Net

Worth doing. The controller diff in this PR effectively re-implements the bg-retry pattern that Table already has for writes, just for one extra RPC at the boundary — pushing it into LogClient removes ~80 lines of single-purpose plumbing here and gives the same fix to every other get_table caller (workers, zephyr coordinator, anything spinning up a Table in __init__).

Happy to write the FineLog-side patch as a follow-up PR if you want.
· branch worktree-rav-iris-controller

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

agent-generated Created by automation/agent

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants