Skip to content

Commit

Permalink
Sync 2 (#131)
Browse files Browse the repository at this point in the history
* Delete duplicated code

* fix copy encode

* Add upload -s

* fix test

* fix README
  • Loading branch information
lewzylu authored May 7, 2018
1 parent 002840f commit 86e319d
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 64 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ coscmd upload -r <localpath> <cospath> //命令格式
coscmd upload -r /home/aaa/ bbb/aaa //操作示例
coscmd upload -r /home/aaa/ bbb/ //操作示例
coscmd upload -r /home/aaa/ / //上传到bucket根目录
coscmd upload -rs /home/aaa/ /home/aaa //同步上传,跳过md5相同的文件
```

请将 "<>" 中的参数替换为您需要上传的本地文件路径(localpath),以及 COS 上存储的路径(cospath)。
Expand All @@ -163,6 +164,7 @@ coscmd upload -r /home/aaa/ / //上传到bucket根目录
* COSCMD 支持大文件断点上传功能。当分片上传大文件失败时,重新上传该文件只会上传失败的分块,而不会从头开始(请保证重新上传的文件的目录以及内容和上传的目录保持一致)。
* COSCMD 分块上传时会对每一块进行 MD5 校验。
* COSMCD 上传默认会携带 `x-cos-meta-md5` 的头部,值为该文件的 `md5`
* 使用-s参数可以使用同步上传,跳过上传md5一致的文件(cos上的原文件必须是由1.8.3.2之后的COSCMD上传的,默认带有x-cos-meta-md5的header)
* 使用-H参数设置HTTP header时,请务必保证格式为json,这里是个例子:`coscmd upload -H '{"Cache-Control":"max-age=31536000","Content-Language":"zh-CN"}' <localpath> <cospath>`

### 下载文件或文件夹
Expand Down
102 changes: 55 additions & 47 deletions coscmd/cos_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,6 @@ def query_yes_no(question, default="no"):
sys.stdout.write("Please respond with 'yes' or 'no' "
"(or 'y' or 'n').\n")

def get_file_md5(local_path):
md5_value = md5()
with open(local_path, "rb") as f:
while True:
data = f.read(2048)
if not data:
break
md5_value.update(data)
return md5_value.hexdigest()

def get_file_md5(local_path):
md5_value = md5()
Expand Down Expand Up @@ -268,29 +259,38 @@ def upload_folder(self, local_path, cos_path, _http_headers='', **kwargs):
return ret_code

def upload_file(self, local_path, cos_path, _http_headers='{}', **kwargs):

_md5 = ""
_http_header = yaml.safe_load(_http_headers)

def check_file_md5(_local_path, _cos_path):
url = self._conf.uri(path=_cos_path)
rt = self._session.head(url=url, auth=CosS3Auth(self._conf._secret_id, self._conf._secret_key), stream=True)
if rt.status_code != 200:
return False
tmp = os.stat(_local_path)
if tmp.st_size != int(rt.headers['Content-Length']):
return False
else:
if 'x-cos-meta-md5' not in rt.headers or get_file_md5(_local_path) != rt.headers['x-cos-meta-md5']:
return False
else:
return True

def single_upload():

if len(local_path) == 0:
data = ""
else:
with open(local_path, 'rb') as File:
data = File.read()
url = self._conf.uri(path=cos_path)
url = self._conf.uri(path=urllib.quote(to_printable_str(cos_path)))
for j in range(self._retry):
try:
http_header = _http_header
http_header['x-cos-meta-md5'] = md5(data).hexdigest()
http_header['x-cos-meta-md5'] = _md5
rt = self._session.put(url=url,
auth=CosS3Auth(self._conf._secret_id, self._conf._secret_key), data=data, headers=http_header)
if rt.status_code == 200:
if local_path != '':
logger.info("Upload {local_path} => cos://{bucket}/{cos_path} [100%]".format(
bucket=self._conf._bucket,
local_path=to_printable_str(local_path),
cos_path=to_printable_str(cos_path)))
return True
else:
time.sleep(2**j)
Expand All @@ -304,7 +304,7 @@ def single_upload():
return False

def init_multiupload():
url = self._conf.uri(path=cos_path)
url = self._conf.uri(path=urllib.quote(to_printable_str(cos_path)))
self._md5 = {}
self.c = 0
self._have_uploaded = []
Expand All @@ -318,7 +318,7 @@ def init_multiupload():
logger.info("Continue uploading from last breakpoint")
return True
http_header = _http_header
http_header['x-cos-meta-md5'] = get_file_md5(local_path)
http_header['x-cos-meta-md5'] = _md5
rt = self._session.post(url=url+"?uploads", auth=CosS3Auth(self._conf._secret_id, self._conf._secret_key), headers=http_header)
logger.debug("Init resp, status code: {code}, headers: {headers}, text: {text}".format(
code=rt.status_code,
Expand All @@ -344,7 +344,7 @@ def multiupload_parts_data(local_path, offset, length, parts_size, idx):
with open(local_path, 'rb') as File:
File.seek(offset, 0)
data = File.read(length)
url = self._conf.uri(path=cos_path)+"?partNumber={partnum}&uploadId={uploadid}".format(partnum=idx, uploadid=self._upload_id)
url = self._conf.uri(path=urllib.quote(to_printable_str(cos_path)))+"?partNumber={partnum}&uploadId={uploadid}".format(partnum=idx, uploadid=self._upload_id)
for j in range(self._retry):
http_header = _http_header
rt = self._session.put(url=url,
Expand Down Expand Up @@ -392,10 +392,6 @@ def multiupload_parts_data(local_path, offset, length, parts_size, idx):

logger.debug("chunk_size: " + str(chunk_size))
logger.debug('Upload file concurrently')
logger.info("Upload {local_path} => cos://{bucket}/{cos_path}".format(
bucket=self._conf._bucket,
local_path=to_printable_str(local_path),
cos_path=to_printable_str(cos_path)))
self._pbar = tqdm(total=file_size, unit='B', unit_scale=True)
if chunk_size >= file_size:
pool.add_task(multiupload_parts_data, local_path, offset, file_size, 1, 0)
Expand Down Expand Up @@ -433,7 +429,7 @@ def complete_multiupload():
t.appendChild(t2)
root.appendChild(t)
data = root.toxml()
url = self._conf.uri(path=cos_path)+"?uploadId={uploadid}".format(uploadid=self._upload_id)
url = self._conf.uri(path=urllib.quote(to_printable_str(cos_path)))+"?uploadId={uploadid}".format(uploadid=self._upload_id)
logger.debug('complete url: ' + url)
logger.debug("complete data: " + data)
try:
Expand All @@ -449,36 +445,46 @@ def complete_multiupload():
except Exception:
return False
return True

if local_path == "":
file_size = 0
else:
file_size = os.path.getsize(local_path)
_md5 = get_file_md5(local_path)
logger.info(to_printable_str("Upload {local_path} => cos://{bucket}/{cos_path} ").format(
bucket=self._conf._bucket,
local_path=to_printable_str(local_path),
cos_path=to_printable_str(cos_path)))
if kwargs['sync'] is True:
if check_file_md5(local_path, cos_path):
logger.info("The file on cos is the same as the local file, skip upload")
return True

if file_size <= self._conf._part_size * 1024 * 1024 + 1024:
for _ in range(self._retry):
if single_upload() is True:
return True
return False
else:
for _ in range(self._retry):
rt = init_multiupload()
if rt:
break
rt = init_multiupload()
if rt:
logger.debug("Init multipart upload ok")
else:
logger.debug("Init multipart upload failed")
return False
logger.debug("Init multipart upload ok")

rt = multiupload_parts()
if rt is False:
if rt:
logger.debug("Multipart upload ok")
else:
logger.warn("Some partial upload failed. Please retry the last command to continue.")
return False
logger.debug("Multipart upload ok")
for _ in range(self._retry):
rt = complete_multiupload()
if rt:
logger.debug("Complete multipart upload ok")
return True
logger.warn("Complete multipart upload failed")
return False
rt = complete_multiupload()
if rt:
logger.debug("Complete multipart upload ok")
else:
logger.warn("Complete multipart upload failed")
return False
return True

def copy_folder(self, source_path, cos_path):

Expand Down Expand Up @@ -534,11 +540,11 @@ def copy_folder(self, source_path, cos_path):
def copy_file(self, source_path, cos_path):

def single_copy():
url = self._conf.uri(path=cos_path)
url = self._conf.uri(path=urllib.quote(to_printable_str(cos_path)))
for j in range(self._retry):
try:
http_header = dict()
http_header['x-cos-copy-source'] = to_printable_str(source_path)
http_header['x-cos-copy-source'] = source_path
rt = self._session.put(url=url,
auth=CosS3Auth(self._conf._secret_id, self._conf._secret_key), headers=http_header)
if rt.status_code == 200:
Expand All @@ -559,7 +565,7 @@ def single_copy():
return False

def init_multiupload():
url = self._conf.uri(path=cos_path)
url = self._conf.uri(path=urllib.quote(to_printable_str(cos_path)))
self._md5 = {}
self._have_finished = 0
self._upload_id = None
Expand Down Expand Up @@ -595,7 +601,7 @@ def source_path_parser():
return source_bucket, source_appid, source_region, source_cospath

def copy_parts_data(source_path, offset, length, parts_size, idx):
url = self._conf.uri(path=cos_path)+"?partNumber={partnum}&uploadId={uploadid}".format(partnum=idx, uploadid=self._upload_id)
url = self._conf.uri(path=urllib.quote(to_printable_str(cos_path)))+"?partNumber={partnum}&uploadId={uploadid}".format(partnum=idx, uploadid=self._upload_id)
http_header = dict()
http_header['x-cos-copy-source'] = source_path
http_header['x-cos-copy-source-range'] = "bytes="+str(offset)+"-"+str(offset+length-1)
Expand Down Expand Up @@ -676,7 +682,7 @@ def complete_multiupload():
t.appendChild(t2)
root.appendChild(t)
data = root.toxml()
url = self._conf.uri(path=cos_path)+"?uploadId={uploadid}".format(uploadid=self._upload_id)
url = self._conf.uri(path=urllib.quote(to_printable_str(cos_path)))+"?uploadId={uploadid}".format(uploadid=self._upload_id)
logger.debug('Complete url: ' + url)
logger.debug("Complete data: " + data)
try:
Expand All @@ -691,7 +697,7 @@ def complete_multiupload():
except Exception:
return False
return True

source_path = urllib.quote(to_printable_str(source_path))
rt = self._session.head(url="http://"+source_path, auth=CosS3Auth(self._conf._secret_id, self._conf._secret_key))
if rt.status_code != 200:
logger.warn("Replication sources do not exist")
Expand Down Expand Up @@ -881,7 +887,7 @@ def delete_file(self, cos_path, _force=False):
if _force is False:
if query_yes_no("WARN: you are deleting the file in the '{cos_path}' cos_path, please make sure".format(cos_path=to_printable_str(cos_path))) is False:
return False
url = self._conf.uri(path=cos_path)
url = self._conf.uri(path=urllib.quote(to_printable_str(cos_path)))
try:
rt = self._session.delete(url=url, auth=CosS3Auth(self._conf._secret_id, self._conf._secret_key))
logger.debug("init resp, status code: {code}, headers: {headers}".format(
Expand Down Expand Up @@ -1074,6 +1080,8 @@ def download_file(self, cos_path, local_path, **kwargs):
def check_file_md5(_local_path, _cos_path):
url = self._conf.uri(path=_cos_path)
rt = self._session.head(url=url, auth=CosS3Auth(self._conf._secret_id, self._conf._secret_key), stream=True)
if rt.status_code != 200:
return False
tmp = os.stat(_local_path)
if tmp.st_size != int(rt.headers['Content-Length']):
return False
Expand Down
9 changes: 6 additions & 3 deletions coscmd/cos_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,13 @@ def upload(args):
logger.warn('local_path %s is not readable!' % to_printable_str(args.local_path))
return -1
args.local_path, args.cos_path = concat_path(args.local_path, args.cos_path)
kwargs = {}
kwargs['sync'] = args.sync
if args.recursive:
if os.path.isfile(args.local_path) is True:
rt = Interface.upload_file(args.local_path, args.cos_path, args.headers)
rt = Interface.upload_file(args.local_path, args.cos_path, args.headers, **kwargs)
elif os.path.isdir(args.local_path):
rt = Interface.upload_folder(args.local_path, args.cos_path, args.headers)
rt = Interface.upload_folder(args.local_path, args.cos_path, args.headers, **kwargs)
logger.info("{folders} folders, {files} files successful, {fail_files} files failed"
.format(folders=Interface._folder_num, files=Interface._file_num, fail_files=Interface._fail_num))
if rt:
Expand All @@ -172,7 +174,7 @@ def upload(args):
if os.path.isfile(args.local_path) is False:
logger.warn("cannot stat '%s': No such file or directory" % to_printable_str(args.local_path))
return -1
if Interface.upload_file(args.local_path, args.cos_path, args.headers) is True:
if Interface.upload_file(args.local_path, args.cos_path, args.headers, **kwargs) is True:
return 0
else:
return -1
Expand Down Expand Up @@ -453,6 +455,7 @@ def command_thread():
parser_upload.add_argument("cos_path", help="cos_path as a/b.txt", type=str)
parser_upload.add_argument('-r', '--recursive', help="upload recursively when upload directory", action="store_true", default=False)
parser_upload.add_argument('-H', '--headers', help="set HTTP headers", type=str, default='{}')
parser_upload.add_argument('-s', '--sync', help="Upload and skip the same file", action="store_true", default=False)
parser_upload.set_defaults(func=Op.upload)

parser_download = sub_parser.add_parser("download", help="download file from COS to local.")
Expand Down
28 changes: 14 additions & 14 deletions coscmd/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,31 +41,31 @@ def tearDown():


def gen_file(filePath, fileSize):
ds=0
ds = 0
with open(filePath, "w") as f:
while ds<fileSize:
f.write(str(round(random.uniform(-1000, 1000),2)))
while ds < fileSize:
f.write(str(round(random.uniform(-1000, 1000), 2)))
f.write("\n")
ds=os.path.getsize(filePath)
ds = os.path.getsize(filePath)
# print(os.path.getsize(filePath))


def test_upload_small_file():
"""test upload small file"""
gen_file("tmp", 1.1)
rt = op_int.upload_file("tmp", "tmp")
assert rt
os.remove("tmp")


def test_upload_big_file():
"""test upload small file"""
gen_file("tmp", 5.1)
rt = op_int.upload_file("tmp", "tmp")
assert rt
os.remove("tmp")


def test_download_file():
"""test download file"""
gen_file("tmp", 7.1)
Expand All @@ -85,15 +85,15 @@ def test_delete_file():
file_name = "tmp" + file_id + "_Bigfile"
rt = op_int.delete_file(file_name, _force=True)
assert rt


def test_bucketacl():
"""test bucketacl"""
op_int.put_bucket_acl("anyone", "anyone", "327874225")
rt = op_int.get_bucket_acl()
assert rt


def test_objectacl():
"""test objectacl"""
file_name = "tmp" + file_id + "_Smallfile"
Expand Down

0 comments on commit 86e319d

Please sign in to comment.