-
Notifications
You must be signed in to change notification settings - Fork 14
Expand file tree
/
Copy pathrun-single-cron
More file actions
executable file
·140 lines (118 loc) · 4.52 KB
/
run-single-cron
File metadata and controls
executable file
·140 lines (118 loc) · 4.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#!/usr/bin/python
#
# Runs a single cronjob the way cron runs it
import docopt
import getpass
from whelk import shell
import sys, os, re
def main():
usage = """Run a single cronjob the way crond would run it
Usage:
run-single-cron [-n] [-u=<user>] [-c=<crontab>] <linenumber>
Options:
-h --help Show this help message and exit
-n Noop. Just show what the command is.
-u=<user> Run a cronjob from this users crontab instead of the
current user
-c=<crontab Run a command from this crontab file instead of the
users crontab
"""
opts = docopt.docopt(usage)
curuser = getpass.getuser()
user = opts['-u'] or curuser
crontab = opts['-c']
lineno = int(opts['<linenumber>'])
if crontab:
user = 'root'
if user != curuser and os.geteuid() != 0:
raise RuntimeError("Only root can run other peoples cronjobs")
env = load_environment(user)
job,input = load_cronjob(env, user, crontab, lineno)
if user != curuser:
uid, gid = shell.getent('passwd', user).stdout.split(':')[2:4]
os.setgid(int(gid))
os.setuid(int(uid))
def logger(shell, sp, fd, data):
if data is None:
return
fd = sp.stdout.fileno() == fd and 1 or 2
os.write(fd, data)
print wrap("Environment:", attr.faint)
kl = max([len(x) for x in env]) + 2
for key in sorted(env):
print wrap(" %-*s %s" % (kl, key, env[key]), attr.faint)
if input:
print wrap("Input:\n%s" % input, attr.faint)
if opts['-n']:
print "Not running cronjob: %s" % job
sys.exit(0)
print wrap("Running cronjob: %s" % job, attr.faint)
ret = shell[env['SHELL']]('-c', job, output_callback=logger, env=env, input=input)
print wrap("Exit code: %d" % ret.returncode, attr.faint)
sys.exit(ret.returncode)
def load_environment(user):
"""Load an environment as specified by crontab(5)"""
env = {}
if os.path.exists('/etc/debian_version'):
# Debian special: cron reads environment via pam
import PAM
auth = PAM.pam()
auth.start('cron')
auth.set_item(PAM.PAM_USER, user)
pid = os.getpid()
if pid != os.getpid():
sys.exit(0)
for item in auth.getenvlist():
key, val = item.split('=', 1)
env[key] = val
env['PATH'] = '/usr/bin:/bin'
env['HOME'] = shell.getent('passwd', user).stdout.strip().split(':')[-2]
env['USER'] = env['LOGNAME'] = user
env['SHELL'] = '/bin/sh'
return env
def load_cronjob(env, user, crontab, lineno):
if crontab:
with open(crontab) as fd:
crontab = fd.readlines()
else:
crontab = shell.crontab('-u', user, '-l', stderr=None).stdout.strip().splitlines()
if len(crontab) < lineno:
raise ValueError("Crontab is only %d lines long" % len(crontab))
crontab = crontab[:lineno]
for line in crontab:
if re.match('^\s*[a-zA-Z][_a-zA-Z0-9]*\s*=', line):
key, val = line.split('=', 1)
if key not in ('USER', 'LOGNAME'):
env[key.strip()] = val.strip()
job = crontab[-1].strip()
if re.match('^@(reboot|(hour|dai|week|month|annual|year)ly|midnight)\s', job):
return job.split(None, 1)[1].strip()
items = job.split(None, 5)
if len(items) != 6 or job.startswith('#'):
raise ValueError("Not a cronjob: %s" % job)
for item in items[:3]:
if not re.match(r'((?:\d+(?:-\d+)?|\*)(/\d+)?)(,\1)*', item):
raise ValueError("Not a cronjob: %s" % job)
job = items[-1]
input = ''
if re.search(r'(?<!\\)%', job):
job, input = re.split(r'(?<!\\)%', job, 1)
input = re.sub(r'(?<!\\)%', '\n', input)
return job, input
class Attr(object):
def __init__(self, **attr):
self.attr = attr
self.rev_attr = dict([(v,k) for k,v in attr.items()])
for k, v in attr.items():
setattr(self, k, v)
def name(self, val):
return self.rev_attr[val]
fgcolor = Attr(black=30, red=31, green=32, yellow=33, blue=34, magenta=35, cyan=36, white=37, none=None)
bgcolor = Attr(black=40, red=41, green=42, yellow=43, blue=44, magenta=45, cyan=46, white=47, none=None)
attr = Attr(normal=0, bright=1, faint=2, underline=4, negative=7, conceal=8, crossed=9, none=None)
esc = '\033'
mode = lambda *args: "%s[%sm" % (esc, ';'.join([str(x) for x in args if x is not None]))
reset = mode(attr.normal)
wrap = lambda text, *args: "%s%s%s" % (mode(*args), text, reset)
if __name__ == '__main__':
main()