|
| 1 | +--- |
| 2 | +name: classify-stream-types |
| 3 | +description: Classify API endpoints into stream types (webhook, incremental, backfill, snapshot) and generate resource definitions for estuary-cdk connectors. Use after scaffolding a connector or when adding new streams. |
| 4 | +argument-hint: "[connector-name]" |
| 5 | +allowed-tools: Bash Read Write Edit Glob Grep WebFetch WebSearch |
| 6 | +--- |
| 7 | + |
| 8 | +Classify each API endpoint of `source-$ARGUMENTS` into the appropriate stream type and generate resource definitions. Read the connector's `api.py` and `resources.py` first, then research the provider's API docs. |
| 9 | + |
| 10 | +## Decision flowchart |
| 11 | + |
| 12 | +Evaluate each endpoint in order: |
| 13 | + |
| 14 | +1. **Does the provider push events via HTTP?** → **Webhook** stream (via `WebhookCaptureSpec`). See `/create-webhook-connector` for setup. |
| 15 | +2. **Does the endpoint support date-range or cursor-based filtering with a cursor that persists over time?** (e.g., `updated_at`, monotonic ID, event timestamp, sequence number — not just `created_at`) → **Incremental + Backfill**. Prefer endpoints that also support sorting, but filtering without document sorting is fine. |
| 16 | +3. **No filtering, but supports sorting in reverse chronological order?** → **Incremental only**. Use `fetch_changes` to walk from the latest document backward to the cursor (initially `start_date`). Only yield a cursor checkpoint once the walk reaches the cursor — if interrupted mid-walk, the next invocation restarts from the top. After the initial catch-up, subsequent invocations only walk back to the last checkpoint. |
| 17 | +4. **Is the dataset small with no change tracking?** → **Snapshot**. |
| 18 | +5. **Large dataset, no filtering or sorting?** → Look for a different endpoint or ask the user. |
| 19 | + |
| 20 | +**Always ask the user for confirmation before committing to an incremental-only approach (no backfill).** If there's any usable cursor, prefer incremental + backfill. Only fall back to incremental-only if the user explicitly confirms backfill isn't needed. |
| 21 | + |
| 22 | +## Code patterns |
| 23 | + |
| 24 | +### Incremental + Backfill |
| 25 | + |
| 26 | +Reference: `source-sentry/source_sentry/resources.py` — `open_issue_binding` function. |
| 27 | + |
| 28 | +```python |
| 29 | +def open_binding_fn( |
| 30 | + binding: CaptureBinding[ResourceConfig], |
| 31 | + binding_index: int, |
| 32 | + state: ResourceState, |
| 33 | + task: Task, |
| 34 | + all_bindings, |
| 35 | +): |
| 36 | + common.open_binding( |
| 37 | + binding, |
| 38 | + binding_index, |
| 39 | + state, |
| 40 | + task, |
| 41 | + fetch_changes=functools.partial(fetch_incremental, http, ...), |
| 42 | + fetch_page=functools.partial(backfill_historical, http, ...), |
| 43 | + ) |
| 44 | + |
| 45 | +# Initial state must set both inc and backfill: |
| 46 | +# Convention: next_page=None signals "beginning of backfill". |
| 47 | +# The fetch_page function handles None by falling back to start_date. |
| 48 | +cutoff = datetime.now(tz=UTC) |
| 49 | +initial_state = ResourceState( |
| 50 | + inc=ResourceState.Incremental(cursor=cutoff), |
| 51 | + backfill=ResourceState.Backfill(cutoff=cutoff, next_page=None), |
| 52 | +) |
| 53 | +``` |
| 54 | + |
| 55 | +### Incremental only |
| 56 | + |
| 57 | +Reference: `source-front/source_front/resources.py` — `incremental_resources_with_cursor_fields` function. |
| 58 | + |
| 59 | +```python |
| 60 | +common.open_binding( |
| 61 | + binding, binding_index, state, task, |
| 62 | + fetch_changes=functools.partial(fetch_changes_fn, http, ...), |
| 63 | +) |
| 64 | + |
| 65 | +initial_state = ResourceState( |
| 66 | + inc=ResourceState.Incremental(cursor=config.start_date), |
| 67 | +) |
| 68 | +``` |
| 69 | + |
| 70 | +### Snapshot |
| 71 | + |
| 72 | +Reference: `source-sentry/source_sentry/resources.py` — `open_full_refresh_bindings` function, and `source-front/source_front/resources.py` — `full_refresh_resources` function. |
| 73 | + |
| 74 | +```python |
| 75 | +common.open_binding( |
| 76 | + binding, binding_index, state, task, |
| 77 | + fetch_snapshot=functools.partial(list_all, http, ...), |
| 78 | + tombstone=MyDocument(_meta=MyDocument.Meta(op="d")), |
| 79 | +) |
| 80 | + |
| 81 | +initial_state = ResourceState() # No cursor needed |
| 82 | +``` |
| 83 | + |
| 84 | +### Webhook |
| 85 | + |
| 86 | +Defer to `/create-webhook-connector` skill for webhook stream setup. |
| 87 | + |
| 88 | +## Backfill design guidance |
| 89 | + |
| 90 | +- `fetch_page(log, page_cursor, cutoff)` walks historical data from oldest to cutoff |
| 91 | +- Page cursor tracks progress across the CDK's 24-hour periodic restart |
| 92 | +- Each invocation fetches one page or time window, then yields a `PageCursor` for the next |
| 93 | +- **Checkpoint every N wall-clock minutes** (e.g., 5 min): track elapsed time within `fetch_page` and yield a `PageCursor` checkpoint when the time budget is reached, even mid-sequence. This ensures progress is saved if the connector restarts. |
| 94 | +- Return without yielding a `PageCursor` to signal completion |
| 95 | +- The `cutoff` LogCursor marks where incremental replication takes over — suppress documents at or after the cutoff |
| 96 | + |
| 97 | +## Workflow |
| 98 | + |
| 99 | +1. Read the connector's `api.py` and `resources.py` |
| 100 | +2. Research the provider's API docs (via WebFetch/WebSearch) to understand each endpoint's filtering, pagination, and cursor capabilities |
| 101 | +3. For each endpoint, apply the decision flowchart above |
| 102 | +4. Present the classification to the user for confirmation before generating code |
| 103 | +5. Generate or modify resource definitions in `resources.py` and fetch functions in `api.py` |
0 commit comments