22
33from __future__ import annotations
44
5+ import json
56import sys
6- from pathlib import Path
7- from typing import Any , Callable , Iterable , Optional , Dict
87import typing
9- from datetime import datetime
8+ from pathlib import Path
9+ from typing import Any , Callable , Iterable
1010
1111import requests
12- from singer_sdk .helpers .jsonpath import extract_jsonpath
13- from singer_sdk .pagination import BaseOffsetPaginator # noqa: TCH002
14- from singer_sdk .streams import RESTStream
15- from lxml import etree
16- import json
1712import xmltodict
13+ from lxml import etree
1814from pendulum import parse
15+ from singer_sdk .helpers .jsonpath import extract_jsonpath
16+ from singer_sdk .pagination import BaseOffsetPaginator
17+ from singer_sdk .streams import RESTStream
1918
2019from tap_exact .auth import ExactAuthenticator
2120
2221if typing .TYPE_CHECKING :
22+ from datetime import datetime
23+
2324 from requests import Response
2425
2526TPageToken = typing .TypeVar ("TPageToken" )
3435
3536
3637class ExactPaginator (BaseOffsetPaginator ):
37- def __init__ (self , stream , start_value , page_size ) -> None :
38+ """Exact pagination helper class."""
39+ def __init__ (self , stream :ExactStream , start_value :int , page_size :int ) -> None : # noqa: D107
3840 super ().__init__ (start_value = start_value , page_size = page_size )
3941 self .stream = stream
4042
41- def has_more (self , response : Response ) -> bool : # noqa: ARG002
43+ def has_more (self , response : Response ) -> bool :
4244 """Override this method to check if the endpoint has any pages left.
4345
4446 Args:
@@ -49,8 +51,9 @@ def has_more(self, response: Response) -> bool: # noqa: ARG002
4951 """
5052 data = self .stream .xml_to_dict (response )
5153 link = data .get ("feed" , {}).get ("link" , [])
52- if type (link ) == list :
54+ if type (link ) is list :
5355 return "next" in [item .get ("@rel" , "" ) for item in link ]
56+ return None
5457
5558 def get_next (self , response : Response ) -> TPageToken | None :
5659 """Get the next pagination token or index from the API response.
@@ -65,24 +68,26 @@ def get_next(self, response: Response) -> TPageToken | None:
6568 data = self .stream .xml_to_dict (response )
6669 link = data .get ("feed" , {}).get ("link" , [])
6770 if "next" in [item .get ("@rel" , "" ) for item in link ]:
68- next_link = [ item ["@href" ] for item in link if item ["@rel" ] == "next" ][ 0 ]
69- next_page_token = next_link .split ("&" )[- 1 ].split ("=" )[- 1 ]
70- return next_page_token
71+ next_link = next ( item ["@href" ] for item in link if item ["@rel" ] == "next" )
72+ return next_link .split ("&" )[- 1 ].split ("=" )[- 1 ]
73+ return None
7174
7275
7376class ExactStream (RESTStream ):
7477 """Exact stream class."""
7578
7679 @property
7780 def partitions (self ) -> list [dict ] | None :
81+ """Return a list of partitions, or None if the stream is not partitioned."""
7882 return [{"division" : division } for division in self .config ["divisions" ]]
7983
8084 @property
8185 def url_base (self ) -> str :
8286 """Return the API URL root, configurable via tap settings."""
83- return f "https://start.exactonline.nl/api/v1"
87+ return "https://start.exactonline.nl/api/v1"
8488
8589 def get_url (self , context : dict | None ) -> str :
90+ """Return the URL for the API request."""
8691 return f"{ self .url_base } /{ context ['division' ]} { self .path } "
8792
8893 @cached_property
@@ -94,14 +99,16 @@ def authenticator(self) -> _Auth:
9499 """
95100 return ExactAuthenticator (self )
96101
97- def get_starting_time (self , context ):
102+ def get_starting_time (self , context :dict ) -> datetime :
103+ """Return the starting timestamp for the stream."""
98104 start_date = self .config .get ("start_date" )
99105 if start_date :
100106 start_date = parse (self .config .get ("start_date" ))
101107 replication_key = self .get_starting_timestamp (context )
102108 return replication_key or start_date
103109
104- def get_url_params (self , context : Optional [dict ], next_page_token ) -> Dict [str , Any ]:
110+ def get_url_params (self , context : dict | None , next_page_token :str ) -> dict [str , Any ]:
111+ """Return a dictionary of parameters to use in the API request."""
105112 params : dict = {}
106113 if self .select :
107114 params ["$select" ] = self .select
@@ -117,15 +124,16 @@ def get_new_paginator(self) -> BaseOffsetPaginator:
117124 """Create a new pagination helper instance."""
118125 return ExactPaginator (self , start_value = None , page_size = 60 )
119126
120- def xml_to_dict (self , response ):
127+ def xml_to_dict (self , response : requests .Response ) -> dict :
128+ """Convert xml response to dict."""
121129 try :
122130 # clean invalid xml characters
123131 my_parser = etree .XMLParser (recover = True )
124- xml = etree .fromstring (response .content , parser = my_parser )
132+ xml = etree .fromstring (response .content , parser = my_parser ) # noqa: S320
125133 cleaned_xml_string = etree .tostring (xml )
126134 # parse xml to dict
127135 data = json .loads (json .dumps (xmltodict .parse (cleaned_xml_string )))
128- except :
136+ except : # noqa: E722
129137 data = json .loads (json .dumps (xmltodict .parse (response .content .decode ("utf-8-sig" ).encode ("utf-8" ))))
130138 return data
131139
@@ -159,7 +167,7 @@ def post_process(
159167 new_content = {}
160168 for key , value in content .items ():
161169 new_key = key [2 :]
162- if type (value ) == str :
170+ if type (value ) is str :
163171 new_content [new_key ] = value
164172 elif not value or value .get ("@m:null" ) == "true" :
165173 new_content [new_key ] = None
@@ -176,11 +184,11 @@ def post_process(
176184 new_content [new_key ] = float (value .get ("#text" ))
177185 else :
178186 new_content [new_key ] = value .get ("#text" , None )
179- row = new_content
180- return row
187+ return new_content
181188
182189 @property
183- def select (self ):
190+ def select (self ) -> str :
191+ """Return the select query parameter."""
184192 return "," .join (self .schema ["properties" ].keys ())
185193
186194
@@ -191,22 +199,21 @@ def get_new_paginator(self) -> BaseOffsetPaginator:
191199 """Create a new pagination helper instance."""
192200 return ExactPaginator (self , start_value = None , page_size = 1000 )
193201
194- def get_starting_time (self , context ):
202+ def get_starting_time (self , context :str ) -> int :
203+ """Return the starting timestamp for the stream."""
195204 state = self .get_context_state (context )
196205 rep_key = None
197- if "replication_key_value" in state . keys () :
206+ if "replication_key_value" in state :
198207 rep_key = state ["replication_key_value" ]
199208 return rep_key or 1
200209
201- def get_url_params (self , context : Optional [dict ], next_page_token ) -> Dict [str , Any ]:
210+ def get_url_params (self , context : dict | None , next_page_token :int ) -> dict [str , Any ]:
211+ """Return a dictionary of parameters to use in the API request."""
202212 params : dict = {}
203213 if self .select :
204214 params ["$select" ] = self .select
205215 start_timestamp = self .get_starting_time (context )
206- if start_timestamp == 1 :
207- date_filter = f"Timestamp gt { start_timestamp } "
208- else :
209- date_filter = f"Timestamp gt { start_timestamp } L"
216+ date_filter = f"Timestamp gt { start_timestamp } " if start_timestamp == 1 else f"Timestamp gt { start_timestamp } L"
210217 params ["$filter" ] = date_filter
211218 if next_page_token :
212219 params ["$skiptoken" ] = next_page_token
0 commit comments