Skip to content

Commit 85603c7

Browse files
Jaideep-QlikRahul GogawaleRushiT0122rdeshmukh15
authored
Library upgrades, Extraction failures handling. (#11)
* Libraries updated. * Timeout and testcases added for timeout issue. * Added unit test & fixed connection error. * Added circleci changes. * As per PR comments required changes are done. * Changes added for test case. * Update README.md Co-authored-by: Rushikesh Todkar <98420315+RushiT0122@users.noreply.github.com> * Test case added in parameterized format. * Requested changes done. * Test cases updated. * Added changes as per the comments. * Fixing build.. * Updated test client for build failuer. * Adjusted path in circle ci. * Fixing Build... * Code refeactor. * build fix... * Test updated. * Test removed. * pip command fix --------- Co-authored-by: Rahul Gogawale <rahul.gogawale@globallogic.com> Co-authored-by: Rushikesh Todkar <98420315+RushiT0122@users.noreply.github.com> Co-authored-by: Rutuja Deshmukh <107538720+rdeshmukh15@users.noreply.github.com> Co-authored-by: Rutuja Deshmukh <rutuja.deshmukh@qlik.com>
1 parent 028a5dd commit 85603c7

File tree

9 files changed

+193
-506
lines changed

9 files changed

+193
-506
lines changed

.circleci/config.yml

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,43 @@ jobs:
1111
uv venv --python 3.9 /usr/local/share/virtualenvs/tap-mailshake
1212
source /usr/local/share/virtualenvs/tap-mailshake/bin/activate
1313
uv pip install .
14+
- run:
15+
name: 'JSON Validator'
16+
command: |
17+
source /usr/local/share/virtualenvs/tap-tester/bin/activate
18+
stitch-validate-json tap_mailshake/schemas/*.json
1419
- run:
1520
name: 'pylint'
1621
command: |
1722
source /usr/local/share/virtualenvs/tap-mailshake/bin/activate
1823
uv pip install pylint
19-
pylint tap_mailshake --disable missing-docstring,logging-format-interpolation,no-member,broad-except,too-many-branches,consider-using-f-string,raise-missing-from,broad-exception-raised,too-many-arguments,too-many-positional-arguments,too-many-locals,too-many-nested-blocks,too-many-statements
20-
24+
pylint tap_mailshake -d C,R,W
25+
- add_ssh_keys
26+
- run:
27+
name: 'Unit Tests'
28+
command: |
29+
source /usr/local/share/virtualenvs/tap-mailshake/bin/activate
30+
uv pip install nose coverage parameterized
31+
nosetests --with-coverage --cover-erase --cover-package=tap_mailshake --cover-html-dir=htmlcov tap_mailshake/tests/unittests
32+
coverage html
33+
- store_test_results:
34+
path: test_output/report.xml
35+
- store_artifacts:
36+
path: htmlcov
2137
workflows:
2238
version: 2
2339
commit:
2440
jobs:
2541
- build:
2642
context: circleci-user
43+
build_daily:
44+
triggers:
45+
- schedule:
46+
cron: "0 0 * * *"
47+
filters:
48+
branches:
49+
only:
50+
- master
51+
jobs:
52+
- build:
53+
context: circleci-user

README.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -133,12 +133,13 @@ This tap uses [simple authentication](https://api-docs.mailshake.com/#Simple) as
133133
"api_key": "YourApiKeyFromMailshake",
134134
"domain": "api.mailshake.com",
135135
"start_date": "2019-01-01T00:00:00Z",
136-
"user_agent": "tap-mailshake <api_user_email@your_company.com>"
136+
"user_agent": "tap-mailshake <api_user_email@your_company.com>",
137+
"request_timeout": 300
137138
}
138139
```
139-
140+
**request_timeout (optional)**: Timeout in seconds for API requests. Default request_timeout is 300.
140141
Optionally, also create a `state.json` file. `currently_syncing` is an optional attribute used for identifying the last object to be synced in case the job is interrupted mid-stream. The next run would begin where the last job left off.
141-
142+
Reference link for API documentation: [Mailshake API Docs](https://api-docs.mailshake.com/)
142143
```json
143144
{
144145
"currently_syncing": "campaigns",

setup.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@
99
classifiers=['Programming Language :: Python :: 3 :: Only'],
1010
py_modules=['tap_mailshake'],
1111
install_requires=[
12-
'backoff==1.10.0',
13-
'requests==2.32.4',
14-
'singer-python==5.13.2'
12+
'backoff==2.2.1',
13+
'requests==2.32.5',
14+
'singer-python==6.1.1'
1515
],
1616
entry_points='''
1717
[console_scripts]

tap_mailshake/__init__.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,13 @@ def do_discover():
2828
@singer.utils.handle_top_exception(LOGGER)
2929
def main():
3030
parsed_args = singer.utils.parse_args(REQUIRED_CONFIG_KEYS)
31+
request_timeout = parsed_args.config.get('request_timeout')
3132

32-
with MailshakeClient(parsed_args.config['api_key'],
33-
parsed_args.config['user_agent']) as client:
33+
with MailshakeClient(
34+
parsed_args.config['api_key'],
35+
parsed_args.config['user_agent'],
36+
request_timeout=request_timeout
37+
) as client:
3438

3539
state = {}
3640
if parsed_args.state:

tap_mailshake/client.py

Lines changed: 50 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,13 @@
33
import singer
44
from singer import metrics, utils
55
from requests.auth import HTTPBasicAuth
6+
from requests.exceptions import ConnectionError, ChunkedEncodingError
7+
import http.client
8+
from requests.exceptions import Timeout
69

710
LOGGER = singer.get_logger()
8-
API_VERSION = '2017-04-01'
11+
API_VERSION = "2017-04-01"
12+
REQUEST_TIMEOUT = 300
913

1014

1115
class Server5xxError(Exception):
@@ -103,7 +107,7 @@ def get_exception_for_error_code(error_code):
103107

104108

105109
def raise_for_error(response):
106-
LOGGER.error('ERROR {}: {}, REASON: {}'.format(response.status_code,
110+
LOGGER.error("ERROR {}: {}, REASON: {}".format(response.status_code,
107111
response.text, response.reason))
108112
try:
109113
response.raise_for_status()
@@ -115,10 +119,10 @@ def raise_for_error(response):
115119
# us a 2xx response nor a response content.
116120
return
117121
response = response.json()
118-
if ('error' in response) or ('errorCode' in response):
119-
message = '%s: %s' % (response.get('error', str(error)),
120-
response.get('message', 'Unknown Error'))
121-
error_code = response.get('code')
122+
if ("error" in response) or ("errorCode" in response):
123+
message = "%s: %s" % (response.get("error", str(error)),
124+
response.get("message", "Unknown Error"))
125+
error_code = response.get("code")
122126
ex = get_exception_for_error_code(error_code)
123127
raise ex(message)
124128
raise MailshakeError(error)
@@ -134,12 +138,15 @@ class MailshakeClient:
134138

135139
def __init__(self,
136140
api_key,
137-
user_agent=None):
141+
user_agent=None,
142+
request_timeout=REQUEST_TIMEOUT):
138143
self.__api_key = api_key
139144
self.base_url = "https://api.mailshake.com/{}".format(
140145
API_VERSION)
141146
self.__user_agent = user_agent
142147
self.__session = requests.Session()
148+
self.__verified = False
149+
self.request_timeout = request_timeout or REQUEST_TIMEOUT
143150

144151
def __enter__(self):
145152
self.check_access()
@@ -155,25 +162,31 @@ def __exit__(self, exception_type, exception_value, traceback):
155162
@utils.ratelimit(1, 1.2)
156163
def check_access(self):
157164
if self.__api_key is None:
158-
raise Exception('Error: Missing api_key in tap_config.json.')
165+
raise Exception("Error: Missing api_key in tap_config.json.")
159166
headers = {}
160-
endpoint = 'me'
161-
url = '{}/{}'.format(self.base_url, endpoint)
167+
endpoint = "me"
168+
url = "{}/{}".format(self.base_url, endpoint)
162169
if self.__user_agent:
163-
headers['User-Agent'] = self.__user_agent
164-
headers['Accept'] = 'application/json'
170+
headers["User-Agent"] = self.__user_agent
171+
headers["Accept"] = "application/json"
165172
response = self.__session.get(
166173
url=url,
167174
headers=headers,
168-
auth=HTTPBasicAuth(self.__api_key, ''))
175+
auth=HTTPBasicAuth(self.__api_key, ""))
169176
if response.status_code != 200:
170-
LOGGER.error('Error status_code = {}'.format(response.status_code))
177+
LOGGER.error("Error status_code = {}".format(response.status_code))
171178
return False
172179
return True
173180

174-
@backoff.on_exception(backoff.expo,
175-
(Server5xxError, ConnectionError, MailshakeAPILimitReachedError),
176-
factor=3)
181+
@backoff.on_exception(
182+
backoff.expo,
183+
(Server5xxError, ConnectionError, ChunkedEncodingError, http.client.IncompleteRead,
184+
MailshakeAPILimitReachedError, Timeout),
185+
factor=3,
186+
max_tries=5,
187+
on_backoff=lambda details: LOGGER.warning(
188+
f"Retrying {details['target'].__name__} after: {details['exception']}")
189+
)
177190
@utils.ratelimit(1, 3)
178191
def request(self, method, path=None, url=None, json=None, **kwargs):
179192
"""Perform HTTP request"""
@@ -185,50 +198,55 @@ def request(self, method, path=None, url=None, json=None, **kwargs):
185198
# self.__verified = self.check_access()
186199

187200
if not url and path:
188-
url = '{}/{}'.format(self.base_url, path)
201+
url = "{}/{}".format(self.base_url, path)
189202

190-
if 'endpoint' in kwargs:
191-
endpoint = kwargs['endpoint']
192-
del kwargs['endpoint']
203+
if "endpoint" in kwargs:
204+
endpoint = kwargs["endpoint"]
205+
del kwargs["endpoint"]
193206
else:
194207
endpoint = None
195208

196-
if 'headers' not in kwargs:
197-
kwargs['headers'] = {}
209+
if "headers" not in kwargs:
210+
kwargs["headers"] = {}
198211

199-
kwargs['headers']['Accept'] = 'application/json'
212+
kwargs["headers"]["Accept"] = "application/json"
200213

201214
if self.__user_agent:
202-
kwargs['headers']['User-Agent'] = self.__user_agent
215+
kwargs["headers"]["User-Agent"] = self.__user_agent
203216

204-
if method == 'POST':
205-
kwargs['headers']['Content-Type'] = 'application/json'
217+
if method == "POST":
218+
kwargs["headers"]["Content-Type"] = "application/json"
206219

207220
with metrics.http_request_timer(endpoint) as timer:
221+
request_timeout = kwargs.pop("request_timeout", self.request_timeout)
208222
response = self.__session.request(
209223
method=method,
210224
url=url,
211225
json=json,
212-
auth=HTTPBasicAuth(self.__api_key, ''),
226+
auth=HTTPBasicAuth(self.__api_key, ""),
227+
timeout=request_timeout,
213228
**kwargs)
214229
timer.tags[metrics.Tag.http_status_code] = response.status_code
215230

216231
if response.status_code >= 500:
217232
raise Server5xxError()
218233

234+
if response.status_code == 429:
235+
raise MailshakeAPILimitReachedError("Rate limit exceeded")
236+
219237
if response.status_code != 200:
220238
raise_for_error(response)
221239

222240
# pagination details (nextToken) are returned in the body
223241
next_token = None
224242
response_body = response.json()
225-
if response_body.get('nextToken') != "":
226-
next_token = response_body.get('nextToken')
243+
if response_body.get("nextToken") != "":
244+
next_token = response_body.get("nextToken")
227245

228246
return response_body, next_token
229247

230248
def get(self, path, **kwargs):
231-
return self.request('GET', path=path, **kwargs)
249+
return self.request("GET", path=path, **kwargs)
232250

233251
def post(self, path, **kwargs):
234-
return self.request('POST', path=path, **kwargs)
252+
return self.request("POST", path=path, **kwargs)

tap_mailshake/sync.py

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import json
2-
2+
from tap_mailshake.client import REQUEST_TIMEOUT
33
import singer
44
from singer import (UNIX_SECONDS_INTEGER_DATETIME_PARSING, Transformer,
55
metadata, metrics, utils)
@@ -8,7 +8,6 @@
88

99
LOGGER = singer.get_logger()
1010

11-
1211
def write_schema(catalog, stream_name):
1312
stream = catalog.get_stream(stream_name)
1413
schema = stream.schema.to_dict()
@@ -124,8 +123,12 @@ def sync_endpoint(client, # pylint: disable=too-many-branches,too-many-nested-b
124123
id_fields=None,
125124
selected_streams=None,
126125
parent=None,
127-
parent_id=None):
126+
parent_id=None,
127+
request_timeout= REQUEST_TIMEOUT):
128128
# Get the latest bookmark for the stream and set the last_integer/datetime
129+
if not start_date:
130+
LOGGER.warning(f"No start_date provided for stream '{stream_name}', using default.")
131+
start_date = "2019-01-01T00:00:00Z"
129132
last_datetime = None
130133
last_integer = None
131134
data_key = endpoint_config.get('data_key')
@@ -181,7 +184,9 @@ def sync_endpoint(client, # pylint: disable=too-many-branches,too-many-nested-b
181184
url=url,
182185
path=path,
183186
params=querystring,
184-
endpoint=stream_name)
187+
endpoint=stream_name,
188+
request_timeout=request_timeout,
189+
)
185190

186191
# time_extracted: datetime when the data was extracted from the API
187192
time_extracted = utils.now()
@@ -227,14 +232,14 @@ def sync_endpoint(client, # pylint: disable=too-many-branches,too-many-nested-b
227232
for record in transformed_data:
228233
i = 0
229234
# Set parent_id
230-
for id_field in id_fields:
231-
parent_id_field = None
232-
if i == 0:
233-
parent_id_field = id_field
234-
if id_field == 'id':
235-
parent_id_field = id_field
236-
i = i + 1
237-
parent_id = record.get(parent_id_field, None)
235+
# Initialize parent_id_field to None in case id_fields is missing or empty
236+
parent_id_field = None
237+
if id_fields:
238+
parent_id_field = 'id' if 'id' in id_fields else id_fields[0]
239+
else:
240+
LOGGER.warning(f"No id_fields provided for stream '{stream_name}'")
241+
# Safely extract parent_id only if parent_id_field is set
242+
parent_id = record.get(parent_id_field) if parent_id_field else None
238243

239244
# sync_endpoint for child
240245
LOGGER.info(
@@ -322,7 +327,9 @@ def get_selected_fields(catalog, stream_name):
322327

323328

324329
def sync(client, config, catalog, state):
325-
start_date = config.get('start_date', None)
330+
start_date = "2019-01-01T00:00:00Z"
331+
if 'start_date' in config:
332+
start_date = config['start_date']
326333

327334
# Get selected_streams from catalog, based on state last_stream
328335
# last_stream = Previous currently synced stream, if the load was interrupted

0 commit comments

Comments
 (0)