perf(composio/gmail): cut redundant fetches on incremental sync (#1404)#1474
perf(composio/gmail): cut redundant fetches on incremental sync (#1404)#1474obchain wants to merge 1 commit into
Conversation
Replaces the day-level `after:YYYY/MM/DD` cursor with a second-precision `after:<unix>` filter so same-day re-ticks stop re-fetching today's window. Adds a first-message head-unchanged early-stop keyed on a new `SyncState.last_seen_id`, plus an adaptive page cap that drops the 20-page ceiling to 2 when the previous successful sync wrote inside a 5 minute window. Trigger-driven and connection-created syncs now bump the periodic scheduler's last-sync map, so concurrent paths coalesce. Adds per-sync metrics to the completion log + outcome details (requests, messages_total, messages_new, dup_ratio, stop_reason, adaptive_cap). Closes tinyhumansai#1404.
📝 WalkthroughWalkthroughThis PR optimizes Gmail incremental sync to reduce redundant Composio API calls by introducing epoch-second precision cursor filtering, adaptive pagination caps based on recent sync timing, early-stop optimizations on unchanged inbox heads, and detailed per-sync metrics. State model extensions enable tracking the freshest message ID and last successful sync timestamp for more intelligent fetch decisions. ChangesGmail Sync Efficiency Optimization
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (3)
src/openhuman/composio/providers/sync_state.rs (1)
256-265: 💤 Low valueConsider logging the new state fields on save/load for observability.
save()andload()debug logs already include cursor/synced_ids_count/budget but omit the newlast_seen_idandlast_sync_at_msvalues. Including them would make it easier to debug head-unchanged early-stop behavior in the field without changing semantics.📝 Suggested addition
tracing::debug!( toolkit = %self.toolkit, connection_id = %self.connection_id, cursor = ?self.cursor, synced_ids_count = self.synced_ids.len(), budget_used = self.daily_budget.requests_used, + last_seen_id = ?self.last_seen_id, + last_sync_at_ms = ?self.last_sync_at_ms, "[sync_state] saved" );🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/openhuman/composio/providers/sync_state.rs` around lines 256 - 265, Update the debug logs in the SyncState save() and load() methods to also include the new fields last_seen_id and last_sync_at_ms for observability; specifically, in the tracing::debug! call inside save() (and the analogous debug in load()) add last_seen_id = ?self.last_seen_id and last_sync_at_ms = self.last_sync_at_ms so the logs show those values alongside toolkit, connection_id, cursor, synced_ids_count and budget_used without changing any logic.src/openhuman/composio/providers/gmail/sync.rs (1)
86-90: 💤 Low valueNumeric cursors are unconditionally treated as milliseconds.
parse::<i64>()succeeds on any bare integer, so a cursor like"1700000000"(10-digit unix seconds) gets divided by 1000 and silently lands in 1970. Today the only caller writesinternalDate(Gmail-side millis), so this is an internal contract — but a magnitude check (e.g. treat ≥ 10¹² as ms, otherwise seconds) would make the helper safer against future call sites and accidental cursor pollution.🔧 Suggested guard
pub(crate) fn parse_cursor_to_epoch_secs(cursor: &str) -> Option<i64> { let cursor = cursor.trim(); - if let Ok(millis) = cursor.parse::<i64>() { - return Some(millis / 1000); + if let Ok(n) = cursor.parse::<i64>() { + // Heuristic: values ≥ 10^12 (year 2001+ in ms) are millis; + // anything smaller is already in seconds. + return Some(if n.abs() >= 1_000_000_000_000 { n / 1000 } else { n }); }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/openhuman/composio/providers/gmail/sync.rs` around lines 86 - 90, The helper parse_cursor_to_epoch_secs currently treats any integer cursor as milliseconds; change it to detect magnitude so 64-bit integers >= 1_000_000_000_000 are interpreted as milliseconds (divide by 1000) and smaller integers are interpreted as seconds (use as-is), ensuring values like "1700000000" are not wrongly converted; update parse_cursor_to_epoch_secs to apply this magnitude check after parsing and return None on parse failure as before.src/openhuman/composio/providers/gmail/provider.rs (1)
419-426: 💤 Low valueSimplify the
newest_idcapture — the loop guard already pins it to page 0, index 0.Inside the
(page_num == 0, msg_index == 0)branch,msg_idis the same valueextract_item_id(messages.first(), ...)already computed in the head-unchanged block above (line 373–375). You can pull that into a single binding at the top of the page-0 block and reuse it, avoiding the duplicate extraction and the nestedif let Some(ref id) = msg_iddeeper in the per-message loop.♻️ Suggested consolidation
- if page_num == 0 { - let first_id = messages - .first() - .and_then(|m| extract_item_id(m, MESSAGE_ID_PATHS)); - if let (Some(seen), Some(first)) = - (state.last_seen_id.as_deref(), first_id.as_deref()) - { - if seen == first { + if page_num == 0 { + let first_id = messages + .first() + .and_then(|m| extract_item_id(m, MESSAGE_ID_PATHS)); + if let Some(ref id) = first_id { + newest_id = Some(id.clone()); + } + if let (Some(seen), Some(first)) = + (state.last_seen_id.as_deref(), first_id.as_deref()) + { + if seen == first { tracing::debug!( connection_id = %connection_id, first_id = %first, "[composio:gmail] first page head matches last_seen_id — no new mail" ); stop_reason = "head_unchanged"; - newest_id = Some(first.to_string()); break; } } } @@ - let msg_id = extract_item_id(msg, MESSAGE_ID_PATHS); - // Capture the very first id of page 0 as the - // freshest-id-on-server marker for next-sync's - // head-unchanged shortcut, regardless of dedup status. - if page_num == 0 && msg_index == 0 { - if let Some(ref id) = msg_id { - newest_id = Some(id.clone()); - } - } + let msg_id = extract_item_id(msg, MESSAGE_ID_PATHS);🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/openhuman/composio/providers/gmail/provider.rs` around lines 419 - 426, The code duplicates extraction of the first message id for newest_id inside the per-message loop even though the page-0 guard already identifies the first message; instead, in the page_num == 0 branch obtain a single binding (use extract_item_id(messages.first(), ...) or the existing head-unchanged binding) and assign newest_id = Some(id.clone()) once, then remove the nested if-let Some(ref id) = msg_id inside the loop in the function handling pages so the loop reuses that top-level binding; refer to symbols page_num, msg_index, messages.first(), extract_item_id, msg_id and newest_id to locate and consolidate the extraction.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@src/openhuman/composio/providers/gmail/provider.rs`:
- Around line 419-426: The code duplicates extraction of the first message id
for newest_id inside the per-message loop even though the page-0 guard already
identifies the first message; instead, in the page_num == 0 branch obtain a
single binding (use extract_item_id(messages.first(), ...) or the existing
head-unchanged binding) and assign newest_id = Some(id.clone()) once, then
remove the nested if-let Some(ref id) = msg_id inside the loop in the function
handling pages so the loop reuses that top-level binding; refer to symbols
page_num, msg_index, messages.first(), extract_item_id, msg_id and newest_id to
locate and consolidate the extraction.
In `@src/openhuman/composio/providers/gmail/sync.rs`:
- Around line 86-90: The helper parse_cursor_to_epoch_secs currently treats any
integer cursor as milliseconds; change it to detect magnitude so 64-bit integers
>= 1_000_000_000_000 are interpreted as milliseconds (divide by 1000) and
smaller integers are interpreted as seconds (use as-is), ensuring values like
"1700000000" are not wrongly converted; update parse_cursor_to_epoch_secs to
apply this magnitude check after parsing and return None on parse failure as
before.
In `@src/openhuman/composio/providers/sync_state.rs`:
- Around line 256-265: Update the debug logs in the SyncState save() and load()
methods to also include the new fields last_seen_id and last_sync_at_ms for
observability; specifically, in the tracing::debug! call inside save() (and the
analogous debug in load()) add last_seen_id = ?self.last_seen_id and
last_sync_at_ms = self.last_sync_at_ms so the logs show those values alongside
toolkit, connection_id, cursor, synced_ids_count and budget_used without
changing any logic.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 0f15e5de-82c6-48cf-a42a-2645ab95f4c5
📒 Files selected for processing (4)
src/openhuman/composio/providers/gmail/provider.rssrc/openhuman/composio/providers/gmail/sync.rssrc/openhuman/composio/providers/gmail/tests.rssrc/openhuman/composio/providers/sync_state.rs
Summary
after:YYYY/MM/DDGmail cursor with second-precisionafter:<unix>so a same-day re-tick does not re-fetch every message Gmail has filed today.SyncState.last_seen_idand bail out after page 1 when the server's head still matches it. Cuts up toMAX_PAGES_PER_SYNC - 1redundant requests on quiet inboxes.LAST_SYNC_ATmap (provider bumps it on success), so concurrent paths coalesce instead of double-fetching.SyncOutcome.details:requests,messages_total,messages_new,dup_ratio,stop_reason,adaptive_cap.Problem
src/openhuman/composio/providers/gmail/provider.rspaginated up toMAX_PAGES_PER_SYNC = 20per sync and filtered against a day-levelafter:YYYY/MM/DDcursor (gmail/sync.rs::cursor_to_gmail_after_filter). On a high-volume Gmail account that filter still returned the whole current day every tick, so the sync chewed through Composio quota even when nothing had actually changed since the previous run. Dedup (SyncState::synced_ids, content-hash ingest) only kicks in after the network round-trip, so by the time we knew a page was redundant the bandwidth was already gone. Three trigger paths (periodic.rs, connection-created,on_trigger) only coordinated via a coarselast_sync_atmap populated by the periodic scheduler alone, so trigger-driven syncs and the periodic tick could fire back-to-back.Concrete pressure points:
cursor_to_gmail_after_filteremitted a day filter only, so any sync within the same calendar day refetched the whole day window.PAGE_SIZE(25) = 500 requests per pass even when 0 messages would be new.synced_ids.on_triggernever touched the scheduler's last-sync map, so trigger syncs and periodic syncs raced.Solution
Pure rust-core, no FE / schema migration. Files touched:
composio/providers/gmail/sync.rs— newcursor_to_gmail_after_epoch_filterplus a sharedparse_cursor_to_epoch_secshelper. The legacy day filter stays as a parse fallback for non-numeric cursors. 11 new unit tests on the helpers (including a round-trip check between the epoch filter and the recency parser).composio/providers/sync_state.rs—SyncStategains optionallast_seen_idandlast_sync_at_msfields, both#[serde(default)]so existing on-disk state blobs deserialize cleanly. Newset_last_seen_id/set_last_sync_at_mssetters, plus tests for the legacy-blob path, the setters, and an extended serialization round-trip.composio/providers/gmail/provider.rs:after:<epoch>; falls back to the day filter only if the cursor cannot be parsed as a timestamp.max_pages—RECENT_SYNC_MAX_PAGES = 2whennow - last_sync_at_ms < RECENT_SYNC_WINDOW_MS(5 min), fullMAX_PAGES_PER_SYNC = 20otherwise. Initial connection-created syncs always get the full ceiling.last_seen_id. Captures the freshest server-side id (newest_id) on page 0 message 0 regardless of dedup status so the next sync has a stable head-marker.max_pages,budget_exhausted,empty_page,head_unchanged,page_all_synced,no_more_pages) folded into the structured completion log and theSyncOutcome.detailsJSON. Addsrequests,messages_total,messages_new,dup_ratio,adaptive_cap.crate::openhuman::composio::periodic::record_sync_successfor all trigger paths so periodic ticks respect a trigger/connection-created sync that just landed.composio/providers/gmail/tests.rs— locks in that bothcursor_to_gmail_after_epoch_filterandcursor_to_gmail_after_filteraccept the same internalDate input (so the epoch path is genuinely the preferred path and the day filter is a fallback, not a divergence) and sanity-bounds the epoch filter inside[2020, 2100].End-to-end ingest correctness is preserved:
synced_ids, content-hash ingest, and the per-page deferred-mark pattern are unchanged. The optimisations only trim how much Composio quota we burn before reaching the existing dedup layer.Submission Checklist
sync()(adaptive cap + head-unchanged) reuseparse_cursor_to_epoch_secsandlast_seen_id, which are unit-tested.composio.gmail.sync); no new user-visible feature row.## Relateddocs/RELEASE-MANUAL-SMOKE.md) — N/A: provider-internal optimisation, no surface change.Closes #NNNin the## RelatedsectionImpact
SyncStatekeeps backward-compatible defaults). The new metrics log replaces the existing one, so log volume is unchanged.after:<unix_seconds>(documented). If Composio'sGMAIL_FETCH_EMAILSever rejects the bare-int form,cursor_to_gmail_after_epoch_filterreturns None for that cursor and we fall back to the day filter, preserving behaviour parity with main.Related
composio.gmail.sync— no catalog rows added or removed.RECENT_SYNC_MAX_PAGES/RECENT_SYNC_WINDOW_MSonce we have production metrics from the new completion log.AI Authored PR Metadata (required for Codex/Linear PRs)
Linear Issue
Commit & Branch
Summary by CodeRabbit
Performance
Improvements