Skip to content

Commit 3ce7df9

Browse files
add s3-mp-copy.py
1 parent 36afe6f commit 3ce7df9

File tree

1 file changed

+150
-0
lines changed

1 file changed

+150
-0
lines changed

s3-mp-copy.py

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
#!/usr/bin/env python
2+
import argparse
3+
from cStringIO import StringIO
4+
import logging
5+
from math import ceil
6+
from multiprocessing import Pool
7+
import sys
8+
import time
9+
import urlparse
10+
11+
import boto
12+
13+
parser = argparse.ArgumentParser(description="Copy large files within S3",
14+
prog="s3-mp-copy")
15+
parser.add_argument("src", help="The S3 source object")
16+
parser.add_argument("dest", help="The S3 destination object")
17+
parser.add_argument("-np", "--num-processes", help="Number of processors to use",
18+
type=int, default=2)
19+
parser.add_argument("-f", "--force", help="Overwrite an existing S3 key",
20+
action="store_true")
21+
parser.add_argument("-s", "--split", help="Split size, in Mb", type=int, default=50)
22+
parser.add_argument("-rrs", "--reduced-redundancy", help="Use reduced redundancy storage. Default is standard.",
23+
default=False, action="store_true")
24+
parser.add_argument("-v", "--verbose", help="Be more verbose", default=False, action="store_true")
25+
26+
logger = logging.getLogger("s3-mp-copy")
27+
28+
def do_part_copy(args):
29+
"""
30+
Copy a part of a MultiPartUpload
31+
32+
Copy a single chunk between S3 objects. Since we can't pickle
33+
S3Connection or MultiPartUpload objects, we have to reconnect and lookup
34+
the MPU object with each part upload.
35+
36+
:type args: tuple of (string, string, string, int, int, int, int)
37+
:param args: The actual arguments of this method. Due to lameness of
38+
multiprocessing, we have to extract these outside of the
39+
function definition.
40+
41+
The arguments are: S3 src bucket name, S3 key name, S3 dest
42+
bucket_name, MultiPartUpload id, the part number,
43+
part start position, part stop position
44+
"""
45+
# Multiprocessing args lameness
46+
src_bucket_name, src_key_name, dest_bucket_name, mpu_id, part_num, start_pos, end_pos = args
47+
logger.debug("do_part_copy got args: %s" % (args,))
48+
49+
# Connect to S3, get the MultiPartUpload
50+
s3 = boto.connect_s3()
51+
dest_bucket = s3.lookup(dest_bucket_name)
52+
mpu = None
53+
for mp in dest_bucket.list_multipart_uploads():
54+
if mp.id == mpu_id:
55+
mpu = mp
56+
break
57+
if mpu is None:
58+
raise Exception("Could not find MultiPartUpload %s" % mpu_id)
59+
60+
# make sure we have a valid key
61+
src_bucket = s3.lookup( src_bucket_name )
62+
src_key = src_bucket.get_key( src_key_name )
63+
# Do the copy
64+
t1 = time.time()
65+
mpu.copy_part_from_key(src_bucket_name, src_key_name, part_num, start_pos, end_pos)
66+
67+
# Print some timings
68+
t2 = time.time() - t1
69+
s = (end_pos - start_pos)/1024./1024.
70+
logger.info("Copied part %s (%0.2fM) in %0.2fs at %0.2fMbps" % (part_num, s, t2, s/t2))
71+
72+
def validate_url( url ):
73+
split = urlparse.urlsplit( url )
74+
if split.scheme != "s3":
75+
raise ValueError("'%s' is not an S3 url" % url)
76+
return split.netloc, split.path[1:]
77+
78+
def main(src, dest, num_processes=2, split=50, force=False, reduced_redundancy=False, verbose=False):
79+
dest_bucket_name, dest_key_name = validate_url( dest )
80+
src_bucket_name, src_key_name = validate_url( src )
81+
82+
s3 = boto.connect_s3()
83+
dest_bucket = s3.lookup( dest_bucket_name )
84+
dest_key = dest_bucket.get_key( dest_key_name )
85+
86+
# See if we're overwriting an existing key
87+
if dest_key is not None:
88+
if not force:
89+
raise ValueError("'%s' already exists. Specify -f to overwrite it" % dest)
90+
91+
# Determine the total size and calculate byte ranges
92+
src_bucket = s3.lookup( src_bucket_name )
93+
src_key = src_bucket.get_key( src_key_name )
94+
size = src_key.size
95+
part_size = max(5*1024*1024, 1024*1024*split)
96+
num_parts = int(ceil(size / part_size))
97+
logging.info("Source object is %d bytes splitting into %d parts of size %d " % (size, num_parts, part_size) )
98+
99+
# If file is less than 5G, copy it directly
100+
if size < 5*1024*1024*1024:
101+
t1 = time.time()
102+
src_key.copy( dest_bucket, dest_key, reduced_redundancy=reduced_redundancy )
103+
t2 = time.time() - t1
104+
s = size/1024./1024.
105+
logger.info("Finished copying %0.2fM in %0.2fs (%0.2fMbps)" % (s, t2, s/t2))
106+
return
107+
108+
# Create the multi-part upload object
109+
mpu = dest_bucket.initiate_multipart_upload( dest_key_name, reduced_redundancy=reduced_redundancy)
110+
logger.info("Initialized copy: %s" % mpu.id)
111+
112+
# Generate arguments for invocations of do_part_copy
113+
def gen_args(num_parts):
114+
cur_pos = 0
115+
for i in range(num_parts):
116+
part_start = cur_pos
117+
cur_pos = cur_pos + part_size
118+
part_end = min(cur_pos - 1, size)
119+
part_num = i + 1
120+
yield (src_bucket_name, src_key_name, dest_bucket_name, mpu.id, part_num, part_start, part_end)
121+
122+
# Do the thing
123+
try:
124+
# Create a pool of workers
125+
pool = Pool(processes=num_processes)
126+
t1 = time.time()
127+
pool.map_async(do_part_copy, gen_args(num_parts)).get(9999999)
128+
# Print out some timings
129+
t2 = time.time() - t1
130+
s = size/1024./1024.
131+
# Finalize
132+
mpu.complete_upload()
133+
logger.info("Finished copying %0.2fM in %0.2fs (%0.2fMbps)" % (s, t2, s/t2))
134+
except KeyboardInterrupt:
135+
logger.warn("Received KeyboardInterrupt, canceling copy")
136+
pool.terminate()
137+
mpu.cancel_upload()
138+
except Exception, err:
139+
logger.error("Encountered an error, canceling copy")
140+
logger.error(err)
141+
mpu.cancel_upload()
142+
143+
if __name__ == "__main__":
144+
logging.basicConfig(level=logging.INFO)
145+
args = parser.parse_args()
146+
arg_dict = vars(args)
147+
if arg_dict['verbose'] == True:
148+
logger.setLevel(logging.DEBUG)
149+
logger.debug("CLI args: %s" % args)
150+
main(**arg_dict)

0 commit comments

Comments
 (0)