Skip to content

refactor(monitoring): replace SQL monitoring with DuckDB Parquet pipeline#2600

Open
tlgimenes wants to merge 13 commits intomainfrom
refact/monitoring
Open

refactor(monitoring): replace SQL monitoring with DuckDB Parquet pipeline#2600
tlgimenes wants to merge 13 commits intomainfrom
refact/monitoring

Conversation

@tlgimenes
Copy link
Contributor

@tlgimenes tlgimenes commented Mar 6, 2026

What is this contribution about?

Replaces the dual monitoring system (OTel for ops + monitoring_logs DB table for dashboards) with a unified pipeline: OTel instrumentation → Parquet files → DuckDB queries.

Key changes:

  • DuckDB Foundation: DuckDBProvider wrapper around @duckdb/node-api, time-partitioned Parquet path utilities, schema constants, SQL injection safety validators
  • ParquetSpanExporter: Custom OTel SpanExporter that buffers monitoring spans and flushes to ZSTD-compressed Parquet files via DuckDB COPY
  • Span Enrichment: emitMonitoringSpan() shared utility replaces direct DB writes in both MonitoringTransport and proxy monitoring middleware
  • DuckDBMonitoringStorage: Implements MonitoringStorage interface via read_parquet() with query, getStats, aggregate, countMatched, property filters, and date range glob narrowing
  • Integration: ParquetSpanExporter wired into OTel SDK (local mode), DebugSampler fixed to always record monitoring spans, context factory switched to DuckDB storage
  • Cleanup: Migration 036 drops monitoring_logs table, SqlMonitoringStorage deleted (-1146 lines), Parquet retention cleanup utility added

Architecture:

Tool Call → MonitoringTransport/proxy-monitoring
         → emitMonitoringSpan() (OTel span with mesh.monitoring.* attributes)
         → ParquetSpanExporter (buffers → DuckDB COPY → ZSTD Parquet)
         → DuckDBMonitoringStorage (read_parquet() for dashboard queries)

Screenshots/Demonstration

N/A — backend infrastructure change, no UI changes.

How to Test

  1. Run the monitoring test suite:

    bun test apps/mesh/src/monitoring/ apps/mesh/src/api/routes/proxy-monitoring.test.ts

    Expected: 61 tests pass across 7 files

  2. Run type checking:

    bun run check

    Expected: All workspaces pass

  3. Run full test suite:

    bun test

    Expected: 967 pass, 32 skip (1 pre-existing e2e failure unrelated to this PR)

  4. Verify Parquet writes (manual):

    • Start dev server with bun run dev
    • Make tool calls through an MCP connection
    • Check ./data/monitoring/YYYY/MM/DD/HH/ for .parquet files
    • Verify monitoring dashboard still shows tool call logs

Migration Notes

  • Migration 036 drops the monitoring_logs table and its 6 indexes
  • Data loss: Existing monitoring data in monitoring_logs will be lost. Export to Parquet before migrating if historical data is needed
  • New dependency: @duckdb/node-api added to apps/mesh
  • New env vars (optional):
    • MONITORING_PARQUET_PATH — override Parquet file location (default: ./data/monitoring)
    • When OTEL_EXPORTER_OTLP_ENDPOINT is set, ParquetSpanExporter is NOT added (cloud mode uses Collector)

Review Checklist

  • PR title is clear and descriptive
  • Changes are tested and working (61 tests)
  • Documentation is updated (if needed)
  • No breaking changes (MonitoringStorage interface preserved)

🤖 Generated with Claude Code


Summary by cubic

Replaces SQL-based monitoring with a unified OTel → Parquet → DuckDB pipeline for faster queries and lower DB load. Spans are enriched at source and written to time-partitioned Parquet files, which power the monitoring dashboard.

  • Refactors

    • Added ParquetSpanExporter to buffer monitoring spans and write ZSTD Parquet on an hourly partitioned path.
    • Introduced emitMonitoringSpan used by MonitoringTransport and proxy middleware; DebugSampler always records these spans.
    • Implemented DuckDBMonitoringStorage (read_parquet with glob narrowing, property filters, aggregate/countMatched with limit and ordering).
    • Wired exporter in local mode and switched context to DuckDB storage; removed SqlMonitoringStorage and dropped DB writes.
    • Added Parquet retention cleanup utility.
  • Migration

    • Drops monitoring_logs table and its indexes (migration 036).
    • Existing monitoring data will be lost; export to Parquet before migrating if needed.
    • New dependency: @duckdb/node-api. Optional env: MONITORING_PARQUET_PATH (default ./data/monitoring).
    • When OTLP endpoint is set, Parquet exporter is not enabled (cloud uses the OTel Collector).

Written for commit 576baaf. Summary will update on new commits.

tlgimenes and others added 13 commits March 6, 2026 08:18
Proposes unified architecture to eliminate infrastructure divergence between local and cloud deployments:
- Single SQL dialect (PostgreSQL everywhere via PGlite locally)
- Unified monitoring pipeline (OTel → Parquet → DuckDB)
- Zero external dependencies for local development
- Maintains cloud-native scaling with Kubernetes, NATS, and OTel Collector

Addresses monitoring data bloat, SQLite/PostgreSQL divergence, and local setup complexity.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
* fix(billing): optimize monitoring queries to reduce memory usage

Billing page was causing excessive memory consumption by running
unbounded GROUP BY queries on monitoring_logs and loading all tabs
simultaneously. This adds LIMIT/ORDER BY support to aggregate queries,
skips unnecessary countMatched calls, and makes inactive tabs lazy.

Made-with: Cursor

* fix(monitoring): filter NULL group keys in SQL before LIMIT

Move NULL exclusion from JS post-filter to SQL WHERE clause so that
LIMIT returns exactly N valid groups instead of potentially fewer
when NULL keys consume LIMIT slots.

Made-with: Cursor
…ce exhaustion (#2588)

COLLECTION_CONNECTIONS_GET was changed from a pure DB read to spawning
background MCP client connections (with 10-15s internal timeouts) for
every connection with tools: null. When the connections management UI
loads many such connections simultaneously, lingering background
connections exhaust file descriptors and memory, causing health check
failures and "no healthy upstream" errors.

Also fix race condition in ProjectConnectionsStorage.add() where
executeTakeFirstOrThrow() could crash if the conflicting row was
deleted between INSERT and SELECT.

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
After reverting the 2.138.x releases back to 2.137.0, the CI kept
re-bumping from 2.137.x which conflicts with already-published npm
versions. Jump to 2.139.0 to establish a clean version sequence.

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
…hema constants, SQL safety

- Install @duckdb/node-api (NAPI-based, no native compilation needed)
- Add parquet-paths.ts: time-partitioned directory utilities
- Add parquet-schema.ts: span attribute keys and staging table SQL
- Add duckdb-provider.ts: async wrapper around @duckdb/node-api
- Add sql-safety.ts: JSONPath/identifier validation against SQL injection
- Consolidate AggregationParams/Result types in storage/types.ts
- Extend MonitoringStorage interface with aggregate() and countMatched()

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Custom OTel SpanExporter that buffers monitoring-enriched spans and
flushes them as time-partitioned Parquet files via DuckDB COPY.
Includes flush mutex, configurable threshold/interval, and bulk INSERT.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Extract shared emitMonitoringSpan utility for span attribute setting
- Update MonitoringTransport to emit spans instead of DB writes
- Update proxy-monitoring middleware to use emitMonitoringSpan
- Update tests to verify span attributes instead of DB log calls
- PII redaction now applied before setting span attributes

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Implements MonitoringStorage interface backed by DuckDB read_parquet().
Supports query, getStats, aggregate, and countMatched with property
filters, date range glob narrowing, and SQL injection protection.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Add ParquetSpanExporter to OTel SDK for local mode (no OTLP endpoint)
- Fix DebugSampler to always record monitoring spans
- Replace SqlMonitoringStorage with DuckDBMonitoringStorage in context factory
- Update monitoring tool imports to use consolidated types

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…leanup

- Add migration 036 to drop monitoring_logs table and indexes
- Remove SqlMonitoringStorage and its tests
- Remove MonitoringLogTable from Database interface
- Add cleanupOldParquetFiles utility for time-based retention

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@github-actions
Copy link
Contributor

github-actions bot commented Mar 6, 2026

🧪 Benchmark

Should we run the Virtual MCP strategy benchmark for this PR?

React with 👍 to run the benchmark.

Reaction Action
👍 Run quick benchmark (10 & 128 tools)

Benchmark will run on the next push after you react.

@github-actions
Copy link
Contributor

github-actions bot commented Mar 6, 2026

Release Options

Should a new version be published when this PR is merged?

React with an emoji to vote on the release type:

Reaction Type Next Version
👍 Prerelease 2.139.4-alpha.1
🎉 Patch 2.139.4
❤️ Minor 2.140.0
🚀 Major 3.0.0

Current version: 2.139.3

Deployment

  • Deploy to production (triggers ArgoCD sync after Docker image is published)

Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

9 issues found across 33 files

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="apps/mesh/src/api/routes/proxy-monitoring.test.ts">

<violation number="1" location="apps/mesh/src/api/routes/proxy-monitoring.test.ts:116">
P3: Avoid fixed-time sleeps in async tests; this can cause flaky failures and unnecessary test latency.</violation>
</file>

<file name="apps/mesh/src/monitoring/parquet-span-exporter.test.ts">

<violation number="1" location="apps/mesh/src/monitoring/parquet-span-exporter.test.ts:142">
P2: The test relies on a fixed 200ms delay, which makes the async flush assertion timing-sensitive and flaky.</violation>
</file>

<file name="apps/mesh/src/monitoring/parquet-span-exporter.ts">

<violation number="1" location="apps/mesh/src/monitoring/parquet-span-exporter.ts:127">
P1: `forceFlush()` can return before an in-flight flush finishes, allowing shutdown to close DuckDB too early.</violation>

<violation number="2" location="apps/mesh/src/monitoring/parquet-span-exporter.ts:134">
P1: Flush errors are swallowed, causing export callbacks to report success even when Parquet writes fail.</violation>
</file>

<file name="apps/mesh/src/monitoring/duckdb-monitoring-storage.ts">

<violation number="1" location="apps/mesh/src/monitoring/duckdb-monitoring-storage.ts:51">
P2: The error catch in `safeQuery` is overly broad: `msg.includes("read_parquet")` will swallow any DuckDB error mentioning `read_parquet`, not just the "no files found" case. This could silently mask real query errors (type mismatches, permission issues, schema problems). Consider removing the `read_parquet` fallback and only catching the specific "No files found" message.</violation>

<violation number="2" location="apps/mesh/src/monitoring/duckdb-monitoring-storage.ts:522">
P1: The `queryTimeseriesGrouped` method silently discards the `groupByExpr` and `limit` parameters, falling back to ungrouped timeseries. Callers requesting grouped time-series aggregations will receive incorrect (ungrouped) results with no error or warning. At minimum, this should be documented with a TODO, or throw an unsupported-operation error so callers know the feature isn't implemented rather than returning misleading data.</violation>

<violation number="3" location="apps/mesh/src/monitoring/duckdb-monitoring-storage.ts:544">
P2: `"5m"` interval maps to `date_trunc('minute', ...)`, producing 1-minute buckets instead of 5-minute buckets. Consider using DuckDB's `time_bucket(INTERVAL '5 minutes', "timestamp")` for correct 5-minute bucketing.</violation>
</file>

<file name="apps/mesh/migrations/036-drop-monitoring-logs.ts">

<violation number="1" location="apps/mesh/migrations/036-drop-monitoring-logs.ts:8">
P2: Down migration is incomplete: three indexes dropped in `up()` are not recreated in `down()`, so rollback does not restore the original schema.</violation>
</file>

<file name="apps/mesh/src/monitoring/retention.ts">

<violation number="1" location="apps/mesh/src/monitoring/retention.ts:20">
P1: Retention comparison mixes local-time cutoff with UTC partition dates, which can delete data too early or too late depending on server timezone.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

* from corrupting the staging table.
*/
private async flush(): Promise<void> {
if (this.buffer.length === 0) return;
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: forceFlush() can return before an in-flight flush finishes, allowing shutdown to close DuckDB too early.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At apps/mesh/src/monitoring/parquet-span-exporter.ts, line 127:

<comment>`forceFlush()` can return before an in-flight flush finishes, allowing shutdown to close DuckDB too early.</comment>

<file context>
@@ -0,0 +1,256 @@
+   * from corrupting the staging table.
+   */
+  private async flush(): Promise<void> {
+    if (this.buffer.length === 0) return;
+
+    // Take the current buffer and reset atomically
</file context>
Fix with Cubic

this.buffer = [];

// Chain onto the flush lock to serialize concurrent flushes
this.flushLock = this.flushLock
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: Flush errors are swallowed, causing export callbacks to report success even when Parquet writes fail.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At apps/mesh/src/monitoring/parquet-span-exporter.ts, line 134:

<comment>Flush errors are swallowed, causing export callbacks to report success even when Parquet writes fail.</comment>

<file context>
@@ -0,0 +1,256 @@
+    this.buffer = [];
+
+    // Chain onto the flush lock to serialize concurrent flushes
+    this.flushLock = this.flushLock
+      .then(() => this.doFlush(rows))
+      .catch((err) => {
</file context>
Fix with Cubic

};
}

private async queryTimeseriesGrouped(
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: The queryTimeseriesGrouped method silently discards the groupByExpr and limit parameters, falling back to ungrouped timeseries. Callers requesting grouped time-series aggregations will receive incorrect (ungrouped) results with no error or warning. At minimum, this should be documented with a TODO, or throw an unsupported-operation error so callers know the feature isn't implemented rather than returning misleading data.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At apps/mesh/src/monitoring/duckdb-monitoring-storage.ts, line 522:

<comment>The `queryTimeseriesGrouped` method silently discards the `groupByExpr` and `limit` parameters, falling back to ungrouped timeseries. Callers requesting grouped time-series aggregations will receive incorrect (ungrouped) results with no error or warning. At minimum, this should be documented with a TODO, or throw an unsupported-operation error so callers know the feature isn't implemented rather than returning misleading data.</comment>

<file context>
@@ -0,0 +1,591 @@
+    };
+  }
+
+  private async queryTimeseriesGrouped(
+    aggExpr: string,
+    _groupByExpr: string,
</file context>
Fix with Cubic

retentionDays: number = 30,
): Promise<number> {
const cutoff = new Date();
cutoff.setDate(cutoff.getDate() - retentionDays);
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: Retention comparison mixes local-time cutoff with UTC partition dates, which can delete data too early or too late depending on server timezone.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At apps/mesh/src/monitoring/retention.ts, line 20:

<comment>Retention comparison mixes local-time cutoff with UTC partition dates, which can delete data too early or too late depending on server timezone.</comment>

<file context>
@@ -0,0 +1,101 @@
+  retentionDays: number = 30,
+): Promise<number> {
+  const cutoff = new Date();
+  cutoff.setDate(cutoff.getDate() - retentionDays);
+  cutoff.setHours(0, 0, 0, 0);
+
</file context>
Fix with Cubic

expect(result).toBe(ExportResultCode.SUCCESS);

// Wait a tick for async flush
await new Promise((r) => setTimeout(r, 200));
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: The test relies on a fixed 200ms delay, which makes the async flush assertion timing-sensitive and flaky.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At apps/mesh/src/monitoring/parquet-span-exporter.test.ts, line 142:

<comment>The test relies on a fixed 200ms delay, which makes the async flush assertion timing-sensitive and flaky.</comment>

<file context>
@@ -0,0 +1,235 @@
+    expect(result).toBe(ExportResultCode.SUCCESS);
+
+    // Wait a tick for async flush
+    await new Promise((r) => setTimeout(r, 200));
+
+    const files = await findParquetFiles(basePath);
</file context>
Fix with Cubic

return await this.duckdb.all<T>(sql, ...params);
} catch (err: unknown) {
const msg = err instanceof Error ? err.message : String(err);
if (msg.includes("No files found") || msg.includes("read_parquet")) {
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: The error catch in safeQuery is overly broad: msg.includes("read_parquet") will swallow any DuckDB error mentioning read_parquet, not just the "no files found" case. This could silently mask real query errors (type mismatches, permission issues, schema problems). Consider removing the read_parquet fallback and only catching the specific "No files found" message.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At apps/mesh/src/monitoring/duckdb-monitoring-storage.ts, line 51:

<comment>The error catch in `safeQuery` is overly broad: `msg.includes("read_parquet")` will swallow any DuckDB error mentioning `read_parquet`, not just the "no files found" case. This could silently mask real query errors (type mismatches, permission issues, schema problems). Consider removing the `read_parquet` fallback and only catching the specific "No files found" message.</comment>

<file context>
@@ -0,0 +1,591 @@
+      return await this.duckdb.all<T>(sql, ...params);
+    } catch (err: unknown) {
+      const msg = err instanceof Error ? err.message : String(err);
+      if (msg.includes("No files found") || msg.includes("read_parquet")) {
+        return [];
+      }
</file context>
Fix with Cubic

private intervalToBucket(interval: string): string {
const map: Record<string, string> = {
"1m": "minute",
"5m": "minute",
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: "5m" interval maps to date_trunc('minute', ...), producing 1-minute buckets instead of 5-minute buckets. Consider using DuckDB's time_bucket(INTERVAL '5 minutes', "timestamp") for correct 5-minute bucketing.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At apps/mesh/src/monitoring/duckdb-monitoring-storage.ts, line 544:

<comment>`"5m"` interval maps to `date_trunc('minute', ...)`, producing 1-minute buckets instead of 5-minute buckets. Consider using DuckDB's `time_bucket(INTERVAL '5 minutes', "timestamp")` for correct 5-minute bucketing.</comment>

<file context>
@@ -0,0 +1,591 @@
+  private intervalToBucket(interval: string): string {
+    const map: Record<string, string> = {
+      "1m": "minute",
+      "5m": "minute",
+      "1h": "hour",
+      "1d": "day",
</file context>
Fix with Cubic

await db.schema
.dropIndex("monitoring_logs_org_timestamp")
.ifExists()
.execute();
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Down migration is incomplete: three indexes dropped in up() are not recreated in down(), so rollback does not restore the original schema.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At apps/mesh/migrations/036-drop-monitoring-logs.ts, line 8:

<comment>Down migration is incomplete: three indexes dropped in `up()` are not recreated in `down()`, so rollback does not restore the original schema.</comment>

<file context>
@@ -0,0 +1,71 @@
+  await db.schema
+    .dropIndex("monitoring_logs_org_timestamp")
+    .ifExists()
+    .execute();
+  await db.schema
+    .dropIndex("monitoring_logs_connection_timestamp")
</file context>
Fix with Cubic

expect(event.output).toEqual({ error: "nope" });
// Verify new fields are logged
expect(event.userAgent).toBe("test-client/1.0");
await new Promise((r) => setTimeout(r, 50));
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P3: Avoid fixed-time sleeps in async tests; this can cause flaky failures and unnecessary test latency.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At apps/mesh/src/api/routes/proxy-monitoring.test.ts, line 116:

<comment>Avoid fixed-time sleeps in async tests; this can cause flaky failures and unnecessary test latency.</comment>

<file context>
@@ -100,29 +107,23 @@ describe("proxy monitoring middleware", () => {
-    expect(event.output).toEqual({ error: "nope" });
-    // Verify new fields are logged
-    expect(event.userAgent).toBe("test-client/1.0");
+    await new Promise((r) => setTimeout(r, 50));
+    expect(startSpan).toHaveBeenCalledWith(MONITORING_SPAN_NAME);
+
</file context>
Fix with Cubic

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants