From c16ba5a4626b866be81243ac4f7d71fc238cbe01 Mon Sep 17 00:00:00 2001 From: Torsten Dittmann Date: Tue, 5 May 2026 02:21:37 +0400 Subject: [PATCH 1/3] Add concurrent chunked uploads to JS SDKs --- package-lock.json | 6 - templates/node/src/client.ts.twig | 200 ++++++++++++++---- .../src/services/template.ts.twig | 112 ++++++++-- templates/web/src/client.ts.twig | 113 ++++++++-- 4 files changed, 341 insertions(+), 90 deletions(-) delete mode 100644 package-lock.json diff --git a/package-lock.json b/package-lock.json deleted file mode 100644 index d0fe0087a3..0000000000 --- a/package-lock.json +++ /dev/null @@ -1,6 +0,0 @@ -{ - "name": "sdk-generator", - "lockfileVersion": 2, - "requires": true, - "packages": {} -} diff --git a/templates/node/src/client.ts.twig b/templates/node/src/client.ts.twig index 8a3e3ba684..23d579cb67 100644 --- a/templates/node/src/client.ts.twig +++ b/templates/node/src/client.ts.twig @@ -275,82 +275,194 @@ class Client { return await this.call(method, url, headers, payload); } - let start = 0; - let response = null; + const totalChunks = Math.ceil(size / Client.CHUNK_SIZE); - while (start < size) { - let end = start + Client.CHUNK_SIZE; - if (end >= size) { - end = size; - } + // Upload first chunk alone to get the upload ID + const firstChunkEnd = Math.min(Client.CHUNK_SIZE, size); + const firstChunkHeaders = { ...headers, 'content-range': `bytes 0-${firstChunkEnd - 1}/${size}` }; + const firstChunk = await file.slice(0, firstChunkEnd); + const firstPayload = { ...originalPayload }; + firstPayload[fileParam] = new File([firstChunk], file.filename); - headers['content-range'] = `bytes ${start}-${end - 1}/${size}`; - const chunk = await file.slice(start, end); + let response = await this.call(method, url, firstChunkHeaders, firstPayload); + const uploadId = response?.$id; - const payload = { ...originalPayload }; - payload[fileParam] = new File([chunk], file.filename); + if (onProgress && typeof onProgress === 'function') { + onProgress({ + $id: uploadId, + progress: Math.round((firstChunkEnd / size) * 100), + sizeUploaded: firstChunkEnd, + chunksTotal: totalChunks, + chunksUploaded: 1 + }); + } + + if (totalChunks === 1) { + return response; + } - response = await this.call(method, url, headers, payload); + // Prepare remaining chunks + const chunks: { index: number; start: number; end: number }[] = []; + for (let i = 1; i < totalChunks; i++) { + const start = i * Client.CHUNK_SIZE; + const end = Math.min(start + Client.CHUNK_SIZE, size); + chunks.push({ index: i, start, end }); + } + + // Upload remaining chunks with max concurrency of 8 + const CONCURRENCY = 8; + let completedCount = 1; + let uploadedBytes = firstChunkEnd; + let lastResponse = response; + + const uploadChunk = async (chunk: typeof chunks[0]) => { + const chunkHeaders = { ...headers }; + if (uploadId) { + chunkHeaders['x-{{spec.title | caseLower }}-id'] = uploadId; + } + chunkHeaders['content-range'] = `bytes ${chunk.start}-${chunk.end - 1}/${size}`; + + const chunkBlob = await file.slice(chunk.start, chunk.end); + const chunkPayload = { ...originalPayload }; + chunkPayload[fileParam] = new File([chunkBlob], file.filename); + + const chunkResponse = await this.call(method, url, chunkHeaders, chunkPayload); + + completedCount++; + uploadedBytes += (chunk.end - chunk.start); + + if (chunk.index === totalChunks - 1) { + lastResponse = chunkResponse; + } if (onProgress && typeof onProgress === 'function') { onProgress({ - $id: response.$id, - progress: Math.round((end / size) * 100), - sizeUploaded: end, - chunksTotal: Math.ceil(size / Client.CHUNK_SIZE), - chunksUploaded: Math.ceil(end / Client.CHUNK_SIZE) + $id: uploadId, + progress: Math.round((uploadedBytes / size) * 100), + sizeUploaded: uploadedBytes, + chunksTotal: totalChunks, + chunksUploaded: completedCount }); } - if (response && response.$id) { - headers['x-{{spec.title | caseLower }}-id'] = response.$id; - } + return chunkResponse; + }; + + // Process with limited concurrency using a worker pool + const queue = [...chunks]; + const workers: Promise[] = []; - start = end; + for (let i = 0; i < Math.min(CONCURRENCY, queue.length); i++) { + workers.push( + (async () => { + while (queue.length > 0) { + const chunk = queue.shift()!; + await uploadChunk(chunk); + } + })() + ); } - return response; + await Promise.all(workers); + + return lastResponse; } if (file.size <= Client.CHUNK_SIZE) { return await this.call(method, url, headers, originalPayload); } - let start = 0; - let response = null; + const totalChunks = Math.ceil(file.size / Client.CHUNK_SIZE); + + // Upload first chunk alone to get the upload ID + const firstChunkEnd = Math.min(Client.CHUNK_SIZE, file.size); + const firstChunkHeaders = { ...headers, 'content-range': `bytes 0-${firstChunkEnd - 1}/${file.size}` }; + const firstChunk = file.slice(0, firstChunkEnd); + const firstPayload = { ...originalPayload }; + firstPayload[fileParam] = new File([firstChunk], file.name); + + let response = await this.call(method, url, firstChunkHeaders, firstPayload); + const uploadId = response?.$id; + + if (onProgress && typeof onProgress === 'function') { + onProgress({ + $id: uploadId, + progress: Math.round((firstChunkEnd / file.size) * 100), + sizeUploaded: firstChunkEnd, + chunksTotal: totalChunks, + chunksUploaded: 1 + }); + } - while (start < file.size) { - let end = start + Client.CHUNK_SIZE; // Prepare end for the next chunk - if (end >= file.size) { - end = file.size; // Adjust for the last chunk to include the last byte - } + if (totalChunks === 1) { + return response; + } - headers['content-range'] = `bytes ${start}-${end-1}/${file.size}`; - const chunk = file.slice(start, end); + // Prepare remaining chunks + const chunks: { index: number; start: number; end: number }[] = []; + for (let i = 1; i < totalChunks; i++) { + const start = i * Client.CHUNK_SIZE; + const end = Math.min(start + Client.CHUNK_SIZE, file.size); + chunks.push({ index: i, start, end }); + } - let payload = { ...originalPayload }; - payload[fileParam] = new File([chunk], file.name); + // Upload remaining chunks with max concurrency of 8 + const CONCURRENCY = 8; + let completedCount = 1; + let uploadedBytes = firstChunkEnd; + let lastResponse = response; - response = await this.call(method, url, headers, payload); + const uploadChunk = async (chunk: typeof chunks[0]) => { + const chunkHeaders = { ...headers }; + if (uploadId) { + chunkHeaders['x-{{spec.title | caseLower }}-id'] = uploadId; + } + chunkHeaders['content-range'] = `bytes ${chunk.start}-${chunk.end - 1}/${file.size}`; + + const chunkBlob = file.slice(chunk.start, chunk.end); + const chunkPayload = { ...originalPayload }; + chunkPayload[fileParam] = new File([chunkBlob], file.name); + + const chunkResponse = await this.call(method, url, chunkHeaders, chunkPayload); + + completedCount++; + uploadedBytes += (chunk.end - chunk.start); + + if (chunk.index === totalChunks - 1) { + lastResponse = chunkResponse; + } if (onProgress && typeof onProgress === 'function') { onProgress({ - $id: response.$id, - progress: Math.round((end / file.size) * 100), - sizeUploaded: end, - chunksTotal: Math.ceil(file.size / Client.CHUNK_SIZE), - chunksUploaded: Math.ceil(end / Client.CHUNK_SIZE) + $id: uploadId, + progress: Math.round((uploadedBytes / file.size) * 100), + sizeUploaded: uploadedBytes, + chunksTotal: totalChunks, + chunksUploaded: completedCount }); } - if (response && response.$id) { - headers['x-{{spec.title | caseLower }}-id'] = response.$id; - } + return chunkResponse; + }; - start = end; + // Process with limited concurrency using a worker pool + const queue = [...chunks]; + const workers: Promise[] = []; + + for (let i = 0; i < Math.min(CONCURRENCY, queue.length); i++) { + workers.push( + (async () => { + while (queue.length > 0) { + const chunk = queue.shift()!; + await uploadChunk(chunk); + } + })() + ); } - return response; + await Promise.all(workers); + + return lastResponse; } async ping(): Promise { diff --git a/templates/react-native/src/services/template.ts.twig b/templates/react-native/src/services/template.ts.twig index 40e3be45ab..9e75c64d5e 100644 --- a/templates/react-native/src/services/template.ts.twig +++ b/templates/react-native/src/services/template.ts.twig @@ -181,41 +181,119 @@ export class {{ service.name | caseUcfirst }} extends Service { {% endif %} {% endfor %} - let timestamp = new Date().getTime(); - while (offset < size) { - let end = Math.min(offset + Service.CHUNK_SIZE - 1, size - 1); + const totalChunks = Math.ceil(size / Service.CHUNK_SIZE); - apiHeaders['content-range'] = 'bytes ' + offset + '-' + end + '/' + size; - if (response && response.$id) { - apiHeaders['x-{{spec.title | caseLower }}-id'] = response.$id; - } + // Upload first chunk alone to get the upload ID + if (offset === 0) { + const firstChunkEnd = Math.min(Service.CHUNK_SIZE, size); + const firstChunkHeaders = { ...apiHeaders, 'content-range': 'bytes 0-' + (firstChunkEnd - 1) + '/' + size }; - let chunk = await FileSystem.readAsStringAsync({{ parameter.name | caseCamel | escapeKeyword }}.uri, { + let firstChunk = await FileSystem.readAsStringAsync({{ parameter.name | caseCamel | escapeKeyword }}.uri, { encoding: FileSystem.EncodingType.Base64, - position: offset, + position: 0, length: Service.CHUNK_SIZE }); - var path = `data:${{'{'}}{{ parameter.name | caseCamel | escapeKeyword }}.type{{'}'}};base64,${{'{'}}chunk{{'}'}}`; + var firstPath = `data:${{'{'}}{{ parameter.name | caseCamel | escapeKeyword }}.type{{'}'}};base64,${{'{'}}firstChunk{{'}'}}`; if (RNPlatform.OS.toLowerCase() === 'android') { - path = FileSystem.cacheDirectory + '/tmp_chunk_' + timestamp; - await FileSystem.writeAsStringAsync(path, chunk, {encoding: FileSystem.EncodingType.Base64}); + firstPath = FileSystem.cacheDirectory + '/tmp_chunk_' + new Date().getTime(); + await FileSystem.writeAsStringAsync(firstPath, firstChunk, {encoding: FileSystem.EncodingType.Base64}); } - payload['{{ parameter.name }}'] = {{ '{' }} uri: path, name: {{ parameter.name | caseCamel | escapeKeyword }}.name, type: {{ parameter.name | caseCamel | escapeKeyword }}.type {{ '}' }}; + payload['{{ parameter.name }}'] = {{ '{' }} uri: firstPath, name: {{ parameter.name | caseCamel | escapeKeyword }}.name, type: {{ parameter.name | caseCamel | escapeKeyword }}.type {{ '}' }}; - response = await this.client.call('{{ method.method | caseLower }}', uri, apiHeaders, payload); + response = await this.client.call('{{ method.method | caseLower }}', uri, firstChunkHeaders, payload); + offset = firstChunkEnd; if (onProgress) { onProgress({ $id: response.$id, progress: (offset / size) * 100, sizeUploaded: offset, - chunksTotal: response.chunksTotal, - chunksUploaded: response.chunksUploaded + chunksTotal: totalChunks, + chunksUploaded: 1 + }); + } + } + + if (offset >= size) { + return response; + } + + const uploadId = response?.$id; + const chunks: { index: number; start: number; end: number }[] = []; + const startChunkIndex = Math.ceil(offset / Service.CHUNK_SIZE); + for (let i = startChunkIndex; i < totalChunks; i++) { + const start = i * Service.CHUNK_SIZE; + const end = Math.min(start + Service.CHUNK_SIZE, size); + chunks.push({ index: i, start, end }); + } + + // Upload remaining chunks with max concurrency of 8 + const CONCURRENCY = 8; + let completedCount = startChunkIndex; + let uploadedBytes = offset; + + const uploadChunk = async (chunk: typeof chunks[0]) => { + const chunkHeaders = { ...apiHeaders }; + if (uploadId) { + chunkHeaders['x-{{spec.title | caseLower }}-id'] = uploadId; + } + chunkHeaders['content-range'] = 'bytes ' + chunk.start + '-' + (chunk.end - 1) + '/' + size; + + const chunkData = await FileSystem.readAsStringAsync({{ parameter.name | caseCamel | escapeKeyword }}.uri, { + encoding: FileSystem.EncodingType.Base64, + position: chunk.start, + length: chunk.end - chunk.start + }); + + let chunkPath = `data:${{'{'}}{{ parameter.name | caseCamel | escapeKeyword }}.type{{'}'}};base64,${{'{'}}chunkData{{'}'}}`; + if (RNPlatform.OS.toLowerCase() === 'android') { + chunkPath = FileSystem.cacheDirectory + '/tmp_chunk_' + new Date().getTime() + '_' + chunk.index; + await FileSystem.writeAsStringAsync(chunkPath, chunkData, {encoding: FileSystem.EncodingType.Base64}); + } + + const chunkPayload = { ...payload }; + chunkPayload['{{ parameter.name }}'] = {{ '{' }} uri: chunkPath, name: {{ parameter.name | caseCamel | escapeKeyword }}.name, type: {{ parameter.name | caseCamel | escapeKeyword }}.type {{ '}' }}; + + const chunkResponse = await this.client.call('{{ method.method | caseLower }}', uri, chunkHeaders, chunkPayload); + + completedCount++; + uploadedBytes += (chunk.end - chunk.start); + + if (chunk.index === totalChunks - 1) { + response = chunkResponse; + } + + if (onProgress) { + onProgress({ + $id: uploadId, + progress: (uploadedBytes / size) * 100, + sizeUploaded: uploadedBytes, + chunksTotal: totalChunks, + chunksUploaded: completedCount }); } - offset += Service.CHUNK_SIZE; + + return chunkResponse; + }; + + // Process with limited concurrency using a worker pool + const queue = [...chunks]; + const workers: Promise[] = []; + + for (let i = 0; i < Math.min(CONCURRENCY, queue.length); i++) { + workers.push( + (async () => { + while (queue.length > 0) { + const chunk = queue.shift()!; + await uploadChunk(chunk); + } + })() + ); } + + await Promise.all(workers); + return response; {% endif %} {% endfor %} diff --git a/templates/web/src/client.ts.twig b/templates/web/src/client.ts.twig index cff84095f6..99d573d8ad 100644 --- a/templates/web/src/client.ts.twig +++ b/templates/web/src/client.ts.twig @@ -824,41 +824,108 @@ class Client { return await this.call(method, url, headers, originalPayload); } - let start = 0; - let response = null; + const totalChunks = Math.ceil(file.size / Client.CHUNK_SIZE); + + // Upload first chunk alone to get the upload ID + const firstChunkEnd = Math.min(Client.CHUNK_SIZE, file.size); + const firstChunkHeaders = { ...headers, 'content-range': `bytes 0-${firstChunkEnd - 1}/${file.size}` }; + const firstChunk = file.slice(0, firstChunkEnd); + const firstPayload = { ...originalPayload }; + firstPayload[fileParam] = new File([firstChunk], file.name); + + let response = await this.call(method, url, firstChunkHeaders, firstPayload); + const uploadId = response?.$id; + + if (onProgress && typeof onProgress === 'function') { + onProgress({ + $id: uploadId, + progress: Math.round((firstChunkEnd / file.size) * 100), + sizeUploaded: firstChunkEnd, + chunksTotal: totalChunks, + chunksUploaded: 1 + }); + } - while (start < file.size) { - let end = start + Client.CHUNK_SIZE; // Prepare end for the next chunk - if (end >= file.size) { - end = file.size; // Adjust for the last chunk to include the last byte - } + if (totalChunks === 1) { + return response; + } - headers['content-range'] = `bytes ${start}-${end-1}/${file.size}`; - const chunk = file.slice(start, end); + // Prepare remaining chunks + const chunks: { index: number; start: number; end: number }[] = []; + for (let i = 1; i < totalChunks; i++) { + const start = i * Client.CHUNK_SIZE; + const end = Math.min(start + Client.CHUNK_SIZE, file.size); + chunks.push({ index: i, start, end }); + } - let payload = { ...originalPayload }; - payload[fileParam] = new File([chunk], file.name); + // Upload remaining chunks with max concurrency of 8 + const CONCURRENCY = 8; + let completedCount = 1; + let uploadedBytes = firstChunkEnd; + let lastResponse = response; - response = await this.call(method, url, headers, payload); + const uploadChunk = async (chunk: typeof chunks[0]) => { + const chunkHeaders = { ...headers }; + if (uploadId) { + chunkHeaders['x-{{spec.title | caseLower }}-id'] = uploadId; + } + chunkHeaders['content-range'] = `bytes ${chunk.start}-${chunk.end - 1}/${file.size}`; + + const chunkBlob = file.slice(chunk.start, chunk.end); + const chunkPayload = { ...originalPayload }; + chunkPayload[fileParam] = new File([chunkBlob], file.name); + + const chunkResponse = await this.call(method, url, chunkHeaders, chunkPayload); + + completedCount++; + uploadedBytes += (chunk.end - chunk.start); + + if (chunk.index === totalChunks - 1) { + lastResponse = chunkResponse; + } if (onProgress && typeof onProgress === 'function') { onProgress({ - $id: response.$id, - progress: Math.round((end / file.size) * 100), - sizeUploaded: end, - chunksTotal: Math.ceil(file.size / Client.CHUNK_SIZE), - chunksUploaded: Math.ceil(end / Client.CHUNK_SIZE) + $id: uploadId, + progress: Math.round((uploadedBytes / file.size) * 100), + sizeUploaded: uploadedBytes, + chunksTotal: totalChunks, + chunksUploaded: completedCount }); } - if (response && response.$id) { - headers['x-{{spec.title | caseLower }}-id'] = response.$id; - } + return chunkResponse; + }; - start = end; - } + await new Promise((resolve, reject) => { + let nextChunk = 0; + let inFlight = 0; + let completed = 0; + + const uploadNext = () => { + if (completed === chunks.length) { + resolve(); + return; + } + + while (inFlight < CONCURRENCY && nextChunk < chunks.length) { + const chunk = chunks[nextChunk++]; + inFlight++; + + uploadChunk(chunk) + .then(() => { + inFlight--; + completed++; + uploadNext(); + }) + .catch(reject); + } + }; + + uploadNext(); + }); - return response; + return lastResponse; } async ping(): Promise { From 2509dabb4b1291101e1a2ff6964b9db578469c4f Mon Sep 17 00:00:00 2001 From: Torsten Dittmann Date: Tue, 5 May 2026 14:17:17 +0400 Subject: [PATCH 2/3] Add concurrent chunked uploads to all SDKs --- .../src/main/java/io/package/Client.kt.twig | 126 ++++++--- templates/apple/Sources/Client.swift.twig | 90 +++++-- .../dart/lib/src/client_browser.dart.twig | 103 ++++++-- templates/dart/lib/src/client_io.dart.twig | 115 +++++++-- templates/deno/src/services/service.ts.twig | 144 +++++++---- templates/dotnet/Package/Client.cs.twig | 182 ++++++++++--- .../flutter/lib/src/client_browser.dart.twig | 103 ++++++-- templates/flutter/lib/src/client_io.dart.twig | 115 +++++++-- templates/go/client.go.twig | 120 +++++++-- .../main/kotlin/io/appwrite/Client.kt.twig | 122 ++++++--- templates/php/base/requests/file.twig | 179 +++++++++++-- templates/python/package/client.py.twig | 116 ++++++--- templates/ruby/lib/container/client.rb.twig | 92 +++++-- templates/rust/src/client.rs.twig | 244 +++++++++++++----- templates/swift/Sources/Client.swift.twig | 90 +++++-- 15 files changed, 1514 insertions(+), 427 deletions(-) diff --git a/templates/android/library/src/main/java/io/package/Client.kt.twig b/templates/android/library/src/main/java/io/package/Client.kt.twig index 8eb0016a23..03549ae09b 100644 --- a/templates/android/library/src/main/java/io/package/Client.kt.twig +++ b/templates/android/library/src/main/java/io/package/Client.kt.twig @@ -13,6 +13,9 @@ import {{ sdk.namespace | caseDot }}.models.UploadProgress import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.suspendCancellableCoroutine import okhttp3.* import okhttp3.Headers.Companion.toHeaders @@ -30,6 +33,8 @@ import java.net.CookieManager import java.net.CookiePolicy import java.security.SecureRandom import java.security.cert.X509Certificate +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicLong import javax.net.ssl.SSLContext import javax.net.ssl.SSLSocketFactory import javax.net.ssl.TrustManager @@ -49,6 +54,7 @@ class Client @JvmOverloads constructor( * The size for chunked uploads in bytes. */ internal const val CHUNK_SIZE = 5*1024*1024; // 5MB + internal const val MAX_CONCURRENT_UPLOADS = 8 internal const val GLOBAL_PREFS = "{{ sdk.namespace | caseDot }}" internal const val COOKIE_PREFS = "myCookie" } @@ -374,12 +380,10 @@ class Client @JvmOverloads constructor( idParamName: String? = null, onProgress: ((UploadProgress) -> Unit)? = null, ): T { - var file: RandomAccessFile? = null val input = params[paramName] as InputFile val size: Long = when(input.sourceType) { "path", "file" -> { - file = RandomAccessFile(input.path, "r") - file.length() + File(input.path).length() } "bytes" -> { (input.data as ByteArray).size.toLong() @@ -408,9 +412,9 @@ class Client @JvmOverloads constructor( ) } - val buffer = ByteArray(CHUNK_SIZE) var offset = 0L var result: Map<*, *>? = null + var uploadId: String? = null if (idParamName?.isNotEmpty() == true) { // Make a request to check if a file already exists @@ -423,59 +427,119 @@ class Client @JvmOverloads constructor( ) val chunksUploaded = current["chunksUploaded"] as Long offset = chunksUploaded * CHUNK_SIZE + uploadId = params[idParamName]?.toString() } - while (offset < size) { - when(input.sourceType) { + fun readChunk(start: Long, end: Long): ByteArray { + val length = (end - start).toInt() + return when(input.sourceType) { "file", "path" -> { - file!!.seek(offset) - file!!.read(buffer) + RandomAccessFile(input.path, "r").use { chunkFile -> + val chunk = ByteArray(length) + chunkFile.seek(start) + chunkFile.readFully(chunk) + chunk + } } "bytes" -> { - val end = if (offset + CHUNK_SIZE < size) { - offset + CHUNK_SIZE - 1 - } else { - size - 1 - } - (input.data as ByteArray).copyInto( - buffer, - startIndex = offset.toInt(), - endIndex = end.toInt() - ) + (input.data as ByteArray).copyOfRange(start.toInt(), end.toInt()) } else -> throw UnsupportedOperationException() } + } - params[paramName] = MultipartBody.Part.createFormData( + suspend fun uploadChunk(index: Int, start: Long, end: Long, includeUploadId: Boolean): Map<*, *> { + val chunkParams = params.toMutableMap() + val chunkHeaders = headers.toMutableMap() + + if (includeUploadId && uploadId != null) { + chunkHeaders["x-{{ spec.title | caseLower }}-id"] = uploadId!! + } + + chunkHeaders["Content-Range"] = "bytes $start-${end - 1}/$size" + chunkParams[paramName] = MultipartBody.Part.createFormData( paramName, input.filename, - buffer.toRequestBody() + readChunk(start, end).toRequestBody() ) - headers["Content-Range"] = - "bytes $offset-${((offset + CHUNK_SIZE) - 1).coerceAtMost(size - 1)}/$size" - - result = call( + val chunkResult = call( method = "POST", path, - headers, - params, + chunkHeaders, + chunkParams, responseType = Map::class.java ) - offset += CHUNK_SIZE - headers["x-{{ spec.title | caseLower }}-id"] = result["\$id"].toString() + if (index == 0 || uploadId == null) { + uploadId = chunkResult["\$id"].toString() + } + + return chunkResult + } + + if (offset == 0L) { + val firstChunkEnd = CHUNK_SIZE.toLong().coerceAtMost(size) + result = uploadChunk(0, 0, firstChunkEnd, false) + offset = firstChunkEnd onProgress?.invoke( UploadProgress( - id = result["\$id"].toString(), + id = uploadId ?: result!!["\$id"].toString(), progress = offset.coerceAtMost(size).toDouble() / size * 100, sizeUploaded = offset.coerceAtMost(size), - chunksTotal = result["chunksTotal"].toString().toInt(), - chunksUploaded = result["chunksUploaded"].toString().toInt(), + chunksTotal = result!!["chunksTotal"].toString().toInt(), + chunksUploaded = result!!["chunksUploaded"].toString().toInt(), ) ) } + val chunks = mutableListOf>() + var chunkOffset = offset + while (chunkOffset < size) { + val end = (chunkOffset + CHUNK_SIZE).coerceAtMost(size) + chunks.add(Triple((chunkOffset / CHUNK_SIZE).toInt(), chunkOffset, end)) + chunkOffset = end + } + + if (chunks.isNotEmpty()) { + val nextChunk = AtomicInteger(0) + val completedChunks = AtomicInteger((offset / CHUNK_SIZE).toInt()) + val uploadedBytes = AtomicLong(offset.coerceAtMost(size)) + + coroutineScope { + List(MAX_CONCURRENT_UPLOADS.coerceAtMost(chunks.size)) { + async { + while (true) { + val chunkIndex = nextChunk.getAndIncrement() + if (chunkIndex >= chunks.size) { + break + } + + val (index, start, end) = chunks[chunkIndex] + val chunkResult = uploadChunk(index, start, end, true) + + val chunksUploaded = completedChunks.incrementAndGet() + val sizeUploaded = uploadedBytes.addAndGet(end - start) + + if (index == ((size - 1) / CHUNK_SIZE).toInt()) { + result = chunkResult + } + + onProgress?.invoke( + UploadProgress( + id = uploadId ?: chunkResult["\$id"].toString(), + progress = sizeUploaded.coerceAtMost(size).toDouble() / size * 100, + sizeUploaded = sizeUploaded.coerceAtMost(size), + chunksTotal = chunkResult["chunksTotal"].toString().toInt(), + chunksUploaded = chunksUploaded, + ) + ) + } + } + }.awaitAll() + } + } + return converter(result as Map) } diff --git a/templates/apple/Sources/Client.swift.twig b/templates/apple/Sources/Client.swift.twig index 27a76f9704..7922d4939f 100644 --- a/templates/apple/Sources/Client.swift.twig +++ b/templates/apple/Sources/Client.swift.twig @@ -441,6 +441,7 @@ open class Client { var offset = 0 var result = [String:Any]() + var uploadId = idParamName != nil ? params[idParamName!] as? String : nil if idParamName != nil { // Make a request to check if a file already exists @@ -459,32 +460,89 @@ open class Client { } } - while offset < size { - let slice = (input.data as! ByteBuffer).getSlice(at: offset, length: Client.chunkSize) - ?? (input.data as! ByteBuffer).getSlice(at: offset, length: Int(size - offset)) - - params[paramName] = InputFile.fromBuffer(slice!, filename: input.filename, mimeType: input.mimeType) - headers["content-range"] = "bytes \(offset)-\(min((offset + Client.chunkSize) - 1, size - 1))/\(size)" + let totalChunks = Int(ceil(Double(size) / Double(Client.chunkSize))) + var nextChunk = offset / Client.chunkSize + var completedChunks = nextChunk + var uploadedBytes = min(offset, size) + + func uploadChunk(index: Int, uploadId: String?) async throws -> (Int, Int, [String: Any]) { + let chunkOffset = index * Client.chunkSize + let chunkLength = min(Client.chunkSize, size - chunkOffset) + let slice = (input.data as! ByteBuffer).getSlice(at: chunkOffset, length: chunkLength)! + var chunkParams = params + var chunkHeaders = headers + chunkParams[paramName] = InputFile.fromBuffer(slice, filename: input.filename, mimeType: input.mimeType) + chunkHeaders["content-range"] = "bytes \(chunkOffset)-\(chunkOffset + chunkLength - 1)/\(size)" + if let uploadId = uploadId { + chunkHeaders["x-{{ spec.title | caseLower }}-id"] = uploadId + } - result = try await call( + let chunkResult = try await call( method: "POST", path: path, - headers: headers, - params: params, + headers: chunkHeaders, + params: chunkParams, converter: { return $0 as! [String: Any] } ) - offset += Client.chunkSize - headers["x-{{ spec.title | caseLower }}-id"] = result["$id"] as? String + return (index, chunkLength, chunkResult) + } + + if nextChunk == 0 { + let first = try await uploadChunk(index: 0, uploadId: uploadId) + result = first.2 + uploadId = result["$id"] as? String + nextChunk = 1 + completedChunks = 1 + uploadedBytes = first.1 onProgress?(UploadProgress( - id: result["$id"] as? String ?? "", - progress: Double(min(offset, size))/Double(size) * 100.0, - sizeUploaded: min(offset, size), - chunksTotal: result["chunksTotal"] as? Int ?? -1, - chunksUploaded: result["chunksUploaded"] as? Int ?? -1 + id: uploadId ?? "", + progress: Double(uploadedBytes)/Double(size) * 100.0, + sizeUploaded: uploadedBytes, + chunksTotal: result["chunksTotal"] as? Int ?? totalChunks, + chunksUploaded: result["chunksUploaded"] as? Int ?? completedChunks )) } + let maxConcurrency = 8 + + try await withThrowingTaskGroup(of: (Int, Int, [String: Any]).self) { group in + var inFlight = 0 + + while inFlight < maxConcurrency && nextChunk < totalChunks { + let index = nextChunk + let currentUploadId = uploadId + group.addTask { try await uploadChunk(index: index, uploadId: currentUploadId) } + nextChunk += 1 + inFlight += 1 + } + + while let chunk = try await group.next() { + inFlight -= 1 + completedChunks += 1 + uploadedBytes += chunk.1 + if chunk.0 == totalChunks - 1 { + result = chunk.2 + } + + onProgress?(UploadProgress( + id: uploadId ?? "", + progress: Double(min(uploadedBytes, size))/Double(size) * 100.0, + sizeUploaded: min(uploadedBytes, size), + chunksTotal: chunk.2["chunksTotal"] as? Int ?? totalChunks, + chunksUploaded: chunk.2["chunksUploaded"] as? Int ?? completedChunks + )) + + while inFlight < maxConcurrency && nextChunk < totalChunks { + let index = nextChunk + let currentUploadId = uploadId + group.addTask { try await uploadChunk(index: index, uploadId: currentUploadId) } + nextChunk += 1 + inFlight += 1 + } + } + } + return try converter!(result) } diff --git a/templates/dart/lib/src/client_browser.dart.twig b/templates/dart/lib/src/client_browser.dart.twig index e4f92a5067..5fe9ebb1d8 100644 --- a/templates/dart/lib/src/client_browser.dart.twig +++ b/templates/dart/lib/src/client_browser.dart.twig @@ -122,6 +122,7 @@ class ClientBrowser extends ClientBase with ClientMixin { } var offset = 0; + String? uploadId; if (idParamName.isNotEmpty) { //make a request to check if a file already exists try { @@ -132,33 +133,97 @@ class ClientBrowser extends ClientBase with ClientMixin { ); final int chunksUploaded = res.data['chunksUploaded'] as int; offset = chunksUploaded * chunkSize; + uploadId = res.data['\$id'] ?? params[idParamName]?.toString(); } on {{spec.title | caseUcfirst}}Exception catch (_) {} } - while (offset < size) { + if (offset >= size) { + return res; + } + + final totalChunks = (size / chunkSize).ceil(); + + Future uploadChunk(int index, int start, int end, String? id) async { List chunk = []; - final end = min(offset + chunkSize, size); - chunk = file.bytes!.getRange(offset, end).toList(); - params[paramName] = + chunk = file.bytes!.getRange(start, end).toList(); + + final chunkParams = Map.from(params); + chunkParams[paramName] = http.MultipartFile.fromBytes(paramName, chunk, filename: file.filename); - headers['content-range'] = - 'bytes $offset-${min((offset + chunkSize - 1), size - 1)}/$size'; - res = await call(HttpMethod.post, - path: path, headers: headers, params: params); - offset += chunkSize; - if (offset < size) { - headers['x-{{spec.title | caseLower }}-id'] = res.data['\$id']; + final chunkHeaders = Map.from(headers); + if (id != null && id.isNotEmpty) { + chunkHeaders['x-{{spec.title | caseLower }}-id'] = id; } - final progress = UploadProgress( - $id: res.data['\$id'] ?? '', - progress: min(offset, size) / size * 100, - sizeUploaded: min(offset, size), - chunksTotal: res.data['chunksTotal'] ?? 0, - chunksUploaded: res.data['chunksUploaded'] ?? 0, + chunkHeaders['content-range'] = 'bytes $start-${end - 1}/$size'; + + return call( + HttpMethod.post, + path: path, + headers: chunkHeaders, + params: chunkParams, ); - onProgress?.call(progress); } - return res; + + final firstStart = offset; + final firstEnd = min(firstStart + chunkSize, size); + final firstIndex = firstStart ~/ chunkSize; + res = await uploadChunk(firstIndex, firstStart, firstEnd, uploadId); + uploadId = res.data['\$id'] ?? uploadId; + + var completedChunks = firstIndex + 1; + var uploadedBytes = firstEnd; + var lastResponse = res; + + final progress = UploadProgress( + $id: uploadId ?? '', + progress: min(uploadedBytes, size) / size * 100, + sizeUploaded: min(uploadedBytes, size), + chunksTotal: totalChunks, + chunksUploaded: completedChunks, + ); + onProgress?.call(progress); + + final chunks = >[]; + for (var start = firstEnd; start < size; start += chunkSize) { + final end = min(start + chunkSize, size); + chunks.add({ + 'index': start ~/ chunkSize, + 'start': start, + 'end': end, + }); + } + + var nextChunk = 0; + Future uploadNext() async { + while (nextChunk < chunks.length) { + final chunk = chunks[nextChunk++]; + final chunkResponse = await uploadChunk( + chunk['index']!, + chunk['start']!, + chunk['end']!, + uploadId, + ); + completedChunks++; + uploadedBytes += chunk['end']! - chunk['start']!; + if (chunk['index'] == totalChunks - 1) { + lastResponse = chunkResponse; + } + + final progress = UploadProgress( + $id: uploadId ?? '', + progress: min(uploadedBytes, size) / size * 100, + sizeUploaded: min(uploadedBytes, size), + chunksTotal: totalChunks, + chunksUploaded: completedChunks, + ); + onProgress?.call(progress); + } + } + + final concurrency = min(8, chunks.length); + await Future.wait(List.generate(concurrency, (_) => uploadNext())); + + return lastResponse; } @override diff --git a/templates/dart/lib/src/client_io.dart.twig b/templates/dart/lib/src/client_io.dart.twig index c5edb88754..09ae09f55b 100644 --- a/templates/dart/lib/src/client_io.dart.twig +++ b/templates/dart/lib/src/client_io.dart.twig @@ -143,6 +143,7 @@ class ClientIO extends ClientBase with ClientMixin { } var offset = 0; + String? uploadId; if (idParamName.isNotEmpty) { //make a request to check if a file already exists try { @@ -153,45 +154,107 @@ class ClientIO extends ClientBase with ClientMixin { ); final int chunksUploaded = res.data['chunksUploaded'] as int; offset = chunksUploaded * chunkSize; + uploadId = res.data['\$id'] ?? params[idParamName]?.toString(); } on {{spec.title | caseUcfirst}}Exception catch (_) {} } - RandomAccessFile? raf; - // read chunk and upload each chunk - if (iofile != null) { - raf = await iofile.open(mode: FileMode.read); + if (offset >= size) { + return res; } - while (offset < size) { + final totalChunks = (size / chunkSize).ceil(); + + Future uploadChunk(int index, int start, int end, String? id) async { List chunk = []; if (file.bytes != null) { - final end = min(offset + chunkSize, size); - chunk = file.bytes!.getRange(offset, end).toList(); + chunk = file.bytes!.getRange(start, end).toList(); } else { - raf!.setPositionSync(offset); - chunk = raf.readSync(chunkSize); + final raf = await iofile!.open(mode: FileMode.read); + try { + await raf.setPosition(start); + chunk = await raf.read(end - start); + } finally { + await raf.close(); + } } - params[paramName] = + + final chunkParams = Map.from(params); + chunkParams[paramName] = http.MultipartFile.fromBytes(paramName, chunk, filename: file.filename); - headers['content-range'] = - 'bytes $offset-${min((offset + chunkSize - 1), size - 1)}/$size'; - res = await call(HttpMethod.post, - path: path, headers: headers, params: params); - offset += chunkSize; - if (offset < size) { - headers['x-{{spec.title | caseLower }}-id'] = res.data['\$id']; + final chunkHeaders = Map.from(headers); + if (id != null && id.isNotEmpty) { + chunkHeaders['x-{{spec.title | caseLower }}-id'] = id; } - final progress = UploadProgress( - $id: res.data['\$id'] ?? '', - progress: min(offset, size) / size * 100, - sizeUploaded: min(offset, size), - chunksTotal: res.data['chunksTotal'] ?? 0, - chunksUploaded: res.data['chunksUploaded'] ?? 0, + chunkHeaders['content-range'] = 'bytes $start-${end - 1}/$size'; + + return call( + HttpMethod.post, + path: path, + headers: chunkHeaders, + params: chunkParams, ); - onProgress?.call(progress); } - raf?.close(); - return res; + + final firstStart = offset; + final firstEnd = min(firstStart + chunkSize, size); + final firstIndex = firstStart ~/ chunkSize; + res = await uploadChunk(firstIndex, firstStart, firstEnd, uploadId); + uploadId = res.data['\$id'] ?? uploadId; + + var completedChunks = firstIndex + 1; + var uploadedBytes = firstEnd; + var lastResponse = res; + + final progress = UploadProgress( + $id: uploadId ?? '', + progress: min(uploadedBytes, size) / size * 100, + sizeUploaded: min(uploadedBytes, size), + chunksTotal: totalChunks, + chunksUploaded: completedChunks, + ); + onProgress?.call(progress); + + final chunks = >[]; + for (var start = firstEnd; start < size; start += chunkSize) { + final end = min(start + chunkSize, size); + chunks.add({ + 'index': start ~/ chunkSize, + 'start': start, + 'end': end, + }); + } + + var nextChunk = 0; + Future uploadNext() async { + while (nextChunk < chunks.length) { + final chunk = chunks[nextChunk++]; + final chunkResponse = await uploadChunk( + chunk['index']!, + chunk['start']!, + chunk['end']!, + uploadId, + ); + completedChunks++; + uploadedBytes += chunk['end']! - chunk['start']!; + if (chunk['index'] == totalChunks - 1) { + lastResponse = chunkResponse; + } + + final progress = UploadProgress( + $id: uploadId ?? '', + progress: min(uploadedBytes, size) / size * 100, + sizeUploaded: min(uploadedBytes, size), + chunksTotal: totalChunks, + chunksUploaded: completedChunks, + ); + onProgress?.call(progress); + } + } + + final concurrency = min(8, chunks.length); + await Future.wait(List.generate(concurrency, (_) => uploadNext())); + + return lastResponse; } @override diff --git a/templates/deno/src/services/service.ts.twig b/templates/deno/src/services/service.ts.twig index f5544affff..1e246aeb94 100644 --- a/templates/deno/src/services/service.ts.twig +++ b/templates/deno/src/services/service.ts.twig @@ -125,6 +125,7 @@ export class {{ service.name | caseUcfirst }} extends Service { {% for parameter in method.parameters.all %} {% if parameter.isUploadID %} + id = {{ parameter.name }}; try { response = await this.client.call( 'get', @@ -137,80 +138,133 @@ export class {{ service.name | caseUcfirst }} extends Service { {% endif %} {% endfor %} - let currentChunk = 1; + const totalChunks = Math.ceil(size / Client.CHUNK_SIZE); + const chunks: { index: number; start: number; end: number; data: Uint8Array }[] = []; + let currentChunk = 0; let currentPosition = 0; let uploadableChunk = new Uint8Array(Client.CHUNK_SIZE); - const uploadChunk = async (lastUpload = false) => { - if(currentChunk <= chunksUploaded) { + const queueChunk = () => { + if (currentChunk < chunksUploaded) { + currentChunk++; + currentPosition = 0; + uploadableChunk = new Uint8Array(Client.CHUNK_SIZE); return; } - const start = ((currentChunk - 1) * Client.CHUNK_SIZE); - let end = start + currentPosition; + const start = currentChunk * Client.CHUNK_SIZE; + const end = Math.min(start + currentPosition, size); + const data = currentPosition >= Client.CHUNK_SIZE + ? uploadableChunk + : uploadableChunk.slice(0, currentPosition); - if (end === size) { - end -= 1; - } - - if(!lastUpload || currentChunk !== 1) { - apiHeaders['content-range'] = 'bytes ' + start + '-' + end + '/' + size; - } + chunks.push({ index: currentChunk, start, end, data }); + currentChunk++; + currentPosition = 0; + uploadableChunk = new Uint8Array(Client.CHUNK_SIZE); + }; - let uploadableChunkTrimmed: Uint8Array; + for await (const chunk of {{ parameter.name | caseCamel | escapeKeyword }}.stream) { + let i = 0; + for(const b of chunk) { + uploadableChunk[currentPosition] = chunk[i]; + currentPosition++; - if(currentPosition + 1 >= Client.CHUNK_SIZE) { - uploadableChunkTrimmed = uploadableChunk; - } else { - uploadableChunkTrimmed = new Uint8Array(currentPosition); - for(let i = 0; i <= currentPosition; i++) { - uploadableChunkTrimmed[i] = uploadableChunk[i]; + if(currentPosition >= Client.CHUNK_SIZE) { + queueChunk(); } - } - if (id) { - apiHeaders['x-{{spec.title | caseLower }}-id'] = id; + i++; } + } - payload['{{ parameter.name }}'] = { type: 'file', file: new File([uploadableChunkTrimmed], {{ parameter.name | caseCamel | escapeKeyword }}.filename), filename: {{ parameter.name | caseCamel | escapeKeyword }}.filename }; + if (currentPosition > 0) { + queueChunk(); + } - response = await this.client.call('{{ method.method | caseLower }}', apiPath, apiHeaders, payload{% if method.type == 'location' %}, 'arraybuffer'{% elseif method.type == 'webAuth' %}, 'location'{% endif %}); + if (chunks.length === 0) { + return response; + } - if (!id) { - id = response['$id']; - } + if (chunks.length === 1 && chunks[0].index === 0) { + payload['{{ parameter.name }}'] = { type: 'file', file: new File([chunks[0].data], {{ parameter.name | caseCamel | escapeKeyword }}.filename), filename: {{ parameter.name | caseCamel | escapeKeyword }}.filename }; + response = await this.client.call('{{ method.method | caseLower }}', apiPath, apiHeaders, payload{% if method.type == 'location' %}, 'arraybuffer'{% elseif method.type == 'webAuth' %}, 'location'{% endif %}); if (onProgress !== null) { onProgress({ $id: response['$id'], - progress: Math.min((currentChunk) * Client.CHUNK_SIZE, size) / size * 100, - sizeUploaded: end+1, - chunksTotal: response['chunksTotal'], - chunksUploaded: response['chunksUploaded'] + progress: 100, + sizeUploaded: size, + chunksTotal: response['chunksTotal'] ?? totalChunks, + chunksUploaded: response['chunksUploaded'] ?? 1 }); } - uploadableChunk = new Uint8Array(Client.CHUNK_SIZE); - currentPosition = 0; - currentChunk++; + return response; } - for await (const chunk of {{ parameter.name | caseCamel | escapeKeyword }}.stream) { - let i = 0; - for(const b of chunk) { - uploadableChunk[currentPosition] = chunk[i]; + const uploadChunk = async (chunk: typeof chunks[0]) => { + const chunkHeaders = { ...apiHeaders }; + chunkHeaders['content-range'] = 'bytes ' + chunk.start + '-' + (chunk.end - 1) + '/' + size; + if (id) { + chunkHeaders['x-{{spec.title | caseLower }}-id'] = id; + } + + const chunkPayload = { ...payload }; + chunkPayload['{{ parameter.name }}'] = { type: 'file', file: new File([chunk.data], {{ parameter.name | caseCamel | escapeKeyword }}.filename), filename: {{ parameter.name | caseCamel | escapeKeyword }}.filename }; + + return await this.client.call('{{ method.method | caseLower }}', apiPath, chunkHeaders, chunkPayload{% if method.type == 'location' %}, 'arraybuffer'{% elseif method.type == 'webAuth' %}, 'location'{% endif %}); + }; + + response = await uploadChunk(chunks[0]); + + if (!id) { + id = response['$id']; + } - if(currentPosition + 1 >= Client.CHUNK_SIZE) { - await uploadChunk(); - currentPosition--; + let completedCount = chunks[0].index + 1; + let uploadedBytes = chunks[0].end; + + if (onProgress !== null) { + onProgress({ + $id: response['$id'], + progress: uploadedBytes / size * 100, + sizeUploaded: uploadedBytes, + chunksTotal: response['chunksTotal'] ?? totalChunks, + chunksUploaded: response['chunksUploaded'] ?? completedCount + }); + } + + const CONCURRENCY = 8; + let nextChunk = 1; + let lastResponse = response; + + const uploadNext = async () => { + while (nextChunk < chunks.length) { + const chunk = chunks[nextChunk++]; + const chunkResponse = await uploadChunk(chunk); + completedCount++; + uploadedBytes += chunk.end - chunk.start; + + if (chunk.index === totalChunks - 1) { + lastResponse = chunkResponse; } - i++; - currentPosition++; + if (onProgress !== null) { + onProgress({ + $id: id!, + progress: uploadedBytes / size * 100, + sizeUploaded: uploadedBytes, + chunksTotal: chunkResponse['chunksTotal'] ?? totalChunks, + chunksUploaded: chunkResponse['chunksUploaded'] ?? completedCount + }); + } } - } + }; + + await Promise.all(Array.from({ length: Math.min(CONCURRENCY, chunks.length - 1) }, uploadNext)); - await uploadChunk(true); + response = lastResponse; return response; {% endif %} diff --git a/templates/dotnet/Package/Client.cs.twig b/templates/dotnet/Package/Client.cs.twig index f3901fb060..6d72275dc5 100644 --- a/templates/dotnet/Package/Client.cs.twig +++ b/templates/dotnet/Package/Client.cs.twig @@ -7,6 +7,7 @@ using System.Net.Http.Headers; using System.Text; using System.Text.Json; using System.Text.Json.Serialization; +using System.Threading; using System.Threading.Tasks; using {{ spec.title | caseUcfirst }}.Converters; using {{ spec.title | caseUcfirst }}.Extensions; @@ -26,6 +27,7 @@ namespace {{ spec.title | caseUcfirst }} private string _endpoint; private static readonly int ChunkSize = 5 * 1024 * 1024; + private static readonly int MaxConcurrentUploads = 8; public static JsonSerializerOptions DeserializerOptions { get; set; } = new JsonSerializerOptions { @@ -194,14 +196,14 @@ namespace {{ spec.title | caseUcfirst }} { if (header.Key.Equals("content-type", StringComparison.OrdinalIgnoreCase)) { - _http.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue(header.Value)); + request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue(header.Value)); } else { - if (_http.DefaultRequestHeaders.Contains(header.Key)) { - _http.DefaultRequestHeaders.Remove(header.Key); + if (request.Headers.Contains(header.Key)) { + request.Headers.Remove(header.Key); } - _http.DefaultRequestHeaders.Add(header.Key, header.Value); + request.Headers.Add(header.Key, header.Value); } } @@ -436,69 +438,123 @@ namespace {{ spec.title | caseUcfirst }} ); } + var uploadId = string.Empty; + if (!string.IsNullOrEmpty(idParamName)) { try { - // Make a request to check if a file already exists - var current = await Call>( - method: "GET", - path: $"{path}/{parameters[idParamName!]}", - new Dictionary { { "content-type", "application/json" } }, - parameters: new Dictionary() - ); - if (current.TryGetValue("chunksUploaded", out var chunksUploadedValue) && chunksUploadedValue != null) - { - offset = Convert.ToInt64(chunksUploadedValue) * ChunkSize; + // Make a request to check if a file already exists + var current = await Call>( + method: "GET", + path: $"{path}/{parameters[idParamName!]}", + new Dictionary { { "content-type", "application/json" } }, + parameters: new Dictionary() + ); + if (current.TryGetValue("chunksUploaded", out var chunksUploadedValue) && chunksUploadedValue != null) + { + offset = Convert.ToInt64(chunksUploadedValue) * ChunkSize; + } + uploadId = parameters[idParamName!]?.ToString() ?? string.Empty; } - } catch { // ignored as it mostly means file not found } } - while (offset < size) + var readLock = new object(); + + async Task ReadChunkAsync(long start, long end) { + var length = (int)(end - start); + var chunk = new byte[length]; + switch(input.SourceType) { case "path": + using (var chunkStream = File.OpenRead(input.Path)) + { + chunkStream.Seek(start, SeekOrigin.Begin); + var read = 0; + while (read < length) + { + var count = await chunkStream.ReadAsync(chunk, read, length - read); + if (count == 0) + break; + read += count; + } + } + break; case "stream": var stream = input.Data as Stream; if (stream == null) throw new InvalidOperationException("Stream data is null"); - stream.Seek(offset, SeekOrigin.Begin); - await stream.ReadAsync(buffer, 0, ChunkSize); + lock (readLock) + { + stream.Seek(start, SeekOrigin.Begin); + var read = 0; + while (read < length) + { + var count = stream.Read(chunk, read, length - read); + if (count == 0) + break; + read += count; + } + } break; case "bytes": - buffer = ((byte[])input.Data) - .Skip((int)offset) - .Take((int)Math.Min(size - offset, ChunkSize - 1)) - .ToArray(); + Buffer.BlockCopy((byte[])input.Data, (int)start, chunk, 0, length); break; } - var content = new MultipartFormDataContent { - { new ByteArrayContent(buffer), paramName, input.Filename } + return chunk; + } + + async Task> UploadChunkAsync(int index, long start, long end, bool includeUploadId) + { + var chunkHeaders = new Dictionary(headers) + { + ["Content-Range"] = $"bytes {start}-{end - 1}/{size}" }; - parameters[paramName] = content; + if (includeUploadId && !string.IsNullOrEmpty(uploadId)) + { + chunkHeaders["x-appwrite-id"] = uploadId; + } - headers["Content-Range"] = - $"bytes {offset}-{Math.Min(offset + ChunkSize - 1, size - 1)}/{size}"; + var content = new MultipartFormDataContent { + { new ByteArrayContent(await ReadChunkAsync(start, end)), paramName, input.Filename } + }; - result = await Call>( + var chunkParameters = new Dictionary(parameters) + { + [paramName] = content + }; + + var chunkResult = await Call>( method: "POST", path, - headers, - parameters + chunkHeaders, + chunkParameters ); - offset += ChunkSize; + if (index == 0 || string.IsNullOrEmpty(uploadId)) + { + uploadId = chunkResult.ContainsKey("$id") + ? chunkResult["$id"]?.ToString() ?? string.Empty + : string.Empty; + } + + return chunkResult; + } + + if (offset == 0) + { + var firstChunkEnd = Math.Min(ChunkSize, size); + result = await UploadChunkAsync(0, 0, firstChunkEnd, false); + offset = firstChunkEnd; - var id = result.ContainsKey("$id") - ? result["$id"]?.ToString() ?? string.Empty - : string.Empty; var chunksTotal = result.TryGetValue("chunksTotal", out var chunksTotalValue) && chunksTotalValue != null ? Convert.ToInt64(chunksTotalValue) : 0L; @@ -506,17 +562,67 @@ namespace {{ spec.title | caseUcfirst }} ? Convert.ToInt64(chunksUploadedValue) : 0L; - headers["x-appwrite-id"] = id; - onProgress?.Invoke( new UploadProgress( - id: id, - progress: Math.Min(offset, size) / size * 100, + id: uploadId, + progress: Math.Min(offset, size) / (double)size * 100, sizeUploaded: Math.Min(offset, size), chunksTotal: chunksTotal, chunksUploaded: chunksUploaded)); } + var chunks = new List<(int Index, long Start, long End)>(); + var chunkOffset = offset; + while (chunkOffset < size) + { + var end = Math.Min(chunkOffset + ChunkSize, size); + chunks.Add(((int)(chunkOffset / ChunkSize), chunkOffset, end)); + chunkOffset = end; + } + + if (chunks.Count > 0) + { + var nextChunk = -1; + var completedChunks = (long)(offset / ChunkSize); + var uploadedBytes = offset; + var lastChunkIndex = (int)((size - 1) / ChunkSize); + + var workers = Enumerable.Range(0, Math.Min(MaxConcurrentUploads, chunks.Count)) + .Select(async _ => + { + while (true) + { + var chunkIndex = Interlocked.Increment(ref nextChunk); + if (chunkIndex >= chunks.Count) + break; + + var chunk = chunks[chunkIndex]; + var chunkResult = await UploadChunkAsync(chunk.Index, chunk.Start, chunk.End, true); + + if (chunk.Index == lastChunkIndex) + { + result = chunkResult; + } + + var chunksUploaded = Interlocked.Increment(ref completedChunks); + var sizeUploaded = Interlocked.Add(ref uploadedBytes, chunk.End - chunk.Start); + var chunksTotal = chunkResult.TryGetValue("chunksTotal", out var chunksTotalValue) && chunksTotalValue != null + ? Convert.ToInt64(chunksTotalValue) + : 0L; + + onProgress?.Invoke( + new UploadProgress( + id: uploadId, + progress: Math.Min(sizeUploaded, size) / (double)size * 100, + sizeUploaded: Math.Min(sizeUploaded, size), + chunksTotal: chunksTotal, + chunksUploaded: chunksUploaded)); + } + }); + + await Task.WhenAll(workers); + } + // Convert to non-nullable dictionary for converter var nonNullableResult = result.Where(kvp => kvp.Value != null) .ToDictionary(kvp => kvp.Key, kvp => kvp.Value!); diff --git a/templates/flutter/lib/src/client_browser.dart.twig b/templates/flutter/lib/src/client_browser.dart.twig index 7ad50dea9b..467f5963e6 100644 --- a/templates/flutter/lib/src/client_browser.dart.twig +++ b/templates/flutter/lib/src/client_browser.dart.twig @@ -150,6 +150,7 @@ class ClientBrowser extends ClientBase with ClientMixin { } var offset = 0; + String? uploadId; if (idParamName.isNotEmpty) { //make a request to check if a file already exists try { @@ -160,40 +161,100 @@ class ClientBrowser extends ClientBase with ClientMixin { ); final int chunksUploaded = res.data['chunksUploaded'] as int; offset = chunksUploaded * chunkSize; + uploadId = res.data['\$id'] ?? params[idParamName]?.toString(); } on {{spec.title | caseUcfirst}}Exception catch (_) {} } - while (offset < size) { + if (offset >= size) { + return res; + } + + final totalChunks = (size / chunkSize).ceil(); + + Future uploadChunk(int index, int start, int end, String? id) async { List chunk = []; - final end = min(offset + chunkSize, size); - chunk = file.bytes!.getRange(offset, end).toList(); - params[paramName] = http.MultipartFile.fromBytes( + chunk = file.bytes!.getRange(start, end).toList(); + + final chunkParams = Map.from(params); + chunkParams[paramName] = http.MultipartFile.fromBytes( paramName, chunk, filename: file.filename, ); - headers['content-range'] = - 'bytes $offset-${min((offset + chunkSize - 1), size - 1)}/$size'; - res = await call( + final chunkHeaders = Map.from(headers); + if (id != null && id.isNotEmpty) { + chunkHeaders['x-{{spec.title | caseLower }}-id'] = id; + } + chunkHeaders['content-range'] = 'bytes $start-${end - 1}/$size'; + + return call( HttpMethod.post, path: path, - headers: headers, - params: params, + headers: chunkHeaders, + params: chunkParams, ); - offset += chunkSize; - if (offset < size) { - headers['x-{{spec.title | caseLower }}-id'] = res.data['\$id']; + } + + final firstStart = offset; + final firstEnd = min(firstStart + chunkSize, size); + final firstIndex = firstStart ~/ chunkSize; + res = await uploadChunk(firstIndex, firstStart, firstEnd, uploadId); + uploadId = res.data['\$id'] ?? uploadId; + + var completedChunks = firstIndex + 1; + var uploadedBytes = firstEnd; + var lastResponse = res; + + final progress = UploadProgress( + $id: uploadId ?? '', + progress: min(uploadedBytes, size) / size * 100, + sizeUploaded: min(uploadedBytes, size), + chunksTotal: totalChunks, + chunksUploaded: completedChunks, + ); + onProgress?.call(progress); + + final chunks = >[]; + for (var start = firstEnd; start < size; start += chunkSize) { + final end = min(start + chunkSize, size); + chunks.add({ + 'index': start ~/ chunkSize, + 'start': start, + 'end': end, + }); + } + + var nextChunk = 0; + Future uploadNext() async { + while (nextChunk < chunks.length) { + final chunk = chunks[nextChunk++]; + final chunkResponse = await uploadChunk( + chunk['index']!, + chunk['start']!, + chunk['end']!, + uploadId, + ); + completedChunks++; + uploadedBytes += chunk['end']! - chunk['start']!; + if (chunk['index'] == totalChunks - 1) { + lastResponse = chunkResponse; + } + + final progress = UploadProgress( + $id: uploadId ?? '', + progress: min(uploadedBytes, size) / size * 100, + sizeUploaded: min(uploadedBytes, size), + chunksTotal: totalChunks, + chunksUploaded: completedChunks, + ); + onProgress?.call(progress); } - final progress = UploadProgress( - $id: res.data['\$id'] ?? '', - progress: min(offset, size) / size * 100, - sizeUploaded: min(offset, size), - chunksTotal: res.data['chunksTotal'] ?? 0, - chunksUploaded: res.data['chunksUploaded'] ?? 0, - ); - onProgress?.call(progress); } - return res; + + final concurrency = min(8, chunks.length); + await Future.wait(List.generate(concurrency, (_) => uploadNext())); + + return lastResponse; } @override diff --git a/templates/flutter/lib/src/client_io.dart.twig b/templates/flutter/lib/src/client_io.dart.twig index 14ea8bfd62..da5fc287f8 100644 --- a/templates/flutter/lib/src/client_io.dart.twig +++ b/templates/flutter/lib/src/client_io.dart.twig @@ -272,6 +272,7 @@ class ClientIO extends ClientBase with ClientMixin { } var offset = 0; + String? uploadId; if (idParamName.isNotEmpty) { //make a request to check if a file already exists try { @@ -282,52 +283,110 @@ class ClientIO extends ClientBase with ClientMixin { ); final int chunksUploaded = res.data['chunksUploaded'] as int; offset = chunksUploaded * chunkSize; + uploadId = res.data['\$id'] ?? params[idParamName]?.toString(); } on {{spec.title | caseUcfirst}}Exception catch (_) {} } - RandomAccessFile? raf; - // read chunk and upload each chunk - if (iofile != null) { - raf = await iofile.open(mode: FileMode.read); + if (offset >= size) { + return res; } - while (offset < size) { + final totalChunks = (size / chunkSize).ceil(); + + Future uploadChunk(int index, int start, int end, String? id) async { List chunk = []; if (file.bytes != null) { - final end = min(offset + chunkSize, size); - chunk = file.bytes!.getRange(offset, end).toList(); + chunk = file.bytes!.getRange(start, end).toList(); } else { - raf!.setPositionSync(offset); - chunk = raf.readSync(chunkSize); + final raf = await iofile!.open(mode: FileMode.read); + try { + await raf.setPosition(start); + chunk = await raf.read(end - start); + } finally { + await raf.close(); + } } - params[paramName] = http.MultipartFile.fromBytes( + + final chunkParams = Map.from(params); + chunkParams[paramName] = http.MultipartFile.fromBytes( paramName, chunk, filename: file.filename, ); - headers['content-range'] = - 'bytes $offset-${min((offset + chunkSize - 1), size - 1)}/$size'; - res = await call( + final chunkHeaders = Map.from(headers); + if (id != null && id.isNotEmpty) { + chunkHeaders['x-{{spec.title | caseLower }}-id'] = id; + } + chunkHeaders['content-range'] = 'bytes $start-${end - 1}/$size'; + + return call( HttpMethod.post, path: path, - headers: headers, - params: params, + headers: chunkHeaders, + params: chunkParams, ); - offset += chunkSize; - if (offset < size) { - headers['x-{{spec.title | caseLower }}-id'] = res.data['\$id']; + } + + final firstStart = offset; + final firstEnd = min(firstStart + chunkSize, size); + final firstIndex = firstStart ~/ chunkSize; + res = await uploadChunk(firstIndex, firstStart, firstEnd, uploadId); + uploadId = res.data['\$id'] ?? uploadId; + + var completedChunks = firstIndex + 1; + var uploadedBytes = firstEnd; + var lastResponse = res; + + final progress = UploadProgress( + $id: uploadId ?? '', + progress: min(uploadedBytes, size) / size * 100, + sizeUploaded: min(uploadedBytes, size), + chunksTotal: totalChunks, + chunksUploaded: completedChunks, + ); + onProgress?.call(progress); + + final chunks = >[]; + for (var start = firstEnd; start < size; start += chunkSize) { + final end = min(start + chunkSize, size); + chunks.add({ + 'index': start ~/ chunkSize, + 'start': start, + 'end': end, + }); + } + + var nextChunk = 0; + Future uploadNext() async { + while (nextChunk < chunks.length) { + final chunk = chunks[nextChunk++]; + final chunkResponse = await uploadChunk( + chunk['index']!, + chunk['start']!, + chunk['end']!, + uploadId, + ); + completedChunks++; + uploadedBytes += chunk['end']! - chunk['start']!; + if (chunk['index'] == totalChunks - 1) { + lastResponse = chunkResponse; + } + + final progress = UploadProgress( + $id: uploadId ?? '', + progress: min(uploadedBytes, size) / size * 100, + sizeUploaded: min(uploadedBytes, size), + chunksTotal: totalChunks, + chunksUploaded: completedChunks, + ); + onProgress?.call(progress); } - final progress = UploadProgress( - $id: res.data['\$id'] ?? '', - progress: min(offset, size) / size * 100, - sizeUploaded: min(offset, size), - chunksTotal: res.data['chunksTotal'] ?? 0, - chunksUploaded: res.data['chunksUploaded'] ?? 0, - ); - onProgress?.call(progress); } - raf?.close(); - return res; + + final concurrency = min(8, chunks.length); + await Future.wait(List.generate(concurrency, (_) => uploadNext())); + + return lastResponse; } bool get _customSchemeAllowed => Platform.isWindows || Platform.isLinux; diff --git a/templates/go/client.go.twig b/templates/go/client.go.twig index 06ff9af9a2..545c961f5b 100644 --- a/templates/go/client.go.twig +++ b/templates/go/client.go.twig @@ -150,6 +150,22 @@ func isFileUpload(headers map[string]interface{}) bool { return false } +func copyHeaders(headers map[string]interface{}) map[string]interface{} { + clone := make(map[string]interface{}, len(headers)) + for key, value := range headers { + clone[key] = value + } + return clone +} + +func copyParams(params map[string]interface{}) map[string]interface{} { + clone := make(map[string]interface{}, len(params)) + for key, value := range params { + clone[key] = value + } + return clone +} + func (client *Client) FileUpload(url string, headers map[string]interface{}, params map[string]interface{}, paramName string, uploadId string) (*ClientResponse, error) { inputFile, ok := params[paramName].(file.InputFile) if !ok { @@ -219,40 +235,108 @@ func (client *Client) FileUpload(url string, headers map[string]interface{}, par return result, nil } - for i := currentChunk; i < numChunks; i++ { + parseUploadId := func(resp *ClientResponse) string { + var parsed map[string]interface{} + if resp != nil && strings.HasPrefix(resp.Type, "application/json") { + err = json.Unmarshal([]byte(resp.Result.(string)), &parsed) + if err == nil { + id, _ := parsed["$id"].(string) + return id + } + } + return "" + } + + uploadChunk := func(i int64, id string) (*ClientResponse, error) { chunkSize := client.ChunkSize - offset := int64(i) * chunkSize + offset := i * client.ChunkSize if i == numChunks-1 { chunkSize = fileInfo.Size() - offset - inputFile.Data = make([]byte, chunkSize) } - _, err := file.ReadAt(inputFile.Data, offset) + + chunkFile := inputFile + chunkFile.Data = make([]byte, chunkSize) + _, err := file.ReadAt(chunkFile.Data, offset) if err != nil && err != io.EOF { return nil, err } - params[paramName] = inputFile - if uploadId != "" { - headers["x-appwrite-id"] = uploadId + + chunkParams := copyParams(params) + chunkParams[paramName] = chunkFile + chunkHeaders := copyHeaders(headers) + if id != "" { + chunkHeaders["x-appwrite-id"] = id } + totalSize := fileInfo.Size() start := offset - end := offset + client.ChunkSize - 1 - if end >= totalSize { - end = totalSize - 1 - } - headers["content-range"] = fmt.Sprintf("bytes %d-%d/%d", start, end, totalSize) - result, err = client.Call("POST", url, headers, params) + end := offset + chunkSize - 1 + chunkHeaders["content-range"] = fmt.Sprintf("bytes %d-%d/%d", start, end, totalSize) + + return client.Call("POST", url, chunkHeaders, chunkParams) + } + + if currentChunk == 0 { + result, err = uploadChunk(0, uploadId) if err != nil { return nil, err } + if id := parseUploadId(result); id != "" { + uploadId = id + } + currentChunk = 1 + } - var parsed map[string]interface{} - if strings.HasPrefix(result.Type, "application/json") { - err = json.Unmarshal([]byte(result.Result.(string)), &parsed) - if err == nil { - uploadId, _ = parsed["$id"].(string) + type chunkResult struct { + index int64 + resp *ClientResponse + err error + } + + remainingChunks := numChunks - currentChunk + if remainingChunks <= 0 { + return result, nil + } + + concurrency := int64(8) + if remainingChunks < concurrency { + concurrency = remainingChunks + } + + jobs := make(chan int64) + results := make(chan chunkResult, int(remainingChunks)) + + for worker := int64(0); worker < concurrency; worker++ { + go func() { + for i := range jobs { + resp, err := uploadChunk(i, uploadId) + results <- chunkResult{index: i, resp: resp, err: err} } + }() + } + + go func() { + for i := currentChunk; i < numChunks; i++ { + jobs <- i + } + close(jobs) + }() + + var firstErr error + for i := int64(0); i < remainingChunks; i++ { + chunk := <-results + if chunk.err != nil { + if firstErr == nil { + firstErr = chunk.err + } + continue } + if chunk.index == numChunks-1 { + result = chunk.resp + } + } + if firstErr != nil { + return nil, firstErr } return result, nil } diff --git a/templates/kotlin/src/main/kotlin/io/appwrite/Client.kt.twig b/templates/kotlin/src/main/kotlin/io/appwrite/Client.kt.twig index 17992731c0..45d159d810 100644 --- a/templates/kotlin/src/main/kotlin/io/appwrite/Client.kt.twig +++ b/templates/kotlin/src/main/kotlin/io/appwrite/Client.kt.twig @@ -8,6 +8,9 @@ import {{ sdk.namespace | caseDot }}.models.UploadProgress import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.suspendCancellableCoroutine import okhttp3.* import okhttp3.Headers.Companion.toHeaders @@ -24,6 +27,8 @@ import java.io.IOException import java.lang.IllegalArgumentException import java.security.SecureRandom import java.security.cert.X509Certificate +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicLong import javax.net.ssl.HostnameVerifier import javax.net.ssl.SSLContext import javax.net.ssl.SSLSocketFactory @@ -39,6 +44,7 @@ class Client @JvmOverloads constructor( companion object { const val CHUNK_SIZE = 5*1024*1024; // 5MB + const val MAX_CONCURRENT_UPLOADS = 8 } override val coroutineContext: CoroutineContext @@ -355,12 +361,10 @@ class Client @JvmOverloads constructor( idParamName: String? = null, onProgress: ((UploadProgress) -> Unit)? = null, ): T { - var file: RandomAccessFile? = null val input = params[paramName] as InputFile val size: Long = when(input.sourceType) { "path", "file" -> { - file = RandomAccessFile(input.path, "r") - file.length() + File(input.path).length() } "bytes" -> { (input.data as ByteArray).size.toLong() @@ -389,9 +393,9 @@ class Client @JvmOverloads constructor( ) } - val buffer = ByteArray(CHUNK_SIZE) var offset = 0L var result: Map<*, *>? = null + var uploadId: String? = null if (idParamName?.isNotEmpty() == true) { // Make a request to check if a file already exists @@ -404,51 +408,64 @@ class Client @JvmOverloads constructor( ) val chunksUploaded = current["chunksUploaded"] as Long offset = chunksUploaded * CHUNK_SIZE + uploadId = params[idParamName]?.toString() } - while (offset < size) { - when(input.sourceType) { + fun readChunk(start: Long, end: Long): ByteArray { + val length = (end - start).toInt() + return when(input.sourceType) { "file", "path" -> { - file!!.seek(offset) - file!!.read(buffer) + RandomAccessFile(input.path, "r").use { chunkFile -> + val chunk = ByteArray(length) + chunkFile.seek(start) + chunkFile.readFully(chunk) + chunk + } } "bytes" -> { - val end = if (offset + CHUNK_SIZE < size) { - offset + CHUNK_SIZE - 1 - } else { - size - 1 - } - (input.data as ByteArray).copyInto( - buffer, - startIndex = offset.toInt(), - endIndex = end.toInt() - ) + (input.data as ByteArray).copyOfRange(start.toInt(), end.toInt()) } else -> throw UnsupportedOperationException() } + } - params[paramName] = MultipartBody.Part.createFormData( + suspend fun uploadChunk(index: Int, start: Long, end: Long, includeUploadId: Boolean): Map<*, *> { + val chunkParams = params.toMutableMap() + val chunkHeaders = headers.toMutableMap() + + if (includeUploadId && uploadId != null) { + chunkHeaders["x-{{ spec.title | caseLower }}-id"] = uploadId!! + } + + chunkHeaders["Content-Range"] = "bytes $start-${end - 1}/$size" + chunkParams[paramName] = MultipartBody.Part.createFormData( paramName, input.filename, - buffer.toRequestBody() + readChunk(start, end).toRequestBody() ) - headers["Content-Range"] = - "bytes $offset-${((offset + CHUNK_SIZE) - 1).coerceAtMost(size - 1)}/$size" - - result = call( + val chunkResult = call( method = "POST", path, - headers, - params, + chunkHeaders, + chunkParams, responseType = Map::class.java ) - offset += CHUNK_SIZE - headers["x-{{ spec.title | caseLower }}-id"] = result!!["\$id"].toString() + if (index == 0 || uploadId == null) { + uploadId = chunkResult["\$id"].toString() + } + + return chunkResult + } + + if (offset == 0L) { + val firstChunkEnd = CHUNK_SIZE.toLong().coerceAtMost(size) + result = uploadChunk(0, 0, firstChunkEnd, false) + offset = firstChunkEnd onProgress?.invoke( UploadProgress( - id = result!!["\$id"].toString(), + id = uploadId ?: result!!["\$id"].toString(), progress = offset.coerceAtMost(size).toDouble() / size * 100, sizeUploaded = offset.coerceAtMost(size), chunksTotal = result!!["chunksTotal"].toString().toInt(), @@ -457,6 +474,53 @@ class Client @JvmOverloads constructor( ) } + val chunks = mutableListOf>() + var chunkOffset = offset + while (chunkOffset < size) { + val end = (chunkOffset + CHUNK_SIZE).coerceAtMost(size) + chunks.add(Triple((chunkOffset / CHUNK_SIZE).toInt(), chunkOffset, end)) + chunkOffset = end + } + + if (chunks.isNotEmpty()) { + val nextChunk = AtomicInteger(0) + val completedChunks = AtomicInteger((offset / CHUNK_SIZE).toInt()) + val uploadedBytes = AtomicLong(offset.coerceAtMost(size)) + + coroutineScope { + List(MAX_CONCURRENT_UPLOADS.coerceAtMost(chunks.size)) { + async { + while (true) { + val chunkIndex = nextChunk.getAndIncrement() + if (chunkIndex >= chunks.size) { + break + } + + val (index, start, end) = chunks[chunkIndex] + val chunkResult = uploadChunk(index, start, end, true) + + val chunksUploaded = completedChunks.incrementAndGet() + val sizeUploaded = uploadedBytes.addAndGet(end - start) + + if (index == ((size - 1) / CHUNK_SIZE).toInt()) { + result = chunkResult + } + + onProgress?.invoke( + UploadProgress( + id = uploadId ?: chunkResult["\$id"].toString(), + progress = sizeUploaded.coerceAtMost(size).toDouble() / size * 100, + sizeUploaded = sizeUploaded.coerceAtMost(size), + chunksTotal = chunkResult["chunksTotal"].toString().toInt(), + chunksUploaded = chunksUploaded, + ) + ) + } + } + }.awaitAll() + } + } + return converter(result as Map) } diff --git a/templates/php/base/requests/file.twig b/templates/php/base/requests/file.twig index 60bdc8ab07..ee0bb41d12 100644 --- a/templates/php/base/requests/file.twig +++ b/templates/php/base/requests/file.twig @@ -101,35 +101,180 @@ $handle = @fopen(${{parameter.name}}->getPath(), "rb"); } + $uploadId = ''; + {% for parameter in method.parameters.all %}{% if parameter.isUploadID %} + $uploadId = ${{ parameter.name }} ?? ''; + {% endif %}{% endfor %} + $totalChunks = (int) ceil($size / Client::CHUNK_SIZE); + $chunks = []; $start = $counter * Client::CHUNK_SIZE; while ($start < $size) { - $chunk = ''; + $chunks[] = [ + 'index' => $counter, + 'start' => $start, + 'end' => min($start + Client::CHUNK_SIZE, $size), + ]; + $counter++; + $start += Client::CHUNK_SIZE; + } + + $readChunk = function(int $start, int $end) use ($handle, ${{ parameter.name | caseCamel }}) { if(!empty($handle)) { fseek($handle, $start); - $chunk = @fread($handle, Client::CHUNK_SIZE); - } else { - $chunk = substr(${{ parameter.name | caseCamel }}->getData(), $start, Client::CHUNK_SIZE); + return @fread($handle, $end - $start); } - $apiParams['{{ parameter.name }}'] = new \CURLFile('data://' . $mimeType . ';base64,' . base64_encode($chunk), $mimeType, $postedName); - $apiHeaders['content-range'] = 'bytes ' . ($counter * Client::CHUNK_SIZE) . '-' . min(((($counter * Client::CHUNK_SIZE) + Client::CHUNK_SIZE) - 1), $size - 1) . '/' . $size; - if(!empty($id)) { - $apiHeaders['x-{{spec.title | caseLower }}-id'] = $id; + + return substr(${{ parameter.name | caseCamel }}->getData(), $start, $end - $start); + }; + + $uploadChunk = function(array $chunk, string $currentUploadId = '') use ($readChunk, $apiPath, $apiHeaders, $apiParams, $mimeType, $postedName, $size) { + $chunkParams = $apiParams; + $chunkHeaders = $apiHeaders; + $data = $readChunk($chunk['start'], $chunk['end']); + $chunkParams['{{ parameter.name }}'] = new \CURLFile('data://' . $mimeType . ';base64,' . base64_encode($data), $mimeType, $postedName); + $chunkHeaders['content-range'] = 'bytes ' . $chunk['start'] . '-' . ($chunk['end'] - 1) . '/' . $size; + if(!empty($currentUploadId)) { + $chunkHeaders['x-{{spec.title | caseLower }}-id'] = $currentUploadId; } - $response = $this->client->call(Client::METHOD_POST, $apiPath, $apiHeaders, $apiParams); - $counter++; - $start += Client::CHUNK_SIZE; - if(empty($id)) { - $id = $response['$id']; + + return $this->client->call(Client::METHOD_POST, $apiPath, $chunkHeaders, $chunkParams); + }; + + if (!empty($chunks)) { + $response = $uploadChunk($chunks[0], $uploadId); + if(empty($uploadId)) { + $uploadId = $response['$id']; } + $completedCount = $chunks[0]['index'] + 1; + $uploadedSize = $chunks[0]['end']; if($onProgress !== null) { $onProgress([ '$id' => $response['$id'], - 'progress' => min(((($counter * Client::CHUNK_SIZE) + Client::CHUNK_SIZE)), $size) / $size * 100, - 'sizeUploaded' => min($counter * Client::CHUNK_SIZE), - 'chunksTotal' => $response['chunksTotal'], - 'chunksUploaded' => $response['chunksUploaded'], + 'progress' => $uploadedSize / $size * 100, + 'sizeUploaded' => $uploadedSize, + 'chunksTotal' => $response['chunksTotal'] ?? $totalChunks, + 'chunksUploaded' => $response['chunksUploaded'] ?? $completedCount, ]); } + + $remainingChunks = array_slice($chunks, 1); + $clientConfig = \Closure::bind(function() { + if (property_exists($this, 'key') && $this->key !== null) { + $this->headers['authorization'] = $this->getAuthorization(); + } + + return [$this->endpoint, $this->headers, $this->selfSigned, $this->timeout, $this->connectTimeout]; + }, $this->client, Client::class); + $flattenParams = \Closure::bind(function(array $params): array { + return $this->flatten($params); + }, $this->client, Client::class); + [$endpoint, $globalHeaders, $selfSigned, $timeout, $connectTimeout] = $clientConfig(); + $responseHeaders = []; + + $makeHandle = function(array $chunk) use ($readChunk, $apiPath, $apiHeaders, $apiParams, $mimeType, $postedName, $size, $uploadId, $endpoint, $globalHeaders, $selfSigned, $timeout, $connectTimeout, $flattenParams, &$responseHeaders) { + $chunkParams = $apiParams; + $chunkHeaders = array_merge($globalHeaders, $apiHeaders); + $data = $readChunk($chunk['start'], $chunk['end']); + $chunkParams['{{ parameter.name }}'] = new \CURLFile('data://' . $mimeType . ';base64,' . base64_encode($data), $mimeType, $postedName); + $chunkHeaders['content-range'] = 'bytes ' . $chunk['start'] . '-' . ($chunk['end'] - 1) . '/' . $size; + if(!empty($uploadId)) { + $chunkHeaders['x-{{spec.title | caseLower }}-id'] = $uploadId; + } + + $headers = []; + foreach ($chunkHeaders as $key => $value) { + $headers[] = $key . ':' . $value; + } + + $ch = curl_init($endpoint . $apiPath); + $responseHeaders[(int) $ch] = []; + curl_setopt($ch, CURLOPT_CUSTOMREQUEST, Client::METHOD_POST); + curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1); + curl_setopt($ch, CURLOPT_USERAGENT, php_uname('s') . '-' . php_uname('r') . ':{{ language.name | caseLower }}-' . phpversion()); + curl_setopt($ch, CURLOPT_HTTPHEADER, $headers); + curl_setopt($ch, CURLOPT_FOLLOWLOCATION, true); + curl_setopt($ch, CURLOPT_POSTFIELDS, $flattenParams($chunkParams)); + curl_setopt($ch, CURLOPT_HEADERFUNCTION, function($curl, $header) use (&$responseHeaders) { + $length = strlen($header); + $header = explode(':', strtolower($header), 2); + if (count($header) >= 2) { + $responseHeaders[(int) $curl][strtolower(trim($header[0]))] = trim($header[1]); + } + + return $length; + }); + if($selfSigned) { + curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false); + curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false); + } + if($timeout !== null) { + curl_setopt($ch, CURLOPT_TIMEOUT, $timeout); + } + if($connectTimeout !== null) { + curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, $connectTimeout); + } + + return $ch; + }; + + $nextChunk = 0; + while ($nextChunk < count($remainingChunks)) { + $multiHandle = curl_multi_init(); + $handles = []; + for ($i = 0; $i < 8 && $nextChunk < count($remainingChunks); $i++, $nextChunk++) { + $chunk = $remainingChunks[$nextChunk]; + $ch = $makeHandle($chunk); + $handles[(int) $ch] = ['handle' => $ch, 'chunk' => $chunk]; + curl_multi_add_handle($multiHandle, $ch); + } + + do { + $status = curl_multi_exec($multiHandle, $active); + if ($active) { + curl_multi_select($multiHandle); + } + } while ($active && $status == CURLM_OK); + + foreach ($handles as $handleInfo) { + $ch = $handleInfo['handle']; + $body = curl_multi_getcontent($ch); + $statusCode = curl_getinfo($ch, CURLINFO_HTTP_CODE); + $contentType = $responseHeaders[(int) $ch]['content-type'] ?? ''; + + if (curl_errno($ch)) { + throw new {{ spec.namespace | split('\\') | last | caseUcfirst}}Exception(curl_error($ch), $statusCode, '', $body); + } + + $chunkResponse = str_starts_with($contentType, 'application/json') ? json_decode($body, true) : $body; + + if($statusCode >= 400) { + if(is_array($chunkResponse)) { + throw new {{ spec.namespace | split('\\') | last | caseUcfirst}}Exception($chunkResponse['message'], $statusCode, $chunkResponse['type'] ?? '', json_encode($chunkResponse)); + } + + throw new {{ spec.namespace | split('\\') | last | caseUcfirst}}Exception($chunkResponse, $statusCode, '', $chunkResponse); + } + + $completedCount++; + $uploadedSize += $handleInfo['chunk']['end'] - $handleInfo['chunk']['start']; + if($handleInfo['chunk']['index'] === $totalChunks - 1) { + $response = $chunkResponse; + } + if($onProgress !== null) { + $onProgress([ + '$id' => $uploadId, + 'progress' => $uploadedSize / $size * 100, + 'sizeUploaded' => $uploadedSize, + 'chunksTotal' => $chunkResponse['chunksTotal'] ?? $totalChunks, + 'chunksUploaded' => $chunkResponse['chunksUploaded'] ?? $completedCount, + ]); + } + + curl_multi_remove_handle($multiHandle, $ch); + } + + curl_multi_close($multiHandle); + } } if(!empty($handle)) { @fclose($handle); diff --git a/templates/python/package/client.py.twig b/templates/python/package/client.py.twig index 75f5b6e6b0..f311d00a48 100644 --- a/templates/python/package/client.py.twig +++ b/templates/python/package/client.py.twig @@ -4,6 +4,8 @@ import os import platform import sys import requests +from concurrent.futures import ThreadPoolExecutor, as_completed +from threading import Lock from .input_file import InputFile from .exception import {{spec.title | caseUcfirst}}Exception from .encoders.value_class_encoder import ValueClassEncoder @@ -162,46 +164,96 @@ class Client: if counter > 0: offset = counter * self._chunk_size - input.seek(offset) + if input_file.source_type == 'path': + input.seek(offset) + total_chunks = (size + self._chunk_size - 1) // self._chunk_size + chunks = [] while offset < size: - if input_file.source_type == 'path': - input_file.data = input.read(self._chunk_size) or input.read(size - offset) - elif input_file.source_type == 'bytes': - if offset + self._chunk_size < size: - end = offset + self._chunk_size - else: - end = size - input_file.data = input[offset:end] + end = min(offset + self._chunk_size, size) + chunks.append({ + 'index': counter, + 'start': offset, + 'end': end, + }) + offset = end + counter = counter + 1 - params[param_name] = input_file - headers["content-range"] = f'bytes {offset}-{min((offset + self._chunk_size) - 1, size - 1)}/{size}' + if not chunks: + return result + + def read_chunk(start, end): + if input_file.source_type == 'path': + with open(input_file.path, 'rb') as chunk_file: + chunk_file.seek(start) + return chunk_file.read(end - start) + return input[start:end] + + upload_id_header = upload_id + completed_count = chunks[0]['index'] + uploaded_size = chunks[0]['start'] + progress_lock = Lock() + last_result = None + + def upload_chunk(chunk, current_upload_id): + chunk_input = InputFile.from_bytes( + read_chunk(chunk['start'], chunk['end']), + input_file.filename, + getattr(input_file, 'mime_type', None) + ) + chunk_params = {**params, param_name: chunk_input} + chunk_headers = {**headers} + chunk_headers["content-range"] = f"bytes {chunk['start']}-{chunk['end'] - 1}/{size}" + if current_upload_id: + chunk_headers["x-{{ spec.title | caseLower }}-id"] = current_upload_id - result = self.call( + return self.call( 'post', path, - headers, - params, + chunk_headers, + chunk_params, ) - offset = offset + self._chunk_size - - if "$id" in result: - headers["x-{{ spec.title | caseLower }}-id"] = result["$id"] - - if on_progress is not None: - end = min((((counter * self._chunk_size) + self._chunk_size) - 1), size - 1) - on_progress({ - "$id": result["$id"], - "progress": min(offset, size)/size * 100, - "sizeUploaded": end+1, - "chunksTotal": result["chunksTotal"], - "chunksUploaded": result["chunksUploaded"], - }) - - counter = counter + 1 - - return result + result = upload_chunk(chunks[0], upload_id_header) + last_result = result + if "$id" in result: + upload_id_header = result["$id"] + + completed_count = chunks[0]['index'] + 1 + uploaded_size = chunks[0]['end'] + + if on_progress is not None: + on_progress({ + "$id": result["$id"], + "progress": uploaded_size / size * 100, + "sizeUploaded": uploaded_size, + "chunksTotal": result.get("chunksTotal", total_chunks), + "chunksUploaded": result.get("chunksUploaded", completed_count), + }) + + def upload_remaining_chunk(chunk): + nonlocal completed_count, uploaded_size, last_result + chunk_result = upload_chunk(chunk, upload_id_header) + with progress_lock: + completed_count = completed_count + 1 + uploaded_size = uploaded_size + (chunk['end'] - chunk['start']) + if chunk['index'] == total_chunks - 1: + last_result = chunk_result + if on_progress is not None: + on_progress({ + "$id": upload_id_header, + "progress": uploaded_size / size * 100, + "sizeUploaded": uploaded_size, + "chunksTotal": chunk_result.get("chunksTotal", total_chunks), + "chunksUploaded": chunk_result.get("chunksUploaded", completed_count), + }) + + with ThreadPoolExecutor(max_workers=8) as executor: + futures = [executor.submit(upload_remaining_chunk, chunk) for chunk in chunks[1:]] + for future in as_completed(futures): + future.result() + + return last_result def flatten(self, data, prefix='', stringify=False): output = {} diff --git a/templates/ruby/lib/container/client.rb.twig b/templates/ruby/lib/container/client.rb.twig index 4a0911f523..c971b45556 100644 --- a/templates/ruby/lib/container/client.rb.twig +++ b/templates/ruby/lib/container/client.rb.twig @@ -142,7 +142,9 @@ module {{ spec.title | caseUcfirst }} offset = 0 id_param_name = id_param_name.to_sym if id_param_name + upload_id = nil if id_param_name&.empty? == false + upload_id = params[id_param_name] # Make a request to check if a file already exists current = call( method: "GET", @@ -154,44 +156,88 @@ module {{ spec.title | caseUcfirst }} offset = chunks_uploaded * @chunk_size end + total_chunks = (size.to_f / @chunk_size).ceil + chunks = [] while offset < size + chunks << { + index: chunks_uploaded.to_i, + start: offset, + ending: [offset + @chunk_size, size].min + } + offset += @chunk_size + chunks_uploaded = chunks_uploaded.to_i + 1 + end + + result = current if defined?(current) + return result unless chunks.any? + + upload_chunk = lambda do |chunk, current_upload_id| case input_file.source_type when 'path' - string = IO.read(input_file.path, @chunk_size, offset) + string = IO.read(input_file.path, chunk[:ending] - chunk[:start], chunk[:start]) when 'string' - string = input_file.data.byteslice(offset, [@chunk_size, size - offset].min) + string = input_file.data.byteslice(chunk[:start], chunk[:ending] - chunk[:start]) end - params[param_name.to_sym] = InputFile::from_string( + chunk_params = params.merge(param_name.to_sym => InputFile::from_string( string, filename: input_file.filename, mime_type: input_file.mime_type - ) + )) - headers['content-range'] = "bytes #{offset}-#{[offset + @chunk_size - 1, size - 1].min}/#{size}" + chunk_headers = headers.merge('content-range' => "bytes #{chunk[:start]}-#{chunk[:ending] - 1}/#{size}") + chunk_headers['x-{{ spec.title | caseLower }}-id'] = current_upload_id if current_upload_id - result = call( + call( method: 'POST', path: path, - headers: headers, - params: params, + headers: chunk_headers, + params: chunk_params, ) + end - offset += @chunk_size - - if defined? result['$id'] - headers['x-{{ spec.title | caseLower }}-id'] = result['$id'] + result = upload_chunk.call(chunks.first, upload_id) + upload_id = result['$id'] if result['$id'] + completed_count = chunks.first[:index] + 1 + uploaded_size = chunks.first[:ending] + + on_progress.call({ + id: result['$id'], + progress: uploaded_size.to_f/size.to_f * 100.0, + size_uploaded: uploaded_size, + chunks_total: result['chunksTotal'] || total_chunks, + chunks_uploaded: result['chunksUploaded'] || completed_count + }) unless on_progress.nil? + + mutex = Mutex.new + queue = Queue.new + chunks.drop(1).each { |chunk| queue << chunk } + + workers = [8, queue.size].min.times.map do + Thread.new do + loop do + chunk = queue.pop(true) rescue nil + break unless chunk + + chunk_result = upload_chunk.call(chunk, upload_id) + mutex.synchronize do + completed_count += 1 + uploaded_size += chunk[:ending] - chunk[:start] + result = chunk_result if chunk[:index] == total_chunks - 1 + on_progress.call({ + id: upload_id, + progress: uploaded_size.to_f/size.to_f * 100.0, + size_uploaded: uploaded_size, + chunks_total: chunk_result['chunksTotal'] || total_chunks, + chunks_uploaded: chunk_result['chunksUploaded'] || completed_count + }) unless on_progress.nil? + end + end end - - on_progress.call({ - id: result['$id'], - progress: ([offset, size].min).to_f/size.to_f * 100.0, - size_uploaded: [offset, size].min, - chunks_total: result['chunksTotal'], - chunks_uploaded: result['chunksUploaded'] - }) unless on_progress.nil? end + workers.each(&:value) + return result unless response_type.respond_to?("from") response_type.from(map: result) @@ -209,8 +255,8 @@ module {{ spec.title | caseUcfirst }} ) raise ArgumentError, 'Too Many HTTP Redirects' if limit == 0 - @http = Net::HTTP.new(uri.host, uri.port) unless defined? @http - @http.use_ssl = !@self_signed + http = Net::HTTP.new(uri.host, uri.port) + http.use_ssl = !@self_signed payload = '' headers = @headers.merge(headers) @@ -231,7 +277,7 @@ module {{ spec.title | caseUcfirst }} end begin - response = @http.send_request(method, uri.request_uri, payload, headers) + response = http.send_request(method, uri.request_uri, payload, headers) rescue => error raise {{spec.title | caseUcfirst}}::Exception.new(error.message) end diff --git a/templates/rust/src/client.rs.twig b/templates/rust/src/client.rs.twig index 0c2c68016d..2b9e8ccc85 100644 --- a/templates/rust/src/client.rs.twig +++ b/templates/rust/src/client.rs.twig @@ -10,6 +10,7 @@ use serde_json::Value; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; +use tokio::task::JoinSet; use url::Url; /// Default request timeout in seconds @@ -748,99 +749,206 @@ impl Client { } } - let mut reader = input_file.chunked_reader().await?; - - if start_chunk > 0 { - let resume_offset = start_chunk * chunk_size as u64; - reader.seek(resume_offset).await?; - } - let mut last_response = None; + let mut next_chunk = start_chunk; + let mut uploaded_chunks = start_chunk; + let mut uploaded_bytes = (start_chunk * chunk_size as u64).min(file_size); + + if next_chunk == 0 { + let result = self.upload_file_chunk( + path, + headers.clone(), + params.clone(), + param_name, + input_file, + current_upload_id.clone(), + 0, + file_size, + chunk_size, + ).await?; + + if let Some(id) = result.get("$id").and_then(|v| v.as_str()) { + current_upload_id = Some(id.to_string()); + } - for chunk_index in start_chunk..total_chunks { - let chunk_data = match reader.read_next(chunk_size).await? { - Some(data) => data, - None => break, - }; - let actual_chunk_size = chunk_data.len(); - let start = reader.position() - actual_chunk_size as u64; + uploaded_chunks = 1; + uploaded_bytes = (chunk_size as u64).min(file_size); + last_response = Some(result.clone()); - if actual_chunk_size == 0 { - break; + if let Some(ref callback) = options.on_progress { + callback(UploadProgress { + bytes_uploaded: uploaded_bytes, + total_bytes: file_size, + chunks_uploaded: uploaded_chunks, + total_chunks, + }); } - let state = self.state.load_full(); - let mut form = multipart::Form::new(); - let mut file_part = multipart::Part::stream_with_length(chunk_data, actual_chunk_size as u64) - .file_name(input_file.filename().to_string()); + next_chunk = 1; + } + + let max_concurrency = 8usize; + let mut tasks = JoinSet::new(); + + while tasks.len() < max_concurrency && next_chunk < total_chunks { + let client = self.clone(); + let path = path.to_string(); + let headers = headers.clone(); + let params = params.clone(); + let param_name = param_name.to_string(); + let input_file = input_file.clone(); + let upload_id = current_upload_id.clone(); + let chunk_index = next_chunk; + + tasks.spawn(async move { + let result = client.upload_file_chunk( + &path, + headers, + params, + ¶m_name, + &input_file, + upload_id, + chunk_index, + file_size, + chunk_size, + ).await?; + + Ok::<_, {{ spec.title | caseUcfirst }}Error>((chunk_index, result)) + }); + next_chunk += 1; + } + + while let Some(joined) = tasks.join_next().await { + let (chunk_index, result) = joined + .map_err(|e| {{ spec.title | caseUcfirst }}Error::new(0, format!("Chunk upload task failed: {}", e), None, String::new()))??; - if let Some(mime_type) = input_file.mime_type() { - file_part = file_part.mime_str(mime_type) - .map_err(|e| {{ spec.title | caseUcfirst }}Error::new(0, format!("Invalid MIME type: {}", e), None, String::new()))?; + if chunk_index == total_chunks - 1 { + last_response = Some(result.clone()); } - form = form.part(param_name.to_string(), file_part); + uploaded_chunks += 1; + let chunk_start = chunk_index * chunk_size as u64; + let chunk_end = (chunk_start + chunk_size as u64).min(file_size); + uploaded_bytes += chunk_end - chunk_start; - let mut params_to_flatten = HashMap::new(); - for (key, value) in ¶ms { - if key != param_name { - params_to_flatten.insert(key.clone(), value.clone()); - } + if let Some(ref callback) = options.on_progress { + callback(UploadProgress { + bytes_uploaded: uploaded_bytes.min(file_size), + total_bytes: file_size, + chunks_uploaded: uploaded_chunks, + total_chunks, + }); } - for (key, value_str) in Self::flatten_multipart_params(¶ms_to_flatten, "") { - form = form.text(key, value_str); + while tasks.len() < max_concurrency && next_chunk < total_chunks { + let client = self.clone(); + let path = path.to_string(); + let headers = headers.clone(); + let params = params.clone(); + let param_name = param_name.to_string(); + let input_file = input_file.clone(); + let upload_id = current_upload_id.clone(); + let chunk_index = next_chunk; + + tasks.spawn(async move { + let result = client.upload_file_chunk( + &path, + headers, + params, + ¶m_name, + &input_file, + upload_id, + chunk_index, + file_size, + chunk_size, + ).await?; + + Ok::<_, {{ spec.title | caseUcfirst }}Error>((chunk_index, result)) + }); + next_chunk += 1; } + } - let url = format!("{}{}", state.config.endpoint, path); - let mut request_builder = state.http.post(url).headers(state.config.headers.clone()); + last_response + .ok_or_else(|| {{ spec.title | caseUcfirst }}Error::new(0, "No chunks uploaded", None, String::new())) + .and_then(|v| serde_json::from_value(v).map_err(Into::into)) + } - if let Some(ref custom_headers) = headers { - for (key, value) in custom_headers { - // Skip content-type for multipart - reqwest sets it automatically with boundary - if key.to_lowercase() != "content-type" { - request_builder = request_builder.header(key, value); - } - } - } + async fn upload_file_chunk( + &self, + path: &str, + headers: Option>, + params: HashMap, + param_name: &str, + input_file: &InputFile, + upload_id: Option, + chunk_index: u64, + file_size: u64, + chunk_size: usize, + ) -> Result { + let start = chunk_index * chunk_size as u64; + let mut reader = input_file.chunked_reader().await?; + reader.seek(start).await?; - if let Some(ref id) = current_upload_id { - request_builder = request_builder.header("x-appwrite-id", id); - } + let chunk_data = reader + .read_next(chunk_size) + .await? + .ok_or_else(|| {{ spec.title | caseUcfirst }}Error::new(0, "Chunk data not found", None, String::new()))?; + let actual_chunk_size = chunk_data.len(); - let chunk_end = start + actual_chunk_size as u64 - 1; - let content_range = format!("bytes {}-{}/{}", start, chunk_end, file_size); - request_builder = request_builder.header("content-range", content_range); + if actual_chunk_size == 0 { + return Err({{ spec.title | caseUcfirst }}Error::new(0, "Chunk data is empty", None, String::new())); + } - let response = request_builder - .multipart(form) - .send() - .await - ?; + let state = self.state.load_full(); + let mut form = multipart::Form::new(); + let mut file_part = multipart::Part::stream_with_length(chunk_data, actual_chunk_size as u64) + .file_name(input_file.filename().to_string()); - let result: Value = self.handle_response(response).await?; + if let Some(mime_type) = input_file.mime_type() { + file_part = file_part.mime_str(mime_type) + .map_err(|e| {{ spec.title | caseUcfirst }}Error::new(0, format!("Invalid MIME type: {}", e), None, String::new()))?; + } - if current_upload_id.is_none() { - if let Some(id) = result.get("$id").and_then(|v| v.as_str()) { - current_upload_id = Some(id.to_string()); - } + form = form.part(param_name.to_string(), file_part); + + let mut params_to_flatten = HashMap::new(); + for (key, value) in ¶ms { + if key != param_name { + params_to_flatten.insert(key.clone(), value.clone()); } + } - last_response = Some(result); + for (key, value_str) in Self::flatten_multipart_params(¶ms_to_flatten, "") { + form = form.text(key, value_str); + } - if let Some(ref callback) = options.on_progress { - callback(UploadProgress { - bytes_uploaded: start + actual_chunk_size as u64, - total_bytes: file_size, - chunks_uploaded: chunk_index + 1, - total_chunks, - }); + let url = format!("{}{}", state.config.endpoint, path); + let mut request_builder = state.http.post(url).headers(state.config.headers.clone()); + + if let Some(custom_headers) = headers { + for (key, value) in custom_headers { + // Skip content-type for multipart - reqwest sets it automatically with boundary + if key.to_lowercase() != "content-type" { + request_builder = request_builder.header(key, value); + } } } - last_response - .ok_or_else(|| {{ spec.title | caseUcfirst }}Error::new(0, "No chunks uploaded", None, String::new())) - .and_then(|v| serde_json::from_value(v).map_err(Into::into)) + if let Some(id) = upload_id { + request_builder = request_builder.header("x-appwrite-id", id); + } + + let chunk_end = start + actual_chunk_size as u64 - 1; + let content_range = format!("bytes {}-{}/{}", start, chunk_end, file_size); + request_builder = request_builder.header("content-range", content_range); + + let response = request_builder + .multipart(form) + .send() + .await?; + + self.handle_response(response).await } async fn handle_response(&self, response: Response) -> Result { diff --git a/templates/swift/Sources/Client.swift.twig b/templates/swift/Sources/Client.swift.twig index 100a00aeb4..c18263be9f 100644 --- a/templates/swift/Sources/Client.swift.twig +++ b/templates/swift/Sources/Client.swift.twig @@ -443,6 +443,7 @@ open class Client { var offset = 0 var result = [String:Any]() + var uploadId = idParamName != nil ? params[idParamName!] as? String : nil if idParamName != nil { // Make a request to check if a file already exists @@ -461,32 +462,89 @@ open class Client { } } - while offset < size { - let slice = (input.data as! ByteBuffer).getSlice(at: offset, length: Client.chunkSize) - ?? (input.data as! ByteBuffer).getSlice(at: offset, length: Int(size - offset)) - - params[paramName] = InputFile.fromBuffer(slice!, filename: input.filename, mimeType: input.mimeType) - headers["content-range"] = "bytes \(offset)-\(min((offset + Client.chunkSize) - 1, size - 1))/\(size)" + let totalChunks = Int(ceil(Double(size) / Double(Client.chunkSize))) + var nextChunk = offset / Client.chunkSize + var completedChunks = nextChunk + var uploadedBytes = min(offset, size) + + func uploadChunk(index: Int, uploadId: String?) async throws -> (Int, Int, [String: Any]) { + let chunkOffset = index * Client.chunkSize + let chunkLength = min(Client.chunkSize, size - chunkOffset) + let slice = (input.data as! ByteBuffer).getSlice(at: chunkOffset, length: chunkLength)! + var chunkParams = params + var chunkHeaders = headers + chunkParams[paramName] = InputFile.fromBuffer(slice, filename: input.filename, mimeType: input.mimeType) + chunkHeaders["content-range"] = "bytes \(chunkOffset)-\(chunkOffset + chunkLength - 1)/\(size)" + if let uploadId = uploadId { + chunkHeaders["x-{{ spec.title | caseLower }}-id"] = uploadId + } - result = try await call( + let chunkResult = try await call( method: "POST", path: path, - headers: headers, - params: params, + headers: chunkHeaders, + params: chunkParams, converter: { return $0 as! [String: Any] } ) - offset += Client.chunkSize - headers["x-{{ spec.title | caseLower }}-id"] = result["$id"] as? String + return (index, chunkLength, chunkResult) + } + + if nextChunk == 0 { + let first = try await uploadChunk(index: 0, uploadId: uploadId) + result = first.2 + uploadId = result["$id"] as? String + nextChunk = 1 + completedChunks = 1 + uploadedBytes = first.1 onProgress?(UploadProgress( - id: result["$id"] as? String ?? "", - progress: Double(min(offset, size))/Double(size) * 100.0, - sizeUploaded: min(offset, size), - chunksTotal: result["chunksTotal"] as? Int ?? -1, - chunksUploaded: result["chunksUploaded"] as? Int ?? -1 + id: uploadId ?? "", + progress: Double(uploadedBytes)/Double(size) * 100.0, + sizeUploaded: uploadedBytes, + chunksTotal: result["chunksTotal"] as? Int ?? totalChunks, + chunksUploaded: result["chunksUploaded"] as? Int ?? completedChunks )) } + let maxConcurrency = 8 + + try await withThrowingTaskGroup(of: (Int, Int, [String: Any]).self) { group in + var inFlight = 0 + + while inFlight < maxConcurrency && nextChunk < totalChunks { + let index = nextChunk + let currentUploadId = uploadId + group.addTask { try await uploadChunk(index: index, uploadId: currentUploadId) } + nextChunk += 1 + inFlight += 1 + } + + while let chunk = try await group.next() { + inFlight -= 1 + completedChunks += 1 + uploadedBytes += chunk.1 + if chunk.0 == totalChunks - 1 { + result = chunk.2 + } + + onProgress?(UploadProgress( + id: uploadId ?? "", + progress: Double(min(uploadedBytes, size))/Double(size) * 100.0, + sizeUploaded: min(uploadedBytes, size), + chunksTotal: chunk.2["chunksTotal"] as? Int ?? totalChunks, + chunksUploaded: chunk.2["chunksUploaded"] as? Int ?? completedChunks + )) + + while inFlight < maxConcurrency && nextChunk < totalChunks { + let index = nextChunk + let currentUploadId = uploadId + group.addTask { try await uploadChunk(index: index, uploadId: currentUploadId) } + nextChunk += 1 + inFlight += 1 + } + } + } + return try converter!(result) } From 7087aa38f4198cd6d3637360c0a1ec3c9622eb04 Mon Sep 17 00:00:00 2001 From: Torsten Dittmann Date: Tue, 5 May 2026 17:46:26 +0400 Subject: [PATCH 3/3] Fix SDK upload validation failures --- templates/apple/Sources/Client.swift.twig | 6 ++++-- templates/dotnet/Package/Client.cs.twig | 2 +- templates/swift/Sources/Client.swift.twig | 6 ++++-- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/templates/apple/Sources/Client.swift.twig b/templates/apple/Sources/Client.swift.twig index 7922d4939f..348996c497 100644 --- a/templates/apple/Sources/Client.swift.twig +++ b/templates/apple/Sources/Client.swift.twig @@ -464,13 +464,15 @@ open class Client { var nextChunk = offset / Client.chunkSize var completedChunks = nextChunk var uploadedBytes = min(offset, size) + let baseParams = params + let baseHeaders = headers func uploadChunk(index: Int, uploadId: String?) async throws -> (Int, Int, [String: Any]) { let chunkOffset = index * Client.chunkSize let chunkLength = min(Client.chunkSize, size - chunkOffset) let slice = (input.data as! ByteBuffer).getSlice(at: chunkOffset, length: chunkLength)! - var chunkParams = params - var chunkHeaders = headers + var chunkParams = baseParams + var chunkHeaders = baseHeaders chunkParams[paramName] = InputFile.fromBuffer(slice, filename: input.filename, mimeType: input.mimeType) chunkHeaders["content-range"] = "bytes \(chunkOffset)-\(chunkOffset + chunkLength - 1)/\(size)" if let uploadId = uploadId { diff --git a/templates/dotnet/Package/Client.cs.twig b/templates/dotnet/Package/Client.cs.twig index 6d72275dc5..bb62c14157 100644 --- a/templates/dotnet/Package/Client.cs.twig +++ b/templates/dotnet/Package/Client.cs.twig @@ -473,7 +473,7 @@ namespace {{ spec.title | caseUcfirst }} switch(input.SourceType) { case "path": - using (var chunkStream = File.OpenRead(input.Path)) + using (var chunkStream = System.IO.File.OpenRead(input.Path)) { chunkStream.Seek(start, SeekOrigin.Begin); var read = 0; diff --git a/templates/swift/Sources/Client.swift.twig b/templates/swift/Sources/Client.swift.twig index c18263be9f..3ee17adf90 100644 --- a/templates/swift/Sources/Client.swift.twig +++ b/templates/swift/Sources/Client.swift.twig @@ -466,13 +466,15 @@ open class Client { var nextChunk = offset / Client.chunkSize var completedChunks = nextChunk var uploadedBytes = min(offset, size) + let baseParams = params + let baseHeaders = headers func uploadChunk(index: Int, uploadId: String?) async throws -> (Int, Int, [String: Any]) { let chunkOffset = index * Client.chunkSize let chunkLength = min(Client.chunkSize, size - chunkOffset) let slice = (input.data as! ByteBuffer).getSlice(at: chunkOffset, length: chunkLength)! - var chunkParams = params - var chunkHeaders = headers + var chunkParams = baseParams + var chunkHeaders = baseHeaders chunkParams[paramName] = InputFile.fromBuffer(slice, filename: input.filename, mimeType: input.mimeType) chunkHeaders["content-range"] = "bytes \(chunkOffset)-\(chunkOffset + chunkLength - 1)/\(size)" if let uploadId = uploadId {