-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathrelease_prep.py
150 lines (111 loc) · 4.43 KB
/
release_prep.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import json
import os
from optparse import OptionParser
class ReleasePreparation:
def __init__(self, schema_url, version_map):
self.schema_url = schema_url
self.version_map = version_map
def _find_schema_version(self, schema):
hierarchy = schema.split("/")
latest = self.version_map["version_numbers"]
for e in hierarchy:
latest = latest[e]
if not isinstance(latest, dict):
return latest
def _find_value(self, key, dictionary):
for k, v in dictionary.items():
if k == key:
yield v
elif isinstance(v, dict):
for result in self._find_value(key, v):
yield result
elif isinstance(v, list):
for d in v:
if isinstance(d, dict):
for result in self._find_value(key, d):
yield result
def _replace_value(self, key, dictionary, old, new):
for k in dictionary.keys():
if k == key and dictionary[k] == old:
dictionary[k] = new
elif isinstance(dictionary[k], dict):
self._replace_value(key, dictionary[k], old, new)
elif isinstance(dictionary[k], list):
for d in dictionary[k]:
if isinstance(d, dict):
self._replace_value(key, d, old, new)
def _insert_into_dict(self, dict, obj, pos):
return {k: v for k, v in (list(dict.items())[:pos] + list(obj.items()) + list(dict.items())[pos:])}
def expand_urls(self, relative_path, file_data):
version = self._find_schema_version(relative_path)
el = relative_path.split("/")
el.insert(len(el) - 1, version)
id_url = self.schema_url + "/".join(el)
if "draft-04" in file_data["$schema"]:
id_key = "id"
else:
id_key = "$id"
id = ({id_key: id_url})
new_json = self._insert_into_dict(file_data, id, 1)
for item in self._find_value("$ref", new_json):
if self.schema_url not in item:
d = item.replace(".json", "")
if "#" in d:
v = version
el = d.split("/")
for i in range(0, len(el)):
if "#" in el[i]:
el.insert(i, v)
break
else:
v = self._find_schema_version(d)
el = d.split("/")
el.insert(len(el) - 1, v)
expanded = self.schema_url + "/".join(el)
self._replace_value("$ref", new_json, item, expanded)
return new_json
def get_schema_key(self, file_data):
if "draft-04" in file_data["$schema"]:
schema_id_key = "id"
else:
schema_id_key = "$id"
if schema_id_key in file_data:
schema_id = file_data[schema_id_key]
key = schema_id.replace(".json", "")
key = key.replace(self.schema_url, "")
else:
key = None
return key
def _get_json(path):
f = open(path, 'r')
return json.loads(f.read())
def _save_json(path, data):
with open(path, 'w') as outfile:
json.dump(data, outfile, indent=4, sort_keys=True)
def _get_schema_paths(path):
schema_paths = [os.path.join(dirpath, f)
for dirpath, dirnames, files in os.walk(path)
for f in files if f.endswith('.json') and not f.endswith('versions.json')]
return schema_paths
if __name__ == '__main__':
parser = OptionParser()
parser.add_option("-p", "--path", dest="path",
help="Base path to the HCA metadata schemas", metavar="FILE")
parser.add_option("-c", "--context", dest="context",
help="Release context")
(options, args) = parser.parse_args()
if not options.path:
print("You must supply the path to the metadata schema directory")
exit(2)
releasePrep = ReleasePreparation()
if "~" in options.path:
path = os.path.expanduser(options.path)
else:
path = options.path
versions = _get_json(path + "/versions.json")
schemas = _get_schema_paths(path)
context = options.context
for s in schemas:
json_schema = _get_json(s)
newJson = releasePrep.expand_urls(path, s, json_schema)
_save_json(s, newJson)