-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathZFSBackup.py
More file actions
2937 lines (2628 loc) · 118 KB
/
ZFSBackup.py
File metadata and controls
2937 lines (2628 loc) · 118 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from __future__ import print_function
import os, sys
import json
import subprocess
import time
import tempfile
import threading
from select import select
from io import BytesIO
import errno
import boto3
import botocore
import socket
import fcntl
from enum import Enum
if sys.version_info[0] == 2:
from pipes import quote as SHELL_QUOTE
elif sys.version_info[0] == 3:
from shlex import quote as SHELL_QUOTE
debug = True
verbose = False
def SetNonBlock(f):
fl = fcntl.fcntl(f.fileno(), fcntl.F_GETFL)
fcntl.fcntl(f.fileno(), fcntl.F_SETFL, fl | os.O_NONBLOCK)
def _find_snapshot_index(name, snapshots):
"""
Given a list of snapshots (that is, an ordered-by-creation-time
array of dictionaries), return the index. If it's not found,
raise KeyError.
"""
for indx, snapshot in enumerate(snapshots):
if snapshot["Name"] == name:
return indx
raise KeyError(name)
def _last_common_snapshot(source, target):
"""
Given a list of snapshots (which are dictionaries),
return the last common snapshot (also as a dictionary,
but a different one). The inputs are a list, sorted
by creation date.
The return value -- if any -- will include:
- Name: (str) the name of the snapshot
- CreationTime: (int) the creation time of the snapshot.
This is taken from the source.
Optional values:
- incremental: (bool) Whether or not this was an incremental
snapshot. This is always taken from target.
- parent: (str) If an incremental snapshot, then the previous
snapshot used to create it. This is always taken from target.
- ResumeToken: (str) If the snapshot in question was interrupted,
and can be resumed, this will be the value. This value must be
present and equal in both source and target, or else it will not
be in the return value.
"""
# We're going to turn the target list into a dictionary, first.
target_dict = dict((el["Name"], el) for el in target)
# Now we go through the source list, in reversed order, seeing
# if the source snapshot is in target.
for snap in reversed(source):
if snap["Name"] in target_dict:
t = target_dict[snap["Name"]]
# Great, we found it!
rv = {"Name" : snap["Name"], "CreationTime" : int(snap["CreationTime"]) }
rv["incremental"] = t.get("incremental", False)
if "parent" in t:
rv["parent"] = t["parent"]
if "ResumeToken" in snap and "ResumeToken" in t:
if t["ResumeToken"] == snap["ResumeToken"]:
rv["ResumeToken"] = snap['ResumeToken']
return rv
return None
def _merge_snapshots(list1, list2):
"""
Given a list of snapshots, return a list of
common snapshots (sorted by creation time).
The return list is simply an array of names.
N.B.: Snapshots are assumed to be the same if
they have the same name!
"""
rv = []
if list2:
dict2 = dict((el["Name"], True) for el in list2)
for snapname in [x["Name"] for x in list1]:
if snapname in dict2:
rv.append(snapname)
else:
pass;
return rv
def CHECK_OUTPUT(*args, **kwargs):
if debug:
print("CHECK_OUTPUT({}, {})".format(args, kwargs), file=sys.stderr)
return subprocess.check_output(*args, **kwargs)
def CALL(*args, **kwargs):
if debug:
print("CALL({}, {})".format(args, kwargs, file=sys.stderr))
return subprocess.call(*args, **kwargs)
def CHECK_CALL(*args, **kwargs):
if debug:
print("CHECK_CALL({}, {})".format(args, kwargs), file=sys.stderr)
return subprocess.check_call(*args, **kwargs)
def POPEN(*args, **kwargs):
if debug:
print("POPEN({}, {})".format(args, kwargs), file=sys.stderr)
return subprocess.Popen(*args, **kwargs)
def _get_snapshot_size_estimate(ds, toname, fromname=None, recursive=False):
"""
Get an estimate of the size of a snapshot. If fromname is given, it's
an incremental, and we start from that.
"""
command = ["/sbin/zfs", "send", "-nPv"]
if recursive:
command.append("-R")
if fromname:
command.extend(["-i", "{}@{}".format(ds, fromname)])
command.append("{}@{}".format(ds, toname))
try:
output = CHECK_OUTPUT(command, stderr=subprocess.STDOUT)
output = output.decode("utf-8").split("\n")
for line in output:
if line.startswith("size"):
(x, y) = line.split()
if x == "size":
return int(y)
except subprocess.CalledProcessError as e:
if verbose:
print("`{}` got exception {}".format(" ".join(command), str(e)), file=sys.stderr)
raise
return 0
def _get_snapshots(ds):
"""
Return a list of snapshots for the given dataset.
This only works for local ZFS pools, obviously.
It relies on /sbin/zfs sorting, rather than sorting itself.
"""
command = ["/sbin/zfs", "list", "-H", "-p", "-o", "name,creation,receive_resume_token",
"-r", "-d", "1", "-t", "snapshot", "-s", "creation",
ds]
if debug:
print("get_snapshots: {}".format(" ".join(command)), file=sys.stderr)
try:
output = CHECK_OUTPUT(command).decode('utf-8').split("\n")
except subprocess.CalledProcessError:
# We'll assume this is because there are no snapshots
return []
snapshots = []
for snapshot in output:
snapshot = snapshot.rstrip()
if not snapshot:
continue
if debug:
print("Output line: {}".format(snapshot), file=sys.stderr)
(name, ctime, resume_token) = snapshot.split("\t")
name = name.split('@')[1]
d = { "Name" : name, "CreationTime" : int(ctime) }
if resume_token != "-":
d["ResumeToken"] = resume_token
snapshots.append(d)
return snapshots
class ChunkRestorePriority(Enum):
"""
When restoring a chunk from glacier, which priority to
use.
"""
High = "Expedited"
Medium = "Standard"
Low = "Bulk"
class ChunkStatus(Enum):
"""
For classes which use chunks (e.g., ZFSBackupDirectory),
this status indicates whether a chunk is ready or not to use.
Offline and Transferring are S3-specific, and indicate that the
chunk is in glacier, and transferring from glacier, respectively.
"""
Available = "Available"
Missing = "Missing"
Error = "Error"
Offline = "Offline"
Transferring = "Transferring"
class ZFSBackupError(ValueError):
def __init__(self, message):
self.message = message
super(ZFSBackupError, self).__init__(message)
class ZFSBackupNotImplementedError(ZFSBackupError):
def __init__(self, message):
super(ZFSBackupNotImplementedError, self).__init__(message)
class ZFSBackupMissingFullBackupError(ZFSBackupError):
def __init__(self):
super(ZFSBackupMissingFullBackupError, self).__init__("No full backup available")
class ZFSBackupSnapshotNotFoundError(ZFSBackupError):
def __init__(self, snapname):
self.snapshot_name = snapname
super(ZFSBackupSnapshotNotFoundError, self).__("Specified snapshot {} does not exist".format(snapname))
class ZFSBackupChunkError(ZFSBackupError):
"""
Base class for exceptions related to chunks.
"""
def __init__(self, snapname, chunkname, msg=None):
self.snapshot_name = snapname
self.chunk_name = chunkname
self.message = msg or "Error with chunk file {} for snapshot {}".format(chunkname, snapshot)
super(ZFSBackupChunkError, self).__init__(self.message)
def __str__(self):
return "<{} snapshot={}, chunkname={}, message={}>".format(self.__class__.__name__,
self.snapshot_name,
self.chunk_name,
self.message)
def __repr__(self):
return "{}(snapname={}, chunkname={}, msg={})".format(self.__class__.__name__,
self.snapshot_name,
self.chunk_name,
self.message)
class ZFSBackupChunkMissingError(ZFSBackupChunkError):
"""
Raised when the specified chunk is missing.
"""
def __init__(self, snapname, chunkname):
super(ZFSBackupChunkMissingError, self).__init__(snapname, chunkname,
"Chunk file {} is missing for snapshot {}".format(chunkname, snapname))
class ZFSBackupChunkOfflineError(ZFSBackupChunkError):
"""
Currently only used with S3: this indicates that the storage class won't allow the
chunk data to be accessed. I.e., Glacier storage, with no restore pending.
"""
def __init__(self, snapname, chunkname):
super(ZFSBackupChunkOfflineError, self).__init__(snapname,
chunkname,
"Chunk file {} for snapshot {} is offline".format(chunkname, snapname))
return
class ZFSBackupChunkPendingError(ZFSBackupChunkError):
"""
Similar to the above, but this is used when the restore has not completed yet.
This means, "Try again later," really.
"""
def __init__(self, snapname, chunkname):
super(ZFSBackupChunkPendingError, self).__init__(snapname, chunkname,
"Chunk file {} for snapshot {} has not completed transferring".format(chunkname, snapname))
class ZFSHelper(object):
"""
This is the base class for running subprocesses as part of the
backup or restore process. It works with the ZFSBackup
object, for coordiantion, error reporting, and termination.
A ZFSHelper may be implemented using a subprocess (using Popen,
since it is expected to be part of the pipeline), or via threads.
It must implement start, stop, and wait methods. (wait must be
invokable multiple times; if the process has finished, it will
return or raise an exception if it finished with error. Being
forcibly stopped does not count as an error. stop will do its
best to wait for it take effect, but wait() is the only method to
guarantee it.)
After calling start, the stdin, stdout, and stderr objects must be
accessible as file-like objects. They may be None.
It must inform the ZFSBackup object of exit, including an exception
if that occurred, via zfsbackup.HelperFinished(self, exc). (Setting
exc to None if there was no error.)
"""
def __init__(self, *args, **kwargs):
name = kwargs.pop("name", None)
handler = kwargs.pop("handler", None)
stdin = kwargs.pop("stdin", None)
stdout = kwargs.pop("stdout", None)
stderr = kwargs.pop("stderr", None)
# Quick sanity checks
if handler is None:
raise ValueError("handler must be set")
self._handler = handler
self._name = name
self._stdin = stdin
self._stdout = stdout
self._stderr = stderr
# Set up various control-related instance variables
self._thread = None
self._started = threading.Event()
self._exited = threading.Event()
self._exception = None
self._stop = False
super(ZFSHelper, self).__init__(*args, **kwargs)
return
@property
def name(self):
return self._name
@property
def handler(self):
return self._handler
@property
def stdin(self):
return self._stdin
@stdin.setter
def stdin(self, stream):
self._stdin = stream
@property
def stdout(self):
return self._stdout
@stdout.setter
def stdout(self, stream):
self._stdout = stream
@property
def stderr(self):
return self._stderr
@stdout.setter
def stderr(self, stream):
self._stderr = stream
def _run(self):
raise NotImplementedError("Base class does not implement run method")
def start(self):
"""
Common code to start the thread; subclasses need to implement _run, and
do any setup in their implementation of start().
"""
if self.stdin is None and self.stdout is None:
raise ValueError("{}.start: At least one of stdin and stdout must be defined".format(self.name))
self._started.clear()
self._exited.clear()
self._thread = threading.Thread(target=self._run)
self._thread.daemon = True
self._thread.start()
self._started.wait()
self._started.clear()
return
def stop(self):
raise NotImplementedError("Base class does not implement stop method")
def wait(self):
raise NotImplementedError("Base class does not implement wait method")
class ZFSHelperThread(ZFSHelper):
"""
Implement a ZSFSProcess as a thread -- instead of fork/execing, we'll
just create a thread, which will do the work. All of the constraints and
requirements mentioned in the base class apply.
One additional requirement: since threads cannot be sent a signal,
the processing code MUST check for a stop request within a reasonable time
frame. This means using select for reading and writing, with a small timeout
to check for the stop signal.
The processing code can be passed in as target, or a subclass can implement
its own _process method.
"""
def __init__(self, *args, **kwargs):
target = kwargs.pop("target", None)
super(ZFSHelperThread, self).__init__(*args, **kwargs)
if target and not callable(target):
raise ValueError("Thread target must be callable")
self._target = target
self._to_close = []
def _process(self, buffer):
"""
The base class version of _process simply returns buffer.
Subclasses should override this.
"""
return
def _run(self):
"""
The init method will have errore out if both stdin and stdout
are not set. So we can create pipes here for them. When we
create a pipe, we need to set both ends to non-blocking and
close-on-exec; we also need to keep track of them so we can
close the appropriate end (write-side for stdin, read-side
for stdout) when we are finished.
If stderr is None, we'll set it to /dev/null, and add it to
the list for closing.
"""
if self.stdin is None:
# Okay, we need to make a pipe for this. We want to set
# self.stdin to an fdopen of the write side, and set the
# read side to be closed when we're done.
read_side, write_side = os.pipe()
for f in [read_side, write_side]:
fl = fcntl.fcntl(f, fcntl.F_GETFL)
fcntl.fcntl(f, fcntl.F_SETFL, fl | fcntl.FD_CLOEXEC)
self.stdin = os.fdopen(write_side, "wb")
read_from = os.fdopen(read_side, "rb")
self._to_close.append(read_from)
else:
read_from = self.stdin
if self.stdout is None:
# We make a pipe for this one. We want to set
# self.stdout to an fdopen of the read side, and set the
# write side to be closed when we're done.
read_side, write_side = os.pipe()
for f in [read_side, write_side]:
fl = fcntl.fcntl(f, fcntl.F_GETFL)
fcntl.fcntl(f, fcntl.F_SETFL, fl | fcntl.FD_CLOEXEC)
self.stdout = os.fdopen(read_side, "rb")
write_to = os.fdopen(write_side, "wb")
self._to_close.append(write_to)
else:
write_to = self.stdout
if self.stderr is None:
self.stderr = open("/dev/null", "wb")
self._to_close.append(self.stderr)
# Inform the main thread we've started.
self._started.set()
mByte = 1024 * 1024
# We want to set read_from and write_to to be non-blocking
for f in [read_from, write_to]:
SetNonBlock(f)
try:
def doWrite(buffer):
"""
Write out to self.stdout, making sure to write out the whole
buffer, and stop when required. In its own nested function
simply to make the main loop easier to read.
"""
nwritten = 0
while nwritten < len(buffer):
_, w, _ = select([], [write_to], [], 0.1)
if self._stop:
return
if w:
# We use os.write() because it will tell us how many
# bytes were written, which is important if there's
# a full pipe.
try:
nwritten += os.write(write_to.fileno(), buffer[nwritten:])
except OSError:
print("Got OSError", file=sys.stderr)
if self._stop:
return
return
while True:
r, _, _ = select([read_from], [], [], 0.1)
if self._stop:
break
if r:
# Great, we have input ready. Or eof.
b = read_from.read(mByte)
if b:
temp_buf = (self._target if callable(self._target) else self._process)(b)
# Great, we have data we want to write out
doWrite(temp_buf)
else:
# EOF, so let's close the output
break
if self._stop:
break
self._exception = None
except BaseException as e:
# Deliberately catching all exceptions
self._exception = None if self._stop else e
self.handler.HelperFinished(self, exc=self._exception)
# Now close the files in _to_close:
for f in self._to_close:
try:
if type(f) == int:
os.close(f)
else:
f.close()
except OSError:
pass
self.stdin = None
self.stdout = None
self.stderr = None
self._to_close = []
self._exited.set()
def stop(self):
self._stop = True
self._exited.wait()
def wait(self):
if self._exited.isSet() is False:
self._exited.wait()
if self._exception:
raise self._exception
class ZFSHelperCommand(ZFSHelper):
"""
Implement a ZFSHelper as a command -- that is, it will use a subprocess,
specifically Popen. The process is run in a separate thread. The command
is not started until the start method is invoked. For the initializer,
if any of stdin, stdout, stderr is None, it will be replaced by subprocess.PIPE.
"""
def __init__(self, *args, **kwargs):
command = kwargs.pop("command", [])
super(ZFSHelperCommand, self).__init__(*args, **kwargs)
# Copy it
self._command = command[:]
@property
def command(self):
return self._command
def _run(self):
"""
Invoked as part of the thread handler.
"""
try:
# Should I actually just use CHECK_CALL instead?
# I'd have to set up the pipes for that; but that
# could also be done in shared code at that point.
self._proc = POPEN(self.command,
bufsize=1024*1024,
close_fds=True,
stdin=self.stdin or subprocess.PIPE,
stdout=self.stdout or subprocess.PIPE,
stderr=self.stderr or subprocess.PIPE)
self.stdin = self.stdin or self._proc.stdin
self.stdout = self.stdout or self._proc.stdout
self.stderr = self.stderr or self._proc.stderr
self._started.set()
self._proc.wait()
if self._proc.returncode != 0:
raise subprocess.CalledProcessError(self._proc.returncode, " ".join(self.command))
except (OSError, ValueError, subprocess.CalledProcessError) as e:
self._started.set()
self._exception = None if self._stop else e
self.handler.HelperFinished(self, exc=self._exception)
finally:
self._exited.set()
def start(self):
"""
Starts the command im a separate thread.
We do setup here, and let the base class go from there.
"""
self._proc = None
super(ZFSHelperCommand, self).start()
return
def stop(self):
if self._proc:
self._stop = True
self._proc.terminate()
def wait(self):
if self._thread:
self._thread.join()
self._thread = None
if self._proc:
self._proc.wait()
if self._exception:
raise self._exception
class ZFSBackupFilterBase(object):
"""
Base class for ZFS backup filters.
Filters have several properties, and start_backup() and start_restore()
methods. The start_* methods take a source, which should be a pipe.
In general, the filters should use a subprocess or thread, unless
they are the terminus of the pipeline. (Doing otherwise risks deadlock.)
The transformative property indicates that the filter transforms
the data as it processes it. Some filters don't -- the counter
filter, for example. This is important for some ZFSBackups subclasses,
such as ZFSBackupSSH, which need to apply transformative filters on
the other end as part of the backup and restore. By default, it's
false; subclasses can change it, and the object can alter it.
This base class exists only so it can be incorporated into subclasses.
"""
def __init__(self, *args, **kwargs):
name = kwargs.pop("name", "Null Filter")
self.transformative = kwargs.pop("transformative", False)
super(ZFSBackupFilterBase, self).__init__(*args, **kwargs)
self.mode = None
self._name = name
@property
def name(self):
return self._name
@name.setter
def name(self, n):
self._name = n
return
@property
def transformative(self):
return self._transformative
@transformative.setter
def transformative(self, t):
self._transformative = t
return
@property
def mode(self):
return self._mode
@mode.setter
def mode(self, mode):
if not mode in ("backup", "restore", None):
raise ValueError("Invalid mode {}".format(mode))
self._mode = mode
return
@property
def backup_command(self):
return []
@property
def restore_command(self):
return []
# Actually this needs to go in the derived class
@property
def command(self):
if self.mode == "backup":
return self.backup_command
elif self.mode == "restore":
return self.restore_command
elif self.mode == None:
return None
else:
raise ValueError("Filter mode is not set")
"""
Backups are done by creating a pipline that goes:
zfs send | filter | filter | ZFSBackup<class>
Restores are done the opposite way:
ZFSBackup<class> | filter | filter | zfs recv
So the start_backup and start_restore methods set
self.stdin and self.stdout, respectively.
"""
def start_backup(self, source):
self.mode = "backup"
self.stdin = source
self.stdout = None
self.start()
return self.stdout
def start_restore(self, source):
self.mode = "restore"
self.stdout = source
self.start()
return self.stdin
def finish(self):
"""
Any cleanup work required for the filter.
This is to be called _after_ the filter has
completed.
"""
self.mode = None
self.stdin = None
self.stdout = None
self.stderr = None
class ZFSBackupFilterCommand(ZFSBackupFilterBase, ZFSHelperCommand):
"""
Implement a filter as a command -- that is, it creates a thread,
and runs a subprocess in that thread. Note the multiple inheritance.
"""
def __init__(self, *args, **kwargs):
self.backup_command = kwargs.pop("backup_command", ["/bin/cat"])
self.restore_command = kwargs.pop("restore_command", None)
super(ZFSBackupFilterCommand, self).__init__(*args, **kwargs)
@property
def backup_command(self):
return self._backup_command
@backup_command.setter
def backup_command(self, cmd):
self._backup_command = cmd[:]
return
@property
def restore_command(self):
return self._restore_command if self._restore_command else self.backup_command
@restore_command.setter
def restore_command(self, cmd):
self._restore_command = cmd[:] if cmd else None
return
@property
def command(self):
if self.mode == "backup":
return self.backup_command
elif self.mode == "restore":
return self.restore_command
elif self.mode == None:
return None
else:
raise ValueError("Unknown mode")
class ZFSBackupFilterThread(ZFSBackupFilterBase, ZFSHelperThread):
"""
Implement a filter as a thread -- that is, it creates a thread,
rather than running a subprocess. Note the multiple inheritance.
"""
def __init__(self, *args, **kwargs):
super(ZFSBackupFilterThread, self).__init__(*args, **kwargs)
@property
def backup_command(self):
return None
@property
def restore_command(self):
return None
@property
def command(self):
return None
def finish(self):
self.wait()
if self._thread:
self._thread.join()
self._thread = None
return
class ZFSBackupFilterCounter(ZFSBackupFilterThread):
"""
A simple thread filter; all this does is count the bytes
that come in to be processed.
"""
def __init__(self, *args, **kwargs):
super(ZFSBackupFilterCounter, self).__init__(*args, **kwargs)
self._count = 0
@property
def count(self):
self.wait()
return self._count
def _process(self, buffer):
self._count += len(buffer)
return buffer
class ZFSBackupFilterEncrypted(ZFSBackupFilterCommand):
"""
A filter to encrypt and decrypt a stream.
The openssl command can do a lot more than we're asking
of it here.
We require a password file (for now, anyway).
"""
def __init__(self, *args, **kwargs):
cipher = kwargs.pop("cipher", "aes-256-cbc")
password_file = kwargs.pop("password_file", None)
def ValidateCipher(cipher):
if cipher is None:
return False
try:
ciphers = CHECK_OUTPUT(["/usr/bin/openssl", "list-cipher-commands"]).split()
return cipher in ciphers
except subprocess.CalledProcessError:
return False
if password_file is None:
raise ValueError("Password file must be set for encryption filter")
if not ValidateCipher(cipher):
raise ValueError("Invalid cipher {}".format(cipher))
self.cipher = cipher
self.password_file = password_file
kwargs["backup_command"] = ["/usr/bin/openssl",
"enc", "-{}".format(cipher),
"-e",
"-salt",
"-pass", "file:{}".format(password_file)]
kwargs["restore_command"] = ["/usr/bin/openssl",
"enc", "-{}".format(cipher),
"-d",
"-salt",
"-pass", "file:{}".format(password_file)]
kwargs["name"] = '{} encryption filter'.format(self.cipher)
kwargs["transformative"] = True
super(ZFSBackupFilterEncrypted, self).__init__(*args, **kwargs)
class ZFSBackupFilterCompressed(ZFSBackupFilterCommand):
"""
A sample command filter, for compressing.
One optional parameter: pigz.
"""
def __init__(self, *args, **kwargs):
use_pigz = kwargs.pop("pigz", False)
if use_pigz:
kwargs["backup_command"] = ["/usr/local/bin/pigz"]
kwargs["restore_command"]= ["/usr/local/bin/unpigz"]
kwargs["name"] = 'pigz compressor filter'
else:
kwargs["backup_command"] = ["/usr/bin/gzip"]
kwargs["restore_command"] = ["/usr/bin/gunzip"]
kwargs["name"] = 'gzip compressor filter'
kwargs["transformative"] = True
super(ZFSBackupFilterCompressed, self).__init__(*args, **kwargs)
self.pigz = use_pigz
@property
def name(self):
return "pigz compress filter" if self.pigz else "gzip compress filter"
class ZFSBackup(object):
"""
Base class for doing ZFS backups.
Backups are done using snapshots -- zfs send is used -- not using files.
Every backup must have a source and a target, although subclasses
can change how they are interpreted. Backups can be recursive.
One ZFSBackup object should be created for each <source, target>, but
not for each snapshot. That is, you would use
backup = ZFSBackup("/tank/Media", "/backup/tank/Media", recursive=True)
<do backup>
backup = ZFSBackup("/tank/Documents", "/backup/tank/Documents")
<do backup>
instead of creating a ZFSBackup object for each snapshot.
In general, backups and restores are simply inverses of each other.
In order to perform backups, it is necesary to get a list of snapshots
on both the source and target. An empty list on the target will mean
a full backup is being done; an empty list on the source is a failure.
Backups can have filters applied to them. This is not used in the base
class (since it only implements ZFS->ZFS), but subclasses may wish to
add filters for compression, encryption, or accounting. Some sample
filter classes are provided.
Some notes on how replication works:
* source is the full path to the dataset. *Or* it can be the entire pool.
* target is the dataset to which the replication should go.
* If source is the full pool, then the target will have all of the files
at the root of the source pool.
* If source is NOT the full pool, then the target will end up with only the
dataset(s) being replicated -- but any intervening datasets will be created.
What this means:
* tank -> backup/tank means we end up with backup/tank as a copy of tank.
* tank/usr/home > backup/home means we end up with bakup/home/usr/home.
* When getting snapshots for the destination, we need to add the path for
source, *minus* the pool name.
* UNLESS we are replicating the full pool.
What *that* means:
* tank -> backup/tank means getting snapshots from backup/tank
* tanks/usr/home -> backup/home means getting snapshots from backup/home/usr/home
"""
def __init__(self, source, target, recursive=False):
"""
Parameters:
source - (str) a ZFS pool or dataset to be backed up.
target - (str) a ZFS dataset to be backed up.
recursive - (bool) Indicate whether the backup is to be recursive or not.
The only thing the base class does is run some validation tests
on the source and target.
"""
self.target = target
self.source = source
self.recursive = recursive
self._source_snapshots = None
self._target_snapshots = None
self._filters = []
self.validate()
self._helper_status = []
self._helper_lock = threading.Lock()
self._helper_done = threading.Event()
@property
def target(self):
return self._dest
@target.setter
def target(self, t):
self._dest = t
@property
def source(self):
return self._source
@source.setter
def source(self, s):
self._source = s
@property
def filters(self):
return self._filters
@property
def recursive(self):
return self._recursive
@recursive.setter
def recursive(self, b):
self._recursive = b
def delete(self, *args, **kwargs):
"""
Delete a snapshot, or set of snapshots (in *args).
This isn't implemented in the base class, since snapshots
can be automatically deleted as part of the replication process.
Classes which do backups some other method (e.g., ZFSBackupDirectory)
may need this.
In general, this method is to be used to clean up old snapshots;
it'll need to handle dependencies (that is, ensure that any incrementals
still have their parents, or that they're all deleted).
"""
raise ZFSBackupNotImplementedError("delete not implemented in class {}".format(self.__class__.__name__))
def HelperFinished(self, which, exc=None):
# Append the helper object / exception pair to the status list.
self._helper_lock.acquire()
self._helper_status.append(( which, exc))
self._helper_lock.release()
# Notify the main thread that we've updated it.
self._helper_done.set()
def AddFilter(self, filter):
"""
Add a filter. The filter is set up during the backup and
restore methods. The filter needs to be an instance of
ZFSFilter -- at least, it needs to have the start_backup and
start_restore methods.
"""
if not callable(getattr(filter, "start_backup", None)) and \
not callable(getattr(filter, "start_restore", None)):
raise ValueError("Incorrect type passed for filter")
self._filters.append(filter)
def _finish_filters(self, reason="backup"):
# Common method to wait for all filters to finish and clean up
for f in self.filters if reason == "backup" else reversed(self.filters):
f.finish()
def _filter_backup(self, source, error=sys.stderr):
# Private method, to stitch the backup filters together.
if source is None:
raise ValueError("{}._filter_backup: source is None".format(self))
input = source
for f in self.filters:
f.stderr = error
if debug:
print("Starting filter {} ({}), input = {}".format(f.name, f.backup_command, input), file=sys.stderr)
input = f.start_backup(input)
if input is None:
raise ValueError("Filter {} returned None for stream".format(f.name))
return input
def _filter_restore(self, destination, error=None, use_filters=None):
# Private method, to stitch the restore filters together.
# Note that they are in reverse order.
output = destination
for f in reversed(use_filters if use_filters is not None else self.filters):
f.error_output = error
if debug:
print("Starting restore filter {} ({})".format(f.name, f.restore_command), file=sys.stderr)
output = f.start_restore(output)
if debug:
print("\tFilter output = {}".format(output), file=sys.stderr)
return output
def __repr__(self):
return "{}(source={}, target={})".format(self.__class__.__name__, self.source, self.target)
@property
def source_snapshots(self):
"""
Return a list of snapshots on the source. The return value is
an array of dictionaries; the dictionaries have, at minimum, two
elements:
Name -- (str) Snapshot name. The part that goes after the '@'
CreationTime -- (int) Time (in unix epoch seconds) the snapshot was created.
Even if the recursive is true, this _only_ lists the snapshots for the
source (recursive requires that the same snapshot exist on the descendents,
or it doesn't get backed up).
We cache this so we don't have to keep doing a list.
"""
if not self._source_snapshots:
self._source_snapshots = _get_snapshots(self.source)
return self._source_snapshots
@property
def target_snapshots(self):
"""
Return a list of snapshots on the target. The return value is
an array of dictionaries; the dictionaries have, at minimum, two
elements:
Name -- (str) Snapshot name. The part that goes after the '@'
CreationTime -- (int) Time (in unix epoch seconds) the snapshot was created.
Even if the recursive is true, this _only_ lists the snapshots for the