diff --git a/lib/upload/errors/UploadFailedError.spec.ts b/lib/upload/errors/UploadFailedError.spec.ts new file mode 100644 index 00000000..1a16aa8f --- /dev/null +++ b/lib/upload/errors/UploadFailedError.spec.ts @@ -0,0 +1,44 @@ +/*! + * SPDX-FileCopyrightText: 2026 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +import type { AxiosRequestHeaders, AxiosResponse } from 'axios' + +import { AxiosError } from 'axios' +import { expect, test } from 'vitest' +import { UploadFailedError } from './UploadFailedError.ts' + +test('UploadFailedError - axios error but no response', () => { + const cause = new AxiosError('Network error') + const error = new UploadFailedError(cause) + expect(error).toBeInstanceOf(Error) + expect(error).toBeInstanceOf(UploadFailedError) + expect(error.message).toBe('Upload has failed') + expect(error.cause).toBe(cause) + expect(error).toHaveProperty('__UPLOAD_FAILED__') + expect(error.response).toBeUndefined() +}) + +test('UploadFailedError - axios error', () => { + const response = {} as AxiosResponse + const cause = new AxiosError('Network error', '200', { headers: {} as AxiosRequestHeaders }, {}, response) + const error = new UploadFailedError(cause) + expect(error).toBeInstanceOf(Error) + expect(error).toBeInstanceOf(UploadFailedError) + expect(error.message).toBe('Upload has failed') + expect(error.cause).toBe(cause) + expect(error).toHaveProperty('__UPLOAD_FAILED__') + expect(error.response).toBe(response) +}) + +test('UploadFailedError - generic error', () => { + const cause = new Error('Generic error') + const error = new UploadFailedError(cause) + expect(error).toBeInstanceOf(Error) + expect(error).toBeInstanceOf(UploadFailedError) + expect(error.message).toBe('Upload has failed') + expect(error.cause).toBe(cause) + expect(error).toHaveProperty('__UPLOAD_FAILED__') + expect(error.response).toBeUndefined() +}) diff --git a/lib/upload/errors/UploadFailedError.ts b/lib/upload/errors/UploadFailedError.ts new file mode 100644 index 00000000..e85b3f78 --- /dev/null +++ b/lib/upload/errors/UploadFailedError.ts @@ -0,0 +1,21 @@ +/*! + * SPDX-FileCopyrightText: 2026 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +import type { AxiosResponse } from '@nextcloud/axios' + +import { isAxiosError } from '@nextcloud/axios' + +export class UploadFailedError extends Error { + private __UPLOAD_FAILED__ = true + + readonly response?: AxiosResponse + + public constructor(cause?: unknown) { + super('Upload has failed', { cause }) + if (isAxiosError(cause) && cause.response) { + this.response = cause.response + } + } +} diff --git a/lib/upload/index.ts b/lib/upload/index.ts index 0891cb85..8d749dee 100644 --- a/lib/upload/index.ts +++ b/lib/upload/index.ts @@ -3,10 +3,7 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ -export type { Eta, EtaEventsMap } from './uploader/index.ts' -export type { Directory, IDirectory } from './utils/fileTree.ts' - +export { UploadCancelledError } from './errors/UploadCancelledError.ts' +export { UploadFailedError } from './errors/UploadFailedError.ts' +export * from './uploader/index.ts' export { getUploader } from './getUploader.ts' -export { Upload, UploadStatus } from './uploader/Upload.ts' -export { EtaStatus, Uploader, UploaderStatus } from './uploader/index.ts' -export { getConflicts, hasConflict } from './utils/conflicts.ts' diff --git a/lib/upload/uploader/Upload.spec.ts b/lib/upload/uploader/Upload.spec.ts index 06f34a40..aaaad11f 100644 --- a/lib/upload/uploader/Upload.spec.ts +++ b/lib/upload/uploader/Upload.spec.ts @@ -3,132 +3,22 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ -import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' -import { Upload, UploadStatus } from './Upload.ts' +import type PQueue from 'p-queue' -describe('Upload', () => { - beforeEach(() => { - (window as any).OC = { - appConfig: { - files: { - max_chunk_size: 5 * 1024 * 1024, - }, - }, - } - }) - - afterEach(() => { - delete (window as any).OC - vi.useRealTimers() - }) - - it('initializes non-chunked uploads by default', () => { - const file = new File(['data'], 'document.txt') - const upload = new Upload('/remote.php/dav/files/user/document.txt', false, 1024, file) - - expect(upload.source).toBe('/remote.php/dav/files/user/document.txt') - expect(upload.file).toBe(file) - expect(upload.size).toBe(1024) - expect(upload.isChunked).toBe(false) - expect(upload.chunks).toBe(1) - expect(upload.status).toBe(UploadStatus.INITIALIZED) - expect(upload.uploaded).toBe(0) - expect(upload.startTime).toBe(0) - }) - - it('enables chunking when configured and multiple chunks are needed', () => { - const file = new File(['data'], 'video.mp4') - const upload = new Upload('/remote.php/dav/files/user/video.mp4', true, 12 * 1024 * 1024, file) - - expect(upload.isChunked).toBe(true) - expect(upload.chunks).toBe(3) - }) - - it('limits the number of chunks to 10000', () => { - const file = new File(['data'], 'huge.bin') - const upload = new Upload('/remote.php/dav/files/user/huge.bin', true, 5 * 1024 * 1024 * 20000, file) - - expect(upload.isChunked).toBe(true) - expect(upload.chunks).toBe(10000) - }) - - it('tracks upload progress and keeps first start time', () => { - vi.useFakeTimers() - vi.setSystemTime(new Date('2026-03-09T10:00:00.000Z')) - - const file = new File(['data'], 'archive.zip') - const upload = new Upload('/remote.php/dav/files/user/archive.zip', false, 100, file) - upload.uploaded = 10 - - expect(upload.status).toBe(UploadStatus.UPLOADING) - expect(upload.uploaded).toBe(10) - expect(upload.startTime).toBe(new Date('2026-03-09T10:00:00.000Z').getTime()) - - vi.setSystemTime(new Date('2026-03-09T10:00:10.000Z')) - upload.uploaded = 20 - - expect(upload.status).toBe(UploadStatus.UPLOADING) - expect(upload.uploaded).toBe(20) - expect(upload.startTime).toBe(new Date('2026-03-09T10:00:00.000Z').getTime()) - }) +import { describe, expect, it } from 'vitest' +import { Upload } from './Upload.ts' - it('marks non-chunked uploads as finished when all bytes are uploaded', () => { - const file = new File(['data'], 'photo.jpg') - const upload = new Upload('/remote.php/dav/files/user/photo.jpg', false, 10, file) +class TestUpload extends Upload { + public async start(queue: PQueue): Promise { + queue.add(() => Promise.resolve()) + } +} - upload.uploaded = 10 - - expect(upload.status).toBe(UploadStatus.FINISHED) - expect(upload.uploaded).toBe(10) - }) - - it('marks chunked uploads as assembling when all bytes are uploaded', () => { - const file = new File(['data'], 'dataset.csv') - const upload = new Upload('/remote.php/dav/files/user/dataset.csv', true, 12 * 1024 * 1024, file) - - upload.uploaded = 12 * 1024 * 1024 - - expect(upload.status).toBe(UploadStatus.ASSEMBLING) - expect(upload.uploaded).toBe(12 * 1024 * 1024) - }) - - it('stores and exposes the server response', () => { - const file = new File(['data'], 'result.txt') - const upload = new Upload('/remote.php/dav/files/user/result.txt', false, 10, file) - const response = { - status: 201, - statusText: 'Created', - headers: {}, - config: { headers: {} }, - data: { ok: true }, - } as any - - expect(upload.response).toBeNull() - upload.response = response - expect(upload.response).toBe(response) - - upload.response = null - expect(upload.response).toBeNull() - }) - - it('can update status directly', () => { - const file = new File(['data'], 'manual.txt') - const upload = new Upload('/remote.php/dav/files/user/manual.txt', false, 10, file) - - upload.status = UploadStatus.FAILED - - expect(upload.status).toBe(UploadStatus.FAILED) - }) - - it('aborts signal and marks upload as cancelled', () => { - const file = new File(['data'], 'cancelled.txt') - const upload = new Upload('/remote.php/dav/files/user/cancelled.txt', false, 10, file) - - expect(upload.signal.aborted).toBe(false) - - upload.cancel() - - expect(upload.signal.aborted).toBe(true) - expect(upload.status).toBe(UploadStatus.CANCELLED) +describe('Upload', () => { + it('cancels an upload', () => { + const a = new TestUpload() + expect(a.signal.aborted).toBe(false) + a.cancel() + expect(a.signal.aborted).toBe(true) }) }) diff --git a/lib/upload/uploader/Upload.ts b/lib/upload/uploader/Upload.ts index fa0608d3..7d3ef607 100644 --- a/lib/upload/uploader/Upload.ts +++ b/lib/upload/uploader/Upload.ts @@ -3,125 +3,104 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ -import type { AxiosResponse } from 'axios' +import type PQueue from 'p-queue' -import { getMaxChunksSize } from '../utils/config.ts' +import { TypedEventTarget } from 'typescript-event-target' export const UploadStatus = Object.freeze({ + /** The upload was initialized */ INITIALIZED: 0, - UPLOADING: 1, - ASSEMBLING: 2, - FINISHED: 3, - CANCELLED: 4, - FAILED: 5, + /** The upload was scheduled but is not yet uploading */ + SCHEDULED: 1, + /** The upload itself is running */ + UPLOADING: 2, + /** Chunks are being assembled */ + ASSEMBLING: 3, + /** The upload finished successfully */ + FINISHED: 4, + /** The upload was cancelled by the user */ + CANCELLED: 5, + /** The upload failed */ + FAILED: 6, }) -type TUploadStatus = typeof UploadStatus[keyof typeof UploadStatus] +export type TUploadStatus = typeof UploadStatus[keyof typeof UploadStatus] -export class Upload { - private _source: string - private _file: File - private _isChunked: boolean - private _chunks: number - - private _size: number - private _uploaded = 0 - private _startTime = 0 - - private _status: TUploadStatus = UploadStatus.INITIALIZED - private _controller: AbortController - private _response: AxiosResponse | null = null - - constructor(source: string, chunked = false, size: number, file: File) { - const chunks = Math.min(getMaxChunksSize() > 0 ? Math.ceil(size / getMaxChunksSize()) : 1, 10000) - this._source = source - this._isChunked = chunked && getMaxChunksSize() > 0 && chunks > 1 - this._chunks = this._isChunked ? chunks : 1 - this._size = size - this._file = file - this._controller = new AbortController() - } - - get source(): string { - return this._source - } - - get file(): File { - return this._file - } - - get isChunked(): boolean { - return this._isChunked - } - - get chunks(): number { - return this._chunks - } - - get size(): number { - return this._size - } - - get startTime(): number { - return this._startTime - } - - set response(response: AxiosResponse | null) { - this._response = response - } - - get response(): AxiosResponse | null { - return this._response - } +interface UploadEvents { + finished: CustomEvent + progress: CustomEvent +} - get uploaded(): number { - return this._uploaded - } +export interface IUpload extends TypedEventTarget { + /** + * The source of the upload + */ + readonly source: string + /** + * Whether the upload is chunked or not + */ + readonly isChunked: boolean + /** + * The total size of the upload in bytes + */ + readonly totalBytes: number + /** + * Timestamp of when the upload started. + * Will return `undefined` if the upload has not started yet. + */ + readonly startTime?: number + /** + * The number of bytes that have been uploaded so far + */ + readonly uploadedBytes: number + /** + * The current status of the upload + */ + readonly status: TUploadStatus + /** + * The internal abort signal + */ + readonly signal: AbortSignal /** - * Update the uploaded bytes of this upload + * The children of this upload. + * - For a file upload, this will be an empty array. + * - For a folder upload, this will be the uploads of the children files and folders. */ - set uploaded(length: number) { - if (length >= this._size) { - this._status = this._isChunked - ? UploadStatus.ASSEMBLING - : UploadStatus.FINISHED - this._uploaded = this._size - return - } + readonly children: IUpload[] - this._status = UploadStatus.UPLOADING - this._uploaded = length + /** + * Cancels the upload + */ + cancel(): void +} - // If first progress, let's log the start time - if (this._startTime === 0) { - this._startTime = new Date().getTime() - } - } +export abstract class Upload extends TypedEventTarget implements Partial { + #abortController = new AbortController() - get status(): TUploadStatus { - return this._status + get signal(): AbortSignal { + return this.#abortController.signal } /** - * Update this upload status + * Cancels the upload */ - set status(status: TUploadStatus) { - this._status = status + public cancel(): void { + this.#abortController.abort() } /** - * Returns the axios cancel token source + * Get the children of this upload. + * For a file upload, this will be an empty array, for a folder upload, this will be the uploads of the children files and folders. */ - get signal(): AbortSignal { - return this._controller.signal + public get children(): IUpload[] { + return [] } /** - * Cancel any ongoing requests linked to this upload + * Start the upload + * + * @param queue - The job queue. It is used to limit the number of concurrent upload jobs. */ - cancel() { - this._controller.abort() - this._status = UploadStatus.CANCELLED - } + public abstract start(queue: PQueue): Promise } diff --git a/lib/upload/uploader/UploadFile.spec.ts b/lib/upload/uploader/UploadFile.spec.ts new file mode 100644 index 00000000..957a40fd --- /dev/null +++ b/lib/upload/uploader/UploadFile.spec.ts @@ -0,0 +1,280 @@ +/* eslint-disable @typescript-eslint/no-unused-vars */ +/*! + * SPDX-FileCopyrightText: 2026 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +import axios from '@nextcloud/axios' +import { CanceledError } from 'axios' +import { describe, expect, it, vi } from 'vitest' +import { UploadStatus } from './Upload.ts' +import { UploadFile } from './UploadFile.ts' + +const isPublicShareMock = vi.hoisted(() => vi.fn()) +vi.mock('@nextcloud/sharing/public', () => ({ isPublicShare: isPublicShareMock })) + +const initChunkWorkspaceMock = vi.hoisted(() => vi.fn()) +const uploadDataMock = vi.hoisted(() => vi.fn()) +vi.mock('../utils/upload.ts', async () => ({ + ...(await vi.importActual('../utils/upload.ts')), + initChunkWorkspace: initChunkWorkspaceMock, + uploadData: uploadDataMock, +})) + +const getMaxChunksSizeMock = vi.hoisted(() => vi.fn()) +const supportsPublicChunkingMock = vi.hoisted(() => vi.fn()) +vi.mock('../utils/config.ts', () => ({ + getMaxChunksSize: getMaxChunksSizeMock, + supportsPublicChunking: supportsPublicChunkingMock, +})) + +describe('chunking', () => { + it('enables chunking for non-public shares', () => { + isPublicShareMock.mockReturnValue(false) + getMaxChunksSizeMock.mockReturnValue(1024) + const uploadFile = new UploadFile('/destination', new File(['x'.repeat(2048)], 'filename'), { noChunking: false }) + expect(uploadFile.isChunked).toBe(true) + }) + + it('enables chunking for public shares', () => { + isPublicShareMock.mockReturnValue(true) + supportsPublicChunkingMock.mockReturnValue(true) + getMaxChunksSizeMock.mockReturnValue(1024) + const uploadFile = new UploadFile('/destination', new File(['x'.repeat(2048)], 'filename'), { noChunking: false }) + expect(uploadFile.isChunked).toBe(true) + }) + + it('disables chunking if too small', () => { + isPublicShareMock.mockReturnValue(false) + getMaxChunksSizeMock.mockReturnValue(1024) + const uploadFile = new UploadFile('/destination', new File(['x'.repeat(1000)], 'filename'), { noChunking: false }) + expect(uploadFile.isChunked).toBe(false) + }) + + it('disables chunking if explicitly disabled', () => { + isPublicShareMock.mockReturnValue(false) + getMaxChunksSizeMock.mockReturnValue(1024) + const uploadFile = new UploadFile('/destination', new File(['x'.repeat(2048)], 'filename'), { noChunking: true }) + expect(uploadFile.isChunked).toBe(false) + }) + + it('disables chunking if disabled by admin', () => { + isPublicShareMock.mockReturnValue(false) + getMaxChunksSizeMock.mockReturnValue(0) + const uploadFile = new UploadFile('/destination', new File([], 'filename'), { noChunking: true }) + expect(uploadFile.isChunked).toBe(false) + }) + + it('disables chunking if not supported by public shares', () => { + isPublicShareMock.mockReturnValue(true) + supportsPublicChunkingMock.mockReturnValue(false) + getMaxChunksSizeMock.mockReturnValue(1024) + const uploadFile = new UploadFile('/destination', new File(['x'.repeat(2048)], 'filename'), { noChunking: false }) + expect(uploadFile.isChunked).toBe(false) + }) + + it.each([ + [0, 1], + [1024, 1], + [1025, 2], + [2048, 2], + [2049, 3], + ])('calculates number of chunks correctly for file size %i', async (fileSize, expectedChunks) => { + isPublicShareMock.mockReturnValue(false) + getMaxChunksSizeMock.mockReturnValue(1024) + + const uploadFile = new UploadFile('/destination', new File(['x'.repeat(fileSize)], 'filename'), { noChunking: false }) + expect(uploadFile.isChunked).toBe(expectedChunks > 1) + + const { resolve, promise } = Promise.withResolvers() + const queue = { add: vi.fn(() => resolve()) } + uploadFile.start(queue as never) + + // wait for queue to be called + await promise + expect(uploadFile.numberOfChunks).toBe(expectedChunks) + }) +}) + +describe('upload status and events', () => { + it('initialized', () => { + const uploadFile = new UploadFile('/destination', new File(['x'.repeat(2048)], 'filename'), { noChunking: false }) + expect(uploadFile.status).toBe(UploadStatus.INITIALIZED) + }) + + it('converts FileSystemFileEntry to File when starting', async () => { + isPublicShareMock.mockReturnValue(false) + getMaxChunksSizeMock.mockReturnValue(1024 * 1024) + + const fileEntry = { + file: vi.fn((resolve: (f: File) => void) => resolve(new File(['x'.repeat(1024)], 'entry.txt'))), + } as unknown as FileSystemFileEntry + + uploadDataMock.mockImplementationOnce(() => Promise.resolve()) + + const onFinish = vi.fn() + const uploadFile = new UploadFile('/destination', fileEntry, { noChunking: false }) + uploadFile.addEventListener('finished', onFinish) + + const queue = { add: vi.fn((_fn: () => Promise) => {}) } + await uploadFile.start(queue as never) + expect(fileEntry.file).toHaveBeenCalledOnce() + // run the scheduled upload + await queue.add.mock.calls[0][0]() + expect(uploadFile.status).toBe(UploadStatus.FINISHED) + expect(onFinish).toHaveBeenCalledOnce() + }) + + it('throws if start called twice', async () => { + isPublicShareMock.mockReturnValue(false) + getMaxChunksSizeMock.mockReturnValue(1024 * 1024) + + const uploadFile = new UploadFile('/destination', new File(['x'.repeat(100)], 'filename'), { noChunking: false }) + const queue = { add: vi.fn((_fn: () => Promise) => {}) } + await uploadFile.start(queue as never) + await expect(uploadFile.start(queue as never)).rejects.toThrow('Upload already started') + }) + + it('resets uploadedBytes on upload retry and emits progress', async () => { + isPublicShareMock.mockReturnValue(false) + getMaxChunksSizeMock.mockReturnValue(1024 * 1024) + + // Mock uploadData to call onUploadProgress and onUploadRetry synchronously + uploadDataMock.mockImplementationOnce((_url: string, _chunk: Blob, options: any) => { + options.onUploadProgress?.({ bytes: 100 }) + options.onUploadRetry?.() + return Promise.resolve() + }) + + const onProgress = vi.fn() + const uploadFile = new UploadFile('/destination', new File(['x'.repeat(1024)], 'filename'), { noChunking: false }) + uploadFile.addEventListener('progress', onProgress) + + const queue = { add: vi.fn((fn: () => Promise) => fn()) } + await uploadFile.start(queue as never) + // the queued function was executed immediately by our queue stub — wait for it to finish + await queue.add.mock.calls[0][0]() + expect(uploadFile.uploadedBytes).toBe(1024) + expect(onProgress).toHaveBeenCalled() + }) + + it('chunked assemble finishes when MOVE succeeds', async () => { + isPublicShareMock.mockReturnValue(false) + getMaxChunksSizeMock.mockReturnValue(1024) + // make sure chunking is enabled + initChunkWorkspaceMock.mockResolvedValue('/tmp/temporary') + // each chunk upload succeeds + uploadDataMock.mockImplementation(() => Promise.resolve()) + // axios MOVE succeeds + vi.spyOn(axios, 'request').mockResolvedValueOnce({}) + + const onFinish = vi.fn() + const uploadFile = new UploadFile('/destination', new File(['x'.repeat(4096)], 'bigfile'), { noChunking: false }) + uploadFile.addEventListener('finished', onFinish) + + // simple queue that executes tasks immediately and returns their promise + const queue = { add: vi.fn((fn: () => Promise) => fn()) } + + await uploadFile.start(queue as never) + // wait for all queued tasks to finish + await Promise.all(queue.add.mock.results.map((r) => r.value)) + + expect(uploadFile.status).toBe(UploadStatus.FINISHED) + expect(onFinish).toHaveBeenCalledOnce() + }) + + it('scheduled', async () => { + isPublicShareMock.mockReturnValue(false) + getMaxChunksSizeMock.mockReturnValue(1024) + + const uploadFile = new UploadFile('/destination', new File(['x'.repeat(1024)], 'filename'), { noChunking: false }) + const { resolve, promise } = Promise.withResolvers() + const queue = { add: vi.fn(() => resolve()) } + + uploadFile.start(queue as never) + // wait for queue to be called + await promise + expect(uploadFile.status).toBe(UploadStatus.SCHEDULED) + }) + + it('uploading', async () => { + isPublicShareMock.mockReturnValue(false) + getMaxChunksSizeMock.mockReturnValue(1024) + + const { promise: uploadDataPromise } = Promise.withResolvers() + uploadDataMock.mockImplementationOnce(() => uploadDataPromise) + + const { promise: queuePromise, resolve: queueResolve } = Promise.withResolvers() + const uploadFile = new UploadFile('/destination', new File(['x'.repeat(1024)], 'filename'), { noChunking: false }) + const queue = { add: vi.fn((fn: () => Promise) => (queueResolve(), fn())) } + // start upload and wait for queue to be called + uploadFile.start(queue as never) + await queuePromise + + expect(uploadFile.status).toBe(UploadStatus.UPLOADING) + }) + + it('finished', async () => { + isPublicShareMock.mockReturnValue(false) + getMaxChunksSizeMock.mockReturnValue(1024) + uploadDataMock.mockImplementationOnce(() => Promise.resolve()) + + const onFinish = vi.fn() + const uploadFile = new UploadFile('/destination', new File(['x'.repeat(1024)], 'filename'), { noChunking: false }) + uploadFile.addEventListener('finished', onFinish) + + const queue = { add: vi.fn((_fn: () => Promise) => {}) } + await uploadFile.start(queue as never) + await queue.add.mock.calls[0][0]() + expect(uploadFile.status).toBe(UploadStatus.FINISHED) + expect(onFinish).toHaveBeenCalledOnce() + }) + + it('cancelled by DOM', async () => { + isPublicShareMock.mockReturnValue(false) + getMaxChunksSizeMock.mockReturnValue(1024) + uploadDataMock.mockImplementationOnce(() => Promise.reject(new DOMException('Aborted', 'AbortError'))) + + const onFinish = vi.fn() + const uploadFile = new UploadFile('/destination', new File(['x'.repeat(1024)], 'filename'), { noChunking: false }) + uploadFile.addEventListener('finished', onFinish) + + const queue = { add: vi.fn((_fn: () => Promise) => {}) } + await uploadFile.start(queue as never) + await queue.add.mock.calls[0][0]().catch(() => {}) + expect(uploadFile.status).toBe(UploadStatus.CANCELLED) + expect(onFinish).toHaveBeenCalledOnce() + }) + + it('cancelled by axios', async () => { + isPublicShareMock.mockReturnValue(false) + getMaxChunksSizeMock.mockReturnValue(1024) + uploadDataMock.mockImplementationOnce(() => Promise.reject(new CanceledError())) + + const onFinish = vi.fn() + const uploadFile = new UploadFile('/destination', new File(['x'.repeat(1024)], 'filename'), { noChunking: false }) + uploadFile.addEventListener('finished', onFinish) + + const queue = { add: vi.fn((_fn: () => Promise) => {}) } + await uploadFile.start(queue as never) + await queue.add.mock.calls[0][0]().catch(() => {}) + expect(uploadFile.status).toBe(UploadStatus.CANCELLED) + expect(onFinish).toHaveBeenCalledOnce() + }) + + it('failed', async () => { + isPublicShareMock.mockReturnValue(false) + getMaxChunksSizeMock.mockReturnValue(1024) + uploadDataMock.mockImplementationOnce(() => Promise.reject(new Error('generic error'))) + + const onFinish = vi.fn() + const uploadFile = new UploadFile('/destination', new File(['x'.repeat(1024)], 'filename'), { noChunking: false }) + uploadFile.addEventListener('finished', onFinish) + + const queue = { add: vi.fn((_fn: () => Promise) => {}) } + await uploadFile.start(queue as never) + await queue.add.mock.calls[0][0]().catch(() => {}) + expect(uploadFile.status).toBe(UploadStatus.FAILED) + expect(onFinish).toHaveBeenCalledOnce() + }) +}) diff --git a/lib/upload/uploader/UploadFile.ts b/lib/upload/uploader/UploadFile.ts new file mode 100644 index 00000000..3984f30f --- /dev/null +++ b/lib/upload/uploader/UploadFile.ts @@ -0,0 +1,214 @@ +/*! + * SPDX-FileCopyrightText: 2022 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +import type PQueue from 'p-queue' +import type { IUpload, TUploadStatus } from './Upload.ts' + +import axios from '@nextcloud/axios' +import { join } from '@nextcloud/paths' +import { isPublicShare } from '@nextcloud/sharing/public' +import { UploadCancelledError } from '../errors/UploadCancelledError.ts' +import { UploadFailedError } from '../errors/UploadFailedError.ts' +import { getMaxChunksSize, supportsPublicChunking } from '../utils/config.ts' +import { getMtimeHeader, isRequestAborted } from '../utils/requests.ts' +import { getChunk, initChunkWorkspace, uploadData } from '../utils/upload.ts' +import { Upload, UploadStatus } from './Upload.ts' + +/** + * A class representing a single file to be uploaded + */ +export class UploadFile extends Upload implements IUpload { + #customHeaders: Record + #fileHandle: File | FileSystemFileEntry + #file?: File + #noChunking: boolean + + public source: string + public status: TUploadStatus = UploadStatus.INITIALIZED + public startTime?: number + public totalBytes: number = 0 + public uploadedBytes: number = -1 + public numberOfChunks: number = 1 + + constructor( + destination: string, + fileHandle: File | FileSystemFileEntry, + options: { headers?: Record, noChunking?: boolean }, + ) { + super() + const { + headers = {}, + noChunking = false, + } = options + + // exposed state + this.source = destination + this.totalBytes = 'size' in fileHandle ? fileHandle.size : -1 + + // private state + this.#fileHandle = fileHandle + this.#customHeaders = headers + this.#noChunking = noChunking + this.signal.addEventListener('abort', () => { + if (this.status !== UploadStatus.FAILED) { + this.status = UploadStatus.CANCELLED + } + }) + } + + get isChunked(): boolean { + const maxChunkSize = getMaxChunksSize('size' in this.#fileHandle ? this.#fileHandle.size : undefined) + return !this.#noChunking + && maxChunkSize > 0 + && this.totalBytes > maxChunkSize + && (!isPublicShare() || supportsPublicChunking()) + } + + async start(queue: PQueue): Promise { + if (this.status !== UploadStatus.INITIALIZED) { + throw new Error('Upload already started') + } + + this.startTime = Date.now() + this.#file = await getFile(this.#fileHandle) + this.totalBytes = this.#file.size + this.uploadedBytes = 0 + this.status = UploadStatus.SCHEDULED + + try { + if (this.isChunked) { + this.numberOfChunks = Math.ceil(this.totalBytes / getMaxChunksSize(this.totalBytes)) + await this.#uploadChunked(queue) + } else { + queue.add(() => this.#upload()) + } + } catch (error) { + this.cancel() + if (error instanceof UploadCancelledError || error instanceof UploadFailedError) { + throw error + } + this.status = UploadStatus.FAILED + throw new UploadFailedError(error) + } + } + + /** + * Internal implementation of the upload process for non-chunked uploads. + */ + async #upload() { + this.status = UploadStatus.UPLOADING + const chunk = await getChunk(this.#file!, 0, this.#file!.size) + try { + await this.#uploadChunk(chunk, this.source) + } finally { + this.dispatchTypedEvent('finished', new CustomEvent('finished', { detail: this })) + } + } + + /** + * Internal implementation of the upload process for chunked uploads. + * + * @param queue - The job queue to throttle number of concurrent chunk uploads + */ + async #uploadChunked(queue: PQueue) { + this.status = UploadStatus.UPLOADING + const temporaryUrl = await initChunkWorkspace(this.source, 5, isPublicShare(), this.#customHeaders) + + const promises: Promise[] = [] + for (let i = 0; i < this.numberOfChunks; i++) { + const chunk = await getChunk(this.#file!, i * getMaxChunksSize(this.totalBytes), (i + 1) * getMaxChunksSize(this.totalBytes)) + promises.push(queue.add(() => this.#uploadChunk(chunk, join(temporaryUrl, String(i))))) + } + this.status = UploadStatus.UPLOADING + + queue.add(async () => { + try { + await Promise.all(promises) + // Send the assemble request + this.status = UploadStatus.ASSEMBLING + await queue.add(async () => { + await axios.request({ + method: 'MOVE', + url: `${temporaryUrl}/.file`, + headers: { + ...this.#customHeaders, + ...getMtimeHeader(this.#file!), + 'OC-Total-Length': this.#file!.size, + Destination: this.source, + }, + }) + }) + this.status = UploadStatus.FINISHED + } catch (error) { + this.cancel() + if (isRequestAborted(error)) { + this.status = UploadStatus.CANCELLED + throw new UploadCancelledError(error) + } + this.status = UploadStatus.FAILED + throw new UploadFailedError(error) + } finally { + this.dispatchTypedEvent('finished', new CustomEvent('finished', { detail: this })) + } + }) + } + + /** + * Internal helper to share logic for uploading a chunk of data for both chunked and non-chunked uploads. + * + * @param chunk - The chunk to upload + * @param url - The target URL + */ + async #uploadChunk(chunk: Blob, url: string) { + try { + await uploadData( + url, + chunk, + { + signal: this.signal, + onUploadProgress: ({ bytes }) => { + // As this is only the sent bytes not the processed ones we only count 90%. + // When the upload is finished (server acknowledged the upload) the remaining 10% will be correctly set. + this.uploadedBytes += bytes * 0.9 + this.dispatchTypedEvent('progress', new CustomEvent('progress', { detail: this })) + }, + onUploadRetry: () => { + this.uploadedBytes = 0 + }, + headers: { + ...this.#customHeaders, + ...getMtimeHeader(this.#file!), + 'Content-Type': this.#file!.type, + }, + }, + ) + + // Update progress - now we set the uploaded size to 100% of the file size + this.uploadedBytes = this.totalBytes + this.status = UploadStatus.FINISHED + } catch (error) { + if (isRequestAborted(error)) { + this.status = UploadStatus.CANCELLED + throw new UploadCancelledError(error) + } + + this.status = UploadStatus.FAILED + throw new UploadFailedError(error) + } + } +} + +/** + * Converts a FileSystemFileEntry to a File if needed and returns it. + * + * @param fileHandle - The file handle + */ +async function getFile(fileHandle: File | FileSystemFileEntry): Promise { + if (fileHandle instanceof File) { + return fileHandle + } + + return await new Promise((resolve, reject) => fileHandle.file(resolve, reject)) +} diff --git a/lib/upload/uploader/UploadFileTree.spec.ts b/lib/upload/uploader/UploadFileTree.spec.ts new file mode 100644 index 00000000..d2f1c011 --- /dev/null +++ b/lib/upload/uploader/UploadFileTree.spec.ts @@ -0,0 +1,241 @@ +/* + * SPDX-FileCopyrightText: 2026 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +import type PQueue from 'p-queue' + +import { CanceledError } from 'axios' +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' +import { UploadCancelledError } from '../errors/UploadCancelledError.ts' +import { UploadFailedError } from '../errors/UploadFailedError.ts' +import { Directory } from '../utils/fileTree.ts' +import { UploadStatus } from './Upload.ts' + +const axiosRequestMock = vi.hoisted(() => vi.fn()) +const isAxiosErrorMock = vi.hoisted(() => vi.fn()) +const isCancelMock = vi.hoisted(() => vi.fn()) + +const uploadFileMocks = vi.hoisted(() => { + const instances: Array<{ + source: string + start: ReturnType + cancel: ReturnType + status: number + }> = [] + + class MockUploadFile { + public source: string + public status: number = UploadStatus.INITIALIZED + public start = vi.fn(async () => { + this.status = UploadStatus.FINISHED + }) + + public cancel = vi.fn(() => { + this.status = UploadStatus.CANCELLED + }) + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + public constructor(source: string, _file: File, _options: unknown) { + this.source = source + instances.push(this) + } + } + + return { instances, MockUploadFile } +}) + +vi.mock('@nextcloud/axios', () => ({ + default: { + request: axiosRequestMock, + }, + isCancel: isCancelMock, + isAxiosError: isAxiosErrorMock, +})) + +vi.mock('./UploadFile.ts', () => ({ + UploadFile: uploadFileMocks.MockUploadFile, +})) + +import { UploadFileTree } from './UploadFileTree.ts' + +function createQueue(): PQueue { + return { + add: vi.fn((job: () => Promise) => job()), + } as never +} + +async function createDirectoryTree(): Promise { + const root = new Directory('/destination') + const folder = new Directory('/destination/folder') + await folder.addChild(new File(['folder'], 'nested.txt', { lastModified: 1000 })) + + await root.addChildren([ + folder, + new File(['root'], 'root.txt', { lastModified: 2000 }), + ]) + + return root +} + +beforeEach(() => { + axiosRequestMock.mockReset() + isAxiosErrorMock.mockReset() + uploadFileMocks.instances.length = 0 +}) + +afterEach(() => { + vi.restoreAllMocks() +}) + +describe('UploadFileTree', () => { + it('initializes child uploads recursively and exposes a defensive children copy', async () => { + const directory = await createDirectoryTree() + const tree = new UploadFileTree('/destination', directory, {}) + + expect(tree.isChunked).toBe(false) + expect(tree.status).toBe(UploadStatus.INITIALIZED) + + const children = tree.initialize() + const snapshot = [...children] + + expect(children).toHaveLength(3) + expect(tree.children).toHaveLength(2) + expect(tree.children).not.toBe(children) + + children.pop() + expect(tree.children).toHaveLength(2) + + expect(tree.children).toHaveLength(2) + expect(tree.children[0]).toBeInstanceOf(UploadFileTree) + expect(tree.children.map((child) => child.source)).toEqual([ + '/destination/folder', + '/destination/root.txt', + ]) + expect(snapshot[0].source).toBe('/destination/folder') + expect(snapshot[1].source).toBe('/destination/root.txt') + expect(snapshot[2].source).toBe('/destination/folder/nested.txt') + }) + + it('cancels child uploads when aborted', async () => { + const directory = await createDirectoryTree() + const tree = new UploadFileTree('/destination', directory, {}) + tree.initialize() + + const child = tree.children[0] as UploadFileTree + const cancelSpy = vi.spyOn(child, 'cancel') + + expect(tree.signal.aborted).toBe(false) + tree.cancel() + + expect(tree.signal.aborted).toBe(true) + expect(tree.status).toBe(UploadStatus.CANCELLED) + expect(cancelSpy).toHaveBeenCalledOnce() + expect(child.signal.aborted).toBe(true) + }) + + it('starts once and marks the upload as finished after child uploads resolve', async () => { + axiosRequestMock.mockResolvedValue({}) + isAxiosErrorMock.mockReturnValue(false) + + const directory = await createDirectoryTree() + const tree = new UploadFileTree('/destination', directory, {}) + const onFinish = vi.fn() + tree.addEventListener('finished', onFinish) + tree.initialize() + + const queue = createQueue() + + await tree.start(queue) + + expect(axiosRequestMock).toHaveBeenCalledTimes(2) + expect(tree.status).toBe(UploadStatus.FINISHED) + expect(onFinish).toHaveBeenCalledOnce() + expect(uploadFileMocks.instances).toHaveLength(2) + expect(uploadFileMocks.instances[0].start).toHaveBeenCalledOnce() + expect(uploadFileMocks.instances[1].start).toHaveBeenCalledOnce() + + await expect(tree.start(queue)).rejects.toThrow('Upload already started') + }) + + it('renames children through the conflict callback when MKCOL reports an existing directory', async () => { + axiosRequestMock.mockRejectedValueOnce({ response: { status: 405 } }) + isAxiosErrorMock.mockReturnValue(true) + + const conflictCallback = vi.fn(async () => ({ + 'root.txt': 'root-renamed.txt', + })) + + const directory = new Directory('/destination') + await directory.addChild(new File(['root'], 'root.txt')) + + const tree = new UploadFileTree('/destination', directory, { callback: conflictCallback }) + tree.initialize() + + const queue = createQueue() + + await tree.start(queue) + + expect(conflictCallback).toHaveBeenCalledOnce() + expect(conflictCallback).toHaveBeenCalledWith(['root.txt'], '/destination') + expect(tree.children[0].source).toBe('/destination/root-renamed.txt') + expect(tree.status).toBe(UploadStatus.FINISHED) + }) + + it('cancels the upload when the conflict callback aborts it', async () => { + axiosRequestMock.mockRejectedValueOnce({ response: { status: 405 } }) + isAxiosErrorMock.mockReturnValue(true) + + const conflictCallback = vi.fn(async () => false) + const directory = new Directory('/destination') + await directory.addChild(new File(['root'], 'root.txt')) + + const tree = new UploadFileTree( + '/destination', + directory, + // @ts-expect-error -- mocked for testing purposes + { callback: conflictCallback }, + ) + tree.initialize() + + const queue = createQueue() + + await tree.start(queue) + + expect(conflictCallback).toHaveBeenCalledOnce() + expect(tree.status).toBe(UploadStatus.CANCELLED) + expect(uploadFileMocks.instances[0].start).not.toHaveBeenCalled() + }) + + it.each([ + ['request cancellation', new CanceledError()], + ['tree cancellation', new UploadCancelledError(new Error('cancelled'))], + ['tree failure', new UploadFailedError(new Error('failed'))], + ])('propagates %s from child uploads', async (_label, rejection) => { + axiosRequestMock.mockResolvedValue({}) + isAxiosErrorMock.mockReturnValue(false) + isCancelMock.mockImplementation((error: unknown) => error instanceof CanceledError) + + const directory = new Directory('/destination') + await directory.addChild(new File(['root'], 'root.txt')) + + const tree = new UploadFileTree('/destination', directory, {}) + tree.initialize() + uploadFileMocks.instances[0].start.mockRejectedValueOnce(rejection) + + const queue = createQueue() + + if (rejection instanceof CanceledError) { + await expect(tree.start(queue)).rejects.toBeInstanceOf(UploadCancelledError) + } else { + await expect(tree.start(queue)).rejects.toBe(rejection) + } + + if (rejection instanceof UploadFailedError) { + expect(tree.status).toBe(UploadStatus.FAILED) + } else { + expect(tree.status).toBe(UploadStatus.CANCELLED) + } + expect(uploadFileMocks.instances[0].cancel).toHaveBeenCalledOnce() + }) +}) diff --git a/lib/upload/uploader/UploadFileTree.ts b/lib/upload/uploader/UploadFileTree.ts new file mode 100644 index 00000000..863c8a63 --- /dev/null +++ b/lib/upload/uploader/UploadFileTree.ts @@ -0,0 +1,213 @@ +/*! + * SPDX-FileCopyrightText: 2022 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +import type PQueue from 'p-queue' +import type { IUpload, TUploadStatus } from './Upload.ts' + +import axios, { isAxiosError } from '@nextcloud/axios' +import { basename, join } from '@nextcloud/paths' +import { UploadCancelledError } from '../errors/UploadCancelledError.ts' +import { UploadFailedError } from '../errors/UploadFailedError.ts' +import { Directory as FileTree } from '../utils/fileTree.ts' +import { getMtimeHeader, isRequestAborted } from '../utils/requests.ts' +import { Upload, UploadStatus } from './Upload.ts' +import { UploadFile } from './UploadFile.ts' + +/** + * Callback type for conflict resolution when uploading a folder tree. + * + * The callback receives the nodes in the current folder and the current path to upload to, + * it should return a list of nodes that should be uploaded (e.g. after resolving conflicts by renaming or selecting which files to upload). + * In case the upload should be cancelled, it should return `false`. + * The returned mapping allowes to resolve conflicts by renaming files or folders before upload, + * the key is the original name of the node and the value is the name to upload it as. + * + * @param nodes - The nodes to upload (list of filenames) + * @param currentPath - The current path to upload to + * @return A promise that resolves to a list of nodes that should be uploaded or false if the upload should be cancelled + */ +export type ConflictsCallback = (nodes: string[], currentPath: string) => Promise> + +/** + * A class representing a single file to be uploaded + */ +export class UploadFileTree extends Upload implements IUpload { + /** Customer headers passed */ + #customHeaders: Record + /** The current file tree to upload */ + #directory: FileTree + /** Whether chunking is disabled */ + #noChunking: boolean + /** Children uploads of this parent folder upload */ + #children: (Upload & IUpload)[] = [] + /** The callback to handle conflicts */ + #conflictsCallback?: ConflictsCallback + + /** Whether we need to check for conflicts or not (newly created parent folders = no conflict resolution needed) */ + protected needConflictResolution = true + + public source: string + public status: TUploadStatus = UploadStatus.INITIALIZED + public startTime?: number + public totalBytes: number = 0 + public uploadedBytes: number = -1 + + constructor( + destination: string, + directory: FileTree, + options: { + callback?: ConflictsCallback + headers?: Record + noChunking?: boolean + }, + ) { + super() + const { + headers = {}, + noChunking = false, + } = options + + // exposed state + this.source = destination + this.#directory = directory + this.#customHeaders = headers + this.#noChunking = noChunking + this.#conflictsCallback = options.callback + + this.signal.addEventListener('abort', () => { + for (const child of this.#children) { + child.cancel() + } + if (this.status !== UploadStatus.FAILED) { + this.status = UploadStatus.CANCELLED + } + }) + } + + get isChunked(): boolean { + return false + } + + get children(): IUpload[] { + return [...this.#children] + } + + /** + * Set up all child uploads for this upload tree. + */ + initialize(): (Upload & IUpload)[] { + const grandchildren: (Upload & IUpload)[] = [] + for (const child of this.#directory.children) { + if (child instanceof FileTree) { + const upload = new UploadFileTree( + join(this.source, child.originalName), + child, + { + callback: this.#conflictsCallback, + headers: this.#customHeaders, + noChunking: this.#noChunking, + }, + ) + this.#children.push(upload) + grandchildren.push(...upload.initialize()) + } else { + const upload = new UploadFile( + join(this.source, child.name), + child, + { headers: this.#customHeaders, noChunking: this.#noChunking }, + ) + this.#children.push(upload) + } + } + return [...this.#children, ...grandchildren] + } + + async start(queue: PQueue): Promise { + if (this.status !== UploadStatus.INITIALIZED) { + throw new Error('Upload already started') + } + this.status = UploadStatus.SCHEDULED + this.startTime = Date.now() + this.uploadedBytes = 0 + + this.status = UploadStatus.UPLOADING + await this.#createDirectory(queue) + if (this.needConflictResolution && this.#conflictsCallback) { + const nodes = await this.#conflictsCallback( + this.#directory.children.map((node) => basename(node.name)), + this.source, + ) + if (nodes === false) { + this.cancel() + return + } + + for (const [originalName, newName] of Object.entries(nodes)) { + const upload = this.#children.find((child) => basename(child.source) === originalName) + if (upload) { + Object.defineProperty(upload, 'source', { value: join(this.source, newName) }) + } + } + } + + const uploads: Promise[] = [] + for (const upload of this.#children) { + uploads.push(upload.start(queue)) + // for folder tree uploads store the conflict resolution state to prevent useless requests + if (upload instanceof UploadFileTree) { + upload.needConflictResolution = this.needConflictResolution + } + } + + try { + await Promise.all(uploads) + this.status = UploadStatus.FINISHED + } catch (error) { + this.cancel() + if (isRequestAborted(error)) { + this.status = UploadStatus.CANCELLED + throw new UploadCancelledError(error) + } else if (error instanceof UploadCancelledError) { + this.status = UploadStatus.CANCELLED + throw error + } else if (error instanceof UploadFailedError) { + this.status = UploadStatus.FAILED + throw error + } + } finally { + this.dispatchTypedEvent('finished', new CustomEvent('finished', { detail: this })) + } + } + + /** + * Helper to create the directory for this tree. + * + * @param queue - The job queue + */ + async #createDirectory(queue: PQueue): Promise { + await queue.add(async () => { + try { + await axios.request({ + method: 'MKCOL', + url: this.source, + headers: { + ...this.#customHeaders, + ...getMtimeHeader(this.#directory), + }, + signal: this.signal, + }) + // MKCOL worked so this is a new directory, no conflict resolution needed + this.needConflictResolution = false + } catch (error) { + // ignore 405 Method Not Allowed as it means the directory already exists and we can continue with uploading the children + if (isAxiosError(error) && error.response?.status === 405) { + this.needConflictResolution = true + return + } + throw error + } + }) + } +} diff --git a/lib/upload/uploader/Uploader.spec.ts b/lib/upload/uploader/Uploader.spec.ts index 0960f652..90d03b5c 100644 --- a/lib/upload/uploader/Uploader.spec.ts +++ b/lib/upload/uploader/Uploader.spec.ts @@ -3,186 +3,189 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ +import type { IUpload, TUploadStatus } from './Upload.ts' + import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' import { Folder } from '../../node/folder.ts' -import { Permission } from '../../permissions.ts' -import { UploadCancelledError } from '../errors/UploadCancelledError.ts' import { UploadStatus } from './Upload.ts' import { Uploader, UploaderStatus } from './Uploader.ts' -const nextcloudAuth = vi.hoisted(() => ({ - getCurrentUser: vi.fn(() => ({ uid: 'test', displayName: 'Test', isAdmin: false })), -})) -vi.mock('@nextcloud/auth', () => nextcloudAuth) - -const nextcloudCapabilities = vi.hoisted(() => ({ - getCapabilities: vi.fn(() => ({ - files: { chunked_upload: { max_parallel_count: 2 } }, - dav: { public_shares_chunking: true }, - })), -})) -vi.mock('@nextcloud/capabilities', () => nextcloudCapabilities) - -const nextcloudAxios = vi.hoisted(() => ({ - default: { - request: vi.fn(), - }, - isCancel: vi.fn(() => false), +// Mock auth to provide a current user by default +const authMock = vi.hoisted(() => ({ + getCurrentUser: vi.fn<() => ({ uid: string }) | null>(() => ({ uid: 'tester' })), })) -vi.mock('@nextcloud/axios', () => nextcloudAxios) +vi.mock('@nextcloud/auth', () => authMock) -const dav = vi.hoisted(() => ({ - getClient: vi.fn(() => ({ - createDirectory: vi.fn(), - })), +vi.mock('../../dav/dav.ts', () => ({ defaultRemoteURL: 'https://localhost/remote.php/dav', defaultRootPath: '/files/test', })) -vi.mock('../../dav/dav.ts', () => dav) - -const uploadUtils = vi.hoisted(() => ({ - getChunk: vi.fn(async (file: File) => new Blob([file], { type: file.type || 'application/octet-stream' })), - initChunkWorkspace: vi.fn(), - uploadData: vi.fn(async (_url: string, _blob: Blob, options: any) => { - options?.onUploadProgress?.({ bytes: 50 }) - return { - status: 201, - statusText: 'Created', - headers: {}, - config: { headers: {} }, - data: { ok: true }, + +vi.mock('../../utils/logger.ts', () => ({ default: { debug: vi.fn(), info: vi.fn(), warn: vi.fn(), error: vi.fn() } })) + +// Provide simple mocks for UploadFile and UploadFileTree so we can deterministically +// simulate progress/finished events and exercise uploader logic. +vi.mock('./UploadFile.ts', () => ({ + UploadFile: class implements IUpload { + source = 'file:///test' + isChunked = false + totalBytes: number + uploadedBytes: number + status: TUploadStatus + response: any + signal = new AbortController().signal + children: IUpload[] = [] + private listeners: Record void)[]> + constructor(..._args: any[]) { + const file = _args[1] + this.listeners = {} + this.totalBytes = (file && file.size) || 0 + this.uploadedBytes = 0 + this.status = UploadStatus.INITIALIZED + this.response = undefined } - }), -})) -vi.mock('../utils/upload.ts', () => uploadUtils) -vi.mock('../utils/filesystem.ts', () => ({ - isFileSystemDirectoryEntry: vi.fn(() => false), - isFileSystemFileEntry: vi.fn(() => false), -})) + addEventListener = ((ev: string, cb: (ev?: CustomEvent) => void) => { + this.listeners[ev] = this.listeners[ev] || [] + this.listeners[ev].push(cb) + }) as any + + removeEventListener() {} + dispatchEvent = (() => true) as any + dispatchTypedEvent = (() => true) as any + cancel = vi.fn(() => { + if (this.status !== UploadStatus.FINISHED) { + this.status = UploadStatus.CANCELLED + } + }) -vi.mock('../../utils/logger.ts', () => ({ - default: { - debug: vi.fn(), - info: vi.fn(), - warn: vi.fn(), - error: vi.fn(), + start = async () => { + // simulate progress then finish + this.uploadedBytes = this.totalBytes / 2 + this.listeners.progress?.forEach((cb) => cb(new CustomEvent('progress', { detail: this }))) + this.uploadedBytes = this.totalBytes + this.status = UploadStatus.FINISHED + this.response = { status: 201 } + this.listeners.finished?.forEach((cb) => cb(new CustomEvent('finished', { detail: this }))) + } }, })) -describe('Uploader', () => { - beforeEach(() => { - (window as any).OC = { - appConfig: { - files: { - max_chunk_size: 0, - }, - }, +vi.mock('./UploadFileTree.ts', () => ({ + UploadFileTree: class implements IUpload { + source = 'file:///test' + isChunked = false + totalBytes = 0 + uploadedBytes = 0 + status: TUploadStatus = UploadStatus.FINISHED as TUploadStatus + response = { status: 201 } + signal = new AbortController().signal + children: IUpload[] = [] + private listeners: Record void)[]> = {} + constructor() {} + addEventListener = ((ev: string, cb: (ev?: CustomEvent) => void) => { + this.listeners[ev] = this.listeners[ev] || [] + this.listeners[ev].push(cb) + }) as any + + removeEventListener = (() => {}) as any + dispatchEvent = (() => true) as any + dispatchTypedEvent = (() => true) as any + cancel = vi.fn(() => { + if (this.status !== UploadStatus.FINISHED) { + this.status = UploadStatus.CANCELLED as TUploadStatus + } + }) + + initialize = () => [] + start = async () => { + this.listeners.finished?.forEach((cb) => cb(new CustomEvent('finished', { detail: this }))) } + }, +})) - nextcloudAuth.getCurrentUser.mockReturnValue({ uid: 'test', displayName: 'Test', isAdmin: false }) - nextcloudCapabilities.getCapabilities.mockReturnValue({ - files: { chunked_upload: { max_parallel_count: 2 } }, - dav: { public_shares_chunking: true }, - }) - nextcloudAxios.isCancel.mockReturnValue(false) - uploadUtils.getChunk.mockClear() - uploadUtils.uploadData.mockClear() - dav.getClient.mockClear() +describe('Uploader (current API)', () => { + beforeEach(() => { + authMock.getCurrentUser.mockReturnValue({ uid: 'tester' }) }) afterEach(() => { - delete (window as any).OC vi.restoreAllMocks() }) - it('initializes with the default destination for logged in users', () => { + it('constructs with default destination and exposes status/destination', () => { const uploader = new Uploader() - + expect(uploader.status).toBe(UploaderStatus.IDLE) expect(uploader.destination).toBeInstanceOf(Folder) - expect(uploader.destination.owner).toBe('test') - expect(uploader.root).toBe('https://localhost/remote.php/dav/files/test') - expect(uploader.info.status).toBe(UploaderStatus.IDLE) }) - it('throws when no user is logged in and uploader is not public', () => { - nextcloudAuth.getCurrentUser.mockReturnValue(null as any) - - expect(() => new Uploader(false)).toThrowError('User is not logged in') + it('throws when no user and not public', () => { + authMock.getCurrentUser.mockReturnValue(null) + expect(() => new Uploader(false)).toThrow() }) - it('uses anonymous owner in public mode', () => { - nextcloudAuth.getCurrentUser.mockReturnValue(null as any) + it('allows public mode with anonymous owner', () => { + authMock.getCurrentUser.mockReturnValue(null) const uploader = new Uploader(true) - expect(uploader.destination.owner).toBe('anonymous') }) - it('manages custom headers through a cloned public getter', () => { + it('manages custom headers and exposes a cloned map', () => { const uploader = new Uploader() - uploader.setCustomHeader('X-NC-Test', '1') - + uploader.setCustomHeader('X-Test', '1') const headers = uploader.customHeaders - headers['X-NC-Test'] = '2' - - expect(uploader.customHeaders['X-NC-Test']).toBe('1') - - uploader.deleteCustomerHeader('X-NC-Test') - expect(uploader.customHeaders['X-NC-Test']).toBeUndefined() + expect(headers.get('X-Test')).toBe('1') + uploader.deleteCustomerHeader('X-Test') + expect(uploader.customHeaders.get('X-Test')).toBeUndefined() }) - it('rejects invalid destination values', () => { + it('can pause, start and reset', async () => { const uploader = new Uploader() + const paused = new Promise((res) => uploader.addEventListener('paused', () => res())) + const resumed = new Promise((res) => uploader.addEventListener('resumed', () => res())) - expect(() => { - uploader.destination = { type: null, source: '' } as any - }).toThrowError('Invalid destination folder') + await uploader.pause() + expect(uploader.status).toBe(UploaderStatus.PAUSED) + await paused + + uploader.start() + expect(uploader.status).toBe(UploaderStatus.UPLOADING) + await resumed + + // reset should clear queue and set IDLE + uploader.reset() + expect(uploader.status).toBe(UploaderStatus.IDLE) + expect(uploader.queue).toEqual([]) }) - it('uploads files in regular mode and notifies listeners', async () => { + it('uploads a file and emits progress and finished events', async () => { const uploader = new Uploader() - uploader.setCustomHeader('X-NC-Test', 'value') - const notifier = vi.fn() - uploader.addNotifier(notifier) + const file = new File(['hello'], 'hello.txt', { type: 'text/plain' }) - const file = new File(['x'.repeat(100)], 'report.txt', { type: 'text/plain', lastModified: 1000 }) - const upload = await uploader.upload('/docs/report.txt', file) + const started = vi.fn() + const progress = vi.fn() + const finished = vi.fn() - await vi.waitFor(() => { - expect(upload.status).toBe(UploadStatus.FINISHED) - }) + uploader.addEventListener('uploadStarted', () => started()) + uploader.addEventListener('uploadProgress', () => progress()) + uploader.addEventListener('uploadFinished', () => finished()) - expect(upload.uploaded).toBe(upload.size) - expect(upload.response?.status).toBe(201) - expect(uploadUtils.getChunk).toHaveBeenCalledTimes(1) - expect(uploadUtils.uploadData).toHaveBeenCalledTimes(1) - expect(notifier).toHaveBeenCalledTimes(1) - expect(notifier).toHaveBeenCalledWith(upload) + const upload = await uploader.upload('/hello.txt', file) + // wait for upload to finish await vi.waitFor(() => { - expect(uploader.info.status).toBe(UploaderStatus.IDLE) + expect(upload.status).toBe(UploadStatus.FINISHED) }) + + expect(started).toHaveBeenCalled() + expect(progress).toHaveBeenCalled() + expect(finished).toHaveBeenCalled() }) - it('converts callback cancellation in batch upload to UploadCancelledError', async () => { - const uploader = new Uploader(false, new Folder({ - id: 1, - owner: 'test', - permissions: Permission.ALL, - root: '/files/test', - source: 'https://localhost/dav/files/test', - })) - - const notifier = vi.fn() - uploader.addNotifier(notifier) - const file = new File(['data'], 'a.txt', { type: 'text/plain' }) - - await expect(uploader.batchUpload('/uploads', [file], { - callback: async () => false, - })).rejects.toBeInstanceOf(UploadCancelledError) - - expect(dav.getClient).toHaveBeenCalledTimes(1) - expect(notifier).toHaveBeenCalledTimes(1) - expect(notifier.mock.calls[0][0].status).toBe(UploadStatus.CANCELLED) + it('performs batchUpload using UploadFileTree and initializes children', async () => { + const uploader = new Uploader() + const uploads = await uploader.batchUpload('/dir', [new File(['a'], 'a.txt')]) + expect(Array.isArray(uploads)).toBe(true) + expect(uploads.length).toBeGreaterThanOrEqual(1) }) }) diff --git a/lib/upload/uploader/Uploader.ts b/lib/upload/uploader/Uploader.ts index 05ce2b96..68116437 100644 --- a/lib/upload/uploader/Uploader.ts +++ b/lib/upload/uploader/Uploader.ts @@ -3,28 +3,23 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ -import type { AxiosError, AxiosResponse } from 'axios' -import type { WebDAVClient } from 'webdav' import type { IFolder } from '../../node/folder.ts' -import type { IDirectory } from '../utils/fileTree.ts' +import type { IUpload } from './Upload.ts' +import type { ConflictsCallback } from './UploadFileTree.ts' import { getCurrentUser } from '@nextcloud/auth' -import axios, { isCancel } from '@nextcloud/axios' -import { getCapabilities } from '@nextcloud/capabilities' -import { encodePath } from '@nextcloud/paths' import PQueue from 'p-queue' -import { normalize } from 'path' -import { defaultRemoteURL, defaultRootPath, getClient } from '../../dav/dav.ts' +import { TypedEventTarget } from 'typescript-event-target' +import { defaultRemoteURL, defaultRootPath } from '../../dav/dav.ts' import { FileType, Folder } from '../../node/index.ts' import { Permission } from '../../permissions.ts' import logger from '../../utils/logger.ts' -import { UploadCancelledError } from '../errors/UploadCancelledError.ts' -import { getMaxChunksSize } from '../utils/config.ts' -import { isFileSystemFileEntry } from '../utils/filesystem.ts' +import { getMaxChunksSize, getMaxParallelUploads } from '../utils/config.ts' import { Directory } from '../utils/fileTree.ts' -import { getChunk, initChunkWorkspace, uploadData } from '../utils/upload.ts' import { Eta } from './Eta.ts' -import { Upload, UploadStatus } from './Upload.ts' +import { UploadStatus } from './Upload.ts' +import { UploadFile } from './UploadFile.ts' +import { UploadFileTree } from './UploadFileTree.ts' export const UploaderStatus = Object.freeze({ IDLE: 0, @@ -43,7 +38,8 @@ interface BaseOptions { interface UploadOptions extends BaseOptions { /** - * The root folder where to upload + * The root folder where to upload. + * Allows to override the current root of the uploader for this upload */ root?: string @@ -55,38 +51,48 @@ interface UploadOptions extends BaseOptions { retries?: number } -interface DirectoryUploadOptions extends BaseOptions { - destination: string - directory: Directory - client: WebDAVClient +interface BatchUploadOptions extends UploadOptions { + callback?: ConflictsCallback } -interface BatchUploadOptions extends BaseOptions { - callback?: (nodes: Array, currentPath: string) => Promise | false> +interface UploaderEventsMap { + /** + * Dispatched when the uploader is paused + */ + paused: CustomEvent + /** + * Dispatched when the uploader is resumed + */ + resumed: CustomEvent + /** + * Dispatched when the uploader has finished all uploads (successfully, failed or cancelled) + */ + finished: CustomEvent + + /** + * Dispatched when a new upload has been started. + */ + uploadStarted: CustomEvent + /** + * Dispatched when an upload has made progress (e.g. a chunk has been uploaded). + */ + uploadProgress: CustomEvent + /** + * Dispatched when an upload has finished (successfully, failed or cancelled). + */ + uploadFinished: CustomEvent } -export class Uploader { - // Initialized via setter in the constructor - private _destinationFolder!: IFolder - private _isPublic: boolean - private _customHeaders: Record - - // Global upload queue - private _uploadQueue: Array = [] - private _jobQueue: PQueue = new PQueue({ - // Maximum number of concurrent uploads - // @ts-expect-error TS2339 Object has no defined properties - concurrency: getCapabilities().files?.chunked_upload?.max_parallel_count ?? 5, +export class Uploader extends TypedEventTarget { + #eta = new Eta() + #destinationFolder: IFolder + #customHeaders: Map = new Map() + #status: TUploaderStatus = UploaderStatus.IDLE + #uploadQueue: IUpload[] = [] + #jobQueue: PQueue = new PQueue({ + concurrency: getMaxParallelUploads(), }) - private _queueSize = 0 - private _queueProgress = 0 - private _queueStatus: TUploaderStatus = UploaderStatus.IDLE - - private _eta = new Eta() - - private _notifiers: Array<(upload: Upload) => void> = [] - /** * Initialize uploader * @@ -97,9 +103,7 @@ export class Uploader { isPublic = false, destinationFolder?: IFolder, ) { - this._isPublic = isPublic - this._customHeaders = {} - + super() if (!destinationFolder) { const source = `${defaultRemoteURL}${defaultRootPath}` let owner: string @@ -122,47 +126,43 @@ export class Uploader { source, }) } - this.destination = destinationFolder + this.#destinationFolder = destinationFolder logger.debug('Upload workspace initialized', { - destination: this.destination, - root: this.root, + root: this.#destinationFolder.source, isPublic, maxChunksSize: getMaxChunksSize(), }) } + public get status(): TUploaderStatus { + return this.#status + } + /** - * Get the upload destination path relative to the root folder + * Get the upload destination folder */ - get destination(): IFolder { - return this._destinationFolder + public get destination(): IFolder { + return this.#destinationFolder } /** * Set the upload destination path relative to the root folder */ - set destination(folder: IFolder) { + public set destination(folder: IFolder) { if (!folder || folder.type !== FileType.Folder || !folder.source) { throw new Error('Invalid destination folder') } logger.debug('Destination set', { folder }) - this._destinationFolder = folder - } - - /** - * Get the root folder - */ - get root() { - return this._destinationFolder.source + this.#destinationFolder = folder } /** * Get registered custom headers for uploads */ - get customHeaders(): Record { - return structuredClone(this._customHeaders) + public get customHeaders(): Map { + return structuredClone(this.#customHeaders) } /** @@ -171,8 +171,8 @@ export class Uploader { * @param name The header to set * @param value The string value */ - setCustomHeader(name: string, value: string = ''): void { - this._customHeaders[name] = value + public setCustomHeader(name: string, value: string = ''): void { + this.#customHeaders.set(name, value) } /** @@ -180,42 +180,27 @@ export class Uploader { * * @param name The header to unset */ - deleteCustomerHeader(name: string): void { - delete this._customHeaders[name] + public deleteCustomerHeader(name: string): void { + this.#customHeaders.delete(name) } /** * Get the upload queue */ - get queue(): Upload[] { - return this._uploadQueue - } - - private reset() { - // Reset the ETA - this._eta.reset() - // If there is no upload in the queue and no job in the queue - if (this._uploadQueue.length === 0 && this._jobQueue.size === 0) { - return - } - - // Reset upload queue but keep the reference - this._uploadQueue.splice(0, this._uploadQueue.length) - this._jobQueue.clear() - this._queueSize = 0 - this._queueProgress = 0 - this._queueStatus = UploaderStatus.IDLE - logger.debug('Uploader state reset') + public get queue(): IUpload[] { + return [...this.#uploadQueue] } /** - * Pause any ongoing upload(s) + * Pause the uploader. + * Already started uploads will continue, but all other (not yet started) uploads + * will be paused until `start()` is called. */ - public pause() { - this._eta.pause() - this._jobQueue.pause() - this._queueStatus = UploaderStatus.PAUSED - this.updateStats() + public async pause() { + this.#jobQueue.pause() + this.#status = UploaderStatus.PAUSED + await this.#jobQueue.onPendingZero() + this.dispatchTypedEvent('paused', new CustomEvent('paused')) logger.debug('Uploader paused') } @@ -223,70 +208,22 @@ export class Uploader { * Resume any pending upload(s) */ public start() { - this._eta.resume() - this._jobQueue.start() - this._queueStatus = UploaderStatus.UPLOADING - this.updateStats() + this.#jobQueue.start() + this.#status = UploaderStatus.UPLOADING + this.dispatchTypedEvent('resumed', new CustomEvent('resumed')) logger.debug('Uploader resumed') } - /** - * Get the estimation for the uploading time. - */ - get eta(): Eta { - return this._eta - } - - /** - * Get the upload queue stats - */ - get info() { - return { - size: this._queueSize, - progress: this._queueProgress, - status: this._queueStatus, - } - } - - private updateStats() { - const size = this._uploadQueue.map((upload) => upload.size) - .reduce((partialSum, a) => partialSum + a, 0) - const uploaded = this._uploadQueue.map((upload) => upload.uploaded) - .reduce((partialSum, a) => partialSum + a, 0) - - this._eta.update(uploaded, size) - this._queueSize = size - this._queueProgress = uploaded - - // If already paused keep it that way - if (this._queueStatus !== UploaderStatus.PAUSED) { - const pending = this._uploadQueue.find(({ status }) => ([UploadStatus.INITIALIZED, UploadStatus.UPLOADING, UploadStatus.ASSEMBLING] as number[]).includes(status)) - if (this._jobQueue.size > 0 || pending) { - this._queueStatus = UploaderStatus.UPLOADING - } else { - this.eta.reset() - this._queueStatus = UploaderStatus.IDLE - } + public reset() { + for (const upload of this.#uploadQueue) { + upload.cancel() } - } - - addNotifier(notifier: (upload: Upload) => void) { - this._notifiers.push(notifier) - } - /** - * Notify listeners of the upload completion - * - * @param upload The upload that finished - */ - private _notifyAll(upload: Upload): void { - for (const notifier of this._notifiers) { - try { - notifier(upload) - } catch (error) { - logger.warn('Error in upload notifier', { error, source: upload.source }) - } - } + this.#uploadQueue = [] + this.#jobQueue.clear() + this.#eta.reset() + this.#status = UploaderStatus.IDLE + logger.debug('Uploader reset') } /** @@ -322,155 +259,33 @@ export class Uploader { * } * ``` */ - async batchUpload( + public async batchUpload( destination: string, files: (File | FileSystemEntry)[], options?: BatchUploadOptions, - ): Promise { + ): Promise { const rootFolder = new Directory('') await rootFolder.addChildren(files) // create a meta upload to ensure all ongoing child requests are listed - const target = `${this.root.replace(/\/$/, '')}/${destination.replace(/^\//, '')}` - const upload = new Upload(target, false, 0, rootFolder) - upload.status = UploadStatus.UPLOADING - this._uploadQueue.push(upload) - - logger.debug('Starting new batch upload', { target }) - try { - // setup client with root and custom header - const client = getClient(this.root, this._customHeaders) - // Create the promise for the virtual root directory - const promise = this.uploadDirectory({ - ...options, - destination, - directory: rootFolder, - client, - }) - // await the uploads and resolve with "finished" status - const uploads = await promise - upload.status = UploadStatus.FINISHED - return uploads - } catch (error) { - if (isCancel(error) || error instanceof UploadCancelledError || (error instanceof DOMException && error.name === 'AbortError')) { - logger.info('Upload cancelled by user', { error }) - upload.status = UploadStatus.CANCELLED - throw new UploadCancelledError(error) - } else { - logger.error('Error in batch upload', { error }) - upload.status = UploadStatus.FAILED - throw error - } - } finally { - // Upload queue is cleared when all the uploading jobs are done - // Meta upload unlike real uploading does not create a job - // Removing it manually here to make sure it is remove even when no uploading happened and there was nothing to finish - this._uploadQueue.splice(this._uploadQueue.indexOf(upload), 1) - this._notifyAll(upload) - this.updateStats() - } - } - - /** - * Helper to create a directory wrapped inside an Upload class - * - * @param options - the options for the directory upload - * @param options.destination Destination where to create the directory - * @param options.directory The directory to create - * @param options.client The cached WebDAV client - */ - private async createDirectory(options: DirectoryUploadOptions): Promise { - if (!options.directory.name) { - throw new Error('Can not create empty directory') - } - - const folderPath = normalize(`${options.destination}/${options.directory.name}`).replace(/\/$/, '') - const rootPath = `${this.root.replace(/\/$/, '')}/${folderPath.replace(/^\//, '')}` - - // Add a new upload to the upload queue - const currentUpload: Upload = new Upload(rootPath, false, 0, options.directory) - if (options.signal) { - options.signal.addEventListener('abort', currentUpload.cancel) - } - this._uploadQueue.push(currentUpload) - - try { - // Add the request to the job queue -> wait for finish to resolve the promise - return await this._jobQueue.add(async () => { - currentUpload.status = UploadStatus.UPLOADING - await options.client.createDirectory(folderPath, { signal: currentUpload.signal }) - return currentUpload - }) - } catch (error) { - if (isCancel(error) || error instanceof UploadCancelledError || (error instanceof DOMException && error.name === 'AbortError')) { - currentUpload.status = UploadStatus.CANCELLED - throw new UploadCancelledError(error) - } else if (error && typeof error === 'object' && 'status' in error && error.status === 405) { - // Directory already exists, so just write into it and ignore the error - logger.debug('Directory already exists, writing into it', { directory: options.directory.name }) - currentUpload.status = UploadStatus.FINISHED - return currentUpload - } else { - // Another error happened, so abort uploading the directory - currentUpload.status = UploadStatus.FAILED - throw error - } - } finally { - // Update statistics - this._notifyAll(currentUpload) - this.updateStats() - } - } - - // Helper for uploading directories (recursively) - private async uploadDirectory(options: BatchUploadOptions & DirectoryUploadOptions): Promise { - // we use an internal abort controller to also cancel uploads if an error happened. - // So if a signal is provided we connect it to our controller. - const internalAbortController = new AbortController() - if (options.signal) { - options.signal.addEventListener('abort', () => internalAbortController.abort()) - } - - const internalOptions = { ...options, signal: internalAbortController.signal } - const folderPath = normalize(`${internalOptions.destination}/${internalOptions.directory.name}`).replace(/\/$/, '') - - // Let the user handle conflicts - const selectedForUpload = await (internalOptions.callback?.(internalOptions.directory.children, folderPath) ?? internalOptions.directory.children) - if (selectedForUpload === false) { - logger.debug('Upload canceled by user', { directory: internalOptions.directory }) - throw new UploadCancelledError('Conflict resolution cancelled by user') - } else if (selectedForUpload.length === 0 && internalOptions.directory.children.length > 0) { - logger.debug('Skipping directory, as all files were skipped by user', { directory: internalOptions.directory }) - return [] + const target = `${this.destination.source.replace(/\/$/, '')}/${destination.replace(/^\//, '')}` + const headers = Object.fromEntries(this.#customHeaders.entries()) + const upload = new UploadFileTree( + target, + rootFolder, + { ...options, headers }, + ) + if (options?.signal) { + options.signal.addEventListener('abort', upload.cancel) } - logger.debug('Start directory upload', { directory: internalOptions.directory }) - const directories: Promise[] = [] - const uploads: Promise[] = [] - try { - if (internalOptions.directory.name) { - // If not the virtual root we need to create the directory first before uploading - // Make sure the promise is listed in the final result - uploads.push(this.createDirectory(internalOptions)) - // Ensure the directory is created before uploading / creating children - await uploads.at(-1) - } - - for (const node of selectedForUpload) { - if (node instanceof Directory) { - directories.push(this.uploadDirectory({ ...internalOptions, directory: node })) - } else { - uploads.push(this.upload(`${folderPath}/${node.name}`, node, { signal: internalOptions.signal })) - } - } - - const resolvedUploads = await Promise.all(uploads) - const resolvedDirectoryUploads = await Promise.all(directories) - return [resolvedUploads, ...resolvedDirectoryUploads].flat() - } catch (e) { - // Ensure a failure cancels all other requests - internalAbortController.abort() - throw e + const uploads = [...upload.initialize(), upload] + for (const upload of uploads) { + this.#attachEventListeners(upload) } + this.#uploadQueue.push(...uploads) + this.dispatchTypedEvent('uploadStarted', new CustomEvent('uploadStarted', { detail: upload })) + await upload.start(this.#jobQueue) + return uploads } /** @@ -480,247 +295,65 @@ export class Uploader { * @param fileHandle - The file to upload * @param options - Optional parameters */ - async upload(destination: string, fileHandle: File | FileSystemFileEntry, options?: UploadOptions): Promise { - const root = options?.root ?? this.root - const destinationPath = `${root.replace(/\/$/, '')}/${destination.replace(/^\//, '')}` - - // Get the encoded source url to this object for requests purposes - const { origin } = new URL(destinationPath) - const encodedDestinationFile = origin + encodePath(destinationPath.slice(origin.length)) - - this.eta.resume() - logger.debug(`Uploading ${fileHandle.name} to ${encodedDestinationFile}`) - - // Handle file system entries by retrieving the file handle - if (isFileSystemFileEntry(fileHandle)) { - fileHandle = await new Promise((resolve, reject) => (fileHandle as FileSystemFileEntry).file(resolve, reject)) - } - // We can cast here as we handled system entries in the if above - const file = fileHandle as File - - // @ts-expect-error TS2339 Object has no defined properties - const supportsPublicChunking = getCapabilities().dav?.public_shares_chunking ?? false - const maxChunkSize = getMaxChunksSize('size' in file ? file.size : undefined) - // If manually disabled or if the file is too small - const disabledChunkUpload = (this._isPublic && !supportsPublicChunking) - || maxChunkSize === 0 - || ('size' in file && file.size < maxChunkSize) - - const upload = new Upload(destinationPath, !disabledChunkUpload, file.size, file) - this._uploadQueue.push(upload) - this.updateStats() - - // Register cancellation caller + public async upload(destination: string, fileHandle: File | FileSystemFileEntry, options?: UploadOptions): Promise { + const target = `${this.destination.source.replace(/\/$/, '')}/${destination.replace(/^\//, '')}` + const headers = Object.fromEntries(this.#customHeaders.entries()) + const upload = new UploadFile(target, fileHandle, { ...options, headers }) if (options?.signal) { options.signal.addEventListener('abort', upload.cancel) } - const retries = options?.retries ?? 5 - if (!disabledChunkUpload) { - logger.debug('Initializing chunked upload', { file, upload }) - - // Let's initialize a chunk upload - const tempUrl = await initChunkWorkspace(encodedDestinationFile, retries, this._isPublic, this._customHeaders) - const chunksQueue: Array> = [] - - // Generate chunks array - for (let chunk = 0; chunk < upload.chunks; chunk++) { - const bufferStart = chunk * maxChunkSize - // Don't go further than the file size - const bufferEnd = Math.min(bufferStart + maxChunkSize, upload.size) - // Make it a Promise function for better memory management - const blob = () => getChunk(file, bufferStart, maxChunkSize) - - // Init request queue - const request = () => { - // bytes uploaded on this chunk (as upload.uploaded tracks all chunks) - let chunkBytes = 0 - return uploadData( - `${tempUrl}/${chunk + 1}`, - blob, - { - signal: upload.signal, - destinationFile: encodedDestinationFile, - retries, - onUploadProgress: ({ bytes }) => { - // Only count 90% of bytes as the request is not yet processed by server - // we set the remaining 10% when the request finished (server responded). - const progressBytes = bytes * 0.9 - chunkBytes += progressBytes - upload.uploaded += progressBytes - this.updateStats() - }, - onUploadRetry: () => { - // Current try failed, so reset the stats for this chunk - // meaning remove the uploaded chunk bytes from stats - upload.uploaded -= chunkBytes - chunkBytes = 0 - this.updateStats() - }, - headers: { - ...this._customHeaders, - ...this._mtimeHeader(file), - 'OC-Total-Length': file.size, - 'Content-Type': 'application/octet-stream', - }, - }, - ) - // Update upload progress on chunk completion - .then(() => { - // request fully done so we uploaded the full chunk - // we first remove the intermediate chunkBytes from progress events - // and then add the real full size - upload.uploaded += bufferEnd - bufferStart - chunkBytes - this.updateStats() - }) - .catch((error) => { - if (error?.response?.status === 507) { - logger.error('Upload failed, not enough space on the server or quota exceeded. Cancelling the remaining chunks', { error, upload }) - upload.cancel() - upload.status = UploadStatus.FAILED - throw error - } - - if (!isCancel(error)) { - logger.error(`Chunk ${chunk + 1} ${bufferStart} - ${bufferEnd} uploading failed`, { error, upload }) - upload.cancel() - upload.status = UploadStatus.FAILED - } - throw error - }) - } - chunksQueue.push(this._jobQueue.add(request)) - } + this.#attachEventListeners(upload) + this.#uploadQueue.push(upload) + this.dispatchTypedEvent('uploadStarted', new CustomEvent('uploadStarted', { detail: upload })) + await upload.start(this.#jobQueue) + return upload + } - const request = async () => { - try { - // Once all chunks are sent, assemble the final file - await Promise.all(chunksQueue) - - // Assemble the chunks - upload.status = UploadStatus.ASSEMBLING - this.updateStats() - - // Send the assemble request - upload.response = await axios.request({ - method: 'MOVE', - url: `${tempUrl}/.file`, - headers: { - ...this._customHeaders, - ...this._mtimeHeader(file), - 'OC-Total-Length': file.size, - Destination: encodedDestinationFile, - }, - }) - upload.status = UploadStatus.FINISHED - this.updateStats() - - logger.debug(`Successfully uploaded ${file.name}`, { file, upload }) - return upload - } catch (error) { - // Cleaning up temp directory - axios.request({ - method: 'DELETE', - url: `${tempUrl}`, - }) - - if (isCancel(error) || error instanceof UploadCancelledError) { - upload.status = UploadStatus.CANCELLED - throw new UploadCancelledError(error) - } else { - upload.status = UploadStatus.FAILED - throw new Error('Failed to assemble the chunks together') - } - } finally { - // Notify listeners of the upload completion - this._notifyAll(upload) - } - } + /** + * Handle the progress event of an upload. + * Update the ETA and dispatch a progress event for the uploader. + * + * @param event - The progress event of an upload + */ + #onProgress(event: CustomEvent) { + const totalBytes = this.#uploadQueue.reduce((acc, upload) => acc + upload.totalBytes, 0) + const uploadedBytes = this.#uploadQueue.reduce((acc, upload) => acc + upload.uploadedBytes, 0) + this.#eta.update(uploadedBytes, totalBytes) + this.dispatchTypedEvent('uploadProgress', new CustomEvent('uploadProgress', { detail: event.detail })) + } - this._jobQueue.add(request) - } else { - logger.debug('Initializing regular upload', { file, upload }) - - // Generating upload limit - const blob = await getChunk(file, 0, upload.size) - const request = async () => { - try { - upload.response = await uploadData( - encodedDestinationFile, - blob, - { - signal: upload.signal, - onUploadProgress: ({ bytes }) => { - // As this is only the sent bytes not the processed ones we only count 90%. - // When the upload is finished (server acknowledged the upload) the remaining 10% will be correctly set. - upload.uploaded += bytes * 0.9 - this.updateStats() - }, - onUploadRetry: () => { - upload.uploaded = 0 - this.updateStats() - }, - headers: { - ...this._customHeaders, - ...this._mtimeHeader(file), - 'Content-Type': file.type, - }, - }, - ) - - // Update progress - now we set the uploaded size to 100% of the file size - upload.uploaded = upload.size - this.updateStats() - - // Resolve - logger.debug(`Successfully uploaded ${file.name}`, { file, upload }) - return upload - } catch (error) { - if (isCancel(error) || error instanceof UploadCancelledError) { - upload.status = UploadStatus.CANCELLED - throw new UploadCancelledError(error) - } - - // Attach response to the upload object - if ((error as AxiosError)?.response) { - upload.response = (error as AxiosError).response as AxiosResponse - } - - upload.status = UploadStatus.FAILED - logger.error(`Failed uploading ${file.name}`, { error, file, upload }) - throw new Error('Failed to upload the file') - } finally { - // Notify listeners of the upload completion - this._notifyAll(upload) - } - } - this._jobQueue.add(request) + /** + * Handle the finished event of an upload. + * + * 1. Update the progress + * 2. if all uploads are finished dispatch a finished event for the uploader and clear the queue + * + * @param event - The finished event of an upload + */ + async #onFinished(event: CustomEvent) { + this.#onProgress(event) + this.dispatchTypedEvent('uploadFinished', new CustomEvent('uploadFinished', { detail: event.detail })) + + const finalStates = [ + UploadStatus.FINISHED, + UploadStatus.CANCELLED, + UploadStatus.FAILED, + ] as number[] + if (this.#uploadQueue.every((upload) => finalStates.includes(upload.status))) { + await this.#jobQueue.onIdle() + this.dispatchTypedEvent('finished', new CustomEvent('finished')) + this.reset() } - - // Reset when upload queue is done - // Only when we know we're closing on the last chunks - // and/or assembling we can reset the uploader. - // Otherwise he queue might be idle for a short time - // and clear the Upload queue before we're done. - this._jobQueue.onIdle() - .then(() => this.reset()) - - // Finally return the Upload - return upload } /** - * Create modification time headers if valid value is available. - * It can be invalid on Android devices if SD cards with NTFS / FAT are used, - * as those files might use the NT epoch for time so the value will be negative. + * Attach progress listeners to an upload. * - * @param file The file to upload + * @param upload - The upload to attach listeners to */ - private _mtimeHeader(file: File): { 'X-OC-Mtime'?: number } { - const mtime = Math.floor(file.lastModified / 1000) - if (mtime > 0) { - return { 'X-OC-Mtime': mtime } - } - return {} + #attachEventListeners(upload: IUpload) { + upload.addEventListener('progress', this.#onProgress.bind(this)) + upload.addEventListener('finished', this.#onFinished.bind(this)) } } diff --git a/lib/upload/uploader/index.ts b/lib/upload/uploader/index.ts index 13de3ad6..3f4f94d2 100644 --- a/lib/upload/uploader/index.ts +++ b/lib/upload/uploader/index.ts @@ -3,14 +3,9 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ -export { - type Eta, - type EtaEventsMap, +export type { Eta } from './Eta.ts' +export type { IUpload } from './Upload.ts' - EtaStatus, -} from './Eta.ts' - -export { - Uploader, - UploaderStatus, -} from './Uploader.ts' +export { EtaStatus } from './Eta.ts' +export { UploadStatus } from './Upload.ts' +export { Uploader, UploaderStatus } from './Uploader.ts' diff --git a/lib/upload/utils/config.ts b/lib/upload/utils/config.ts index 511460a6..cddefe39 100644 --- a/lib/upload/utils/config.ts +++ b/lib/upload/utils/config.ts @@ -3,6 +3,25 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ +import { getCapabilities } from '@nextcloud/capabilities' + +/** + * Get the maximum number of parallel uploads based on the server configuration. + */ +export function getMaxParallelUploads(): number { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const capabilities = getCapabilities() as Record + return capabilities.files?.chunked_upload?.max_parallel_count ?? 5 +} + +/** + * Checks if the server supports chunking for public shares. + */ +export function supportsPublicChunking(): boolean { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + return (getCapabilities() as Record).dav?.public_shares_chunking ?? false +} + /** * Get the maximum chunk size for chunked uploads based on the server configuration and file size. * diff --git a/lib/upload/utils/conflicts.ts b/lib/upload/utils/conflicts.ts deleted file mode 100644 index 2d4e5adb..00000000 --- a/lib/upload/utils/conflicts.ts +++ /dev/null @@ -1,34 +0,0 @@ -/*! - * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors - * SPDX-License-Identifier: AGPL-3.0-or-later - */ - -import type { INode } from '../../node/index.ts' - -/** - * Check if there is a conflict between two sets of files - * - * @param files the incoming files - * @param content all the existing files in the directory - * @return true if there is a conflict - */ -export function hasConflict(files: (File | FileSystemEntry | INode)[], content: INode[]): boolean { - return getConflicts(files, content).length > 0 -} - -/** - * Get the conflicts between two sets of files - * - * @param files the incoming files - * @param content all the existing files in the directory - * @return true if there is a conflict - */ -export function getConflicts(files: T[], content: INode[]): T[] { - const contentNames = content.map((node: INode) => node.basename) - const conflicts = files.filter((node: File | FileSystemEntry | INode) => { - const name = 'basename' in node ? node.basename : node.name - return contentNames.indexOf(name) !== -1 - }) - - return conflicts -} diff --git a/lib/upload/utils/requests.spec.ts b/lib/upload/utils/requests.spec.ts new file mode 100644 index 00000000..4632d2bd --- /dev/null +++ b/lib/upload/utils/requests.spec.ts @@ -0,0 +1,35 @@ +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + * SPDX-FileCopyrightText: 2026 Nextcloud GmbH and Nextcloud contributors + */ + +import { CanceledError } from 'axios' +import { expect, test } from 'vitest' +import { getMtimeHeader, isRequestAborted } from './requests.ts' + +test('getMtimeHeader - valid mtime', () => { + const file = new File([''], 'test.txt', { lastModified: 1620000000000 }) + const headers = getMtimeHeader(file) + expect(headers).toHaveProperty('X-OC-Mtime', 1620000000) +}) + +test('getMtimeHeader - invalid mtime', () => { + const file = new File([''], 'test.txt', { lastModified: -1000 }) + const headers = getMtimeHeader(file) + expect(headers).not.toHaveProperty('X-OC-Mtime') +}) + +test('isRequestAborted - axios cancel error', () => { + const error = new CanceledError('Upload cancelled') + expect(isRequestAborted(error)).toBe(true) +}) + +test('isRequestAborted - DOMException with AbortError name', () => { + const error = new DOMException('Upload cancelled', 'AbortError') + expect(isRequestAborted(error)).toBe(true) +}) + +test('isRequestAborted - other error', () => { + const error = new Error('Some other error') + expect(isRequestAborted(error)).toBe(false) +}) diff --git a/lib/upload/utils/requests.ts b/lib/upload/utils/requests.ts new file mode 100644 index 00000000..36fe4e17 --- /dev/null +++ b/lib/upload/utils/requests.ts @@ -0,0 +1,31 @@ +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + * SPDX-FileCopyrightText: 2026 Nextcloud GmbH and Nextcloud contributors + */ + +import { isCancel } from '@nextcloud/axios' + +/** + * Create modification time headers if valid value is available. + * It can be invalid on Android devices if SD cards with NTFS / FAT are used, + * as those files might use the NT epoch for time so the value will be negative. + * + * @param file - The file to upload + */ +export function getMtimeHeader(file: File): { 'X-OC-Mtime'?: number } { + const mtime = Math.floor(file.lastModified / 1000) + if (mtime > 0) { + return { 'X-OC-Mtime': mtime } + } + return {} +} + +/** + * Check if the given error is an abort error + * + * @param error - Error to check + */ +export function isRequestAborted(error: unknown): boolean { + return isCancel(error) + || (error instanceof DOMException && error.name === 'AbortError') +}