Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions lib/iris/OPS.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,14 @@ SELECT slice_id, lifecycle, scale_group, worker_ids FROM slices WHERE lifecycle=
-- Task attempt history (debugging retries)
SELECT task_id, attempt_id, state, exit_code, error FROM task_attempts
WHERE task_id LIKE '%<job_fragment>%' ORDER BY attempt_id;
```

Controller audit events (`event=<kind> action=<action> entity=<id> ...`) are
emitted as structured `logger.info` lines — query them through
`iris process logs` with a substring filter, not via SQL. Example:

-- What the controller has been doing
SELECT kind, count(*) FROM txn_log GROUP BY kind ORDER BY count(*) DESC LIMIT 10;
```bash
iris process logs --since 24h | grep 'action=worker_heartbeat_failed'
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Grep audit events using the event= key

The new audit logging format emitted by controller transitions uses event=<action> (for example event=worker_heartbeat_failed), but this OPS example filters on action=worker_heartbeat_failed; operators following it will miss the intended audit entries during debugging. Update the filter/example to match the emitted field name.

Useful? React with 👍 / 👎.

Comment on lines +123 to +128
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The grammar description and grep example don't match what log_event actually emits. log_event in transitions.py emits event=<action> entity=<entity_id> trigger=<trigger> k=v ... — there is no separate action= token. The documented grep 'action=worker_heartbeat_failed' will match nothing. Either the grammar line should drop the phantom action=<action> and the example should be grep 'event=worker_heartbeat_failed'.

Suggested change
Controller audit events (`event=<kind> action=<action> entity=<id> ...`) are
emitted as structured `logger.info` lines — query them through
`iris process logs` with a substring filter, not via SQL. Example:
-- What the controller has been doing
SELECT kind, count(*) FROM txn_log GROUP BY kind ORDER BY count(*) DESC LIMIT 10;
```bash
iris process logs --since 24h | grep 'action=worker_heartbeat_failed'
Controller audit events (`event=<action> entity=<id> trigger=<parent> k=v ...`)
are emitted as structured `logger.info` lines — query them through
`iris process logs` with a substring filter, not via SQL. Example:
```bash
iris process logs --since 24h | grep 'event=worker_heartbeat_failed'

```

Full table list: `iris query "SELECT name FROM sqlite_master WHERE type='table'"`.
Expand Down
3 changes: 0 additions & 3 deletions lib/iris/dashboard/src/App.vue
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ const WORKER_TABS: Tab[] = [
{ key: 'fleet', label: 'Workers', to: '/fleet' },
{ key: 'endpoints', label: 'Endpoints', to: '/endpoints' },
{ key: 'autoscaler', label: 'Autoscaler', to: '/autoscaler' },
{ key: 'transactions', label: 'Transactions', to: '/transactions' },
{ key: 'account', label: 'Account', to: '/account' },
{ key: 'status', label: 'Status', to: '/status' },
]
Expand All @@ -30,7 +29,6 @@ const KUBERNETES_TABS: Tab[] = [
{ key: 'scheduler', label: 'Scheduler', to: '/scheduler' },
{ key: 'cluster', label: 'Cluster', to: '/cluster' },
{ key: 'endpoints', label: 'Endpoints', to: '/endpoints' },
{ key: 'transactions', label: 'Transactions', to: '/transactions' },
{ key: 'account', label: 'Account', to: '/account' },
{ key: 'status', label: 'Status', to: '/status' },
]
Expand All @@ -46,7 +44,6 @@ const PATH_TO_TAB: Record<string, string> = {
'/cluster': 'cluster',
'/endpoints': 'endpoints',
'/autoscaler': 'autoscaler',
'/transactions': 'transactions',
'/account': 'account',
'/status': 'status',
}
Expand Down
102 changes: 0 additions & 102 deletions lib/iris/dashboard/src/components/controller/TransactionsTab.vue

This file was deleted.

4 changes: 0 additions & 4 deletions lib/iris/dashboard/src/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ const routes = [
path: '/status',
component: () => import('./components/controller/StatusTab.vue'),
},
{
path: '/transactions',
component: () => import('./components/controller/TransactionsTab.vue'),
},
{
path: '/scheduler',
component: () => import('./components/controller/SchedulerTab.vue'),
Expand Down
13 changes: 0 additions & 13 deletions lib/iris/dashboard/src/types/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -458,19 +458,6 @@ export interface GetProcessStatusResponse {
logEntries?: LogEntry[]
}

// -- Transactions --

export interface TransactionAction {
timestamp?: ProtoTimestamp
action?: string
entityId?: string
details?: string
}

export interface GetTransactionsResponse {
actions: TransactionAction[]
}

// -- Task State Counts (used in job summaries and user summaries) --

/** Mapping from lowercase state name to count, e.g. { running: 2, pending: 5 } */
Expand Down
2 changes: 0 additions & 2 deletions lib/iris/scripts/benchmark_db_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@
"task_resource_history",
"endpoints",
"reservation_claims",
"txn_log",
"txn_actions",
"meta",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file has two more dangling references to the deleted _transaction_actions helper that this PR missed:

  • line 70 — still imported in the from iris.cluster.controller.service import (...) block
  • line 442 — still called via bench("_transaction_actions", lambda: _transaction_actions(db))

Since _transaction_actions was removed from service.py, running the script now raises ImportError: cannot import name '_transaction_actions' from 'iris.cluster.controller.service' at module load. Drop both the import entry and the bench(...) call.

Fix this →

"schema_migrations",
]
Expand Down
17 changes: 16 additions & 1 deletion lib/iris/src/iris/cluster/controller/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ def create_api_key(
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(key_id, key_hash, key_prefix, user_id, name, now.epoch_ms(), expires_at.epoch_ms() if expires_at else None),
)
logger.info(
"event=api_key_created entity=%s trigger=- user=%s name=%s expires_at_ms=%s",
key_id,
user_id,
name,
expires_at.epoch_ms() if expires_at else "-",
)


def lookup_api_key_by_hash(db: ControllerDB, key_hash: str) -> ApiKeyRow | None:
Expand All @@ -81,7 +88,10 @@ def revoke_api_key(db: ControllerDB, key_id: str, now: Timestamp) -> bool:
f"UPDATE {db.api_keys_table} SET revoked_at_ms = ? WHERE key_id = ? AND revoked_at_ms IS NULL",
(now.epoch_ms(), key_id),
)
return cur._cursor.rowcount > 0
revoked = cur._cursor.rowcount > 0
if revoked:
logger.info("event=api_key_revoked entity=%s trigger=-", key_id)
return revoked


def list_api_keys(db: ControllerDB, user_id: str | None = None) -> list[ApiKeyRow]:
Expand Down Expand Up @@ -111,6 +121,11 @@ def revoke_login_keys_for_user(db: ControllerDB, user_id: str, now: Timestamp) -
" WHERE user_id = ? AND name LIKE 'login-%' AND revoked_at_ms IS NULL",
(now.epoch_ms(), user_id),
)
logger.info(
"event=login_keys_revoked entity=%s trigger=- count=%d",
user_id,
len(revoked_ids),
)
return revoked_ids


Expand Down
4 changes: 0 additions & 4 deletions lib/iris/src/iris/cluster/controller/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -936,9 +936,6 @@ class ControllerConfig:
worker_retention: Duration = field(default_factory=lambda: Duration.from_seconds(86400))
"""Delete inactive/unhealthy workers whose last heartbeat exceeds this (default: 24 hours)."""

txn_action_retention: Duration = field(default_factory=lambda: Duration.from_seconds(3 * 86400))
"""Delete txn_actions older than this (default: 3 days)."""

profile_retention: Duration = field(default_factory=lambda: Duration.from_seconds(86400))
"""Delete task_profiles older than this (default: 24 hours)."""

Expand Down Expand Up @@ -1419,7 +1416,6 @@ def _run_prune_loop(self, stop_event: threading.Event) -> None:
self._transitions.prune_old_data(
job_retention=self._config.job_retention,
worker_retention=self._config.worker_retention,
txn_action_retention=self._config.txn_action_retention,
profile_retention=self._config.profile_retention,
stop_event=stop_event,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,9 @@


def migrate(conn: sqlite3.Connection) -> None:
conn.executescript(
"""
DROP TRIGGER IF EXISTS trg_txn_log_retention;
CREATE TRIGGER IF NOT EXISTS trg_txn_log_retention
AFTER INSERT ON txn_log
WHEN (SELECT COUNT(*) FROM txn_log) > 1100
BEGIN
DELETE FROM txn_log WHERE id <= (
SELECT id FROM txn_log ORDER BY id DESC LIMIT 1 OFFSET 1000
);
END;

CREATE INDEX IF NOT EXISTS idx_workers_healthy_active ON workers(healthy, active);
"""
)
# Originally this migration also rewrote the `trg_txn_log_retention`
# trigger; those statements were removed once migration 0037 dropped the
# `txn_log` / `txn_actions` tables entirely. On DBs that already ran the
# old form the trigger survives until 0037 executes; 0037 is idempotent
# (`DROP TRIGGER IF EXISTS`) so no fixup is needed here.
conn.execute("CREATE INDEX IF NOT EXISTS idx_workers_healthy_active ON workers(healthy, active)")
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Copyright The Marin Authors
# SPDX-License-Identifier: Apache-2.0

"""Drop the `txn_log` and `txn_actions` audit tables.

The controller no longer records mutating transitions to SQLite; every
state change now emits a semi-structured `logger.info` line via
``transitions.log_event`` that is captured by the Iris log server and
queried through the normal log-store DuckDB interface.
"""

import sqlite3


def migrate(conn: sqlite3.Connection) -> None:
conn.execute("DROP TRIGGER IF EXISTS trg_txn_log_retention")
conn.execute("DROP INDEX IF EXISTS idx_txn_actions_txn")
conn.execute("DROP TABLE IF EXISTS txn_actions")
conn.execute("DROP TABLE IF EXISTS txn_log")
77 changes: 0 additions & 77 deletions lib/iris/src/iris/cluster/controller/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -1112,62 +1112,6 @@ def generate_full_ddl(tables: Sequence[Table]) -> str:
indexes=("CREATE INDEX IF NOT EXISTS idx_dispatch_worker ON dispatch_queue(worker_id, id)",),
)

TXN_LOG = Table(
"txn_log",
"tl",
columns=(
Column("id", "INTEGER", "PRIMARY KEY AUTOINCREMENT"),
Column("kind", "TEXT", "NOT NULL", python_type=str, decoder=str),
Column("payload_json", "TEXT", "NOT NULL", python_name="payload", python_type=dict, decoder=_decode_json_dict),
Column(
"created_at_ms",
"INTEGER",
"NOT NULL",
python_name="created_at",
python_type=Timestamp,
decoder=decode_timestamp_ms,
),
),
triggers=(
# Migration 0004_worker_indexes rewrote the trigger from 0001
"""CREATE TRIGGER IF NOT EXISTS trg_txn_log_retention
AFTER INSERT ON txn_log
WHEN (SELECT COUNT(*) FROM txn_log) > 1100
BEGIN
DELETE FROM txn_log WHERE id <= (
SELECT id FROM txn_log ORDER BY id DESC LIMIT 1 OFFSET 1000
);
END;""",
),
)

TXN_ACTIONS = Table(
"txn_actions",
"ta2",
columns=(
Column("id", "INTEGER", "PRIMARY KEY AUTOINCREMENT"),
Column(
"txn_id",
"INTEGER",
"NOT NULL REFERENCES txn_log(id) ON DELETE CASCADE",
python_type=int,
decoder=int,
),
Column("action", "TEXT", "NOT NULL", python_type=str, decoder=str),
Column("entity_id", "TEXT", "NOT NULL", python_type=str, decoder=str),
Column("details_json", "TEXT", "NOT NULL", python_name="details", python_type=dict, decoder=_decode_json_dict),
Column(
"created_at_ms",
"INTEGER",
"NOT NULL",
python_name="timestamp",
python_type=Timestamp,
decoder=decode_timestamp_ms,
),
),
indexes=("CREATE INDEX IF NOT EXISTS idx_txn_actions_txn ON txn_actions(txn_id, id)",),
)

# Migration 0003: restructured scaling_groups
SCALING_GROUPS = Table(
"scaling_groups",
Expand Down Expand Up @@ -1370,8 +1314,6 @@ def generate_full_ddl(tables: Sequence[Table]) -> str:
TASK_RESOURCE_HISTORY,
ENDPOINTS,
DISPATCH_QUEUE,
TXN_LOG,
TXN_ACTIONS,
SCALING_GROUPS,
SLICES,
RESERVATION_CLAIMS,
Expand Down Expand Up @@ -1632,16 +1574,6 @@ class EndpointRow:
registered_at: Timestamp


@dataclass(frozen=True, slots=True)
class TransactionActionRow:
"""Transaction action log entry."""

timestamp: Timestamp
action: str
entity_id: str
details: dict


@dataclass(frozen=True, slots=True)
class ApiKeyRow:
"""API key record."""
Expand Down Expand Up @@ -1947,15 +1879,6 @@ def _job_columns(*names: str) -> tuple[tuple[Column, ...], tuple[str, ...]]:
row_cls=EndpointRow,
)

# Transaction action row.
TXN_ACTION_PROJECTION = TXN_ACTIONS.projection(
"created_at_ms",
"action",
"entity_id",
"details_json",
row_cls=TransactionActionRow,
)

# API key row.
API_KEY_PROJECTION = AUTH_API_KEYS.projection(
"key_id",
Expand Down
Loading
Loading