Skip to content

Commit b29350a

Browse files
tools: Adding an update feature to check-snapshots
using `./tools/check-snapshots -u .` will update the test repo files with any NEWER updates Signed-off-by: John Castranio <[email protected]>
1 parent 2cfebfe commit b29350a

File tree

1 file changed

+194
-0
lines changed

1 file changed

+194
-0
lines changed

tools/check-snapshots

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
#!/usr/bin/python3
2+
"""check-snapshots greps the directory tree for rpmrepo urls and checks them
3+
against the current snapshot list"""
4+
5+
import argparse
6+
import json
7+
import os
8+
import sys
9+
import subprocess
10+
import time
11+
import requests
12+
import re
13+
from urllib.parse import urlparse
14+
from pathlib import Path
15+
16+
SNAPSHOTS_URL="https://rpmrepo.osbuild.org/v2/enumerate"
17+
SNAPSHOTS_TIMEOUT = 2 * 60
18+
SNAPSHOT_GREP = ["grep", "--color=never", "-or", r"http.*rpmrepo.osbuild.org.*-20[0-9]\+"]
19+
20+
def fetch_snapshots_api(url, timeout=SNAPSHOTS_TIMEOUT):
21+
"""Get the list of snapshots from the rpmrepo API"""
22+
print(f"Fetching list of snapshots from {url}")
23+
start = time.time()
24+
try:
25+
r = requests.get(url, timeout=timeout)
26+
except:
27+
return None
28+
elapsed = time.time() - start
29+
if r.status_code != 200:
30+
print(f"HTTP Response {r.status_code} from {url} after {elapsed:0.0f}s: {r.text}")
31+
return None
32+
print(f"Received snapshot list in {elapsed:0.0f}s")
33+
return r.json()
34+
35+
36+
def find_snapshot_urls(directory):
37+
"""grep the directory for rpmrepo snapshot urls
38+
39+
Returns a map of urls to the files they are used in.
40+
"""
41+
urls = {}
42+
try:
43+
grep_out = subprocess.run(SNAPSHOT_GREP + [directory],
44+
check=True,
45+
capture_output=True,
46+
env={"LANG": "C"})
47+
except subprocess.CalledProcessError as e:
48+
print("ERROR: " + e.stderr.decode("utf-8"))
49+
sys.exit(1)
50+
51+
for line in grep_out.stdout.decode("utf-8").splitlines():
52+
try:
53+
file, url = line.split(":", 1)
54+
except ValueError:
55+
print(f"Problem parsing {line}")
56+
continue
57+
url = url.strip()
58+
if url not in urls:
59+
urls[url] = [file]
60+
else:
61+
urls[url].append(file)
62+
63+
return urls
64+
65+
66+
def check_baseurl(repo, snapshots):
67+
"""Check the baseurl to see if it is a valid snapshot, and if there is a newer one
68+
available.
69+
"""
70+
invalid = None
71+
newer = None
72+
url = urlparse(repo)
73+
snapshot = os.path.basename(url.path)
74+
# Is this snapshot valid?
75+
if snapshot not in snapshots:
76+
invalid = f"{snapshot} is not a valid snapshot name"
77+
# is this snapshot old?
78+
base = snapshot.rsplit("-", 1)[0]
79+
newest = snapshot
80+
for s in snapshots:
81+
if s.rsplit("-", 1)[0] != base:
82+
continue
83+
if s > newest:
84+
newest = s
85+
if newest != snapshot:
86+
newer = f"{snapshot} has a newer version - {newest}"
87+
return invalid, newer
88+
89+
90+
def check_snapshot_urls(urls, snapshots, skip=["test/data/manifests"], errors_only=False, update=False):
91+
"""check the urls against the current list of snapshots
92+
Returns:
93+
0 if all were valid and no newer snapshots are available
94+
2 if there were invalid snapshots
95+
3 if there were newer snapshots
96+
6 if there were invalid and newer snapshots
97+
"""
98+
# Gather up the messages for each file
99+
messages = {}
100+
ret = 0
101+
for url in urls:
102+
invalid, newer = check_baseurl(url, snapshots)
103+
if invalid:
104+
# Add this to each file's invalid message list
105+
for f in urls[url]:
106+
if any(bool(s in f) for s in skip):
107+
continue
108+
ret |= 2
109+
if f in messages:
110+
if invalid not in messages[f]["invalid"]:
111+
messages[f]["invalid"].append(invalid)
112+
else:
113+
messages[f] = {"invalid": [invalid], "newer": []}
114+
if errors_only:
115+
continue
116+
if newer:
117+
# Add this to each file's newer message list
118+
for f in urls[url]:
119+
if any(bool(s in f) for s in skip):
120+
continue
121+
ret |= 4
122+
if f in messages:
123+
if newer not in messages[f]["newer"]:
124+
messages[f]["newer"].append(newer)
125+
else:
126+
messages[f] = {"newer": [newer], "invalid": []}
127+
# Print the messages for each file
128+
if not update:
129+
for f in messages:
130+
print(f"{f}:")
131+
for msg in messages[f]["invalid"]:
132+
print(f" ERROR: {msg}")
133+
for msg in messages[f]["newer"]:
134+
print(f" NEWER: {msg}")
135+
return ret, messages
136+
137+
def update_snapshots(messages):
138+
for target_file in list(messages.keys()):
139+
if Path(target_file).exists():
140+
new_entry = messages[target_file].get("newer")
141+
if new_entry:
142+
for line in new_entry:
143+
match = re.search(r'(.*) has a newer version - (.*)', line)
144+
if match:
145+
old, new = match.groups()
146+
command = f"sed -i 's/{old}/{new}/g' {target_file}"
147+
os.system(command)
148+
print(f'{target_file} was updated')
149+
elif not Path(target_file).exists():
150+
print(f'{target_file} not found')
151+
else:
152+
print("No newer changes found")
153+
154+
# parse cmdline args
155+
def parse_args():
156+
parser =argparse.ArgumentParser(description="Check snapshot urls")
157+
parser.add_argument("--verbose")
158+
parser.add_argument("--timeout", type=int, default=SNAPSHOTS_TIMEOUT,
159+
help="How long to wait for rpmrepo snapshot list")
160+
parser.add_argument("--cache", help="Use a cached file for the list of rpmrepo snapshots")
161+
parser.add_argument("--url", default=SNAPSHOTS_URL,
162+
help="URL to use for the list of rpmrepo snapshots")
163+
parser.add_argument("--errors-only", action="store_true",
164+
help="Only return errors")
165+
parser.add_argument("-u", "--update", help="Update the files with the new snapshot address", action='store_true')
166+
parser.add_argument("directory")
167+
return parser.parse_args()
168+
169+
def main():
170+
args = parse_args()
171+
urls = find_snapshot_urls(args.directory)
172+
snapshots = None
173+
if args.cache:
174+
try:
175+
with open(args.cache, encoding="utf8") as f:
176+
snapshots = json.load(f)
177+
except:
178+
print(f"No snapshots cache found at {args.cache}")
179+
sys.exit(1)
180+
else:
181+
snapshots = fetch_snapshots_api(args.url, args.timeout)
182+
if not snapshots:
183+
print(f"Cannot download snapshots from {args.url}")
184+
sys.exit(1)
185+
186+
ret, messages = check_snapshot_urls(urls, snapshots, errors_only=args.errors_only, update=args.update)
187+
if args.update:
188+
update_snapshots(messages)
189+
else:
190+
return ret
191+
192+
193+
if __name__=='__main__':
194+
sys.exit(main())

0 commit comments

Comments
 (0)