Skip to content

Commit 2b072af

Browse files
feat: ✨ Add replication lookback window setting (#91)
* set `is_sorted` to true set this so that progress can be resumed in incremental syncs. * add boto3 as explicit requirement * set Intercom API version to 2.5 and add sorting to conversations payload * add timestamp normalization methods to IntercomStream and update conversation post-processing * add replication lookback window setting to configuration options * set `is_sorted` to false for ConversationsStream and remove unused request payload preparation * update Intercom API version to 2.15 and enhance request payload preparation with lookback window logic * remove sorting from request payload in prepare_request_payload method * remove logging of prepared request payload in prepare_request_payload method * remove timestamp handling methods and simplify request payload preparation * remove unused datetime import from client.py * Apply suggested changes * Add Unix timestamp signpost defaulting to sync start time for incremental Intercom streams in get_replication_key_signpost method * Add logging to replication key signpost logic * Apply suggestions from code review Co-authored-by: Tobias Cadée <tobias.cadee@ticketswap.com> * Simplify post_process method in ConversationPartsStream to return modified row directly * Add missing paranthesis --------- Co-authored-by: Tobias Cadee <tobias.cadee@ticketswap.com>
1 parent 6e016b8 commit 2b072af

File tree

6 files changed

+63
-17
lines changed

6 files changed

+63
-17
lines changed

README.md

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,16 @@ Built with the [Meltano Singer SDK](https://sdk.meltano.com).
4242

4343
## Settings
4444

45-
| Setting | Required | Default | Description |
46-
|:--------------------|:--------:|:-------:|:------------|
47-
| access_token | True | None | The token to authenticate against the API service |
48-
| start_date | False | None | The earliest record date to sync |
49-
| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). |
50-
| stream_map_config | False | None | User-defined config values to be used within map expressions. |
51-
| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. |
52-
| flattening_max_depth| False | None | The max depth to flatten schemas. |
53-
| batch_config | False | None | |
45+
| Setting | Required | Default | Description |
46+
|:-------------------------------------|:--------:|:-------:|:-------------------------------------------------------------------------------------------------------------------------------------|
47+
| access_token | True | None | The token to authenticate against the API service |
48+
| start_date | False | None | The earliest record date to sync |
49+
| replication_lookback_window_seconds | False | 0 | Overlap window in seconds for incremental replication to replay recent records and reduce misses near bookmark boundaries |
50+
| stream_maps | False | None | Config object for stream maps capability. |
51+
| stream_map_config | False | None | User-defined config values to be used within map expressions. |
52+
| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. |
53+
| flattening_max_depth | False | None | The max depth to flatten schemas. |
54+
| batch_config | False | None | |
5455

5556
A full list of supported settings and capabilities for this
5657
tap is available by running:

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ dependencies = [
2525
"singer-sdk>=0.42.1",
2626
"fs-s3fs",
2727
"requests",
28+
"boto3>=1.40.18",
2829
]
2930

3031
[project.optional-dependencies]

tap_intercom/client.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from __future__ import annotations
44

5+
import time
56
import typing as t
67

78
from singer_sdk.authenticators import BearerTokenAuthenticator
@@ -46,7 +47,7 @@ def http_headers(self) -> dict:
4647
if user_agent:
4748
result["User-Agent"] = user_agent
4849
result["Content-Type"] = "application/json"
49-
result["Intercom-Version"] = "2.14"
50+
result["Intercom-Version"] = "2.15"
5051
return result
5152

5253
def get_url_params(self, context: dict | None, next_page_token: object) -> dict: # noqa: ARG002
@@ -83,7 +84,7 @@ def prepare_request_payload(
8384
next page of data.
8485
"""
8586
if self.http_method == "POST":
86-
body = {"sort": {"field": self.replication_key, "order": "ascending"}}
87+
body = {}
8788
start_date = self.get_starting_replication_key_value(context)
8889
if start_date or self.config.get("filters", {}).get(self.name):
8990
body["query"] = {
@@ -94,6 +95,8 @@ def prepare_request_payload(
9495
],
9596
}
9697
if start_date:
98+
if start_date != self.config.get("start_date"):
99+
start_date -= int(self.config["replication_lookback_window_seconds"])
97100
body["query"]["value"].append(
98101
{
99102
"field": self.replication_key,
@@ -118,6 +121,28 @@ def compare_start_date(self, value: str, start_date_value: str) -> str:
118121
"""
119122
return max(value, start_date_value)
120123

124+
def get_replication_key_signpost(
125+
self,
126+
context: dict | None, # noqa: ARG002
127+
) -> int | None:
128+
"""Overrides the signpost to be the Unix integer at sync start for incremental streams.
129+
130+
This enables the SDK to finalize state as the lower of max(replication_key_value)
131+
seen during the run and the run start time.
132+
133+
Args:
134+
context: Stream partition or context dictionary.
135+
136+
Returns:
137+
Unix timestamp signpost for integer replication keys, else None.
138+
"""
139+
if not self.replication_key:
140+
return None
141+
signpost = int(time.time())
142+
self.logger.info("Setting replication key signpost to current Unix timestamp at sync start.")
143+
self.logger.info("Signpost value: %s", signpost)
144+
return signpost
145+
121146
def post_process(
122147
self,
123148
row: dict,

tap_intercom/streams.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ class ConversationsStream(IntercomStream):
3131
records_jsonpath = "$.conversations[*]"
3232
http_method = "POST"
3333
schema = conversations_schema
34+
is_sorted = False
3435

3536
def get_child_context(self, record: dict, context: dict | None) -> dict: # noqa: ARG002
3637
"""Return a context dictionary for child streams."""

tap_intercom/tap.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,16 @@ class TapIntercom(Tap):
2626
th.IntegerType,
2727
description="The earliest record date to sync, in unix timestamp format",
2828
),
29+
th.Property(
30+
"replication_lookback_window_seconds",
31+
th.IntegerType,
32+
default=0,
33+
description=(
34+
"Optional overlap window in seconds applied to incremental bookmarks. "
35+
"Use this to replay recent records and reduce misses caused by API pagination drift "
36+
"or near-boundary updates."
37+
),
38+
),
2939
th.Property(
3040
"filters",
3141
th.ObjectType(

0 commit comments

Comments
 (0)