Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs-src/docs/migration-schema.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@ myDatabase.addCollections({
By default, the migration automatically happens when the collection is created. Calling `RxDatabase.addCollections()` returns only when the migration has finished.
If you have lots of data or the migrationStrategies take a long time, it might be better to start the migration 'by hand' and show the migration-state to the user as a loading-bar.

:::warning No writes during a running migration
While a schema migration is running on a collection, writes to that collection are not allowed.
Calls that would write will throw a `COL25` error until the migration finishes.
Wait for `collection.migratePromise()` to resolve (or observe `collection.getMigrationState().$` until status is `DONE`)
before performing writes.
:::

```javascript
const messageCol = await myDatabase.addCollections({
messages: {
Expand Down
1 change: 1 addition & 0 deletions orga/changelog/prevent-writes-during-migration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- FIX writes to an `RxCollection` while a schema migration is pending or running now fail fast with a clear `COL25` error instead of racing the migration and surfacing as `RC_PUSH`.
3 changes: 3 additions & 0 deletions src/plugins/attachments/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import {
assignMethodsToAttachment,
ensureSchemaSupportsAttachments
} from './attachments-utils.ts';
import { isWriteAllowed } from '../../rx-collection-helper.ts';



Expand Down Expand Up @@ -56,6 +57,7 @@ export class RxAttachment {
}

remove(): Promise<void> {
isWriteAllowed(this.doc.collection);
return this.doc.collection.incrementalWriteQueue.addWrite(
this.doc._data,
docWriteData => {
Expand Down Expand Up @@ -113,6 +115,7 @@ async function _putAttachmentsImpl<RxDocType>(
attachments: RxAttachmentCreator[]
): Promise<RxAttachment[]> {
ensureSchemaSupportsAttachments(doc);
isWriteAllowed(doc.collection);

if (attachments.length === 0) {
return [];
Expand Down
6 changes: 6 additions & 0 deletions src/plugins/dev-mode/error-messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,12 @@ export const ERROR_MESSAGES = {
fix: 'Reduce the number of open collections or upgrade to premium.',
docs: 'https://rxdb.info/premium.html?console=errors&code=COL23'
},
COL25: {
message: 'Cannot write to RxCollection while a schema migration is running. Wait for the migration to finish before writing.',
cause: 'You tried to insert/upsert/remove or modify a document while the collection is in the middle of a schema migration.',
fix: 'Await the migration (e.g. via collection.migratePromise() or by observing collection.getMigrationState().$) before writing.',
docs: 'https://rxdb.info/migration-schema.html?console=errors&code=COL25'
},
COL24: {
message: 'inline _attachments must be an array of { id, type, data } objects; the map format is reserved for internal use only',
cause: 'An object was passed as _attachments that is neither an array of attachment creators nor a fully-normalized internal map.',
Expand Down
25 changes: 25 additions & 0 deletions src/plugins/migration-schema/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,31 @@ export const RxDBMigrationPlugin: RxPlugin = {
hooks: {
preCloseRxDatabase: {
after: onDatabaseClose
},
/**
* Block writes to the new collection while a migration is pending.
* For schemas with version > 0 we optimistically set the flag, then
* release it once we know no migration is actually needed. If a
* migration is needed, the flag stays set until startMigration()
* runs to completion (or fails/cancels), which clears it.
*/
createRxCollection: {
after: (i: any) => {
const collection: RxCollection = i.collection;
if (collection.schema.version === 0) {
return;
}
collection.migrationInProgress = true;
collection.migrationNeeded()
.then((needed: boolean) => {
if (!needed) {
collection.migrationInProgress = false;
}
})
.catch(() => {
collection.migrationInProgress = false;
});
}
}
},
prototypes: {
Expand Down
20 changes: 17 additions & 3 deletions src/plugins/migration-schema/rx-migration-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,24 @@ export class RxMigrationState {
* is run on a different browser tab.
*/
async startMigration(batchSize: number = MIGRATION_DEFAULT_BATCH_SIZE): Promise<void> {
if (this.started) {
throw newRxError('DM1');
}
/**
* Block outside writes to the collection while the migration is running.
* The migration replication fills the new storage and concurrent writes
* could conflict with that process.
* We set the flag synchronously (before the `mustMigrate` await) so that
* any code calling `migratePromise()` and then immediately performing a
* write will reliably observe the block.
* If no migration is actually needed, the flag is cleared again below.
*/
this.collection.migrationInProgress = true;
const must = await this.mustMigrate;
if (!must) {
this.collection.migrationInProgress = false;
return;
}
if (this.started) {
throw newRxError('DM1');
}
this.started = true;


Expand Down Expand Up @@ -228,6 +239,7 @@ export class RxMigrationState {
);
} catch (err) {
await oldStorageInstance.close();
this.collection.migrationInProgress = false;
await this.updateStatus(s => {
s.status = 'ERROR';
s.error = errorToPlainJson(err as Error);
Expand Down Expand Up @@ -276,6 +288,7 @@ export class RxMigrationState {
}
}

this.collection.migrationInProgress = false;
await this.updateStatus(s => {
s.status = 'DONE';
return s;
Expand Down Expand Up @@ -503,6 +516,7 @@ export class RxMigrationState {
*/
public async cancel() {
this.canceled = true;
this.collection.migrationInProgress = false;
await Promise.all(
Array.from(this.replicationStates.values())
.map(state => cancelRxStorageReplication(state))
Expand Down
29 changes: 29 additions & 0 deletions src/rx-collection-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -266,3 +266,32 @@ export function ensureRxCollectionIsNotClosed(
);
}
}

/**
* Asserts that a write to the given collection is currently allowed.
* Throws if the collection is closed or if a schema migration is
* pending or running, in which cases external writes would either
* fail or conflict with the migration replication.
*/
export function isWriteAllowed(
collection: RxCollection | RxCollectionBase<any, any, any, any, any>
) {
if (collection.closed) {
throw newRxError(
'COL21',
{
collection: collection.name,
version: collection.schema.version
}
);
}
if (collection.migrationInProgress) {
throw newRxError(
'COL25',
{
collection: collection.name,
version: collection.schema.version
}
);
}
}
22 changes: 15 additions & 7 deletions src/rx-collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import {
normalizeInlineAttachments,
createRxCollectionStorageInstance,
removeCollectionStorages,
ensureRxCollectionIsNotClosed
ensureRxCollectionIsNotClosed,
isWriteAllowed
} from './rx-collection-helper.ts';
import {
createRxQuery,
Expand Down Expand Up @@ -229,6 +230,13 @@ export class RxCollectionBase<
public onClose: (() => MaybePromise<any>)[] = [];
public closed = false;

/**
* Set to true while a schema migration is running for this collection.
* Writes are blocked while this is true to ensure the migration
* replication can fill the new storage without external interference.
*/
public migrationInProgress = false;

public onRemove: (() => MaybePromise<any>)[] = [];

public async prepare(): Promise<void> {
Expand Down Expand Up @@ -378,7 +386,7 @@ export class RxCollectionBase<
async insert(
json: RxDocumentType | RxDocument
): Promise<RxDocument<RxDocumentType, OrmMethods, Reactivity>> {
ensureRxCollectionIsNotClosed(this);
isWriteAllowed(this);
const writeResult = await this.bulkInsert([json as any]);

const isError = writeResult.error[0];
Expand Down Expand Up @@ -410,7 +418,7 @@ export class RxCollectionBase<
success: RxDocument<RxDocumentType, OrmMethods, Reactivity>[];
error: RxStorageWriteError<RxDocumentType>[];
}> {
ensureRxCollectionIsNotClosed(this);
isWriteAllowed(this);
/**
* Optimization shortcut,
* do nothing when called with an empty array
Expand Down Expand Up @@ -559,7 +567,7 @@ export class RxCollectionBase<
success: RxDocument<RxDocumentType, OrmMethods, Reactivity>[];
error: RxStorageWriteError<RxDocumentType>[];
}> {
ensureRxCollectionIsNotClosed(this);
isWriteAllowed(this);
const primaryPath = this.schema.primaryPath;
/**
* Optimization shortcut,
Expand Down Expand Up @@ -648,7 +656,7 @@ export class RxCollectionBase<
success: RxDocument<RxDocumentType, OrmMethods, Reactivity>[];
error: RxStorageWriteError<RxDocumentType>[];
}> {
ensureRxCollectionIsNotClosed(this);
isWriteAllowed(this);
const insertData: RxDocumentType[] = [];
const useJsonByDocId: Map<string, RxDocumentType> = new Map();

Expand Down Expand Up @@ -738,7 +746,7 @@ export class RxCollectionBase<
* same as insert but overwrites existing document with same primary
*/
async upsert(json: Partial<RxDocumentType>, options?: UpsertOptions): Promise<RxDocument<RxDocumentType, OrmMethods, Reactivity>> {
ensureRxCollectionIsNotClosed(this);
isWriteAllowed(this);
const bulkResult = await this.bulkUpsert([json], options);
throwIfIsStorageWriteError<RxDocumentType>(
this.asRxCollection,
Expand All @@ -753,7 +761,7 @@ export class RxCollectionBase<
* upserts to a RxDocument, uses incrementalModify if document already exists
*/
incrementalUpsert(json: Partial<RxDocumentType>, options?: UpsertOptions): Promise<RxDocument<RxDocumentType, OrmMethods, Reactivity>> {
ensureRxCollectionIsNotClosed(this);
isWriteAllowed(this);
const useJson = fillObjectDataBeforeInsert(this.schema, json);
const primary: string = useJson[this.schema.primaryPath] as any;
if (!primary) {
Expand Down
3 changes: 3 additions & 0 deletions src/rx-document.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import { overwritable } from './overwritable.ts';
import { getSchemaByObjectPath } from './rx-schema-helper.ts';
import { getWrittenDocumentsFromBulkWriteResponse, throwIfIsStorageWriteError } from './rx-storage-helper.ts';
import { modifierFromPublicToInternal } from './incremental-write.ts';
import { isWriteAllowed } from './rx-collection-helper.ts';

export const basePrototype = {
get primaryPath() {
Expand Down Expand Up @@ -325,6 +326,7 @@ export const basePrototype = {
// used by some plugins that wrap the method
_context?: string
): Promise<RxDocument> {
isWriteAllowed(this.collection);
return this.collection.incrementalWriteQueue.addWrite(
this._data,
modifierFromPublicToInternal(mutationFunction)
Expand Down Expand Up @@ -371,6 +373,7 @@ export const basePrototype = {
newData: RxDocumentWriteData<RxDocType>,
oldData: RxDocumentData<RxDocType>
): Promise<RxDocument<RxDocType>> {
isWriteAllowed(this.collection);
newData = flatClone(newData);

// deleted documents cannot be changed
Expand Down
Loading
Loading