Skip to content

Commit 6c14321

Browse files
authored
Fix DuckDB LogStore 60GB memory usage (#3843)
- Split `log_store.py` into a `log_store/` package with two implementations: - **`mem_store.py`**: pure in-memory store (no DuckDB/Parquet/threads) — used automatically in test/CI - **`duckdb_store.py`**: production DuckDB+Parquet store, now single-connection with cursor concurrency instead of 8 independent databases - `__init__.py` detects `PYTEST_CURRENT_TEST` or `CI` and exports the appropriate implementation as `LogStore` **Root cause**: 8 uncapped `duckdb.connect()` instances reserved huge virtual memory, causing `[Errno 12] Cannot allocate memory` on `fork()` in CI smoke tests. Fixes #3842
1 parent 5b7e34a commit 6c14321

File tree

6 files changed

+582
-135
lines changed

6 files changed

+582
-135
lines changed

lib/iris/scripts/benchmark_log_store.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@
2121

2222
import click
2323

24-
from iris.cluster.log_store import LogStore, task_log_key
24+
from iris.cluster.log_store import task_log_key
25+
from iris.cluster.log_store.duckdb_store import DuckDBLogStore as LogStore
2526
from iris.cluster.types import JobName, TaskAttempt
2627
from iris.rpc import logging_pb2
2728

@@ -284,7 +285,6 @@ def print_summary(results: list[tuple[str, float, float]]) -> None:
284285
@click.option("--tasks", default=100, help="Number of tasks per job")
285286
@click.option("--lines", default=1000, help="Log lines per task")
286287
@click.option("--iterations", "-n", default=20, help="Query benchmark iterations")
287-
@click.option("--flush-threshold", default=500_000, help="Rows per Parquet segment")
288288
@click.option("--only", "only_group", type=click.Choice(["ingest", "query"]), help="Run only this phase")
289289
@click.option(
290290
"--log-dir", type=click.Path(path_type=Path), default=None, help="Persist logs to this dir (default: tmpdir)"
@@ -294,21 +294,20 @@ def main(
294294
tasks: int,
295295
lines: int,
296296
iterations: int,
297-
flush_threshold: int,
298297
only_group: str | None,
299298
log_dir: Path | None,
300299
) -> None:
301300
"""Benchmark LogStore write and query performance."""
302301
total = jobs * tasks * lines
303302
print(f"LogStore benchmark: {jobs} jobs x {tasks} tasks x {lines} lines = {total:,} entries")
304-
print(f" flush_threshold={flush_threshold:,} query_iterations={iterations}")
303+
print(f" query_iterations={iterations}")
305304

306305
tmp = None
307306
if log_dir is None:
308307
tmp = TemporaryDirectory(prefix="bench_log_store_")
309308
log_dir = Path(tmp.name) / "logs"
310309

311-
store = LogStore(log_dir=log_dir, flush_row_threshold=flush_threshold)
310+
store = LogStore(log_dir=log_dir)
312311

313312
try:
314313
if only_group is None or only_group == "ingest":
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
# Copyright The Marin Authors
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
"""Log store package.
5+
6+
Exports ``LogStore`` as the environment-appropriate implementation:
7+
- Tests / CI (``PYTEST_CURRENT_TEST`` or ``CI`` set): in-memory ``MemStore``
8+
- Production: DuckDB-backed ``DuckDBLogStore``
9+
10+
All consumers should import from this package:
11+
``from iris.cluster.log_store import LogStore, LogCursor, ...``
12+
"""
13+
14+
from __future__ import annotations
15+
16+
import logging
17+
import os
18+
19+
from iris.cluster.log_store._types import (
20+
PROCESS_LOG_KEY,
21+
LogReadResult,
22+
_EST_BYTES_PER_ROW,
23+
task_log_key,
24+
)
25+
from iris.logging import str_to_log_level
26+
from iris.rpc import logging_pb2
27+
28+
29+
def _is_test_environment() -> bool:
30+
return "PYTEST_CURRENT_TEST" in os.environ or "CI" in os.environ
31+
32+
33+
if _is_test_environment():
34+
from iris.cluster.log_store.mem_store import MemStore as LogStore
35+
else:
36+
from iris.cluster.log_store.duckdb_store import DuckDBLogStore as LogStore
37+
38+
39+
class LogCursor:
40+
"""Stateful incremental reader for a single LogStore key."""
41+
42+
def __init__(self, store: LogStore, key: str) -> None:
43+
self._store = store
44+
self._key = key
45+
self._cursor: int = 0
46+
47+
def read(self, max_entries: int = 5000) -> list[logging_pb2.LogEntry]:
48+
result = self._store.get_logs(self._key, cursor=self._cursor, max_lines=max_entries)
49+
self._cursor = result.cursor
50+
return result.entries
51+
52+
53+
class LogStoreHandler(logging.Handler):
54+
"""Logging handler that writes formatted records directly into a LogStore."""
55+
56+
def __init__(self, log_store: LogStore, key: str = PROCESS_LOG_KEY):
57+
super().__init__()
58+
self._log_store = log_store
59+
self._key = key
60+
self._closed = False
61+
62+
def emit(self, record: logging.LogRecord) -> None:
63+
if self._closed:
64+
return
65+
try:
66+
entry = logging_pb2.LogEntry(
67+
source="process",
68+
data=self.format(record),
69+
level=str_to_log_level(record.levelname),
70+
)
71+
entry.timestamp.epoch_ms = int(record.created * 1000)
72+
self._log_store.append(self._key, [entry])
73+
except Exception:
74+
self.handleError(record)
75+
76+
def close(self) -> None:
77+
self._closed = True
78+
super().close()
79+
80+
81+
__all__ = [
82+
"PROCESS_LOG_KEY",
83+
"_EST_BYTES_PER_ROW",
84+
"LogCursor",
85+
"LogReadResult",
86+
"LogStore",
87+
"LogStoreHandler",
88+
"task_log_key",
89+
]
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Copyright The Marin Authors
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
"""Shared types and constants for the log store implementations."""
5+
6+
from __future__ import annotations
7+
8+
from dataclasses import dataclass, field
9+
10+
from iris.cluster.types import TaskAttempt
11+
from iris.rpc import logging_pb2
12+
13+
PROCESS_LOG_KEY = "/system/process"
14+
15+
_EST_BYTES_PER_ROW = 256
16+
17+
_LIKE_ESCAPE_TABLE = str.maketrans({"%": "\\%", "_": "\\_", "\\": "\\\\"})
18+
19+
20+
def task_log_key(task_attempt: TaskAttempt) -> str:
21+
"""Build a hierarchical key for task attempt logs."""
22+
task_attempt.require_attempt()
23+
return task_attempt.to_wire()
24+
25+
26+
@dataclass
27+
class LogReadResult:
28+
entries: list[logging_pb2.LogEntry] = field(default_factory=list)
29+
cursor: int = 0 # max seq seen

0 commit comments

Comments
 (0)