diff --git a/README.md b/README.md index a0ed39a..41a47d7 100644 --- a/README.md +++ b/README.md @@ -155,6 +155,7 @@ coscmd upload -r //命令格式 coscmd upload -r /home/aaa/ bbb/aaa //操作示例 coscmd upload -r /home/aaa/ bbb/ //操作示例 coscmd upload -r /home/aaa/ / //上传到bucket根目录 +coscmd upload -rs /home/aaa/ /home/aaa //同步上传,跳过md5相同的文件 ``` 请将 "<>" 中的参数替换为您需要上传的本地文件路径(localpath),以及 COS 上存储的路径(cospath)。 @@ -163,6 +164,7 @@ coscmd upload -r /home/aaa/ / //上传到bucket根目录 * COSCMD 支持大文件断点上传功能。当分片上传大文件失败时,重新上传该文件只会上传失败的分块,而不会从头开始(请保证重新上传的文件的目录以及内容和上传的目录保持一致)。 * COSCMD 分块上传时会对每一块进行 MD5 校验。 * COSMCD 上传默认会携带 `x-cos-meta-md5` 的头部,值为该文件的 `md5` 值 +* 使用-s参数可以使用同步上传,跳过上传md5一致的文件(cos上的原文件必须是由1.8.3.2之后的COSCMD上传的,默认带有x-cos-meta-md5的header) * 使用-H参数设置HTTP header时,请务必保证格式为json,这里是个例子:`coscmd upload -H '{"Cache-Control":"max-age=31536000","Content-Language":"zh-CN"}' ` ### 下载文件或文件夹 diff --git a/coscmd/cos_client.py b/coscmd/cos_client.py index 769d648..f52ceb0 100644 --- a/coscmd/cos_client.py +++ b/coscmd/cos_client.py @@ -75,15 +75,6 @@ def query_yes_no(question, default="no"): sys.stdout.write("Please respond with 'yes' or 'no' " "(or 'y' or 'n').\n") -def get_file_md5(local_path): - md5_value = md5() - with open(local_path, "rb") as f: - while True: - data = f.read(2048) - if not data: - break - md5_value.update(data) - return md5_value.hexdigest() def get_file_md5(local_path): md5_value = md5() @@ -268,9 +259,23 @@ def upload_folder(self, local_path, cos_path, _http_headers='', **kwargs): return ret_code def upload_file(self, local_path, cos_path, _http_headers='{}', **kwargs): - + _md5 = "" _http_header = yaml.safe_load(_http_headers) + def check_file_md5(_local_path, _cos_path): + url = self._conf.uri(path=_cos_path) + rt = self._session.head(url=url, auth=CosS3Auth(self._conf._secret_id, self._conf._secret_key), stream=True) + if rt.status_code != 200: + return False + tmp = os.stat(_local_path) + if tmp.st_size != int(rt.headers['Content-Length']): + return False + else: + if 'x-cos-meta-md5' not in rt.headers or get_file_md5(_local_path) != rt.headers['x-cos-meta-md5']: + return False + else: + return True + def single_upload(): if len(local_path) == 0: @@ -278,19 +283,14 @@ def single_upload(): else: with open(local_path, 'rb') as File: data = File.read() - url = self._conf.uri(path=cos_path) + url = self._conf.uri(path=urllib.quote(to_printable_str(cos_path))) for j in range(self._retry): try: http_header = _http_header - http_header['x-cos-meta-md5'] = md5(data).hexdigest() + http_header['x-cos-meta-md5'] = _md5 rt = self._session.put(url=url, auth=CosS3Auth(self._conf._secret_id, self._conf._secret_key), data=data, headers=http_header) if rt.status_code == 200: - if local_path != '': - logger.info("Upload {local_path} => cos://{bucket}/{cos_path} [100%]".format( - bucket=self._conf._bucket, - local_path=to_printable_str(local_path), - cos_path=to_printable_str(cos_path))) return True else: time.sleep(2**j) @@ -304,7 +304,7 @@ def single_upload(): return False def init_multiupload(): - url = self._conf.uri(path=cos_path) + url = self._conf.uri(path=urllib.quote(to_printable_str(cos_path))) self._md5 = {} self.c = 0 self._have_uploaded = [] @@ -318,7 +318,7 @@ def init_multiupload(): logger.info("Continue uploading from last breakpoint") return True http_header = _http_header - http_header['x-cos-meta-md5'] = get_file_md5(local_path) + http_header['x-cos-meta-md5'] = _md5 rt = self._session.post(url=url+"?uploads", auth=CosS3Auth(self._conf._secret_id, self._conf._secret_key), headers=http_header) logger.debug("Init resp, status code: {code}, headers: {headers}, text: {text}".format( code=rt.status_code, @@ -344,7 +344,7 @@ def multiupload_parts_data(local_path, offset, length, parts_size, idx): with open(local_path, 'rb') as File: File.seek(offset, 0) data = File.read(length) - url = self._conf.uri(path=cos_path)+"?partNumber={partnum}&uploadId={uploadid}".format(partnum=idx, uploadid=self._upload_id) + url = self._conf.uri(path=urllib.quote(to_printable_str(cos_path)))+"?partNumber={partnum}&uploadId={uploadid}".format(partnum=idx, uploadid=self._upload_id) for j in range(self._retry): http_header = _http_header rt = self._session.put(url=url, @@ -392,10 +392,6 @@ def multiupload_parts_data(local_path, offset, length, parts_size, idx): logger.debug("chunk_size: " + str(chunk_size)) logger.debug('Upload file concurrently') - logger.info("Upload {local_path} => cos://{bucket}/{cos_path}".format( - bucket=self._conf._bucket, - local_path=to_printable_str(local_path), - cos_path=to_printable_str(cos_path))) self._pbar = tqdm(total=file_size, unit='B', unit_scale=True) if chunk_size >= file_size: pool.add_task(multiupload_parts_data, local_path, offset, file_size, 1, 0) @@ -433,7 +429,7 @@ def complete_multiupload(): t.appendChild(t2) root.appendChild(t) data = root.toxml() - url = self._conf.uri(path=cos_path)+"?uploadId={uploadid}".format(uploadid=self._upload_id) + url = self._conf.uri(path=urllib.quote(to_printable_str(cos_path)))+"?uploadId={uploadid}".format(uploadid=self._upload_id) logger.debug('complete url: ' + url) logger.debug("complete data: " + data) try: @@ -449,36 +445,46 @@ def complete_multiupload(): except Exception: return False return True + if local_path == "": file_size = 0 else: file_size = os.path.getsize(local_path) + _md5 = get_file_md5(local_path) + logger.info(to_printable_str("Upload {local_path} => cos://{bucket}/{cos_path} ").format( + bucket=self._conf._bucket, + local_path=to_printable_str(local_path), + cos_path=to_printable_str(cos_path))) + if kwargs['sync'] is True: + if check_file_md5(local_path, cos_path): + logger.info("The file on cos is the same as the local file, skip upload") + return True + if file_size <= self._conf._part_size * 1024 * 1024 + 1024: for _ in range(self._retry): if single_upload() is True: return True return False else: - for _ in range(self._retry): - rt = init_multiupload() - if rt: - break + rt = init_multiupload() + if rt: + logger.debug("Init multipart upload ok") else: + logger.debug("Init multipart upload failed") return False - logger.debug("Init multipart upload ok") - rt = multiupload_parts() - if rt is False: + if rt: + logger.debug("Multipart upload ok") + else: logger.warn("Some partial upload failed. Please retry the last command to continue.") return False - logger.debug("Multipart upload ok") - for _ in range(self._retry): - rt = complete_multiupload() - if rt: - logger.debug("Complete multipart upload ok") - return True - logger.warn("Complete multipart upload failed") - return False + rt = complete_multiupload() + if rt: + logger.debug("Complete multipart upload ok") + else: + logger.warn("Complete multipart upload failed") + return False + return True def copy_folder(self, source_path, cos_path): @@ -534,11 +540,11 @@ def copy_folder(self, source_path, cos_path): def copy_file(self, source_path, cos_path): def single_copy(): - url = self._conf.uri(path=cos_path) + url = self._conf.uri(path=urllib.quote(to_printable_str(cos_path))) for j in range(self._retry): try: http_header = dict() - http_header['x-cos-copy-source'] = to_printable_str(source_path) + http_header['x-cos-copy-source'] = source_path rt = self._session.put(url=url, auth=CosS3Auth(self._conf._secret_id, self._conf._secret_key), headers=http_header) if rt.status_code == 200: @@ -559,7 +565,7 @@ def single_copy(): return False def init_multiupload(): - url = self._conf.uri(path=cos_path) + url = self._conf.uri(path=urllib.quote(to_printable_str(cos_path))) self._md5 = {} self._have_finished = 0 self._upload_id = None @@ -595,7 +601,7 @@ def source_path_parser(): return source_bucket, source_appid, source_region, source_cospath def copy_parts_data(source_path, offset, length, parts_size, idx): - url = self._conf.uri(path=cos_path)+"?partNumber={partnum}&uploadId={uploadid}".format(partnum=idx, uploadid=self._upload_id) + url = self._conf.uri(path=urllib.quote(to_printable_str(cos_path)))+"?partNumber={partnum}&uploadId={uploadid}".format(partnum=idx, uploadid=self._upload_id) http_header = dict() http_header['x-cos-copy-source'] = source_path http_header['x-cos-copy-source-range'] = "bytes="+str(offset)+"-"+str(offset+length-1) @@ -676,7 +682,7 @@ def complete_multiupload(): t.appendChild(t2) root.appendChild(t) data = root.toxml() - url = self._conf.uri(path=cos_path)+"?uploadId={uploadid}".format(uploadid=self._upload_id) + url = self._conf.uri(path=urllib.quote(to_printable_str(cos_path)))+"?uploadId={uploadid}".format(uploadid=self._upload_id) logger.debug('Complete url: ' + url) logger.debug("Complete data: " + data) try: @@ -691,7 +697,7 @@ def complete_multiupload(): except Exception: return False return True - + source_path = urllib.quote(to_printable_str(source_path)) rt = self._session.head(url="http://"+source_path, auth=CosS3Auth(self._conf._secret_id, self._conf._secret_key)) if rt.status_code != 200: logger.warn("Replication sources do not exist") @@ -881,7 +887,7 @@ def delete_file(self, cos_path, _force=False): if _force is False: if query_yes_no("WARN: you are deleting the file in the '{cos_path}' cos_path, please make sure".format(cos_path=to_printable_str(cos_path))) is False: return False - url = self._conf.uri(path=cos_path) + url = self._conf.uri(path=urllib.quote(to_printable_str(cos_path))) try: rt = self._session.delete(url=url, auth=CosS3Auth(self._conf._secret_id, self._conf._secret_key)) logger.debug("init resp, status code: {code}, headers: {headers}".format( @@ -1074,6 +1080,8 @@ def download_file(self, cos_path, local_path, **kwargs): def check_file_md5(_local_path, _cos_path): url = self._conf.uri(path=_cos_path) rt = self._session.head(url=url, auth=CosS3Auth(self._conf._secret_id, self._conf._secret_key), stream=True) + if rt.status_code != 200: + return False tmp = os.stat(_local_path) if tmp.st_size != int(rt.headers['Content-Length']): return False diff --git a/coscmd/cos_cmd.py b/coscmd/cos_cmd.py index 95b19d1..68293ef 100644 --- a/coscmd/cos_cmd.py +++ b/coscmd/cos_cmd.py @@ -152,11 +152,13 @@ def upload(args): logger.warn('local_path %s is not readable!' % to_printable_str(args.local_path)) return -1 args.local_path, args.cos_path = concat_path(args.local_path, args.cos_path) + kwargs = {} + kwargs['sync'] = args.sync if args.recursive: if os.path.isfile(args.local_path) is True: - rt = Interface.upload_file(args.local_path, args.cos_path, args.headers) + rt = Interface.upload_file(args.local_path, args.cos_path, args.headers, **kwargs) elif os.path.isdir(args.local_path): - rt = Interface.upload_folder(args.local_path, args.cos_path, args.headers) + rt = Interface.upload_folder(args.local_path, args.cos_path, args.headers, **kwargs) logger.info("{folders} folders, {files} files successful, {fail_files} files failed" .format(folders=Interface._folder_num, files=Interface._file_num, fail_files=Interface._fail_num)) if rt: @@ -172,7 +174,7 @@ def upload(args): if os.path.isfile(args.local_path) is False: logger.warn("cannot stat '%s': No such file or directory" % to_printable_str(args.local_path)) return -1 - if Interface.upload_file(args.local_path, args.cos_path, args.headers) is True: + if Interface.upload_file(args.local_path, args.cos_path, args.headers, **kwargs) is True: return 0 else: return -1 @@ -453,6 +455,7 @@ def command_thread(): parser_upload.add_argument("cos_path", help="cos_path as a/b.txt", type=str) parser_upload.add_argument('-r', '--recursive', help="upload recursively when upload directory", action="store_true", default=False) parser_upload.add_argument('-H', '--headers', help="set HTTP headers", type=str, default='{}') + parser_upload.add_argument('-s', '--sync', help="Upload and skip the same file", action="store_true", default=False) parser_upload.set_defaults(func=Op.upload) parser_download = sub_parser.add_parser("download", help="download file from COS to local.") diff --git a/coscmd/test.py b/coscmd/test.py index a585e55..87b9cd1 100644 --- a/coscmd/test.py +++ b/coscmd/test.py @@ -41,31 +41,31 @@ def tearDown(): def gen_file(filePath, fileSize): - ds=0 + ds = 0 with open(filePath, "w") as f: - while ds