Skip to content

Commit 917c182

Browse files
author
Jaap Roes
committed
Added CrontabManager.
1 parent af3574c commit 917c182

File tree

4 files changed

+244
-2
lines changed

4 files changed

+244
-2
lines changed

fabtools/cron.py

Lines changed: 124 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
Fabric tools for managing crontab tasks
33
"""
44
from __future__ import with_statement
5+
from contextlib import closing
56

67
from tempfile import NamedTemporaryFile
78

@@ -13,9 +14,11 @@ def add_task(name, timespec, user, command):
1314
"""
1415
Add a cron task
1516
"""
17+
entry = '%(timespec)s %(user)s %(command)s\n' % locals()
1618
with NamedTemporaryFile() as script:
17-
script.write('%(timespec)s %(user)s %(command)s\n' % locals())
18-
script.flush()
19+
cm = CrontabManager(script.name)
20+
cm.add_task(entry, entry)
21+
cm.write_crontab()
1922
upload_template('/etc/cron.d/%(name)s' % locals(),
2023
script.name,
2124
context={},
@@ -28,3 +31,122 @@ def add_daily(name, user, command):
2831
Add a cron task to run daily
2932
"""
3033
add_task(name, '@daily', user, command)
34+
35+
36+
def remove_task(name, timespec, user, command):
37+
"""
38+
Add a cron task
39+
"""
40+
entry = '%(timespec)s %(user)s %(command)s\n' % locals()
41+
# FIXME: This is wrong, the remote cron file should be retrieved
42+
# instead of writing to a temp file.
43+
with NamedTemporaryFile() as script:
44+
cm = CrontabManager(script.name)
45+
cm.remove_task(entry, entry)
46+
cm.write_crontab()
47+
upload_template('/etc/cron.d/%(name)s' % locals(),
48+
script.name,
49+
context={},
50+
chown=True,
51+
use_sudo=True)
52+
53+
54+
class CrontabManager(object):
55+
"""
56+
Helper class to edit entries in user crontabs (see man 5 crontab)
57+
58+
Based heavily on the UserCrontabManager from z3c.recipe.usercrontab.
59+
"""
60+
prepend = '# START %s (generated by fabtools)'
61+
append = '# END %s (generated by fabtools)'
62+
63+
def __init__(self, path):
64+
self.crontab = []
65+
self._path = path
66+
67+
def read_crontab(self):
68+
with closing(open(self._path, 'r')) as f:
69+
self.crontab = [l.strip("\n") for l in f]
70+
71+
def write_crontab(self):
72+
with closing(open(self._path, 'w')) as f:
73+
for l in self.crontab:
74+
f.write("%s\n" % l)
75+
f.flush()
76+
77+
def __repr__(self):
78+
return "\n".join(self.crontab)
79+
80+
def find_boundaries(self, name):
81+
start = None
82+
end = None
83+
for line_number, line in enumerate(self.crontab):
84+
if line.strip() == self.prepend % name:
85+
if start is not None:
86+
raise RuntimeError("%s found twice in the same crontab. "
87+
"Fix by hand." % (self.prepend % name))
88+
start = line_number
89+
if line.strip() == self.append % name:
90+
if end is not None:
91+
raise RuntimeError("%s found twice in the same crontab. "
92+
"Fix by hand." % (self.append % name))
93+
end = line_number + 1
94+
# ^^^ +1 as we want the range boundary and that is behind the
95+
# element.
96+
return start, end
97+
98+
def has_entry(self, name):
99+
"""
100+
Check if the crontab has an entry
101+
"""
102+
start, end = self.find_boundaries(name)
103+
return start is not None and end is not None
104+
105+
def add_entry(self, name, entry):
106+
"""
107+
Add an entry to the crontab.
108+
109+
Find lines enclosed by APPEND/PREPEND, zap and re-add.
110+
"""
111+
start, end = self.find_boundaries(name)
112+
inject_at = -1 # By default at the end of the file.
113+
if start is not None and end is not None:
114+
# But preferably in our existing location.
115+
self.crontab[start:end] = []
116+
inject_at = start
117+
# There is some white space already, so we only insert
118+
# entry and markers.
119+
to_inject = [self.prepend % name, entry, self.append % name]
120+
else:
121+
# Insert entry, markers, and white space
122+
to_inject = ['', self.prepend % name, entry, self.append % name]
123+
124+
if inject_at == -1:
125+
# [-1:-1] would inject before the last item...
126+
self.crontab += to_inject
127+
else:
128+
self.crontab[inject_at:inject_at] = to_inject
129+
130+
def del_entry(self, name):
131+
"""
132+
Remove an entry from a crontab.
133+
"""
134+
start, end = self.find_boundaries(name)
135+
if start is not None and end is not None:
136+
if start > 0:
137+
if not self.crontab[start - 1].strip():
138+
# Also strip empty line in front.
139+
start = start - 1
140+
if end < len(self.crontab):
141+
if not self.crontab[end].strip():
142+
# Also strip empty line after end marker.
143+
# Note: not self.crontab[end + 1] as end is the location
144+
# AFTER the end marker to selected it with [start:end].
145+
end = end + 1
146+
if end == len(self.crontab):
147+
end = None # Otherwise the last line stays in place
148+
self.crontab[start:end] = []
149+
return 1 # Number of entries that are removed.
150+
151+
# Nothing removed.
152+
return 0

fabtools/tests/cron.d/broken_cron

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
2+
# START test (generated by fabtools)
3+
0 */2 * * * username /home/username/test.pl
4+
# END test (generated by fabtools)
5+
6+
7+
# START test (generated by fabtools)
8+
0 */2 * * * username /home/username/test.pl
9+
# END test (generated by fabtools)
10+
11+
12+
# START test2 (generated by fabtools)
13+
0 */2 * * * username /home/username/test.pl
14+
# END test2 (generated by fabtools)
15+
16+
17+
# START test3 (generated by fabtools)
18+
0 */2 * * * username /home/username/test.pl
19+
20+
# END test2 (generated by fabtools)

fabtools/tests/cron.d/test_cron

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
2+
# START test (generated by fabtools)
3+
0 */2 * * * username /home/username/test.pl
4+
# END test (generated by fabtools)

fabtools/tests/test_cron.py

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
from __future__ import with_statement
2+
from contextlib import closing
3+
4+
import unittest
5+
import os
6+
from fabtools.cron import CrontabManager
7+
8+
CRON_D = os.path.join(os.path.dirname(__file__), 'cron.d')
9+
10+
11+
class TestEmptyCrontabManager(unittest.TestCase):
12+
13+
def setUp(self):
14+
with open(os.path.join(CRON_D, 'empty_cron'), 'w') as f:
15+
f.write('')
16+
17+
def test_read(self):
18+
cm = CrontabManager(os.path.join(CRON_D, 'empty_cron'))
19+
cm.read_crontab()
20+
self.assertEquals([], cm.crontab)
21+
22+
def test_find_boundaries(self):
23+
cm = CrontabManager(os.path.join(CRON_D, 'empty_cron'))
24+
cm.read_crontab()
25+
self.assertEquals((None, None), cm.find_boundaries('test'))
26+
27+
def test_has_entry(self):
28+
cm = CrontabManager(os.path.join(CRON_D, 'empty_cron'))
29+
cm.read_crontab()
30+
self.assertFalse(cm.has_entry('test'))
31+
32+
def test_del_entry(self):
33+
'''
34+
Test that removing a non existing entry is harmless.
35+
'''
36+
cm = CrontabManager(os.path.join(CRON_D, 'empty_cron'))
37+
cm.read_crontab()
38+
self.assertEquals(0, cm.del_entry('test'))
39+
40+
def test_add_write(self):
41+
name = 'test'
42+
entry = '0 */2 * * * username /home/username/test.pl'
43+
cm = CrontabManager(os.path.join(CRON_D, 'empty_cron'))
44+
cm.read_crontab()
45+
cm.add_entry(name, entry)
46+
crontab = ['', cm.prepend % name, entry, cm.append % name]
47+
self.assertEquals(cm.crontab, crontab)
48+
cm.write_crontab()
49+
with closing(open(cm._path)) as f:
50+
self.assertEquals('\n'.join(crontab) + '\n', f.read())
51+
52+
def tearDown(self):
53+
os.unlink(os.path.join(CRON_D, 'empty_cron'))
54+
55+
56+
class TestExistingCrontabManager(unittest.TestCase):
57+
58+
def test_has_entry(self):
59+
cm = CrontabManager(os.path.join(CRON_D, 'test_cron'))
60+
cm.read_crontab()
61+
self.assertTrue(cm.has_entry('test'))
62+
63+
def test_del_entry(self):
64+
cm = CrontabManager(os.path.join(CRON_D, 'test_cron'))
65+
cm.read_crontab()
66+
self.assertEquals(1, cm.del_entry('test'))
67+
68+
def test_find_boundaries(self):
69+
cm = CrontabManager(os.path.join(CRON_D, 'test_cron'))
70+
cm.read_crontab()
71+
self.assertEquals((1, 4), cm.find_boundaries('test'))
72+
73+
def test_add(self):
74+
'''
75+
Test that adding the same item is harmless.
76+
'''
77+
name = 'test'
78+
entry = '0 */2 * * * username /home/username/test.pl'
79+
cm = CrontabManager(os.path.join(CRON_D, 'test_cron'))
80+
cm.read_crontab()
81+
cm.add_entry(name, entry)
82+
crontab = ['', cm.prepend % name, entry, cm.append % name]
83+
self.assertEquals(cm.crontab, crontab)
84+
85+
86+
class TestBrokenCrontabManager(unittest.TestCase):
87+
88+
def test_find_test(self):
89+
cm = CrontabManager(os.path.join(CRON_D, 'broken_cron'))
90+
cm.read_crontab()
91+
self.assertRaises(RuntimeError, cm.find_boundaries, 'test')
92+
93+
def test_find_test2(self):
94+
cm = CrontabManager(os.path.join(CRON_D, 'broken_cron'))
95+
cm.read_crontab()
96+
self.assertRaises(RuntimeError, cm.find_boundaries, 'test2')

0 commit comments

Comments
 (0)