-
Notifications
You must be signed in to change notification settings - Fork 62
Expand file tree
/
Copy pathcommand.py
More file actions
171 lines (152 loc) · 6.98 KB
/
command.py
File metadata and controls
171 lines (152 loc) · 6.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# -*- coding: utf-8 -*-
import sys
import logging
import ckan.plugins.toolkit as tk
from ckanext.xloader.jobs import xloader_data_into_datastore_
from ckanext.xloader.utils import XLoaderFormats, get_xloader_user_apitoken
log = logging.getLogger(__name__)
class XloaderCmd:
def __init__(self, dry_run=False):
self.dry_run = dry_run
self.error_occured = False
def _setup_xloader_logger(self):
# whilst the development.ini's loggers are setup now, because this is
# cli, let's ensure we xloader debug messages are printed for the user
logger = logging.getLogger('ckanext.xloader')
handler = logging.StreamHandler()
formatter = logging.Formatter(
' %(name)-12s %(levelname)-5s %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
logger.propagate = False # in case the config
def _submit_all_existing(self, sync=False, queue=None):
from ckanext.datastore.backend \
import get_all_resources_ids_in_datastore
resource_ids = get_all_resources_ids_in_datastore()
print('Processing %d resources' % len(resource_ids))
user = tk.get_action('get_site_user')(
{'ignore_auth': True}, {})
for resource_id in resource_ids:
try:
resource_dict = tk.get_action('resource_show')(
{'ignore_auth': True}, {'id': resource_id})
except tk.ObjectNotFound:
print(' Skipping resource {} found in datastore but not in '
'metadata'.format(resource_id))
continue
self._submit_resource(resource_dict, user, indent=2, sync=sync, queue=queue)
def _submit_all(self, sync=False, queue=None):
# submit every package
# for each package in the package list,
# submit each resource w/ _submit_package
package_list = tk.get_action('package_search')(
{'ignore_auth': True}, {'include_private': True, 'rows': 1000})
package_list = [pkg['id'] for pkg in package_list['results']]
print('Processing %d datasets' % len(package_list))
user = tk.get_action('get_site_user')(
{'ignore_auth': True}, {})
for p_id in package_list:
self._submit_package(p_id, user, indent=2, sync=sync, queue=queue)
def _submit_package(self, pkg_id, user=None, indent=0, sync=False, queue=None):
indentation = ' ' * indent
if not user:
user = tk.get_action('get_site_user')(
{'ignore_auth': True}, {})
try:
pkg = tk.get_action('package_show')(
{'ignore_auth': True},
{'id': pkg_id.strip()})
except Exception as e:
print(e)
print(indentation + 'Dataset "{}" was not found'.format(pkg_id))
sys.exit(1)
print(indentation + 'Processing dataset {} with {} resources'.format(
pkg['name'], len(pkg['resources'])))
for resource in pkg['resources']:
try:
resource['package_name'] = pkg['name'] # for debug output
self._submit_resource(resource, user, indent=indent + 2, sync=sync, queue=queue)
except Exception as e:
self.error_occured = True
print(str(e))
print(indentation + 'ERROR submitting resource "{}" '.format(
resource['id']))
continue
def _submit_resource(self, resource, user, indent=0, sync=False, queue=None):
'''resource: resource dictionary
'''
indentation = ' ' * indent
if not XLoaderFormats.is_it_an_xloader_format(resource['format']):
print(indentation
+ 'Skipping resource {r[id]} because format "{r[format]}" is '
'not configured to be xloadered'.format(r=resource))
return
if resource['url_type'] in ('datapusher', 'xloader'):
print(indentation
+ 'Skipping resource {r[id]} because url_type "{r[url_type]}" '
'means resource.url points to the datastore '
'already, so loading would be circular.'.format(
r=resource))
return
dataset_ref = resource.get('package_name', resource['package_id'])
print('{indent}{sync_style} /dataset/{dataset}/resource/{r[id]}\n'
'{indent} url={r[url]}\n'
'{indent} format={r[format]}'
.format(sync_style='Processing' if sync else 'Submitting',
dataset=dataset_ref, r=resource, indent=indentation))
if self.dry_run:
print(indentation + '(not submitted - dry-run)')
return
data_dict = {
'resource_id': resource['id'],
'ignore_hash': True,
}
if sync:
data_dict['ckan_url'] = tk.config.get('ckan.site_url')
input_dict = {
'metadata': data_dict,
'api_key': get_xloader_user_apitoken()
}
logger = logging.getLogger('ckanext.xloader.cli')
xloader_data_into_datastore_(input_dict, None, logger)
else:
if queue:
data_dict['queue'] = queue
success = tk.get_action('xloader_submit')({'user': user['name']}, data_dict)
if success:
print(indentation + '...ok')
else:
print(indentation + 'ERROR submitting resource')
self.error_occured = True
def print_status(self):
import ckan.lib.jobs as rq_jobs
jobs = rq_jobs.get_queue().jobs
if not jobs:
print('No jobs currently queued')
return
for job in jobs:
# FIX DEFINITIVO:
# Non usare job.description (non è un canale dati stabile e può essere troncato).
# I parametri veri stanno in job.args / job.kwargs.
metadata = {}
try:
if getattr(job, 'args', None) and len(job.args) >= 1:
payload = job.args[0] or {}
if isinstance(payload, dict):
metadata = payload.get('metadata') or {}
elif getattr(job, 'kwargs', None) and isinstance(job.kwargs, dict):
payload = job.kwargs.get('data') or job.kwargs
if isinstance(payload, dict):
metadata = payload.get('metadata') or {}
except Exception:
metadata = {}
res_id = metadata.get('resource_id', 'N/A')
url = metadata.get('original_url') or metadata.get('url') or 'N/A'
print('{id} Enqueued={enqueued:%Y-%m-%d %H:%M} res_id={res_id} '
'url={url}'.format(
id=getattr(job, '_id', None),
enqueued=job.enqueued_at,
res_id=res_id,
url=url,
))