put_file: support concurrent multipart uploads with max_concurrency#848
put_file: support concurrent multipart uploads with max_concurrency#848martindurant merged 2 commits intofsspec:mainfrom
Conversation
|
For reference, I live in Seoul and am physically close to the However, for any other region, increasing the concurrency makes a noticeable difference for me. Using Given this script and a bucket in s3 = s3fs.S3FileSystem(cache_regions=True)
for i in (None, 32):
s3.put_file(
"ubuntu-22.04.2-live-server-arm64.iso",
f"s3://{BUCKET}/s3fs-test/ubuntu-22.04.2-live-server-arm64.iso",
callback=TqdmCallback(tqdm_kwargs={"desc": f"put file max_concurrency={i}", "unit": "B", "unit_scale": True}),
max_concurrency=i,
)$ python test.py
put file max_concurrency=None: 100%|███████████████████████████| 1.94G/1.94G [01:53<00:00, 17.1MB/s]
put file max_concurrency=32: 100%|█████████████████████████████| 1.94G/1.94G [00:31<00:00, 62.1MB/s]For reference, $ time aws s3 cp ubuntu-22.04.2-live-server-arm64.iso s3://{BUCKET}/s3fs-test/ubuntu-22.04.2-live-server-arm64.iso
upload: ./ubuntu-22.04.2-live-server-arm64.iso to s3://{BUCKET}/s3fs-test/ubuntu-22.04.2-live-server-arm64.iso
aws s3 cp ubuntu-22.04.2-live-server-arm64.iso 7.53s user 4.03s system 26% cpu 43.334 total |
b2a8eb4 to
8f0f85f
Compare
Any idea why this happens? The upload should in theory saturate the bandwidth whether on a single call for the whole massive file (after one count of latency) or on many concurrent calls (that all wait ~1 count of latency at the same time). |
I'm not sure, and as I mentioned I do get the max theoretical bandwidth when I'm guaranteed to have good routing to the datacenter, whether or not it's a concurrent upload. But we have seen users with the same issues with other storage providers as well, so this isn't limited to AWS, see adlfs/azure issue: It's probably worth noting that boto3/s3transfer also do multipart uploads in concurrent threads rather than sequentially |
OK, let's assume there is some component of per-packet latency too, then. Perhaps it's an SSL thing. Thanks for digging. |
martindurant
left a comment
There was a problem hiding this comment.
I have a couple of thoughts on naming, and when/if we can apply the same strategy to the other high-bandwidth operations.
The only substantial comment is about the batching strategy. It may not matter, and we can keep this approach for now, since it already produces an improvement.
s3fs/core.py
Outdated
| self.invalidate_cache(rpath) | ||
| rpath = self._parent(rpath) | ||
|
|
||
| async def _upload_part_concurrent( |
There was a problem hiding this comment.
Can we please rename this to indicate it is uploading from a file, as opposed to bytes. Or can it be generalised to support pipe() too?
| while True: | ||
| chunks = [] | ||
| for i in range(max_concurrency): | ||
| chunk = f0.read(chunksize) |
There was a problem hiding this comment.
Somewhere we need a caveat, that increasing concurrency will lead to high memory use.
s3fs/core.py
Outdated
| If given, the maximum number of concurrent transfers to use for a | ||
| multipart upload. Defaults to 1 (multipart uploads will be done sequentially). | ||
| Note that when used in conjunction with ``S3FileSystem.put(batch_size=...)`` | ||
| the result will be a maximum of ``max_concurrency * batch_size`` concurrent | ||
| transfers. |
There was a problem hiding this comment.
| If given, the maximum number of concurrent transfers to use for a | |
| multipart upload. Defaults to 1 (multipart uploads will be done sequentially). | |
| Note that when used in conjunction with ``S3FileSystem.put(batch_size=...)`` | |
| the result will be a maximum of ``max_concurrency * batch_size`` concurrent | |
| transfers. | |
| The maximum number of concurrent transfers to use per file for a | |
| multipart upload (``put()``) operations. Defaults to 1 (sequential). | |
| When used in conjunction with ``S3FileSystem.put(batch_size=...)`` | |
| the maximum number of simultaneous connections is ``max_concurrency * batch_size``. | |
| We may extend this parameter to affect ``pipe()``, ``cat()`` and ``get()``. |
s3fs/core.py
Outdated
| ): | ||
| max_concurrency = max_concurrency or self.max_concurrency | ||
| if max_concurrency is None or max_concurrency < 1: | ||
| max_concurrency = 1 |
There was a problem hiding this comment.
Why not max_concurrency=1 as the default in __init__?
fsspec/s3fs#848 added a `max_concurrency` kwarg, released in s3fs 2024.3, which seems to break something in `dvc pull`: https://github.com/hudcostreets/nj-crashes/actions/runs/8316430240/job/22755877957#step:11:31
max_concurrencyparameter which can be used to increase the concurrency for multipart uploads duringS3FileSystem._put_file()(behaves the same asmax_concurrencyfor uploads in adlfs)max_concurrencycan be set at the fs instance level or at as a parameter passed to_put_file()fs.put(batch_size=...), so you will end up with a maximum ofmax_concurrency * batch_sizeparts being transferred at once_get_file().