forked from digirati-co-uk/nlw-madoc-models
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgen_json.py
418 lines (374 loc) · 16.2 KB
/
gen_json.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
import argparse
import json
import logging
import os
# noinspection PyCompatibility
import urllib.parse as parse
# noinspection PyCompatibility
from urllib.request import url2pathname
from collections import OrderedDict
import re
import ontospy
import requests
import unicodecsv as csv
# global namespace lookup
with open('context.json', 'r') as c:
namespaces = json.load(c)['@context']
def get_uri(uri):
"""
Get text from a URI
:param uri: uri (can be file://, or http(s)://
:return: text or None
"""
if uri.startswith('file://'):
f = url2pathname(parse.urlparse(uri).path)
if os.access(f, os.R_OK):
with open(f) as file_f:
text = file_f.read()
return text
else:
logging.warning('WARNING Cannot read: %s', uri)
return None
elif uri.startswith(('http://', 'https://')):
r = requests.get(uri)
if r.status_code == requests.codes.ok:
text = r.text
return text
else:
logging.warning('WARNING Cannot retrieve: %s', uri)
return None
else:
logging.warning('WARNING Do not recognise requests of type: %s', uri)
return None
def qname_to_id(val):
"""
Turns a qname into a URI.
Crude string matching to check if it looks like a qname, and looks up the prefix in global dict of namespaces.
:param val: String, e.g. of form prefix:local
:return: string of form http://www.example.com/namespace#local
"""
if val:
# check if the colon containing string isn't a date time (as might be used as a default value for a
# calendar control).
pattern = re.compile("\d\d\d\d[-]\d\d[-]\d\d[T]\d\d[:]\d\d[:]\d\d[+]\d\d[:]\d\d")
if ':' in val and 'http://' not in val and not pattern.match(val):
ns, value = val.split(':')
# noinspection PyBroadException
if ns in namespaces:
ns_uri = namespaces[ns]
else:
ns_uri = None
if ns_uri and value:
return ''.join([ns_uri, value])
else:
return None
else:
return None
else:
return None
def generate_expanded(value):
"""
Generate an full URI for a value, if it can be identified in the namespaces.
:param value:
:return: uri or none
"""
field_uri = qname_to_id(value)
if field_uri:
return field_uri
else:
return None
def expand_dct(dct, sanitise=True, pair=False):
"""
Expand an entire dict with id and label pairs.
:param dct: dictionary
:param sanitise: convert : to _ for Jinja
:param pair: convert fields in id label pairs.
:return: expanded dictionary
"""
for k, v in dct.items():
expanded = generate_expanded(value=v)
if k == 'crowds:uiInputOptions':
if ';' in dct[k]:
"""
Handle case where it's a hand edited CSV and the list is delimited by ;
"""
dct[k] = [opt.strip() for opt in v.split(';')]
elif "[" in v and "]" in v and type(v) == str:
"""
Handle case where the CSV has been generated by code from the JSON.
This is basically some crude string handling to work aorund the fact that the
list is output as a string in the JSON, and not as a list.
"""
mutable_str = "".join([x for x in v if x not in ["[", "]", "'"]])
dct[k] = [c.strip() for c in mutable_str.split(",")]
if expanded:
if pair:
# temp fix for anno studio issue
if k in ['crowds:uiGroup', 'crowds:uiInputType', 'crowds:uiSelectorType']:
dct[k] = {'@id': v, 'o:label': v}
else:
dct[k] = {'@id': expanded, 'o:label': v}
else:
dct[k] = expanded
label_key = k + '_label'
dct[label_key] = v
dct = {k: v for k, v in dct.items() if v}
if sanitise:
return sanitise_keys(dct)
else:
return dct
def template_element(dct, url, elem_t, irc_t, u_t):
"""
Create data suitable for embedded in JSON from a captute model element row in the CSV.
:param url: base url for the server
:param elem_t: Omeka ID for the Crowd Source Element template
:param irc_t: Resource class for the Crowd Source Element template
:param u_t: Omeka User ID
:param dct: dictionary to process
:return: parsed and expanded data as Python object
"""
dct['@id'] = url + '/api/items/' + dct['dcterms:identifier']
dct['o:id'] = dct['dcterms:identifier']
dct['@type'] = ['o:Item', 'dctype:InteractiveResource']
dct['o:is_public'] = 'true'
dct['o:item_set'] = []
dct['o:media'] = []
dct['o:modified'] = {
"@type": "http://www.w3.org/2001/XMLSchema#dateTime",
"@value": "2017-06-28T09:21:30+00:00"
}
dct['o:owner'] = {'@id': url + '/api/users/' + u_t, 'o:id': u_t}
dct['o:resource_class'] = {'@id': url + '/api/resource_classes/' + irc_t,
'o:id': irc_t}
dct['o:resource_template'] = {'@id': url + '/api/resource_templates/' + elem_t,
'o:id': elem_t}
# Set default Boolean values:
default_booleans = {
'crowds:uiHidden': 'FALSE'
}
for k, _ in default_booleans.items():
if k in dct:
if dct[k].upper() not in ['FALSE', 'TRUE']:
dct[k] = default_booleans[k]
dct[k] = dct[k].upper()
default_values = {'crowds:derivedAnnoBodyPurpose': 'oa:tagging',
'crowds:derivedAnnoBodyType': 'oa:TextualBody',
'crowds:derivedAnnoBodyFormat': 'text/plain',
'crowds:uiInputType': 'madoc:textbox'
}
for k, _ in default_values.items():
if k in dct:
if not dct[k]:
dct[k] = default_values[k]
return expand_dct(dct, sanitise=False, pair=True)
def template_group(dct, url, grp_t, irc_t, u_t, nlw_c, ida_c):
"""
Run a group dictionary through the group Jinja template to generate JSON.
:param grp_t:
:param irc_t:
:param nlw_c:
:param ida_c:
:param u_t:
:param url: base url for the server
:param dct: dictionary to process
:return: json string from template.
"""
dct['@id'] = url + '/api/item_sets/' + dct['dcterms:identifier']
dct['o:id'] = dct['dcterms:identifier']
dct['@type'] = ['o:ItemSet', 'dctype:InteractiveResource']
dct['o:is_public'] = 'true'
dct['o:item_set'] = []
dct['o:media'] = []
dct['o:modified'] = {
"@type": "http://www.w3.org/2001/XMLSchema#dateTime",
"@value": "2017-06-28T09:21:30+00:00"
}
dct['o:owner'] = {'@id': url + '/api/users/' + u_t, 'o:id': u_t}
dct['o:resource_class'] = {'@id': url + '/api/resource_classes/' + irc_t,
'o:id': irc_t}
dct['o:resource_template'] = {'@id': url + '/api/resource_templates/' + grp_t,
'o:id': grp_t}
# Set default Boolean values:
default_booleans = {'crowds:uiChoice': 'FALSE',
'crowds:uiMultiple': 'TRUE',
'crowds:derivedAnnoCombine': 'TRUE',
'crowds:derivedAnnoExternalize': 'FALSE',
'crowds:derivedAnnoHumanReadable': 'FALSE',
'crowds:derivedAnnoSerialize': 'TRUE',
'crowds:uiHidden': 'FALSE'
}
for k, _ in default_booleans.items():
if k in dct:
if dct[k].upper() not in ['FALSE', 'TRUE']:
dct[k] = default_booleans[k]
dct[k] = dct[k].upper()
default_values = {'crowds:derivedAnnoBodyPurpose': 'oa:tagging',
'crowds:derivedAnnoBodyType': 'oa:TextualBody',
'crowds:derivedAnnoMotivatedBy': 'oa:tagging',
'crowds:uiComponent': 'resource',
'crowds:uiGroup': 'madoc:form',
'crowds:derivedAnnoBodyFormat': 'text/plain'
}
for k, _ in default_values.items():
if k in dct:
if not dct[k]:
dct[k] = default_values[k]
if nlw_c:
with open('nlw_context.json', 'r') as context_file:
dct['@context'] = json.load(context_file)['@context']
elif ida_c:
with open('ida_context.json', 'r') as context_file:
dct['@context'] = json.load(context_file)['@context']
else:
with open('nlw_context.json', 'r') as context_file:
dct['@context'] = json.load(context_file)['@context']
return expand_dct(dct, sanitise=False, pair=True)
def sanitise_keys(d):
"""
Strip all of the colons out of the key names
:param d:
:return: dict
"""
new_d = {}
for k, v in d.items():
new_d[k.replace(':', '_')] = v
return new_d
def process_group(top_level, groupss, elemss, url_b, group_t, element_t, ir_c, u):
"""
Recursively process a capture model group.
:param url_b: base url for the server
:param top_level: top level group
:param groupss: group level rows
:param elemss: element level rows
:param u: Omeka User ID
:param group_t: ID for group resource template
:param element_t: ID for element resource template.
:param ir_c: ID for the Interactive Resource class
:return: top_level row with parts
"""
parts = top_level['dcterms:hasPart'].split(';')
group_parts = [x for x in groupss if x['dcterms:identifier'] in parts]
element_parts = [x for x in elemss if x['dcterms:identifier'] in parts]
if group_parts:
top_level['dcterms:hasPart'] = [
template_group(process_group(top_level=g, groupss=groupss, elemss=elemss, url_b=url_b,
group_t=group_t, element_t=element_t, ir_c=ir_c, u=u
),
url=url_b, grp_t=group_t, irc_t=ir_c, u_t=u,
ida_c=False, nlw_c=False)
for g in group_parts]
elif element_parts:
top_level['dcterms:hasPart'] = [template_element(item, url=url_b, elem_t=element_t,
irc_t=ir_c,
u_t=u)
for item in element_parts]
else:
pass
return top_level
def csv_load(csv_file, url_base, group, element, irclass, user, top_index='1', delimiter='|', ida_context=False):
"""
Load a CSV file and return formatted JSON. Defaults to assuming a pipe-delimited CSV file.
top_index sets the numbered row in the CSV (using the running_no column) to treat as the top-level group.
N.B. does no validation of the input.
:param ida_context:
:param csv_file: the CSV file to open.
:param url_base: the base for the Omeka server, e.g. 'http://nlw-omeka.digtest.co.uk'
:param group: the Omeka ID number for the Crowd Source Group resource template
:param element: the Omeka ID number for the Crowd Source Element resource template
:param top_index: numbered row to treat as the top level group in the capture model
:param user: Omeka User ID.
:param irclass: Omeka ID for the Interactive resource class
:param delimiter: the delimiter for the CSV, defaults to pipe '|'
:return: json suitable for import into Omeka via the capture model importer module.
"""
if ida_context:
nlw_context = False
else:
nlw_context = True
with open(csv_file, 'rb') as csv_in:
rows = list(csv.DictReader(csv_in, delimiter=delimiter))
groups = [row for row in rows if row['dcterms:type'] == 'madoc:group']
elements = [row for row in rows if row['dcterms:type'] == 'madoc:element']
top = [t for t in groups if t['dcterms:identifier'] == top_index][0]
group_dict = template_group((process_group(top_level=top, groupss=groups, elemss=elements, url_b=url_base,
group_t=group, element_t=element, ir_c=irclass, u=user)),
nlw_c=nlw_context, ida_c=ida_context,
url=url_base, grp_t=group, irc_t=irclass, u_t=user)
# fix case on Booleans via crude string replace in JSON
return json.loads(json.dumps(group_dict).replace('TRUE', 'True').replace('FALSE', 'False'))
def csv_gen_vocab(csv_file, delimiter='|'):
"""
Generate an empty CSV file for creating capture model using the Crowds RDF source. Will parse and append all of the
vocab URIs it finds in the
:param csv_file: filename to write to
:param delimiter: delimiter for CSV, defaults to pipe '|'.
"""
all_fields = OrderedDict([
('dcterms:identifier', None),
('dcterms:type', None),
('dcterms:hasPart', None),
('dcterms:title', None),
('rdfs:label', None),
('dcterms:description', None),
('dcterms:conformsTo', None),
('rdfs:range', None)],
)
crowds = ontospy.Ontospy(
"https://raw.githubusercontent.com/digirati-co-uk/annotation-vocab/master/crowds.rdf")
for p in crowds.all_properties:
all_fields[p.qname] = None
with open(csv_file, 'w') as csv_out:
dw = csv.DictWriter(
csv_out, delimiter=delimiter, fieldnames=all_fields)
dw.writeheader()
def main():
"""
Initialise logging.
Parse args.
Write JSON.
To generate the WW1 capture model, as JSON:
(defaults are currently set to the values on crowd.library.wales)
Simple: python gen_json.py -i exported_crowd_library_wales.csv -o ww1_crowd_wales.json
Advanced: Gwilym Livingston Evans, for the NLW _dev_ site:
python gen_json.py -i gle.csv -o gle.json -b https://nlw-omeka.digtest.co.uk -u 2 -t 1 -c 27 -g 5 -e 4 -u 3
IDA model:
python gen_json.py -i ida_master.csv -o ida_master.json -u 2 -t 1 -c 27 -g 3 -e 1
:return: None
"""
logging.basicConfig(filename='capture_model.log', level=logging.DEBUG)
parser = argparse.ArgumentParser(description='Simple CSV to JSON tool for annotation studio capture models.')
parser.add_argument('-i', '--input', help='Input CSV file name', required=True)
parser.add_argument('-o', '--output', help='Output JSON file name', required=True)
parser.add_argument('-b', '--url_base', help='Base url for the Omeka instance', required=False)
parser.add_argument('-t', '--top_index', help='Numbered element to treat as the top level group', required=False)
parser.add_argument('-g', '--group_id', help='ID for the Crowd Source Group resource template', required=False)
parser.add_argument('-e', '--element_id', help='ID for the Crowd Source Element resource template', required=False)
parser.add_argument('-c', '--irclass', help='ID for the Interactive Resource class', required=False)
parser.add_argument('-u', '--user', help='Omeka User ID for the Owner', required=False)
parser.add_argument('-x', '--context', help='IDA Context', required=False)
args = parser.parse_args()
if not args.url_base:
args.url_base = 'https://crowd.library.wales'
if not args.irclass:
args.irclass = str(27)
if not args.group_id:
args.group_id = str(5)
if not args.element_id:
args.element_id = str(4)
if not args.user:
args.user = str(2)
if not args.context:
args.context = False
if not args.top_index:
args.top_index = str(1)
if args.top_index:
js = csv_load(csv_file=args.input, url_base=args.url_base, top_index=args.top_index, group=args.group_id,
element=args.element_id, irclass=args.irclass, user=args.user, ida_context=args.context)
else:
js = csv_load(csv_file=args.input, url_base=args.url_base, group=args.group_id,
element=args.element_id, irclass=args.irclass, user=args.user, ida_context=args.context)
if js:
with open(args.output, 'w') as o:
json.dump(js, o, indent=4, sort_keys=True)
if __name__ == "__main__":
main()