-
Notifications
You must be signed in to change notification settings - Fork 19
/
Copy pathcommandutils.py
165 lines (148 loc) · 5.51 KB
/
commandutils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
#/*
# * Copyright © 2020 VMware, Inc.
# * SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-only
# */
#pylint: disable=invalid-name,missing-docstring
import subprocess
import os
import crypt
import string
import random
import shutil
import ssl
import requests
import copy
from urllib.parse import urlparse
from OpenSSL.crypto import load_certificate, FILETYPE_PEM
class CommandUtils(object):
def __init__(self, logger):
self.logger = logger
self.hostRpmIsNotUsable = -1
def run(self, cmd, update_env=False):
"""if FileNotFoundError raised by subprocess,
error code 127 report up on the stack"""
retval = 127
try:
use_shell = not isinstance(cmd, list)
process = subprocess.Popen(
cmd, shell=use_shell,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
out, err = process.communicate()
retval = process.returncode
if out != b'':
self.logger.info(out.decode())
if update_env:
os.environ.clear()
os.environ.update(
dict(line.partition('=')[::2]
for line in out.decode('utf8').split('\0')
if line)
)
if retval != 0:
self.logger.info("Command failed: {}".format(cmd))
self.logger.info("Error code: {}".format(retval))
self.logger.error(err.decode())
except FileNotFoundError as _:
self.logger.info(f"Command {cmd} not found")
finally:
return retval
def run_in_chroot(self, chroot_path, cmd, update_env = False):
# Use short command here. Initial version was:
# chroot "${BUILDROOT}" \
# /usr/bin/env -i \
# HOME=/root \
# TERM="$TERM" \
# PS1='\u:\w\$ ' \
# PATH=/bin:/usr/bin:/sbin:/usr/sbin \
# /usr/bin/bash --login +h -c "cd installer;$*"
return self.run(['chroot', chroot_path, '/bin/bash', '-c', cmd], update_env)
@staticmethod
def is_vmware_virtualization():
"""Detect vmware vm"""
process = subprocess.Popen(['systemd-detect-virt'], stdout=subprocess.PIPE)
out, err = process.communicate()
if err is not None and err != 0:
return False
return out.decode() == 'vmware\n'
@staticmethod
def generate_password_hash(password):
"""Generate hash for the password"""
return crypt.crypt(password)
@staticmethod
def _requests_get(url, verify):
try:
r = requests.get(url, verify=verify, stream=True, timeout=5.0)
except:
return None
return r
@staticmethod
def wget(url, out, enforce_https=True, ask_fn=None, fingerprint=None):
# Check URL
try:
u = urlparse(url)
except:
return False, "Failed to parse URL"
if not all([ u.scheme, u.netloc ]):
return False, 'Invalid URL'
if enforce_https:
if u.scheme != 'https':
return False, 'URL must be of secure origin (HTTPS)'
r = CommandUtils._requests_get(url, True)
if r is None:
if fingerprint is None and ask_fn is None:
return False, "Unable to verify server certificate"
port = u.port
if port is None:
port = 443
try:
pem = ssl.get_server_certificate((u.netloc, port))
cert = load_certificate(FILETYPE_PEM, pem)
fp = cert.digest('sha1').decode()
except:
return False, "Failed to get server certificate"
if ask_fn is not None:
if not ask_fn(fp):
return False, "Aborted on user request"
else:
if fingerprint != fp:
return False, "Server fingerprint did not match provided. Got: " + fp
# Download file without validation
r = CommandUtils._requests_get(url, False)
if r is None:
return False, "Failed to download file"
r.raw.decode_content = True
with open(out, 'wb') as f:
shutil.copyfileobj(r.raw, f)
return True, None
def checkIfHostRpmNotUsable(self):
if self.hostRpmIsNotUsable >= 0:
return self.hostRpmIsNotUsable
# if rpm doesn't have zstd support
# if host rpm doesn't support sqlite backend db
cmds = [
"rpm --showrc | grep -qw 'rpmlib(PayloadIsZstd)'",
"rpm -E %{_db_backend} | grep -qw 'sqlite'",
]
for cmd in cmds:
if self.run(cmd):
self.hostRpmIsNotUsable = 1
break
if self.hostRpmIsNotUsable < 0:
self.hostRpmIsNotUsable = 0
return self.hostRpmIsNotUsable
def convertToBytes(self, size):
if not isinstance(size, str):
return int(size)
if not size[-1].isalpha():
return int(size)
conv = {'k': 1024, 'm':1024**2, 'g':1024**3, 't':1024**4}
return int(float(size[:-1]) * conv[size.lower()[-1]])
@staticmethod
def get_disk_size_bytes(disk):
cmd = ['blockdev', '--getsize64', disk]
process = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out,err = process.communicate()
retval = process.returncode
return retval, copy.copy(out.decode())