13
13
from time import sleep
14
14
import requests as rq
15
15
import traceback
16
- import datetime
17
16
import time
18
17
import os
19
18
import json
31
30
cert_path = "SSL.crt"
32
31
key_path = "SSL_auth.key"
33
32
33
+ retry_wait_seconds = 300
34
+ max_retries = 3
35
+
34
36
def schema (configuration : dict ):
35
37
"""
36
38
# Define the schema function which lets you configure the schema your connector delivers.
@@ -51,11 +53,8 @@ def update(configuration: dict, state: dict):
51
53
try :
52
54
token_header = make_headers (configuration )
53
55
54
- for e in data_extracts :
55
- yield from sync_items (configuration , token_header , e )
56
-
57
- # Yield a checkpoint operation to save the new state.
58
- yield op .checkpoint ({})
56
+ for extract in data_extracts :
57
+ yield from sync_items (configuration , token_header , extract )
59
58
60
59
except Exception as e :
61
60
# Return error response
@@ -107,11 +106,11 @@ def sync_items(configuration: dict, headers: dict, extract: dict):
107
106
if tag .get ("tagCode" ) == "LAYOUT_NAME" ),
108
107
None # Default if not found
109
108
)
110
- matching_files = [fn for fn in os .listdir (extract_path ) if layout_name in fn ]
109
+ matching_files = [filename for filename in os .listdir (extract_path ) if layout_name in filename ]
111
110
112
- for m in matching_files :
113
- log .fine (f"processing { m } " )
114
- yield from upsert_rows (f"{ extract_path } { m } " , layout_name )
111
+ for file in matching_files :
112
+ log .fine (f"processing { file } " )
113
+ yield from upsert_rows (f"{ extract_path } { file } " , layout_name )
115
114
116
115
yield op .checkpoint ({})
117
116
@@ -123,13 +122,13 @@ def upsert_rows(filename: str, layout_name: str):
123
122
:return:
124
123
"""
125
124
126
- colnames = column_names [layout_name ]
125
+ layout_column_names = column_names [layout_name ]
127
126
log .fine (f"upserting rows for { filename } " )
128
127
with open (filename , "r" , newline = "" , encoding = "utf-8" ) as file :
129
128
reader = csv .reader (file , delimiter = "\t " ) # Tab-delimited
130
129
131
130
for row in reader :
132
- yield op .upsert (table = layout_name , data = dict (zip (colnames , row )))
131
+ yield op .upsert (table = layout_name , data = dict (zip (layout_column_names , row )))
133
132
134
133
def submit_process (url : str , headers : dict , payload : dict ):
135
134
"""
@@ -139,9 +138,6 @@ def submit_process(url: str, headers: dict, payload: dict):
139
138
:param payload: A dictionary with parameters for the extract to submit
140
139
:return: status and resource_id
141
140
"""
142
- retry_wait_seconds = 300
143
- max_retries = 3
144
-
145
141
for attempt in range (max_retries + 1 ):
146
142
response = rq .post (url , headers = headers , data = json .dumps (payload ), cert = (cert_path , key_path ))
147
143
0 commit comments