diff --git a/docs-src/docs/migration-schema.md b/docs-src/docs/migration-schema.md index 8248c1a37b0..df2c3a19184 100644 --- a/docs-src/docs/migration-schema.md +++ b/docs-src/docs/migration-schema.md @@ -107,6 +107,13 @@ myDatabase.addCollections({ By default, the migration automatically happens when the collection is created. Calling `RxDatabase.addCollections()` returns only when the migration has finished. If you have lots of data or the migrationStrategies take a long time, it might be better to start the migration 'by hand' and show the migration-state to the user as a loading-bar. +:::warning No writes during a running migration +While a schema migration is running on a collection, writes to that collection are not allowed. +Calls that would write will throw a `COL25` error until the migration finishes. +Wait for `collection.migratePromise()` to resolve (or observe `collection.getMigrationState().$` until status is `DONE`) +before performing writes. +::: + ```javascript const messageCol = await myDatabase.addCollections({ messages: { diff --git a/orga/changelog/prevent-writes-during-migration.md b/orga/changelog/prevent-writes-during-migration.md new file mode 100644 index 00000000000..3bca9d6504c --- /dev/null +++ b/orga/changelog/prevent-writes-during-migration.md @@ -0,0 +1 @@ +- FIX writes to an `RxCollection` while a schema migration is pending or running now fail fast with a clear `COL25` error instead of racing the migration and surfacing as `RC_PUSH`. diff --git a/src/plugins/attachments/index.ts b/src/plugins/attachments/index.ts index 003be6bf47e..9c1a5e2dd49 100644 --- a/src/plugins/attachments/index.ts +++ b/src/plugins/attachments/index.ts @@ -26,6 +26,7 @@ import { assignMethodsToAttachment, ensureSchemaSupportsAttachments } from './attachments-utils.ts'; +import { isWriteAllowed } from '../../rx-collection-helper.ts'; @@ -56,6 +57,7 @@ export class RxAttachment { } remove(): Promise { + isWriteAllowed(this.doc.collection); return this.doc.collection.incrementalWriteQueue.addWrite( this.doc._data, docWriteData => { @@ -113,6 +115,7 @@ async function _putAttachmentsImpl( attachments: RxAttachmentCreator[] ): Promise { ensureSchemaSupportsAttachments(doc); + isWriteAllowed(doc.collection); if (attachments.length === 0) { return []; diff --git a/src/plugins/dev-mode/error-messages.ts b/src/plugins/dev-mode/error-messages.ts index 90ce850f5de..e4a94c8a862 100644 --- a/src/plugins/dev-mode/error-messages.ts +++ b/src/plugins/dev-mode/error-messages.ts @@ -461,6 +461,12 @@ export const ERROR_MESSAGES = { fix: 'Reduce the number of open collections or upgrade to premium.', docs: 'https://rxdb.info/premium.html?console=errors&code=COL23' }, + COL25: { + message: 'Cannot write to RxCollection while a schema migration is running. Wait for the migration to finish before writing.', + cause: 'You tried to insert/upsert/remove or modify a document while the collection is in the middle of a schema migration.', + fix: 'Await the migration (e.g. via collection.migratePromise() or by observing collection.getMigrationState().$) before writing.', + docs: 'https://rxdb.info/migration-schema.html?console=errors&code=COL25' + }, COL24: { message: 'inline _attachments must be an array of { id, type, data } objects; the map format is reserved for internal use only', cause: 'An object was passed as _attachments that is neither an array of attachment creators nor a fully-normalized internal map.', diff --git a/src/plugins/migration-schema/index.ts b/src/plugins/migration-schema/index.ts index 51fec716740..369e8e58284 100644 --- a/src/plugins/migration-schema/index.ts +++ b/src/plugins/migration-schema/index.ts @@ -36,6 +36,31 @@ export const RxDBMigrationPlugin: RxPlugin = { hooks: { preCloseRxDatabase: { after: onDatabaseClose + }, + /** + * Block writes to the new collection while a migration is pending. + * For schemas with version > 0 we optimistically set the flag, then + * release it once we know no migration is actually needed. If a + * migration is needed, the flag stays set until startMigration() + * runs to completion (or fails/cancels), which clears it. + */ + createRxCollection: { + after: (i: any) => { + const collection: RxCollection = i.collection; + if (collection.schema.version === 0) { + return; + } + collection.migrationInProgress = true; + collection.migrationNeeded() + .then((needed: boolean) => { + if (!needed) { + collection.migrationInProgress = false; + } + }) + .catch(() => { + collection.migrationInProgress = false; + }); + } } }, prototypes: { diff --git a/src/plugins/migration-schema/rx-migration-state.ts b/src/plugins/migration-schema/rx-migration-state.ts index 6b346f5c8ba..de3a69adef5 100644 --- a/src/plugins/migration-schema/rx-migration-state.ts +++ b/src/plugins/migration-schema/rx-migration-state.ts @@ -134,13 +134,24 @@ export class RxMigrationState { * is run on a different browser tab. */ async startMigration(batchSize: number = MIGRATION_DEFAULT_BATCH_SIZE): Promise { + if (this.started) { + throw newRxError('DM1'); + } + /** + * Block outside writes to the collection while the migration is running. + * The migration replication fills the new storage and concurrent writes + * could conflict with that process. + * We set the flag synchronously (before the `mustMigrate` await) so that + * any code calling `migratePromise()` and then immediately performing a + * write will reliably observe the block. + * If no migration is actually needed, the flag is cleared again below. + */ + this.collection.migrationInProgress = true; const must = await this.mustMigrate; if (!must) { + this.collection.migrationInProgress = false; return; } - if (this.started) { - throw newRxError('DM1'); - } this.started = true; @@ -228,6 +239,7 @@ export class RxMigrationState { ); } catch (err) { await oldStorageInstance.close(); + this.collection.migrationInProgress = false; await this.updateStatus(s => { s.status = 'ERROR'; s.error = errorToPlainJson(err as Error); @@ -276,6 +288,7 @@ export class RxMigrationState { } } + this.collection.migrationInProgress = false; await this.updateStatus(s => { s.status = 'DONE'; return s; @@ -503,6 +516,7 @@ export class RxMigrationState { */ public async cancel() { this.canceled = true; + this.collection.migrationInProgress = false; await Promise.all( Array.from(this.replicationStates.values()) .map(state => cancelRxStorageReplication(state)) diff --git a/src/rx-collection-helper.ts b/src/rx-collection-helper.ts index 15234c6c2bb..273d140989d 100644 --- a/src/rx-collection-helper.ts +++ b/src/rx-collection-helper.ts @@ -266,3 +266,32 @@ export function ensureRxCollectionIsNotClosed( ); } } + +/** + * Asserts that a write to the given collection is currently allowed. + * Throws if the collection is closed or if a schema migration is + * pending or running, in which cases external writes would either + * fail or conflict with the migration replication. + */ +export function isWriteAllowed( + collection: RxCollection | RxCollectionBase +) { + if (collection.closed) { + throw newRxError( + 'COL21', + { + collection: collection.name, + version: collection.schema.version + } + ); + } + if (collection.migrationInProgress) { + throw newRxError( + 'COL25', + { + collection: collection.name, + version: collection.schema.version + } + ); + } +} diff --git a/src/rx-collection.ts b/src/rx-collection.ts index 0fff34388b1..d8fb5bf90dd 100644 --- a/src/rx-collection.ts +++ b/src/rx-collection.ts @@ -21,7 +21,8 @@ import { normalizeInlineAttachments, createRxCollectionStorageInstance, removeCollectionStorages, - ensureRxCollectionIsNotClosed + ensureRxCollectionIsNotClosed, + isWriteAllowed } from './rx-collection-helper.ts'; import { createRxQuery, @@ -229,6 +230,13 @@ export class RxCollectionBase< public onClose: (() => MaybePromise)[] = []; public closed = false; + /** + * Set to true while a schema migration is running for this collection. + * Writes are blocked while this is true to ensure the migration + * replication can fill the new storage without external interference. + */ + public migrationInProgress = false; + public onRemove: (() => MaybePromise)[] = []; public async prepare(): Promise { @@ -378,7 +386,7 @@ export class RxCollectionBase< async insert( json: RxDocumentType | RxDocument ): Promise> { - ensureRxCollectionIsNotClosed(this); + isWriteAllowed(this); const writeResult = await this.bulkInsert([json as any]); const isError = writeResult.error[0]; @@ -410,7 +418,7 @@ export class RxCollectionBase< success: RxDocument[]; error: RxStorageWriteError[]; }> { - ensureRxCollectionIsNotClosed(this); + isWriteAllowed(this); /** * Optimization shortcut, * do nothing when called with an empty array @@ -559,7 +567,7 @@ export class RxCollectionBase< success: RxDocument[]; error: RxStorageWriteError[]; }> { - ensureRxCollectionIsNotClosed(this); + isWriteAllowed(this); const primaryPath = this.schema.primaryPath; /** * Optimization shortcut, @@ -648,7 +656,7 @@ export class RxCollectionBase< success: RxDocument[]; error: RxStorageWriteError[]; }> { - ensureRxCollectionIsNotClosed(this); + isWriteAllowed(this); const insertData: RxDocumentType[] = []; const useJsonByDocId: Map = new Map(); @@ -738,7 +746,7 @@ export class RxCollectionBase< * same as insert but overwrites existing document with same primary */ async upsert(json: Partial, options?: UpsertOptions): Promise> { - ensureRxCollectionIsNotClosed(this); + isWriteAllowed(this); const bulkResult = await this.bulkUpsert([json], options); throwIfIsStorageWriteError( this.asRxCollection, @@ -753,7 +761,7 @@ export class RxCollectionBase< * upserts to a RxDocument, uses incrementalModify if document already exists */ incrementalUpsert(json: Partial, options?: UpsertOptions): Promise> { - ensureRxCollectionIsNotClosed(this); + isWriteAllowed(this); const useJson = fillObjectDataBeforeInsert(this.schema, json); const primary: string = useJson[this.schema.primaryPath] as any; if (!primary) { diff --git a/src/rx-document.ts b/src/rx-document.ts index b92a6c3f5f5..70c3e27f24e 100644 --- a/src/rx-document.ts +++ b/src/rx-document.ts @@ -42,6 +42,7 @@ import { overwritable } from './overwritable.ts'; import { getSchemaByObjectPath } from './rx-schema-helper.ts'; import { getWrittenDocumentsFromBulkWriteResponse, throwIfIsStorageWriteError } from './rx-storage-helper.ts'; import { modifierFromPublicToInternal } from './incremental-write.ts'; +import { isWriteAllowed } from './rx-collection-helper.ts'; export const basePrototype = { get primaryPath() { @@ -325,6 +326,7 @@ export const basePrototype = { // used by some plugins that wrap the method _context?: string ): Promise { + isWriteAllowed(this.collection); return this.collection.incrementalWriteQueue.addWrite( this._data, modifierFromPublicToInternal(mutationFunction) @@ -371,6 +373,7 @@ export const basePrototype = { newData: RxDocumentWriteData, oldData: RxDocumentData ): Promise> { + isWriteAllowed(this.collection); newData = flatClone(newData); // deleted documents cannot be changed diff --git a/test/unit/migration-schema.test.ts b/test/unit/migration-schema.test.ts index d9025a0319e..49482325d60 100644 --- a/test/unit/migration-schema.test.ts +++ b/test/unit/migration-schema.test.ts @@ -418,6 +418,134 @@ describe('migration-schema.test.ts', function () { }); }); }); + describe('writes during migration', () => { + it('should block writes while a migration is running and allow them again after it finishes', async () => { + const col = await humansCollection.createMigrationCollection( + isFastMode() ? 3 : 10, + { + 3: async (doc: any) => { + await promiseWait(20); + doc.age = parseInt(doc.age, 10); + return doc; + } + } + ); + + const migrationDone = col.migratePromise(1); + // wait until the migration flag is actually set + await waitUntil(() => (col as any).migrationInProgress === true); + + await assertThrows( + () => col.insert(schemaObjects.simpleHumanAge() as any), + 'RxError', + 'COL25' + ); + await assertThrows( + () => col.bulkInsert([schemaObjects.simpleHumanAge() as any]), + 'RxError', + 'COL25' + ); + await assertThrows( + () => col.upsert(schemaObjects.simpleHumanAge() as any), + 'RxError', + 'COL25' + ); + await assertThrows( + () => col.bulkUpsert([schemaObjects.simpleHumanAge() as any]), + 'RxError', + 'COL25' + ); + await assertThrows( + () => col.incrementalUpsert(schemaObjects.simpleHumanAge() as any), + 'RxError', + 'COL25' + ); + await assertThrows( + () => col.bulkRemove(['nonexistent']), + 'RxError', + 'COL25' + ); + + // reads must still work + await col.find().exec(); + + await migrationDone; + assert.strictEqual((col as any).migrationInProgress, false); + + // now writes work again (use V3 data since migration converted age to number) + await col.insert(schemaObjects.simpleHumanV3Data()); + + await col.database.close(); + }); + + // Previously a bulkInsert called directly after migratePromise + // could race with the migration replication and surface as a + // confusing RC_PUSH error. Now the write must fail fast with COL25. + it('should block bulkInsert that races with a starting migration (no waitUntil)', async () => { + const col = await humansCollection.createMigrationCollection( + isFastMode() ? 3 : 10, + { + 3: async (doc: any) => { + await promiseWait(20); + doc.age = parseInt(doc.age, 10); + return doc; + } + } + ); + + const migrationDone = col.migratePromise(1); + + await assertThrows( + () => col.bulkInsert([schemaObjects.simpleHumanAge() as any]), + 'RxError', + 'COL25' + ); + + await migrationDone; + await col.database.close(); + }); + + it('should block writes when a migration is needed but not yet started', async () => { + // collection is created with autoMigrate=false and old data is present, + // so the migration is required but startMigration() has not been called. + const col = await humansCollection.createMigrationCollection( + isFastMode() ? 3 : 5, + { + 3: (doc: any) => { + doc.age = parseInt(doc.age, 10); + return doc; + } + } + ); + + await assertThrows( + () => col.insert(schemaObjects.simpleHumanAge() as any), + 'RxError', + 'COL25' + ); + + await col.migratePromise(); + assert.strictEqual((col as any).migrationInProgress, false); + + // writes are allowed after the migration completes (use V3 data since age is now a number) + await col.insert(schemaObjects.simpleHumanV3Data()); + + await col.database.close(); + }); + + it('should reset migrationInProgress on migration error', async () => { + const col = await humansCollection.createMigrationCollection(3, { + 3: () => { + throw new Error('migration-failed-on-purpose'); + } + }); + let failed = false; + await col.migratePromise().catch(() => failed = true); + assert.ok(failed); + assert.strictEqual((col as any).migrationInProgress, false); + await col.database.close(); + }); + }); }); describeParallel('integration into collection', () => { describe('run', () => {