Skip to content

Conversation

maxi297
Copy link
Contributor

@maxi297 maxi297 commented Oct 1, 2025

What

https://docs.google.com/document/d/14Rk-eziI78h4bMYyvZLmuWY2Fd4SwAglRElR3XR2NiU/edit?tab=t.0

How

Add a ResponseAction to trigger an exception that needs to be handled by those doing pagination (in this case, it is the SimpleRetriever). When this is done, call the cursor to try to regenerate the stream slice.

TODO:

  • Allow to use cursor or just restart pagination even though there is a cursor. This could be useful if we can't return the data in a chronological order
  • ConcurrentPerPartition which should be straightforward as it just need to get the underlying cursor, call reduce_slice_range on it and build the StreamSlice accordingly
  • Add the ResponseAction type to the model
  • Find a stop condition
  • Clean the model_to_component_factory stuff as this is horrible right now with the number of cursors being passed
  • Tests

To test, I'm using the following manifest for stream tickets from source-freshdesk without the custom components. Note that I've changed the pagination start_from to 300.

I'm having this output:

{"type": "DEBUG", "message": "Making outbound API request", "data": {"url": "https://newaccount1603334233301.freshdesk.com/api/v2/tickets?include=description%2Crequester%2Cstats&order_by=updated_at&order_type=asc&per_page=100&updated_since=2015-10-04T19%3A40%3A22Z", "request_body": "None", "headers": "{'User-Agent': 'python-requests/2.32.3', 'Accept-Encoding': 'gzip, deflate', 'Accept': '*/*', 'Connection': 'keep-alive', 'Authorization': 'Basic <redacted>'}"}}
<...>
{"type": "DEBUG", "message": "Making outbound API request", "data": {"url": "https://newaccount1603334233301.freshdesk.com/api/v2/tickets?include=description%2Crequester%2Cstats&order_by=updated_at&order_type=asc&page=301&per_page=100&updated_since=2015-10-04T19%3A40%3A22Z", "request_body": "None", "headers": "{'User-Agent': 'python-requests/2.32.3', 'Accept-Encoding': 'gzip, deflate', 'Accept': '*/*', 'Connection': 'keep-alive', 'Authorization': 'Basic <redacted>', 'Cookie': '_x_w=3; __cf_bm=oYvEBxovzVx94rgbr2BII4d82MXtQpWuN2XoH3vckqc-1759347623-1.0.1.1-KvW2zYZjMjllgQZSY2v3nlt.NvSGQO6uM6ZDPDBL2bs8XBTSo.KL7sqEIEjNu7.ApC0O2I_VaAoNck8wA4onQvhz0pWynoeD9vuV37mpb3g'}"}}
<...>
{"type":"LOG","log":{"level":"INFO","message":"Hitting PaginationReset event. StreamSlice used will go from {'start_time': '2015-10-04T19:40:22Z', 'end_time': '2025-10-01T19:40:22Z'} to {'start_time': '2020-08-16T09:59:21Z', 'end_time': '2025-10-01T19:40:22Z'}"}}
<...>
{"type": "DEBUG", "message": "Making outbound API request", "data": {"url": "https://newaccount1603334233301.freshdesk.com/api/v2/tickets?include=description%2Crequester%2Cstats&order_by=updated_at&order_type=asc&per_page=100&updated_since=2020-08-16T09%3A59%3A21Z", "request_body": "None", "headers": "{'User-Agent': 'python-requests/2.32.3', 'Accept-Encoding': 'gzip, deflate', 'Accept': '*/*', 'Connection': 'keep-alive', 'Authorization': 'Basic <redacted>', 'Cookie': '_x_w=3; __cf_bm=oYvEBxovzVx94rgbr2BII4d82MXtQpWuN2XoH3vckqc-1759347623-1.0.1.1-KvW2zYZjMjllgQZSY2v3nlt.NvSGQO6uM6ZDPDBL2bs8XBTSo.KL7sqEIEjNu7.ApC0O2I_VaAoNck8wA4onQvhz0pWynoeD9vuV37mpb3g'}"}}

Manifest:

version: 6.36.2

type: DeclarativeSource

check:
  type: CheckStream
  stream_names:
    - tickets

definitions:
  streams:
    tickets:
      type: DeclarativeStream
      name: tickets
      primary_key:
        - id
      retriever:
        type: SimpleRetriever
        requester:
          $ref: "#/definitions/base_requester"
          type: HttpRequester
          path: tickets
          http_method: GET
          request_parameters:
            include: description,requester,stats
            order_by: updated_at
            order_type: asc
          error_handler:
            type: CompositeErrorHandler
            error_handlers:
              - type: DefaultErrorHandler
                response_filters:
                  - type: HttpResponseFilter
                    action: FAIL
                    http_codes:
                      - 401
                    error_message: >-
                      The endpoint to access stream tickets returned 401: Unauthorized. This is most likely due to wrong credentials.
              - type: DefaultErrorHandler
                backoff_strategies:
                  - type: WaitTimeFromHeader
                    header: Retry-After
        record_selector:
          type: RecordSelector
          extractor:
            type: DpathExtractor
            field_path: []
        paginator:
          type: DefaultPaginator
          page_size_option:
            type: RequestOption
            field_name: per_page
            inject_into: request_parameter
          page_token_option:
            type: RequestOption
            field_name: page
            inject_into: request_parameter
          pagination_strategy:
            type: PageIncrement
            page_size: 100
            start_from_page: 300
      incremental_sync:
        type: DatetimeBasedCursor
        cursor_field: updated_at
        start_datetime:
          type: MinMaxDatetime
          datetime: >-
            {{ config.get('start_date') or day_delta(-3650,
            '%Y-%m-%dT%H:%M:%SZ') }}
        datetime_format: "%Y-%m-%dT%H:%M:%SZ"
        start_time_option:
          type: RequestOption
          field_name: updated_since
          inject_into: request_parameter
        cursor_datetime_formats:
          - "%Y-%m-%dT%H:%M:%SZ"
      schema_loader:
        type: InlineSchemaLoader
        schema:
          $ref: "#/schemas/tickets"
  base_requester:
    type: HttpRequester
    url_base: https://{{ config['domain'] }}/api/v2/
    authenticator:
      type: BasicHttpAuthenticator
      username: "{{ config.get('api_key')}}"

streams:
  - $ref: "#/definitions/streams/tickets"

spec:
  type: Spec
  connection_specification:
    type: object
    $schema: http://json-schema.org/draft-07/schema#
    required:
      - api_key
      - domain
    properties:
      api_key:
        type: string
        description: >-
          Freshdesk API Key. See the <a
          href="https://docs.airbyte.com/integrations/sources/freshdesk">docs</a>
          for more information on how to obtain this key.
        order: 0
        title: API Key
        airbyte_secret: true
      domain:
        type: string
        description: Freshdesk domain
        order: 1
        title: Domain
        pattern: ^[a-zA-Z0-9._-]*\.freshdesk\.com$
        examples:
          - myaccount.freshdesk.com
      requests_per_minute:
        type: integer
        description: >-
          The number of requests per minute that this source allowed to use.
          There is a rate limit of 50 requests per minute per app per account.
        order: 2
        title: Requests per minute
      rate_limit_plan:
        type: object
        description: Rate Limit Plan for API Budget
        title: Rate Limit Plan
        oneOf:
          - type: object
            title: Free Plan
            properties:
              plan_type:
                type: string
                title: Plan
                const: free
              general_rate_limit:
                type: integer
                title: General Rate
                description: General Maximum Rate in Limit/minute for other endpoints in Free Plan
                const: 50
              tickets_rate_limit:
                type: integer
                title: Tickets Rate
                description: Maximum Rate in Limit/minute for tickets list endpoint in Free Plan
                const: 50
              contacts_rate_limit:
                type: integer
                title: Contacts Rate
                description: Maximum Rate in Limit/minute for contacts list endpoint in Free Plan
                const: 50
          - type: object
            title: Growth Plan
            properties:
              plan_type:
                type: string
                title: Plan
                const: growth
              general_rate_limit:
                type: integer
                title: General Rate
                description: General Maximum Rate in Limit/minute for other endpoints in Growth Plan
                const: 200
              tickets_rate_limit:
                type: integer
                title: Tickets Rate
                description: Maximum Rate in Limit/minute for tickets list endpoint in Growth Plan
                const: 20
              contacts_rate_limit:
                type: integer
                title: Contacts Rate
                description: Maximum Rate in Limit/minute for contacts list endpoint in Growth Plan
                const: 20
          - type: object
            title: Pro Plan
            properties:
              plan_type:
                type: string
                title: Plan
                const: pro
              general_rate_limit:
                type: integer
                title: General Rate
                description: General Maximum Rate in Limit/minute for other endpoints in Pro Plan
                const: 400
              tickets_rate_limit:
                type: integer
                title: Tickets Rate
                description: Maximum Rate in Limit/minute for tickets list endpoint in Pro Plan
                const: 100
              contacts_rate_limit:
                type: integer
                title: Contacts Rate
                description: Maximum Rate in Limit/minute for contacts list endpoint in Pro Plan
                const: 100
          - type: object
            title: Enterprise Plan
            properties:
              plan_type:
                type: string
                title: Plan
                const: enterprise
              general_rate_limit:
                type: integer
                title: General Rate
                description: General Maximum Rate in Limit/minute for other endpoints in Enterprise Plan
                const: 700
              tickets_rate_limit:
                type: integer
                title: Tickets Rate
                description: Maximum Rate in Limit/minute for tickets list endpoint in Enterprise Plan
                const: 200
              contacts_rate_limit:
                type: integer
                title: Contacts Rate
                description: Maximum Rate in Limit/minute for contacts list endpoint in Enterprise Plan
                const: 200
          - type: object
            title: Custom Plan
            properties:
              plan_type:
                type: string
                title: Plan
                const: custom
              general_rate_limit:
                type: integer
                title: General Rate
                description: General Maximum Rate in Limit/minute for other endpoints in Custom Plan
              tickets_rate_limit:
                type: integer
                title: Tickets Rate
                description: Maximum Rate in Limit/minute for tickets list endpoint in Custom Plan
              contacts_rate_limit:
                type: integer
                title: Contacts Rate
                description: Maximum Rate in Limit/minute for contacts list endpoint in Custom Plan
      start_date:
        type: string
        description: >-
          UTC date and time. Any data created after this date will be
          replicated. If this parameter is not set, all data will be replicated.
        order: 3
        title: Start Date
        format: date-time
        pattern: ^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$
        examples:
          - "2020-12-01T00:00:00Z"
      lookback_window_in_days:
        type: integer
        description: Number of days for lookback window for the stream Satisfaction Ratings
        order: 4
        title: Lookback Window
        default: 14
    additionalProperties: true

schemas:
  tickets:
    type: object
    $schema: http://json-schema.org/draft-07/schema#
    properties:
      type:
        type:
          - string
          - "null"
      description:
        type:
          - string
          - "null"
      cc_emails:
        type:
          - array
          - "null"
      fwd_emails:
        type:
          - array
          - "null"
      reply_cc_emails:
        type:
          - array
          - "null"
      ticket_cc_emails:
        type:
          - array
          - "null"
      fr_escalated:
        type:
          - "null"
          - boolean
      spam:
        type:
          - "null"
          - boolean
      email_config_id:
        type:
          - integer
          - "null"
      group_id:
        type:
          - integer
          - "null"
      priority:
        type:
          - integer
          - "null"
      requester_id:
        type:
          - integer
          - "null"
      responder_id:
        type:
          - integer
          - "null"
      source:
        type:
          - integer
          - "null"
      company_id:
        type:
          - integer
          - "null"
      status:
        type:
          - integer
          - "null"
      subject:
        type:
          - string
          - "null"
      association_type:
        type:
          - integer
          - "null"
      to_emails:
        type:
          - array
          - "null"
        items:
          type:
            - string
            - "null"
      product_id:
        type:
          - integer
          - "null"
      id:
        type:
          - integer
          - "null"
      due_by:
        type:
          - string
          - "null"
      fr_due_by:
        type:
          - string
          - "null"
      is_escalated:
        type:
          - boolean
          - "null"
      custom_fields:
        type:
          - object
          - "null"
      created_at:
        type:
          - string
          - "null"
      updated_at:
        type:
          - string
          - "null"
      associated_tickets_count:
        type:
          - integer
          - "null"
      tags:
        type:
          - array
          - "null"
      nr_due_by:
        type:
          - string
          - "null"
      nr_escalated:
        type:
          - boolean
          - "null"
      description_text:
        type:
          - string
          - "null"
      requester:
        type:
          - object
          - "null"
      stats:
        type:
          - object
          - "null"
    additionalProperties: true

api_budget:
  type: HTTPAPIBudget
  policies:
    - type: FixedWindowCallRatePolicy
      period: "PT1M"
      # due to lack for support for interpolated string for the call_limit field, this has been
      # hardcoded to 50 which is the default rate for free plan
      # in the near term use "{{ config.get('requests_per_minute')}}"
      # long term is to utilize the `rate_limit_plan` field in config and specifying for each endpoint
      call_limit: 50 
      matchers:
        - method: GET
          url_base: "https://{{ config['domain'] }}/api/v2/"
  ratelimit_reset_header: Retry-After
  ratelimit_remaining_header: X-RateLimit-Remaining
  status_codes_for_ratelimit_hit: [429]

Copy link

github-actions bot commented Oct 1, 2025

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@maxi297/poc_pagination_reset#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch maxi297/poc_pagination_reset

Helpful Resources

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment

📝 Edit this welcome message.

Copy link

github-actions bot commented Oct 1, 2025

PyTest Results (Fast)

1 tests   - 3 777   0 ✅  - 3 766   3s ⏱️ - 6m 31s
1 suites ±    0   0 💤  -    12 
1 files   ±    0   0 ❌ ±    0   1 🔥 +1 

For more details on these errors, see this check.

Results for commit 7dcbc04. ± Comparison against base commit 7ab013d.

This pull request removes 3778 and adds 1 tests. Note that renamed tests count towards both.
unit_tests.cli.airbyte_cdk.test_secret_masks ‑ test_print_ci_secrets_masks_for_config[config0-expected_calls0]
unit_tests.cli.airbyte_cdk.test_secret_masks ‑ test_print_ci_secrets_masks_for_config[config1-expected_calls1]
unit_tests.cli.airbyte_cdk.test_secret_masks ‑ test_print_ci_secrets_masks_for_config[config2-expected_calls2]
unit_tests.cli.airbyte_cdk.test_secret_masks ‑ test_print_ci_secrets_masks_for_config[config3-expected_calls3]
unit_tests.cli.airbyte_cdk.test_secret_masks ‑ test_print_ci_secrets_masks_for_config[config4-expected_calls4]
unit_tests.cli.airbyte_cdk.test_secret_masks ‑ test_print_ci_secrets_masks_for_config[config5-expected_calls5]
unit_tests.cli.airbyte_cdk.test_secret_masks ‑ test_print_ci_secrets_masks_for_config[config6-expected_calls6]
unit_tests.cli.airbyte_cdk.test_secret_masks ‑ test_print_ci_secrets_masks_for_config[config7-expected_calls7]
unit_tests.cli.airbyte_cdk.test_secret_masks ‑ test_print_ci_secrets_masks_for_config[config8-expected_calls8]
unit_tests.cli.airbyte_cdk.test_secrets.TestFetch ‑ test_fetch_with_all_failed_secrets
…
unit_tests.legacy.sources.declarative.test_manifest_declarative_source

Copy link

github-actions bot commented Oct 1, 2025

PyTest Results (Full)

2 tests   - 3 779   0 ✅  - 3 769   14s ⏱️ - 10m 50s
1 suites ±    0   0 💤  -    12 
1 files   ±    0   0 ❌ ±    0   2 🔥 +2 

For more details on these errors, see this check.

Results for commit 7dcbc04. ± Comparison against base commit 7ab013d.

This pull request removes 3781 and adds 2 tests. Note that renamed tests count towards both.
unit_tests.cli.airbyte_cdk.test_secret_masks ‑ test_print_ci_secrets_masks_for_config[config0-expected_calls0]
unit_tests.cli.airbyte_cdk.test_secret_masks ‑ test_print_ci_secrets_masks_for_config[config1-expected_calls1]
unit_tests.cli.airbyte_cdk.test_secret_masks ‑ test_print_ci_secrets_masks_for_config[config2-expected_calls2]
unit_tests.cli.airbyte_cdk.test_secret_masks ‑ test_print_ci_secrets_masks_for_config[config3-expected_calls3]
unit_tests.cli.airbyte_cdk.test_secret_masks ‑ test_print_ci_secrets_masks_for_config[config4-expected_calls4]
unit_tests.cli.airbyte_cdk.test_secret_masks ‑ test_print_ci_secrets_masks_for_config[config5-expected_calls5]
unit_tests.cli.airbyte_cdk.test_secret_masks ‑ test_print_ci_secrets_masks_for_config[config6-expected_calls6]
unit_tests.cli.airbyte_cdk.test_secret_masks ‑ test_print_ci_secrets_masks_for_config[config7-expected_calls7]
unit_tests.cli.airbyte_cdk.test_secret_masks ‑ test_print_ci_secrets_masks_for_config[config8-expected_calls8]
unit_tests.cli.airbyte_cdk.test_secrets.TestFetch ‑ test_fetch_with_all_failed_secrets
…
unit_tests.legacy.sources.declarative.test_manifest_declarative_source
unit_tests.sources.declarative.parsers.test_model_to_component_factory

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant