From 2c69d0f48f3547563ffd4d6a3bdb737f6563834c Mon Sep 17 00:00:00 2001 From: RussTorres Date: Tue, 4 Sep 2018 13:22:03 -0700 Subject: [PATCH] feat: support for wasabi s3 as wasabis3 --- cloudvolume/connectionpools.py | 30 +++++---- cloudvolume/lib.py | 64 +++++++++---------- cloudvolume/storage.py | 108 ++++++++++++++++----------------- 3 files changed, 105 insertions(+), 97 deletions(-) diff --git a/cloudvolume/connectionpools.py b/cloudvolume/connectionpools.py index 1f6e2ca6b..e6420bfb1 100644 --- a/cloudvolume/connectionpools.py +++ b/cloudvolume/connectionpools.py @@ -4,7 +4,7 @@ from functools import partial from google.cloud.storage import Client -import boto3 +import boto3 from .secrets import google_credentials, aws_credentials @@ -14,18 +14,18 @@ class ServiceUnknownException(Exception): class ConnectionPool(object): """ This class is intended to be subclassed. See below. - + Creating fresh client or connection objects for Google or Amazon eventually starts causing breakdowns when too many connections open. - + To promote efficient resource use and prevent containers from dying, we create a ConnectionPool that allows for the reuse of connections. - - Storage interfaces may acquire and release connections - when they need or finish using them. - + + Storage interfaces may acquire and release connections + when they need or finish using them. + If the limit is reached, additional requests for acquiring connections will block until they can be serviced. @@ -41,9 +41,9 @@ def total_connections(self): def _create_connection(self): raise NotImplementedError - def get_connection(self): + def get_connection(self): with self._lock: - try: + try: conn = self.pool.get(block=False) self.pool.task_done() except Queue.Empty: @@ -62,7 +62,7 @@ def release_connection(self, conn): self.outstanding -= 1 def close(self, conn): - return + return def reset_pool(self): while True: @@ -103,9 +103,17 @@ def _create_connection(self): aws_secret_access_key=self.credentials['AWS_SECRET_ACCESS_KEY'], endpoint_url='http://s3-hpcrc.rc.princeton.edu', ) + elif self.service == 'wasabis3': + return boto3.client( + 's3', + aws_access_key_id=self.credentials['AWS_ACCESS_KEY_ID'], + aws_secret_access_key=self.credentials['AWS_SECRET_ACCESS_KEY'], + endpoint_url='https://s3.wasabisys.com', + region_name='us-east-1', + ) else: raise ServiceUnknownException("{} unknown. Choose from 's3' or 'matrix'.") - + def close(self, conn): try: return conn.close() diff --git a/cloudvolume/lib.py b/cloudvolume/lib.py index f2eb476d3..e4379bb52 100644 --- a/cloudvolume/lib.py +++ b/cloudvolume/lib.py @@ -5,7 +5,7 @@ import json import os import io -import re +import re import sys import math import shutil @@ -52,13 +52,13 @@ def colorize(color, text): color = color.upper() return COLORS[color] + text + COLORS['RESET'] -ExtractedPath = namedtuple('ExtractedPath', +ExtractedPath = namedtuple('ExtractedPath', ('protocol', 'intermediate_path', 'bucket', 'dataset','layer') ) def extract_path(cloudpath): """cloudpath: e.g. gs://neuroglancer/DATASET/LAYER/info or s3://...""" - protocol_re = r'^(gs|file|s3|boss|matrix|https?)://' + protocol_re = r'^(gs|file|s3|boss|matrix|wasabis3|https?)://' bucket_re = r'^(/?[~\d\w_\.\-]+)/' tail_re = r'([\d\w_\.\-]+)/([\d\w_\.\-]+)/?$' @@ -138,9 +138,9 @@ def find_closest_divisor(to_divide, closest_to): This is used to find the right chunk size for importing a neuroglancer dataset that has a chunk import size that's not evenly divisible by - 64,64,64. + 64,64,64. - e.g. + e.g. neuroglancer_chunk_size = find_closest_divisor(build_chunk_size, closest_to=[64,64,64]) Required: @@ -152,13 +152,13 @@ def find_closest_divisor(to_divide, closest_to): def find_closest(td, ct): min_distance = td best = td - + for divisor in divisors(td): if abs(divisor - ct) < min_distance: min_distance = abs(divisor - ct) best = divisor return best - + return [ find_closest(td, ct) for td, ct in zip(to_divide, closest_to) ] def divisors(n): @@ -179,7 +179,7 @@ def xyzrange(start_vec, end_vec=None, stride_vec=(1,1,1)): rangeargs = ( (start, end, stride) for start, end, stride in zip(start_vec, end_vec, stride_vec) ) xyzranges = [ range(*arg) for arg in rangeargs ] - + # iterate then x first, then y, then z # this way you process in the xy plane slice by slice # but you don't create process lots of prefix-adjacent keys @@ -302,12 +302,12 @@ def intersection(cls, bbx1, bbx2): @classmethod def intersects(cls, bbx1, bbx2): return ( - bbx1.minpt.x < bbx2.maxpt.x - and bbx1.maxpt.x > bbx2.minpt.x + bbx1.minpt.x < bbx2.maxpt.x + and bbx1.maxpt.x > bbx2.minpt.x and bbx1.minpt.y < bbx2.maxpt.y and bbx1.maxpt.y > bbx2.minpt.y and bbx1.minpt.z < bbx2.maxpt.z - and bbx1.maxpt.z > bbx2.minpt.z + and bbx1.maxpt.z > bbx2.minpt.z ) @classmethod @@ -341,8 +341,8 @@ def from_filename(cls, filename): @classmethod def from_slices(cls, slices3): return Bbox( - (slices3[0].start, slices3[1].start, slices3[2].start), - (slices3[0].stop, slices3[1].stop, slices3[2].stop) + (slices3[0].start, slices3[1].start, slices3[2].start), + (slices3[0].stop, slices3[1].stop, slices3[2].stop) ) @classmethod @@ -352,7 +352,7 @@ def from_list(cls, lst): @property def dtype(self): return self.minpt.dtype - + def to_filename(self): return '{}-{}_{}-{}_{}-{}'.format( self.minpt.x, self.maxpt.x, @@ -416,7 +416,7 @@ def expand_to_chunk_size(self, chunk_size, offset=Vec(0,0,0, dtype=int)): to the nearest grid lines. Required: - chunk_size: arraylike (x,y,z), the size of chunks in the + chunk_size: arraylike (x,y,z), the size of chunks in the dataset e.g. (64,64,64) Optional: offset: arraylike (x,y,z), the starting coordinate of the dataset @@ -425,7 +425,7 @@ def expand_to_chunk_size(self, chunk_size, offset=Vec(0,0,0, dtype=int)): result = self.clone() result = result - offset result.minpt = np.floor(result.minpt / chunk_size) * chunk_size - result.maxpt = np.ceil(result.maxpt / chunk_size) * chunk_size + result.maxpt = np.ceil(result.maxpt / chunk_size) * chunk_size return result + offset def shrink_to_chunk_size(self, chunk_size, offset=Vec(0,0,0, dtype=int)): @@ -434,7 +434,7 @@ def shrink_to_chunk_size(self, chunk_size, offset=Vec(0,0,0, dtype=int)): to the nearest grid lines. Required: - chunk_size: arraylike (x,y,z), the size of chunks in the + chunk_size: arraylike (x,y,z), the size of chunks in the dataset e.g. (64,64,64) Optional: offset: arraylike (x,y,z), the starting coordinate of the dataset @@ -443,7 +443,7 @@ def shrink_to_chunk_size(self, chunk_size, offset=Vec(0,0,0, dtype=int)): result = self.clone() result = result - offset result.minpt = np.ceil(result.minpt / chunk_size) * chunk_size - result.maxpt = np.floor(result.maxpt / chunk_size) * chunk_size + result.maxpt = np.floor(result.maxpt / chunk_size) * chunk_size # If we are inside a single chunk, the ends # can invert, which tells us we should collapse @@ -459,7 +459,7 @@ def round_to_chunk_size(self, chunk_size, offset=Vec(0,0,0, dtype=int)): to the nearest grid lines. Required: - chunk_size: arraylike (x,y,z), the size of chunks in the + chunk_size: arraylike (x,y,z), the size of chunks in the dataset e.g. (64,64,64) Optional: offset: arraylike (x,y,z), the starting coordinate of the dataset @@ -473,10 +473,10 @@ def round_to_chunk_size(self, chunk_size, offset=Vec(0,0,0, dtype=int)): def contains(self, point): return ( - point[0] >= self.minpt[0] + point[0] >= self.minpt[0] and point[1] >= self.minpt[1] - and point[2] >= self.minpt[2] - and point[0] <= self.maxpt[0] + and point[2] >= self.minpt[2] + and point[0] <= self.maxpt[0] and point[1] <= self.maxpt[1] and point[2] <= self.maxpt[2] ) @@ -500,11 +500,11 @@ def astype(self, dtype): def transpose(self): return Bbox(self.minpt[::-1], self.maxpt[::-1]) - # note that operand can be a vector + # note that operand can be a vector # or a scalar thanks to numpy - def __sub__(self, operand): + def __sub__(self, operand): tmp = self.clone() - + if isinstance(operand, Bbox): tmp.minpt -= operand.minpt tmp.maxpt -= operand.maxpt @@ -584,14 +584,14 @@ def generate_slices(slices, minsize, maxsize, bounded=True): while len(slices) < len(maxsize): slices.append( slice(None, None, None) ) - # First three slices are x,y,z, last is channel. + # First three slices are x,y,z, last is channel. # Handle only x,y,z here, channel seperately for index, slc in enumerate(slices): if isinstance(slc, integer_types) or isinstance(slc, float): slices[index] = slice(int(slc), int(slc)+1, 1) else: start = minsize[index] if slc.start is None else slc.start - end = maxsize[index] if slc.stop is None else slc.stop + end = maxsize[index] if slc.stop is None else slc.stop step = 1 if slc.step is None else slc.step if step < 0: @@ -603,7 +603,7 @@ def generate_slices(slices, minsize, maxsize, bounded=True): # marching cubes. if bounded: # if start < 0: # this is support for negative indicies - # start = maxsize[index] + start + # start = maxsize[index] + start check_bounds(start, minsize[index], maxsize[index]) # if end < 0: # this is support for negative indicies # end = maxsize[index] + end @@ -626,7 +626,7 @@ def save_images(image, axis='z', channel=None, directory=None, global_norm=True, """ if directory is None: directory = os.path.join('./saved_images', 'default', 'default', '0', Bbox( (0,0,0), image.shape[:3] ).to_filename()) - + mkdir(directory) print("Saving to {}".format(directory)) @@ -653,7 +653,7 @@ def normalize_float(img): return img.astype(np.uint8) if global_norm and image.dtype in (np.float32, np.float64): - image = normalize_float(image) + image = normalize_float(image) for level in tqdm(range(image.shape[index]), desc="Saving Images"): if index == 0: @@ -677,7 +677,7 @@ def normalize_float(img): # it requires a 90deg counterclockwise rotation on xy plane (leaving z alone) # followed by a flip on Y if axis == 'z': - img2d = np.flipud(np.rot90(img2d, 1)) + img2d = np.flipud(np.rot90(img2d, 1)) if img2d.dtype == np.uint8: img2d = Image.fromarray(img2d, 'L') @@ -695,4 +695,4 @@ def normalize_float(img): filename = '{}-{}'.format(channel_index, filename) path = os.path.join(directory, filename) - img2d.save(path, image_format) \ No newline at end of file + img2d.save(path, image_format) diff --git a/cloudvolume/storage.py b/cloudvolume/storage.py index 6af0475d4..b53792046 100644 --- a/cloudvolume/storage.py +++ b/cloudvolume/storage.py @@ -40,8 +40,8 @@ def reset_connection_pools(): reset_connection_pools() retry = tenacity.retry( - reraise=True, - stop=tenacity.stop_after_attempt(7), + reraise=True, + stop=tenacity.stop_after_attempt(7), wait=tenacity.wait_full_jitter(0.5, 60.0), ) @@ -52,7 +52,7 @@ class UnsupportedProtocol(Exception): class SimpleStorage(object): """ - Access files stored in Google Storage (gs), Amazon S3 (s3), + Access files stored in Google Storage (gs), Amazon S3 (s3), or the local Filesystem (file). e.g. with Storage('gs://bucket/dataset/layer') as stor: @@ -65,7 +65,7 @@ class SimpleStorage(object): Optional: n_threads (int:20): number of threads to use downloading and uplaoding. If 0, execution will be on the main python thread. - progress (bool:false): Show a tqdm progress bar for multiple + progress (bool:false): Show a tqdm progress bar for multiple uploads and downloads. """ def __init__(self, layer_path, progress=False): @@ -74,12 +74,12 @@ def __init__(self, layer_path, progress=False): self._layer_path = layer_path self._path = extract_path(layer_path) - + if self._path.protocol == 'file': self._interface_cls = FileInterface elif self._path.protocol == 'gs': self._interface_cls = GoogleCloudStorageInterface - elif self._path.protocol in ('s3', 'matrix'): + elif self._path.protocol in ('s3', 'matrix', 'wasabis3'): self._interface_cls = S3Interface elif self._path.protocol in ('http', 'https'): self._interface_cls = HttpInterface @@ -99,17 +99,17 @@ def put_json(self, file_path, content, content_type='application/json', *args, * if type(content) != str: content = json.dumps(content) return self.put_file(file_path, content, content_type=content_type, *args, **kwargs) - + def put_file(self, file_path, content, content_type=None, compress=None, cache_control=None): - """ + """ Args: filename (string): it can contains folders content (string): binary data to save """ - return self.put_files([ (file_path, content) ], - content_type=content_type, - compress=compress, - cache_control=cache_control, + return self.put_files([ (file_path, content) ], + content_type=content_type, + compress=compress, + cache_control=cache_control, block=False ) @@ -132,7 +132,7 @@ def exists(self, file_path): def files_exist(self, file_paths): """ - Threaded exists for all file paths. + Threaded exists for all file paths. file_paths: (list) file paths to test for existence @@ -164,10 +164,10 @@ def delete_files(self, file_paths): def list_files(self, prefix="", flat=False): """ - List the files in the layer with the given prefix. + List the files in the layer with the given prefix. flat means only generate one level of a directory, - while non-flat means generate all file paths with that + while non-flat means generate all file paths with that prefix. Here's how flat=True handles different senarios: @@ -179,7 +179,7 @@ def list_files(self, prefix="", flat=False): - Lists the 'bigarray' directory 4. partial file name prefix = 'bigarray/chunk_' - Lists the 'bigarray/' directory and filters on 'chunk_' - + Return: generated sequence of file paths relative to layer_path """ @@ -197,7 +197,7 @@ def __exit__(self, exception_type, exception_value, traceback): class Storage(ThreadedQueue): """ - Access files stored in Google Storage (gs), Amazon S3 (s3), + Access files stored in Google Storage (gs), Amazon S3 (s3), or the local Filesystem (file). e.g. with Storage('gs://bucket/dataset/layer') as stor: @@ -210,7 +210,7 @@ class Storage(ThreadedQueue): Optional: n_threads (int:20): number of threads to use downloading and uplaoding. If 0, execution will be on the main python thread. - progress (bool:false): Show a tqdm progress bar for multiple + progress (bool:false): Show a tqdm progress bar for multiple uploads and downloads. """ def __init__(self, layer_path, n_threads=20, progress=False): @@ -219,12 +219,12 @@ def __init__(self, layer_path, n_threads=20, progress=False): self._layer_path = layer_path self._path = extract_path(layer_path) - + if self._path.protocol == 'file': self._interface_cls = FileInterface elif self._path.protocol == 'gs': self._interface_cls = GoogleCloudStorageInterface - elif self._path.protocol in ('s3', 'matrix'): + elif self._path.protocol in ('s3', 'matrix', 'wasabis3'): self._interface_cls = S3Interface elif self._path.protocol in ('http', 'https'): self._interface_cls = HttpInterface @@ -256,17 +256,17 @@ def put_json(self, file_path, content, content_type='application/json', *args, * if type(content) != str: content = json.dumps(content) return self.put_file(file_path, content, content_type=content_type, *args, **kwargs) - + def put_file(self, file_path, content, content_type=None, compress=None, cache_control=None): - """ + """ Args: filename (string): it can contains folders content (string): binary data to save """ - return self.put_files([ (file_path, content) ], - content_type=content_type, - compress=compress, - cache_control=cache_control, + return self.put_files([ (file_path, content) ], + content_type=content_type, + compress=compress, + cache_control=cache_control, block=False ) @@ -302,7 +302,7 @@ def exists(self, file_path): def files_exist(self, file_paths): """ - Threaded exists for all file paths. + Threaded exists for all file paths. file_paths: (list) file paths to test for existence @@ -343,16 +343,16 @@ def get_files(self, file_paths): results = [] def get_file_thunk(path, interface): - result = error = None + result = error = None try: result = interface.get_file(path) except Exception as err: error = err - # important to print immediately because + # important to print immediately because # errors are collected at the end - print(err) - + print(err) + content, encoding = result content = compression.decompress(content, encoding) @@ -403,10 +403,10 @@ def thunk_delete(path, interface): def list_files(self, prefix="", flat=False): """ - List the files in the layer with the given prefix. + List the files in the layer with the given prefix. flat means only generate one level of a directory, - while non-flat means generate all file paths with that + while non-flat means generate all file paths with that prefix. Here's how flat=True handles different senarios: @@ -418,7 +418,7 @@ def list_files(self, prefix="", flat=False): - Lists the 'bigarray' directory 4. partial file name prefix = 'bigarray/chunk_' - Lists the 'bigarray/' directory and filters on 'chunk_' - + Return: generated sequence of file paths relative to layer_path """ @@ -438,10 +438,10 @@ def __init__(self, path): self._path = path def get_path_to_file(self, file_path): - + clean = filter(None,[ - self._path.bucket, - self._path.intermediate_path, + self._path.bucket, + self._path.intermediate_path, self._path.dataset, self._path.layer, file_path @@ -474,12 +474,12 @@ def get_file(self, file_path): path = self.get_path_to_file(file_path) compressed = os.path.exists(path + '.gz') - + if compressed: path += '.gz' encoding = 'gzip' if compressed else None - + try: with open(path, 'rb') as f: data = f.read() @@ -500,14 +500,14 @@ def delete_file(self, file_path): def list_files(self, prefix, flat): """ - List the files in the layer with the given prefix. + List the files in the layer with the given prefix. flat means only generate one level of a directory, - while non-flat means generate all file paths with that + while non-flat means generate all file paths with that prefix. """ - layer_path = self.get_path_to_file("") + layer_path = self.get_path_to_file("") path = os.path.join(layer_path, prefix) + '*' filenames = [] @@ -525,10 +525,10 @@ def list_files(self, prefix, flat): files = [ os.path.join(root, f) for f in files ] files = [ f.replace(remove, '') for f in files ] files = [ f for f in files if f[:len(prefix)] == prefix ] - + for filename in files: filenames.append(filename) - + def stripgz(fname): (base, ext) = os.path.splitext(fname) if ext == '.gz': @@ -586,7 +586,7 @@ def exists(self, file_path): @retry def delete_file(self, file_path): key = self.get_path_to_file(file_path) - + try: self._bucket.delete_blob( key ) except google.cloud.exceptions.NotFound: @@ -594,13 +594,13 @@ def delete_file(self, file_path): def list_files(self, prefix, flat=False): """ - List the files in the layer with the given prefix. + List the files in the layer with the given prefix. flat means only generate one level of a directory, - while non-flat means generate all file paths with that + while non-flat means generate all file paths with that prefix. """ - layer_path = self.get_path_to_file("") + layer_path = self.get_path_to_file("") path = os.path.join(layer_path, prefix) for blob in self._bucket.list_blobs(prefix=path): filename = blob.name.replace(layer_path + '/', '') @@ -703,7 +703,7 @@ def get_file(self, file_path): encoding = resp['ContentEncoding'] return resp['Body'].read(), encoding - except botocore.exceptions.ClientError as err: + except botocore.exceptions.ClientError as err: if err.response['Error']['Code'] == 'NoSuchKey': return None, None else: @@ -721,7 +721,7 @@ def exists(self, file_path): exists = False else: raise - + return exists @retry @@ -733,14 +733,14 @@ def delete_file(self, file_path): def list_files(self, prefix, flat=False): """ - List the files in the layer with the given prefix. + List the files in the layer with the given prefix. flat means only generate one level of a directory, - while non-flat means generate all file paths with that + while non-flat means generate all file paths with that prefix. """ - layer_path = self.get_path_to_file("") + layer_path = self.get_path_to_file("") path = os.path.join(layer_path, prefix) resp = self._conn.list_objects_v2( @@ -782,7 +782,7 @@ def _radix_sort(L, i=0): """ Most significant char radix sort """ - if len(L) <= 1: + if len(L) <= 1: return L done_bucket = [] buckets = [ [] for x in range(255) ]