-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathclient.py
More file actions
205 lines (165 loc) · 7.28 KB
/
client.py
File metadata and controls
205 lines (165 loc) · 7.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
"""REST client handling, including ExactStream base class."""
from __future__ import annotations
import json
import sys
import typing
from pathlib import Path
from typing import Any, Callable, Iterable
import requests
import xmltodict
from lxml import etree
from singer_sdk.helpers.jsonpath import extract_jsonpath
from singer_sdk.pagination import BaseOffsetPaginator
from singer_sdk.streams import RESTStream
from tap_exact.auth import ExactAuthenticator
if typing.TYPE_CHECKING:
from requests import Response
TPageToken = typing.TypeVar("TPageToken")
if sys.version_info >= (3, 8):
from functools import cached_property
else:
from cached_property import cached_property
_Auth = Callable[[requests.PreparedRequest], requests.PreparedRequest]
SCHEMAS_DIR = Path(__file__).parent / Path("./schemas")
class ExactPaginator(BaseOffsetPaginator):
"""Exact pagination helper class."""
def __init__(self, stream: ExactStream, start_value: int, page_size: int) -> None: # noqa: D107
super().__init__(start_value=start_value, page_size=page_size)
self.stream = stream
def has_more(self, response: Response) -> bool:
"""Override this method to check if the endpoint has any pages left.
Args:
response: API response object.
Returns:
Boolean flag used to indicate if the endpoint has more pages.
"""
data = self.stream.xml_to_dict(response)
link = data.get("feed", {}).get("link", [])
if type(link) is list:
return "next" in [item.get("@rel", "") for item in link]
return None
def get_next(self, response: Response) -> TPageToken | None:
"""Get the next pagination token or index from the API response.
Args:
response: API response object.
Returns:
The next page token or index. Return `None` from this method to indicate
the end of pagination.
"""
data = self.stream.xml_to_dict(response)
link = data.get("feed", {}).get("link", [])
if "next" in [item.get("@rel", "") for item in link]:
next_link = next(item["@href"] for item in link if item["@rel"] == "next")
return next_link.split("&")[-1].split("=")[-1]
return None
class ExactStream(RESTStream):
"""Exact stream class."""
@property
def partitions(self) -> list[dict] | None:
"""Return a list of partitions, or None if the stream is not partitioned."""
return [{"division": division} for division in self.config["divisions"]]
@property
def url_base(self) -> str:
"""Return the API URL root, configurable via tap settings."""
return "https://start.exactonline.nl/api/v1"
def get_url(self, context: dict | None) -> str:
"""Return the URL for the API request."""
return f"{self.url_base}/{context['division']}{self.path}"
@cached_property
def authenticator(self) -> _Auth:
"""Return a new authenticator object.
Returns:
An authenticator instance.
"""
return ExactAuthenticator(self)
def get_url_params(self, context: dict | None, next_page_token: str) -> dict[str, Any]:
"""Return a dictionary of parameters to use in the API request."""
params: dict = {}
if self.select:
params["$select"] = self.select
start_date = self.get_starting_timestamp(context)
if start_date is not None:
date_filter = f"Modified gt datetime'{start_date.strftime('%Y-%m-%dT%H:%M:%S')}'"
params["$filter"] = date_filter
if next_page_token:
params["$skiptoken"] = next_page_token
return params
def get_new_paginator(self) -> BaseOffsetPaginator:
"""Create a new pagination helper instance."""
return ExactPaginator(self, start_value=None, page_size=60)
def xml_to_dict(self, response: requests.Response) -> dict:
"""Convert xml response to dict."""
try:
# clean invalid xml characters
my_parser = etree.XMLParser(recover=True)
xml = etree.fromstring(response.content, parser=my_parser)
cleaned_xml_string = etree.tostring(xml)
# parse xml to dict
data = json.loads(json.dumps(xmltodict.parse(cleaned_xml_string)))
except: # noqa: E722
data = json.loads(json.dumps(xmltodict.parse(response.content.decode("utf-8-sig").encode("utf-8"))))
return data
def parse_response(self, response: requests.Response) -> Iterable[dict]:
"""Parse the response and return an iterator of result records.
Args:
response: The HTTP ``requests.Response`` object.
Yields:
Each record from the source.
"""
data = self.xml_to_dict(response).get("feed", {}).get("entry", [])
yield from extract_jsonpath(self.records_jsonpath, input=data)
def post_process(
self,
row: dict,
context: dict | None = None, # noqa: ARG002
) -> dict | None:
"""As needed, append or transform raw data to match expected structure.
Args:
row: An individual record from the stream.
context: The stream context.
Returns:
The updated record dictionary, or ``None`` to skip the record.
"""
content = row["content"]["m:properties"]
new_content = {}
for key, value in content.items():
new_key = key[2:]
if type(value) is str:
new_content[new_key] = value
elif not value or value.get("@m:null") == "true":
new_content[new_key] = None
elif value.get("@m:type") == "Edm.Boolean":
if value.get("#text") == "true":
new_content[new_key] = True
elif value.get("#text") == "false":
new_content[new_key] = False
else:
new_content[new_key] = None
elif "Int" in value.get("@m:type", ""):
new_content[new_key] = int(value.get("#text"))
elif "Double" in value.get("@m:type", ""):
new_content[new_key] = float(value.get("#text"))
else:
new_content[new_key] = value.get("#text", None)
return new_content
@property
def select(self) -> str:
"""Return the select query parameter."""
return ",".join(self.schema["properties"].keys())
class ExactBulkStream(ExactStream):
"""Exact bulk stream class."""
def get_new_paginator(self) -> BaseOffsetPaginator:
"""Create a new pagination helper instance."""
return ExactPaginator(self, start_value=None, page_size=1000)
class ExactSyncStream(ExactBulkStream):
"""Exact stream class for sync endpoints."""
def get_url_params(self, context: dict | None, next_page_token: str) -> dict[str, Any]:
"""Return a dictionary of parameters to use in the API request."""
params: dict = {}
if self.select:
params["$select"] = self.select
starting_timestamp = self.get_starting_replication_key_value(context)
params["$filter"] = f"Timestamp gt {starting_timestamp if type(starting_timestamp) is int else 1}"
if next_page_token:
params["$skiptoken"] = next_page_token
return params