Skip to content

fix(source-intercom): use state for parent stream requests and use global state #57524

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 50 additions & 1 deletion airbyte-integrations/connectors/source-intercom/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
from dataclasses import dataclass
from functools import wraps
from time import sleep
from typing import Mapping, Optional, Union
from typing import Any, Mapping, Optional, Union

import requests

from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration
from airbyte_cdk.sources.declarative.requesters.error_handlers import DefaultErrorHandler
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ErrorResolution

Expand Down Expand Up @@ -152,3 +153,51 @@ class ErrorHandlerWithRateLimiter(DefaultErrorHandler):
def interpret_response(self, response_or_exception: Optional[Union[requests.Response, Exception]]) -> ErrorResolution:
# Check for response.headers to define the backoff time before the next api call
return super().interpret_response(response_or_exception)


class SubstreamStateMigration(StateMigration):
"""
We require a custom state migration to move from the custom substream state that was generated via the legacy
cursor custom components. State was not written back to the platform in a way that is compatible with concurrent cursors.

The old state roughly had the following shape:
{
"updated_at": 1744153060,
"prior_state": {
"updated_at": 1744066660
}
"conversations": {
"updated_at": 1744153060
}
}

However, this was incompatible when we removed the custom cursors with the concurrent substream partition cursor
components that were configured with use global_substream_cursor and incremental_dependency. They rely on passing the value
of parent_state when getting parent records for the conversations/companies parent stream. The migration results in state:
{
"updated_at": 1744153060,
"prior_state": {
"updated_at": 1744066660
# There are a lot of nested elements here, but are not used or relevant to syncs
}
"conversations": {
"updated_at": 1744153060
}
"parent_state": {
"conversations": {
"updated_at": 1744153060
}
}
}
"""

def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
return "parent_state" not in stream_state and ("conversations" in stream_state or "companies" in stream_state)

def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
migrated_parent_state = {}
if stream_state.get("conversations"):
migrated_parent_state["conversations"] = stream_state.get("conversations")
if stream_state.get("companies"):
migrated_parent_state["companies"] = stream_state.get("companies")
return {**stream_state, "parent_state": migrated_parent_state}
9 changes: 9 additions & 0 deletions airbyte-integrations/connectors/source-intercom/manifest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,7 @@ definitions:
$ref: "#/definitions/streams/conversations"
parent_key: id
partition_field: id
incremental_dependency: true
incremental_sync:
type: DatetimeBasedCursor
cursor_field: updated_at
Expand All @@ -661,6 +662,7 @@ definitions:
cursor_datetime_formats:
- "%s"
is_client_side_incremental: true
global_substream_cursor: true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirming that this is done cause there are usually so many conversations that we assume we will hit the per partition limit and therefore we're fine with having a global state.

Benefits:

  • Easier to understand state

Drawback:

  • Duplication that would not have occurred for people with very small syncs where we don't this the per partition limit

transformations:
- type: AddFields
fields:
Expand All @@ -671,6 +673,9 @@ definitions:
type: InlineSchemaLoader
schema:
$ref: "#/schemas/conversation_parts"
state_migrations:
- type: CustomStateMigration
class_name: source_declarative_manifest.components.SubstreamStateMigration
company_segments:
type: DeclarativeStream
name: company_segments
Expand Down Expand Up @@ -724,10 +729,14 @@ definitions:
cursor_datetime_formats:
- "%s"
is_client_side_incremental: true
global_substream_cursor: true
schema_loader:
type: InlineSchemaLoader
schema:
$ref: "#/schemas/company_segments"
state_migrations:
- type: CustomStateMigration
class_name: source_declarative_manifest.components.SubstreamStateMigration
tickets:
type: DeclarativeStream
name: tickets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: d8313939-3782-41b0-be29-b3ca20d8dd3a
dockerImageTag: 0.13.0-rc.1
dockerImageTag: 0.13.0-rc.2
dockerRepository: airbyte/source-intercom
documentationUrl: https://docs.airbyte.com/integrations/sources/intercom
githubIssueLabel: source-intercom
Expand Down
Loading
Loading