-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathcopy-files-in-parallel
executable file
·96 lines (77 loc) · 3.61 KB
/
copy-files-in-parallel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
#!/usr/bin/env python
import argparse
import os
import subprocess
import sys
import uuid
parser = argparse.ArgumentParser(description='Copy files in parallel.')
parser.add_argument('source', help='source path')
parser.add_argument('destination', help='destination path')
parser.add_argument('-j', default=8, type=int, help='number of threads to use')
parser.add_argument('-c', default=1000, type=int, help='number of files in one chunk')
parser.add_argument('--resume', default=False, help='resume previous job (skip finding files)')
parser.add_argument('--dry', action='store_true', help='perform a trial run with no changes made')
parser.add_argument('--lfs', action='store_true', help='use lustre utilities')
parser.add_argument('--arcfour', action='store_true', help='use arcfour as compression algorithm')
parser.add_argument('--tmp', default='/tmp/', help='path to store temporary data')
args = parser.parse_args()
# check if GNU parallel is installed
if not any(os.access(os.path.join(path, 'parallel'), os.X_OK) for path in os.environ["PATH"].split(os.pathsep)):
raise Exception('GNU parallel is not installed.')
# remove trailing slashes
source = args.source.rstrip('/')
destination = args.destination.rstrip('/')
# prepare find exe
if args.lfs:
find = 'lfs find'
else:
find = 'find'
if args.resume:
# prepare files/directories
tmp_dir = os.path.join(args.resume)
if not os.path.isdir(tmp_dir):
sys.exit('Error: Can not find jobs tmp directory.')
files_file = os.path.join(tmp_dir, 'files')
jobs_file = os.path.join(tmp_dir, 'jobs')
else:
# prepare files/directories
tmp_dir = os.path.join(args.tmp, 'copy-files-in-parallel', str(uuid.uuid4()))
os.makedirs(tmp_dir)
files_file = os.path.join(tmp_dir, 'files')
jobs_file = os.path.join(tmp_dir, 'jobs')
# find files
if ':' in source:
# Hacky, but it works. If source is a remote machine (so it's
# given with user@machine:/path/to/dir) we just split it at the ":"
# This way we can use both parts of the string with the remote find
# command.
source_um, source_path = source.split(':')
subprocess.call('ssh %s "cd %s; %s . -type f" > %s' % (source_um, source_path, find, files_file), shell=True)
else:
# Source is a local directory.
subprocess.call('%s . -type f > %s' % (find, files_file), shell=True, cwd=source)
# split the files file in chunks of 1000
chunk_file = os.path.join(tmp_dir, 'chunk')
subprocess.call('split -l %i -d -a 8 %s %s' % (args.c, files_file, chunk_file), shell=True)
# create a file containing all the copy jobs to perform
jobs_file_handler = open(jobs_file, 'w')
for filename in sorted(os.listdir(tmp_dir)):
if filename.startswith('chunk'):
chunk_file = os.path.join(tmp_dir, filename)
chunk_num = filename.split('chunk')[1]
log_file = os.path.join(tmp_dir, 'log' + chunk_num)
job = 'rsync -aH --no-l'
if args.arcfour:
job += ' -e \'ssh -c arcfour\''
job += ' --files-from=%s --log-file=%s %s %s\n' % (chunk_file, log_file, source, destination)
jobs_file_handler.write(job)
jobs_file_handler.close()
try:
n_files = subprocess.check_output('wc -l %s' % files_file, shell=True).split()[0]
n_jobs = subprocess.check_output('wc -l %s' % jobs_file, shell=True).split()[0]
print 'Copying %s files using %s jobs!' % (n_files, n_jobs)
print 'tmp_dir is', tmp_dir
except AttributeError:
pass
# run parallel with the jobs file
subprocess.call('cat %s | parallel -P%i --eta' % (jobs_file, args.j), shell=True)