Skip to content

Commit

Permalink
1. Add traffic limit funciton & tests & exmaples
Browse files Browse the repository at this point in the history
2. Fix resumable_download and resumable_upload  api to support request payment
3. Fix resumable_download versioning tests
  • Loading branch information
LYZ authored and huiguangjun committed Jul 12, 2019
1 parent ec40d70 commit 9eb0f47
Show file tree
Hide file tree
Showing 11 changed files with 604 additions and 56 deletions.
63 changes: 63 additions & 0 deletions examples/traffic_limit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# -*- coding: utf-8 -*-

import os
import oss2
from oss2.models import OSS_TRAFFIC_LIMIT

# 以下代码展示了限速上传下载文件的设置方法

# 首先初始化AccessKeyId、AccessKeySecret、Endpoint等信息。
# 通过环境变量获取,或者把诸如“<你的AccessKeyId>”替换成真实的AccessKeyId等。
access_key_id = os.getenv('OSS_TEST_ACCESS_KEY_ID', '<你的AccessKeyId>')
access_key_secret = os.getenv('OSS_TEST_ACCESS_KEY_SECRET', '<你的AccessKeySecret>')
bucket_name = os.getenv('OSS_TEST_BUCKET', '<你要请求的Bucket名称>')
endpoint = os.getenv('OSS_TEST_ENDPOINT', '<你的访问域名>')

# 确认上面的参数都填写正确了
for param in (access_key_id, access_key_secret, bucket_name, endpoint):
assert '<' not in param, '请设置参数:' + param

# 创建Bucket对象,所有Object相关的接口都可以通过Bucket对象来进行
bucket = oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name)

OBJECT_SIZE_1MB = (1 * 1024 * 1024)
LIMIT_100KB = (100 * 1024 * 8)

headers = dict()
headers[OSS_TRAFFIC_LIMIT] = str(LIMIT_100KB);

key = 'traffic-limit-test-put-object'
content = b'a' * OBJECT_SIZE_1MB

# 限速上传文件
result = bucket.put_object(key, content, headers=headers)
print('http response status:', result.status)

# 限速下载文件到本地
file_name = key + '.txt'
result = bucket.get_object_to_file(key, file_name, headers=headers)
print('http response status:', result.status)

os.remove(file_name)
bucket.delete_object(key)

# 使用签名url方式限速上传文件
params = dict()
params[OSS_TRAFFIC_LIMIT] = str(LIMIT_100KB);
local_file_name = "example.jpg"

# 创建限速上传文件的签名url, 有效期60s
url = bucket.sign_url('PUT', key, 60, params=params)
# 限速上传
result = bucket.put_object_with_url_from_file(url, local_file_name)
print('http response status:', result.status)

# 创建限速下载文件的签名url, 有效期60s
down_file_name = key + '.tmp'
url = bucket.sign_url('GET', key, 60, params=params)
# 限速下载
result = bucket.get_object_with_url_to_file(url, down_file_name)
print('http response status:', result.status)

os.remove(down_file_name)
bucket.delete_object(key)
9 changes: 8 additions & 1 deletion oss2/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1223,6 +1223,8 @@ def upload_part(self, key, upload_id, part_number, data, progress_callback=None,
:return: :class:`PutObjectResult <oss2.models.PutObjectResult>`
"""
headers = http.CaseInsensitiveDict(headers)

if progress_callback:
data = utils.make_progress_adapter(data, progress_callback)

Expand Down Expand Up @@ -1258,6 +1260,7 @@ def complete_multipart_upload(self, key, upload_id, parts, headers=None):
:return: :class:`PutObjectResult <oss2.models.PutObjectResult>`
"""
headers = http.CaseInsensitiveDict(headers)
parts = sorted(parts, key=lambda p: p.part_number);
data = xml_utils.to_complete_upload_request(parts);

Expand Down Expand Up @@ -1658,8 +1661,12 @@ def put_bucket_website(self, input):
:param input: :class:`BucketWebsite <oss2.models.BucketWebsite>`
"""
data = self.__convert_data(BucketWebsite, xml_utils.to_put_bucket_website, input)

headers = http.CaseInsensitiveDict()
headers['Content-MD5'] = utils.content_md5(data)

logger.debug("Start to put bucket website, bucket: {0}, website: {1}".format(self.bucket_name, to_string(data)))
resp = self.__do_bucket('PUT', data=data, params={Bucket.WEBSITE: ''})
resp = self.__do_bucket('PUT', data=data, params={Bucket.WEBSITE: ''}, headers=headers)
logger.debug("Put bucket website done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status))
return RequestResult(resp)

Expand Down
2 changes: 1 addition & 1 deletion oss2/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class Auth(AuthBase):
'restore', 'qos', 'referer', 'stat', 'bucketInfo', 'append', 'position', 'security-token',
'live', 'comp', 'status', 'vod', 'startTime', 'endTime', 'x-oss-process',
'symlink', 'callback', 'callback-var', 'tagging', 'encryption', 'versions',
'versioning', 'versionId', 'policy', 'requestPayment']
'versioning', 'versionId', 'policy', 'requestPayment', 'x-oss-traffic-limit']
)

def _sign_request(self, req, bucket_name, key):
Expand Down
2 changes: 2 additions & 0 deletions oss2/headers.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@

OSS_REQUEST_PAYER = 'x-oss-request-payer'

OSS_TRAFFIC_LIMIT = 'x-oss-traffic-limit'

class RequestHeader(dict):
def __init__(self, *arg, **kw):
super(RequestHeader, self).__init__(*arg, **kw)
Expand Down
8 changes: 4 additions & 4 deletions oss2/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -677,21 +677,21 @@ class RedirectMirrorHeaders(object):
def __init__(self,pass_all=None, pass_list=None, remove_list=None, set_list=None):
if pass_list is not None:
if not isinstance(pass_list, list):
raise ClientError('The class of pass_list should be list.')
raise ClientError('The type of pass_list should be list.')

if len(pass_list) > 10:
raise ClientError('The capacity of pass_list should not > 10!')

if remove_list is not None:
if not isinstance(remove_list, list):
raise ClientError('The class of remove_list should be list.')
raise ClientError('The type of remove_list should be list.')

if len(remove_list) > 10:
raise ClientError('The capacity of remove_list should not > 10!')

if set_list is not None:
if not isinstance(set_list, list):
raise ClientError('The class of set_list should be list.')
raise ClientError('The type of set_list should be list.')

if len(set_list) > 10:
raise ClientError('The capacity of set_list should not > 10!')
Expand Down Expand Up @@ -751,7 +751,7 @@ class BucketWebsite(object):
def __init__(self, index_file, error_file, rules=None):
if rules is not None:
if not isinstance(rules, list):
raise ClientError('rules class should be list.')
raise ClientError('rules type should be list.')
if len(rules) > 5:
raise ClientError('capacity of rules should not be > 5.')

Expand Down
75 changes: 62 additions & 13 deletions oss2/resumable.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from . import iterators
from . import exceptions
from . import defaults
from . import http
from .api import Bucket

from .models import PartInfo
Expand All @@ -32,7 +33,6 @@
_MAX_PART_COUNT = 10000
_MIN_PART_SIZE = 100 * 1024


def resumable_upload(bucket, key, filename,
store=None,
headers=None,
Expand All @@ -56,7 +56,13 @@ def resumable_upload(bucket, key, filename,
:param key: 上传到用户空间的文件名
:param filename: 待上传本地文件名
:param store: 用来保存断点信息的持久存储,参见 :class:`ResumableStore` 的接口。如不指定,则使用 `ResumableStore` 。
:param headers: 传给 `put_object` 或 `init_multipart_upload` 的HTTP头部
:param headers: HTTP头部
# 调用外部函数put_object 或 init_multipart_upload传递完整headers
# 调用外部函数uplpad_part目前只传递OSS_REQUEST_PAYER, OSS_TRAFFIC_LIMIT
# 调用外部函数complete_multipart_upload目前只传递OSS_REQUEST_PAYER
:type headers: 可以是dict,建议是oss2.CaseInsensitiveDict
:param multipart_threshold: 文件长度大于该值时,则用分片上传。
:param part_size: 指定分片上传的每个分片的大小。如不指定,则自动计算。
:param progress_callback: 上传进度回调函数。参见 :ref:`progress_callback` 。
Expand Down Expand Up @@ -90,7 +96,8 @@ def resumable_download(bucket, key, filename,
progress_callback=None,
num_threads=None,
store=None,
params=None):
params=None,
headers=None):
"""断点下载。
实现的方法是:
Expand Down Expand Up @@ -123,16 +130,21 @@ def resumable_download(bucket, key, filename,
:param dict params: 指定下载参数,可以传入versionId下载指定版本文件
:param headers: HTTP头部,
# 调用外部函数head_object目前只传递OSS_REQUEST_PAYER
# 调用外部函数get_object_to_file, get_object目前需要向下传递的值有OSS_REQUEST_PAYER, OSS_TRAFFIC_LIMIT
:type headers: 可以是dict,建议是oss2.CaseInsensitiveDict
:raises: 如果OSS文件不存在,则抛出 :class:`NotFound <oss2.exceptions.NotFound>` ;也有可能抛出其他因下载文件而产生的异常。
"""

logger.debug("Start to resumable download, bucket: {0}, key: {1}, filename: {2}, multiget_threshold: {3}, "
"part_size: {4}, num_threads: {5}".format(bucket.bucket_name, to_string(key), filename,
multiget_threshold, part_size, num_threads))
multiget_threshold = defaults.get(multiget_threshold, defaults.multiget_threshold)

if isinstance(bucket, Bucket):
result = bucket.head_object(key, params=params)
valid_headers = _populate_valid_headers(headers, [OSS_REQUEST_PAYER])
result = bucket.head_object(key, params=params, headers=valid_headers)
logger.debug("The size of object to download is: {0}, multiget_threshold: {1}".format(result.content_length,
multiget_threshold))
if result.content_length >= multiget_threshold:
Expand All @@ -141,12 +153,15 @@ def resumable_download(bucket, key, filename,
progress_callback=progress_callback,
num_threads=num_threads,
store=store,
params=params)
params=params,
headers=headers)
downloader.download(result.server_crc)
else:
bucket.get_object_to_file(key, filename, progress_callback=progress_callback, params=params)
valid_headers = _populate_valid_headers(headers, [OSS_REQUEST_PAYER, OSS_TRAFFIC_LIMIT])
bucket.get_object_to_file(key, filename, progress_callback=progress_callback, params=params, headers=valid_headers)
else:
bucket.get_object_to_file(key, filename, progress_callback=progress_callback, params=params)
valid_headers = _populate_valid_headers(headers, [OSS_REQUEST_PAYER, OSS_TRAFFIC_LIMIT])
bucket.get_object_to_file(key, filename, progress_callback=progress_callback, params=params, headers=valid_headers)


_MAX_MULTIGET_PART_COUNT = 100
Expand Down Expand Up @@ -197,6 +212,31 @@ def _split_to_parts(total_size, part_size):

return parts

def _populate_valid_headers(headers=None, valid_keys=None):
"""构建只包含有效keys的http header
:param headers: 需要过滤的header
:type headers: 可以是dict,建议是oss2.CaseInsensitiveDict
:param valid_keys: 有效的关键key列表
:type valid_keys: list
:return: 只包含有效keys的http header, type: oss2.CaseInsensitiveDict
"""
if headers is None or valid_keys is None:
return None

headers = http.CaseInsensitiveDict(headers)
valid_headers = http.CaseInsensitiveDict()

for key in valid_keys:
if headers.get(key) is not None:
valid_headers[key] = headers[key]

if len(valid_headers) == 0:
valid_headers = None

return valid_headers

class _ResumableOperation(object):
def __init__(self, bucket, key, filename, size, store,
Expand Down Expand Up @@ -253,7 +293,8 @@ def __init__(self, bucket, key, filename, objectInfo,
store=None,
progress_callback=None,
num_threads=None,
params=None):
params=None,
headers=None):
super(_ResumableDownloader, self).__init__(bucket, key, filename, objectInfo.size,
store or ResumableDownloadStore(),
progress_callback=progress_callback)
Expand All @@ -267,6 +308,7 @@ def __init__(self, bucket, key, filename, objectInfo,
self.__finished_parts = None
self.__finished_size = None
self.__params = params
self.__headers = headers

# protect record
self.__lock = threading.Lock()
Expand Down Expand Up @@ -315,8 +357,12 @@ def __download_part(self, part):
with open(self.__tmp_file, 'rb+') as f:
f.seek(part.start, os.SEEK_SET)

headers = {IF_MATCH : self.objectInfo.etag,
IF_UNMODIFIED_SINCE : utils.http_date(self.objectInfo.mtime)}
headers = _populate_valid_headers(self.__headers, [OSS_REQUEST_PAYER, OSS_TRAFFIC_LIMIT])
if headers is None:
headers = http.CaseInsensitiveDict()
headers[IF_MATCH] = self.objectInfo.etag
headers[IF_UNMODIFIED_SINCE] = utils.http_date(self.objectInfo.mtime)

result = self.bucket.get_object(self.key, byte_range=(part.start, part.end - 1), headers=headers, params=self.__params)
utils.copyfileobj_and_verify(result, f, part.end - part.start, request_id=result.request_id)

Expand Down Expand Up @@ -436,6 +482,7 @@ def __init__(self, bucket, key, filename, size,
progress_callback=progress_callback)

self.__headers = headers

self.__part_size = defaults.get(part_size, defaults.part_size)

self.__mtime = os.path.getmtime(filename)
Expand Down Expand Up @@ -465,7 +512,8 @@ def upload(self):

self._report_progress(self.size)

result = self.bucket.complete_multipart_upload(self.key, self.__upload_id, self.__finished_parts)
headers = _populate_valid_headers(self.__headers, [OSS_REQUEST_PAYER])
result = self.bucket.complete_multipart_upload(self.key, self.__upload_id, self.__finished_parts, headers=headers)
self._del_record()

return result
Expand All @@ -487,8 +535,9 @@ def __upload_part(self, part):
self._report_progress(self.__finished_size)

f.seek(part.start, os.SEEK_SET)
headers = _populate_valid_headers(self.__headers, [OSS_REQUEST_PAYER, OSS_TRAFFIC_LIMIT])
result = self.bucket.upload_part(self.key, self.__upload_id, part.part_number,
utils.SizedFileAdapter(f, part.size))
utils.SizedFileAdapter(f, part.size), headers=headers)

logger.debug("Upload part success, add part info to record, part_number: {0}, etag: {1}, size: {2}".format(
part.part_number, result.etag, part.size))
Expand Down
Loading

0 comments on commit 9eb0f47

Please sign in to comment.