Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3165,8 +3165,12 @@ impl Database {
// whole day (the crash-loop backlog made EVERY project probe-positive,
// so the probe alone still ballooned tens of GB per sweep).
let chunks: Vec<(String, String)> = if schema.dedup_keys.iter().any(|k| k == "timestamp") {
// 10-minute bins (not hours): one HOUR of the largest project is
// >2.1GB of string data — past Arrow's i32 offset limit ("Offset
// overflow error: 2222394106" in prod) and tens of GB materialized.
// 10 minutes matches the flush-bucket granularity.
let probe = format!(
"SELECT CAST(date_trunc('hour', \"timestamp\") AS VARCHAR) FROM \
"SELECT CAST(date_bin(INTERVAL '10 minutes', \"timestamp\", TIMESTAMP '1970-01-01T00:00:00') AS VARCHAR) FROM \
(SELECT \"timestamp\", count(*) AS c FROM {scan_name} WHERE {filter} GROUP BY {keys_csv}) AS g \
WHERE c > 1 GROUP BY 1 ORDER BY 1"
);
Expand Down Expand Up @@ -3196,7 +3200,7 @@ impl Database {
let start = chrono::NaiveDateTime::parse_from_str(h19, "%Y-%m-%dT%H:%M:%S")
.or_else(|_| chrono::NaiveDateTime::parse_from_str(h19, "%Y-%m-%d %H:%M:%S"))
.ok()?;
let end = start + chrono::Duration::hours(1);
let end = start + chrono::Duration::minutes(10);
if end > sealed_before {
debug!("dedup: skipping unsealed chunk starting {start} (cleared on a later sweep)");
return None;
Expand Down
Loading