Skip to content

Commit 14f067c

Browse files
authored
Merge pull request #3 from okfn/changed_columns
Detect changed columns
2 parents cb39878 + ba05b06 commit 14f067c

File tree

3 files changed

+40
-2
lines changed

3 files changed

+40
-2
lines changed

ckanext/datapusher_plus/interfaces.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,3 +53,25 @@ def after_upload(
5353
the resource that was uploaded
5454
"""
5555
pass
56+
57+
def datastore_before_update(self, resource_id, existing_info, new_headers):
58+
""" We are about to update the datastore
59+
60+
:param existing_info: The existing information in the datastore.
61+
Empty if the resource is new.
62+
Something like:
63+
{
64+
'Header 1': {'label': 'Header 1'},
65+
'Header 2': {'label': 'Header 2'},
66+
...
67+
}
68+
69+
:param new_headers: The new headers that are to be added to the datastore.
70+
Something like:
71+
[
72+
{'id': 'Header 1', 'type': 'text', 'info': {'label': 'Header 1'}},
73+
{'id': 'Header 2', 'type': 'numeric', 'info': {'label': 'Header 2'}},
74+
...
75+
]
76+
"""
77+
pass

ckanext/datapusher_plus/jobs.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,16 @@
4646
from dateutil.parser import parse as parsedate
4747

4848
from rq import get_current_job
49+
from ckan import plugins
4950
import ckan.plugins.toolkit as tk
5051

5152

5253
import ckanext.datapusher_plus.utils as utils
5354
import ckanext.datapusher_plus.helpers as dph
5455
# from ckanext.datapusher_plus.config import config
56+
from ckanext.datapusher_plus import interfaces
57+
58+
log = logging.getLogger(__name__)
5559

5660

5761
if locale.getdefaultlocale()[0]:
@@ -149,6 +153,7 @@ def send_resource_to_datastore(
149153
"""
150154
Stores records in CKAN datastore
151155
"""
156+
log.info(f"Sending data to datastore {resource_id}")
152157

153158
if resource_id:
154159
# used to create the "main" resource
@@ -338,6 +343,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
338343
logger.addHandler(logging.StreamHandler())
339344
logger.setLevel(logging.DEBUG)
340345

346+
logger.info(f"_push_to_datastore :: {task_id}")
341347
# check if QSV_BIN and FILE_BIN exists
342348
qsv_bin = tk.config.get("ckanext.datapusher_plus.qsv_bin")
343349
qsv_path = Path(qsv_bin)
@@ -441,6 +447,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
441447
if USE_PROXY:
442448
kwargs["proxies"] = {"http": DOWNLOAD_PROXY, "https": DOWNLOAD_PROXY}
443449
with requests.get(resource_url, **kwargs) as response:
450+
logger.info(f"Response status {response.status_code} for {resource_url}")
444451
response.raise_for_status()
445452

446453
cl = response.headers.get("content-length")
@@ -1008,6 +1015,14 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
10081015
info_dict = dict(label=original_header_dict.get(idx, "Unnamed Column"))
10091016
headers_dicts.append(dict(id=header["id"], type=header_type, info=info_dict))
10101017

1018+
# Notify plugins the datastore will be updated
1019+
for plugin in plugins.PluginImplementations(interfaces.IDataPusher):
1020+
plugin.datastore_before_update(
1021+
resource_id=resource_id,
1022+
existing_info=existing_info,
1023+
new_headers=headers_dicts,
1024+
)
1025+
10111026
# Maintain data dictionaries from matching column names
10121027
# if data dictionary already exists for this resource as
10131028
# we want to preserve the user's data dictionary curations

ckanext/datapusher_plus/utils.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,9 @@ def get_dp_plus_user_apitoken():
8484
if api_token:
8585
return api_token
8686

87-
site_user = tk.get_action("get_site_user")({"ignore_auth": True}, {})
88-
return site_user["apikey"]
87+
raise Exception(
88+
"No API token found. Please set the ckanext.datapusher_plus.api_token config option."
89+
)
8990

9091

9192
def check_response(

0 commit comments

Comments
 (0)