Skip to content

Commit 7fea0f5

Browse files
authored
Concurrency in pipe() (#901)
1 parent dd75a1a commit 7fea0f5

File tree

3 files changed

+39
-25
lines changed

3 files changed

+39
-25
lines changed

s3fs/core.py

+28-20
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ class S3FileSystem(AsyncFileSystem):
273273
connect_timeout = 5
274274
retries = 5
275275
read_timeout = 15
276-
default_block_size = 5 * 2**20
276+
default_block_size = 50 * 2**20
277277
protocol = ("s3", "s3a")
278278
_extra_tokenize_attributes = ("default_block_size",)
279279

@@ -299,7 +299,7 @@ def __init__(
299299
cache_regions=False,
300300
asynchronous=False,
301301
loop=None,
302-
max_concurrency=1,
302+
max_concurrency=10,
303303
fixed_upload_size: bool = False,
304304
**kwargs,
305305
):
@@ -1133,8 +1133,11 @@ async def _call_and_read():
11331133

11341134
return await _error_wrapper(_call_and_read, retries=self.retries)
11351135

1136-
async def _pipe_file(self, path, data, chunksize=50 * 2**20, **kwargs):
1136+
async def _pipe_file(
1137+
self, path, data, chunksize=50 * 2**20, max_concurrency=None, **kwargs
1138+
):
11371139
bucket, key, _ = self.split_path(path)
1140+
concurrency = max_concurrency or self.max_concurrency
11381141
size = len(data)
11391142
# 5 GB is the limit for an S3 PUT
11401143
if size < min(5 * 2**30, 2 * chunksize):
@@ -1146,23 +1149,27 @@ async def _pipe_file(self, path, data, chunksize=50 * 2**20, **kwargs):
11461149
mpu = await self._call_s3(
11471150
"create_multipart_upload", Bucket=bucket, Key=key, **kwargs
11481151
)
1149-
1150-
# TODO: cancel MPU if the following fails
1151-
out = [
1152-
await self._call_s3(
1153-
"upload_part",
1154-
Bucket=bucket,
1155-
PartNumber=i + 1,
1156-
UploadId=mpu["UploadId"],
1157-
Body=data[off : off + chunksize],
1158-
Key=key,
1152+
ranges = list(range(0, len(data), chunksize))
1153+
inds = list(range(0, len(ranges), concurrency)) + [len(ranges)]
1154+
parts = []
1155+
for start, stop in zip(inds[:-1], inds[1:]):
1156+
out = await asyncio.gather(
1157+
*[
1158+
self._call_s3(
1159+
"upload_part",
1160+
Bucket=bucket,
1161+
PartNumber=i + 1,
1162+
UploadId=mpu["UploadId"],
1163+
Body=data[ranges[i] : ranges[i] + chunksize],
1164+
Key=key,
1165+
)
1166+
for i in range(start, stop)
1167+
]
1168+
)
1169+
parts.extend(
1170+
{"PartNumber": i + 1, "ETag": o["ETag"]}
1171+
for i, o in zip(range(start, stop), out)
11591172
)
1160-
for i, off in enumerate(range(0, len(data), chunksize))
1161-
]
1162-
1163-
parts = [
1164-
{"PartNumber": i + 1, "ETag": o["ETag"]} for i, o in enumerate(out)
1165-
]
11661173
await self._call_s3(
11671174
"complete_multipart_upload",
11681175
Bucket=bucket,
@@ -2145,7 +2152,7 @@ def __init__(
21452152
s3,
21462153
path,
21472154
mode="rb",
2148-
block_size=5 * 2**20,
2155+
block_size=50 * 2**20,
21492156
acl=False,
21502157
version_id=None,
21512158
fill_cache=True,
@@ -2365,6 +2372,7 @@ def n_bytes_left() -> int:
23652372
return len(self.buffer.getbuffer()) - self.buffer.tell()
23662373

23672374
min_chunk = 1 if final else self.blocksize
2375+
# TODO: concurrent here
23682376
if self.fs.fixed_upload_size:
23692377
# all chunks have fixed size, exception: last one can be smaller
23702378
while n_bytes_left() >= min_chunk:

s3fs/tests/derived/s3fs_fixtures.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
test_bucket_name = "test"
1212
secure_bucket_name = "test-secure"
1313
versioned_bucket_name = "test-versioned"
14-
port = 5555
14+
port = 5556
1515
endpoint_uri = "http://127.0.0.1:%s/" % port
1616

1717

@@ -109,6 +109,9 @@ def _s3_base(self):
109109
pass
110110
timeout -= 0.1
111111
time.sleep(0.1)
112+
if proc.poll() is not None:
113+
proc.terminate()
114+
raise RuntimeError("Starting moto server failed")
112115
print("server up")
113116
yield
114117
print("moto done")

s3fs/tests/test_s3fs.py

+7-4
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,10 @@ def s3_base():
9090
def reset_s3_fixture():
9191
# We reuse the MotoServer for all tests
9292
# But we do want a clean state for every test
93-
requests.post(f"{endpoint_uri}/moto-api/reset")
93+
try:
94+
requests.post(f"{endpoint_uri}/moto-api/reset")
95+
except:
96+
pass
9497

9598

9699
def get_boto3_client():
@@ -1253,7 +1256,7 @@ def test_write_fails(s3):
12531256

12541257

12551258
def test_write_blocks(s3):
1256-
with s3.open(test_bucket_name + "/temp", "wb") as f:
1259+
with s3.open(test_bucket_name + "/temp", "wb", block_size=5 * 2**20) as f:
12571260
f.write(b"a" * 2 * 2**20)
12581261
assert f.buffer.tell() == 2 * 2**20
12591262
assert not (f.parts)
@@ -1787,7 +1790,7 @@ def test_change_defaults_only_subsequent():
17871790
S3FileSystem.cachable = False # don't reuse instances with same pars
17881791

17891792
fs_default = S3FileSystem(client_kwargs={"endpoint_url": endpoint_uri})
1790-
assert fs_default.default_block_size == 5 * (1024**2)
1793+
assert fs_default.default_block_size == 50 * (1024**2)
17911794

17921795
fs_overridden = S3FileSystem(
17931796
default_block_size=64 * (1024**2),
@@ -1804,7 +1807,7 @@ def test_change_defaults_only_subsequent():
18041807

18051808
# Test the other file systems created to see if their block sizes changed
18061809
assert fs_overridden.default_block_size == 64 * (1024**2)
1807-
assert fs_default.default_block_size == 5 * (1024**2)
1810+
assert fs_default.default_block_size == 50 * (1024**2)
18081811
finally:
18091812
S3FileSystem.default_block_size = 5 * (1024**2)
18101813
S3FileSystem.cachable = True

0 commit comments

Comments
 (0)