Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions package-lock.json

This file was deleted.

126 changes: 95 additions & 31 deletions templates/android/library/src/main/java/io/package/Client.kt.twig
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import {{ sdk.namespace | caseDot }}.models.UploadProgress
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.suspendCancellableCoroutine
import okhttp3.*
import okhttp3.Headers.Companion.toHeaders
Expand All @@ -30,6 +33,8 @@ import java.net.CookieManager
import java.net.CookiePolicy
import java.security.SecureRandom
import java.security.cert.X509Certificate
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLong
import javax.net.ssl.SSLContext
import javax.net.ssl.SSLSocketFactory
import javax.net.ssl.TrustManager
Expand All @@ -49,6 +54,7 @@ class Client @JvmOverloads constructor(
* The size for chunked uploads in bytes.
*/
internal const val CHUNK_SIZE = 5*1024*1024; // 5MB
internal const val MAX_CONCURRENT_UPLOADS = 8
internal const val GLOBAL_PREFS = "{{ sdk.namespace | caseDot }}"
internal const val COOKIE_PREFS = "myCookie"
}
Expand Down Expand Up @@ -374,12 +380,10 @@ class Client @JvmOverloads constructor(
idParamName: String? = null,
onProgress: ((UploadProgress) -> Unit)? = null,
): T {
var file: RandomAccessFile? = null
val input = params[paramName] as InputFile
val size: Long = when(input.sourceType) {
"path", "file" -> {
file = RandomAccessFile(input.path, "r")
file.length()
File(input.path).length()
}
"bytes" -> {
(input.data as ByteArray).size.toLong()
Expand Down Expand Up @@ -408,9 +412,9 @@ class Client @JvmOverloads constructor(
)
}

val buffer = ByteArray(CHUNK_SIZE)
var offset = 0L
var result: Map<*, *>? = null
var uploadId: String? = null

if (idParamName?.isNotEmpty() == true) {
// Make a request to check if a file already exists
Expand All @@ -423,59 +427,119 @@ class Client @JvmOverloads constructor(
)
val chunksUploaded = current["chunksUploaded"] as Long
offset = chunksUploaded * CHUNK_SIZE
uploadId = params[idParamName]?.toString()
}

while (offset < size) {
when(input.sourceType) {
fun readChunk(start: Long, end: Long): ByteArray {
val length = (end - start).toInt()
return when(input.sourceType) {
"file", "path" -> {
file!!.seek(offset)
file!!.read(buffer)
RandomAccessFile(input.path, "r").use { chunkFile ->
val chunk = ByteArray(length)
chunkFile.seek(start)
chunkFile.readFully(chunk)
chunk
}
}
"bytes" -> {
val end = if (offset + CHUNK_SIZE < size) {
offset + CHUNK_SIZE - 1
} else {
size - 1
}
(input.data as ByteArray).copyInto(
buffer,
startIndex = offset.toInt(),
endIndex = end.toInt()
)
(input.data as ByteArray).copyOfRange(start.toInt(), end.toInt())
}
else -> throw UnsupportedOperationException()
}
}

params[paramName] = MultipartBody.Part.createFormData(
suspend fun uploadChunk(index: Int, start: Long, end: Long, includeUploadId: Boolean): Map<*, *> {
val chunkParams = params.toMutableMap()
val chunkHeaders = headers.toMutableMap()

if (includeUploadId && uploadId != null) {
chunkHeaders["x-{{ spec.title | caseLower }}-id"] = uploadId!!
}

chunkHeaders["Content-Range"] = "bytes $start-${end - 1}/$size"
chunkParams[paramName] = MultipartBody.Part.createFormData(
paramName,
input.filename,
buffer.toRequestBody()
readChunk(start, end).toRequestBody()
)

headers["Content-Range"] =
"bytes $offset-${((offset + CHUNK_SIZE) - 1).coerceAtMost(size - 1)}/$size"

result = call(
val chunkResult = call(
method = "POST",
path,
headers,
params,
chunkHeaders,
chunkParams,
responseType = Map::class.java
)

offset += CHUNK_SIZE
headers["x-{{ spec.title | caseLower }}-id"] = result["\$id"].toString()
if (index == 0 || uploadId == null) {
uploadId = chunkResult["\$id"].toString()
}

return chunkResult
}

if (offset == 0L) {
val firstChunkEnd = CHUNK_SIZE.toLong().coerceAtMost(size)
result = uploadChunk(0, 0, firstChunkEnd, false)
offset = firstChunkEnd
onProgress?.invoke(
UploadProgress(
id = result["\$id"].toString(),
id = uploadId ?: result!!["\$id"].toString(),
progress = offset.coerceAtMost(size).toDouble() / size * 100,
sizeUploaded = offset.coerceAtMost(size),
chunksTotal = result["chunksTotal"].toString().toInt(),
chunksUploaded = result["chunksUploaded"].toString().toInt(),
chunksTotal = result!!["chunksTotal"].toString().toInt(),
chunksUploaded = result!!["chunksUploaded"].toString().toInt(),
)
)
}

val chunks = mutableListOf<Triple<Int, Long, Long>>()
var chunkOffset = offset
while (chunkOffset < size) {
val end = (chunkOffset + CHUNK_SIZE).coerceAtMost(size)
chunks.add(Triple((chunkOffset / CHUNK_SIZE).toInt(), chunkOffset, end))
chunkOffset = end
}

if (chunks.isNotEmpty()) {
val nextChunk = AtomicInteger(0)
val completedChunks = AtomicInteger((offset / CHUNK_SIZE).toInt())
val uploadedBytes = AtomicLong(offset.coerceAtMost(size))

coroutineScope {
List(MAX_CONCURRENT_UPLOADS.coerceAtMost(chunks.size)) {
async {
while (true) {
val chunkIndex = nextChunk.getAndIncrement()
if (chunkIndex >= chunks.size) {
break
}

val (index, start, end) = chunks[chunkIndex]
val chunkResult = uploadChunk(index, start, end, true)

val chunksUploaded = completedChunks.incrementAndGet()
val sizeUploaded = uploadedBytes.addAndGet(end - start)

if (index == ((size - 1) / CHUNK_SIZE).toInt()) {
result = chunkResult
}

onProgress?.invoke(
UploadProgress(
id = uploadId ?: chunkResult["\$id"].toString(),
progress = sizeUploaded.coerceAtMost(size).toDouble() / size * 100,
sizeUploaded = sizeUploaded.coerceAtMost(size),
chunksTotal = chunkResult["chunksTotal"].toString().toInt(),
chunksUploaded = chunksUploaded,
)
)
}
}
}.awaitAll()
}
}

return converter(result as Map<String, Any>)
}

Expand Down
92 changes: 76 additions & 16 deletions templates/apple/Sources/Client.swift.twig
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ open class Client {

var offset = 0
var result = [String:Any]()
var uploadId = idParamName != nil ? params[idParamName!] as? String : nil

if idParamName != nil {
// Make a request to check if a file already exists
Expand All @@ -459,32 +460,91 @@ open class Client {
}
}

while offset < size {
let slice = (input.data as! ByteBuffer).getSlice(at: offset, length: Client.chunkSize)
?? (input.data as! ByteBuffer).getSlice(at: offset, length: Int(size - offset))

params[paramName] = InputFile.fromBuffer(slice!, filename: input.filename, mimeType: input.mimeType)
headers["content-range"] = "bytes \(offset)-\(min((offset + Client.chunkSize) - 1, size - 1))/\(size)"
let totalChunks = Int(ceil(Double(size) / Double(Client.chunkSize)))
var nextChunk = offset / Client.chunkSize
var completedChunks = nextChunk
var uploadedBytes = min(offset, size)
let baseParams = params
let baseHeaders = headers

func uploadChunk(index: Int, uploadId: String?) async throws -> (Int, Int, [String: Any]) {
let chunkOffset = index * Client.chunkSize
let chunkLength = min(Client.chunkSize, size - chunkOffset)
let slice = (input.data as! ByteBuffer).getSlice(at: chunkOffset, length: chunkLength)!
var chunkParams = baseParams
var chunkHeaders = baseHeaders
chunkParams[paramName] = InputFile.fromBuffer(slice, filename: input.filename, mimeType: input.mimeType)
chunkHeaders["content-range"] = "bytes \(chunkOffset)-\(chunkOffset + chunkLength - 1)/\(size)"
if let uploadId = uploadId {
chunkHeaders["x-{{ spec.title | caseLower }}-id"] = uploadId
}

result = try await call(
let chunkResult = try await call(
method: "POST",
path: path,
headers: headers,
params: params,
headers: chunkHeaders,
params: chunkParams,
converter: { return $0 as! [String: Any] }
)

offset += Client.chunkSize
headers["x-{{ spec.title | caseLower }}-id"] = result["$id"] as? String
return (index, chunkLength, chunkResult)
}

if nextChunk == 0 {
let first = try await uploadChunk(index: 0, uploadId: uploadId)
result = first.2
uploadId = result["$id"] as? String
nextChunk = 1
completedChunks = 1
uploadedBytes = first.1
onProgress?(UploadProgress(
id: result["$id"] as? String ?? "",
progress: Double(min(offset, size))/Double(size) * 100.0,
sizeUploaded: min(offset, size),
chunksTotal: result["chunksTotal"] as? Int ?? -1,
chunksUploaded: result["chunksUploaded"] as? Int ?? -1
id: uploadId ?? "",
progress: Double(uploadedBytes)/Double(size) * 100.0,
sizeUploaded: uploadedBytes,
chunksTotal: result["chunksTotal"] as? Int ?? totalChunks,
chunksUploaded: result["chunksUploaded"] as? Int ?? completedChunks
))
}

let maxConcurrency = 8

try await withThrowingTaskGroup(of: (Int, Int, [String: Any]).self) { group in
var inFlight = 0

while inFlight < maxConcurrency && nextChunk < totalChunks {
let index = nextChunk
let currentUploadId = uploadId
group.addTask { try await uploadChunk(index: index, uploadId: currentUploadId) }
nextChunk += 1
inFlight += 1
}

while let chunk = try await group.next() {
inFlight -= 1
completedChunks += 1
uploadedBytes += chunk.1
if chunk.0 == totalChunks - 1 {
result = chunk.2
}

onProgress?(UploadProgress(
id: uploadId ?? "",
progress: Double(min(uploadedBytes, size))/Double(size) * 100.0,
sizeUploaded: min(uploadedBytes, size),
chunksTotal: chunk.2["chunksTotal"] as? Int ?? totalChunks,
chunksUploaded: chunk.2["chunksUploaded"] as? Int ?? completedChunks
))

while inFlight < maxConcurrency && nextChunk < totalChunks {
let index = nextChunk
let currentUploadId = uploadId
group.addTask { try await uploadChunk(index: index, uploadId: currentUploadId) }
nextChunk += 1
inFlight += 1
}
}
}

return try converter!(result)
}

Expand Down
Loading
Loading