Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/twelve-keys-hunt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"monarch-orm": minor
---

Add searchIndexes support. Use `schema.searchIndexes()` for defining Atlas Search Indexes
63 changes: 41 additions & 22 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Collection } from "./collection/collection";
import { applyIndexes } from "./schema/indexes";
import type { AnySchema, Schemas } from "./schema/schema";
import { Schema } from "./schema/schema";
import { applySearchIndexes } from "./schema/search-indexes";
import { getValidator, type SchemaValidation, type Validator } from "./schema/validation";
import type { DbCollections } from "./type-helpers";
import { createAsyncLimiter, createAsyncResolver, type AsyncResolver } from "./utils/misc";
Expand Down Expand Up @@ -92,16 +93,22 @@ export class Database<TSchemas extends Schemas<any, any>> {
/**
* Creates collections with indexes and document validation if provided.
*
* @param options - Init options
* @param options - Init options. Pass `true` to run all steps, or an object to selectively enable steps.
* @param collections - Select collections. Omit to initialize all collections, or an object to selectively enable collections.
*/
public async initialize(options?: InitOptions<keyof TSchemas["schemas"] & string>): Promise<void> {
public async initialize(
options?: InitOptions | true,
collections?: InitCollections<keyof TSchemas["schemas"] & string>,
): Promise<void> {
const promises: Promise<void>[] = [];
const collections = Object.values(this.collections).map((c: Collection<any, any>): CollectionInit => {
const resolver = createAsyncResolver();
promises.push(resolver.promise);
return { schema: c.schema, defaultValidation: this.options?.validation, resolver };
});
initializeCollections(this.db, collections, options);
const collectionInits = (Object.values(this.collections) as Collection<any, any>[])
.filter((c) => !collections || collections[c.schema.name] === true)
.map((c): CollectionInit => {
const resolver = createAsyncResolver();
promises.push(resolver.promise);
return { schema: c.schema, defaultValidation: this.options?.validation, resolver };
});
initializeCollections(this.db, collectionInits, options);
return Promise.all(promises).then<void>(() => undefined);
}

Expand Down Expand Up @@ -145,33 +152,40 @@ export function createDatabase<T extends Schemas<any, any>>(
return new Database(db, schemas, options);
}

type InitOptions<T extends string> = {
indexes?: boolean;
validation?: boolean;
collections?: Partial<Record<T, true>>;
/**
* Initialization options. When provided, only fields explicitly set to `true` will run.
* When omitted, all initialization steps run.
*/
export type InitOptions = {
/** Create or update schema indexes. */
indexes?: true;
/** Create or update search indexes. */
searchIndexes?: true;
/** Apply document validation rules. */
validation?: true;
};

/**
* Limit initialization to specific collections.
* When provided, only collections with a `true` value will be initialized.
*/
type InitCollections<T extends string> = { [K in T]?: true };

type CollectionInit = {
schema: AnySchema;
defaultValidation?: SchemaValidation;
resolver: AsyncResolver;
};

function initializeCollections(db: Db, collections: CollectionInit[], options?: InitOptions<any>) {
function initializeCollections(db: Db, collections: CollectionInit[], options?: InitOptions | true) {
const opts = options === true ? undefined : options;
const run = createAsyncLimiter(10);
const existingPromise = db
.listCollections({}, { nameOnly: true })
.toArray()
.then((colls) => new Set(colls.map((c) => c.name)));

for (const c of collections) {
// Skip disabled collections
const enabled = options?.collections ? options.collections[c.schema.name] === true : true;
if (!enabled) {
c.resolver.resolve();
continue;
}

run(async () => {
const existing = await existingPromise;
const exists = existing.has(c.schema.name);
Expand All @@ -180,7 +194,7 @@ function initializeCollections(db: Db, collections: CollectionInit[], options?:
// Get schema validation
let validation: (SchemaValidation & { validator: Validator }) | undefined;
const validationOptions = schemaOptions.validation ?? c.defaultValidation;
if ((options?.validation ?? true) && validationOptions) {
if ((opts === undefined || opts.validation) && validationOptions) {
validation = { ...validationOptions, validator: getValidator(c.schema) };
}

Expand All @@ -194,9 +208,14 @@ function initializeCollections(db: Db, collections: CollectionInit[], options?:
}

// Create schema indexes
if ((options?.indexes ?? true) && schemaOptions.indexes) {
if ((opts === undefined || opts.indexes) && schemaOptions.indexes) {
await applyIndexes(coll, schemaOptions.indexes);
}

// Create schema search indexes
if ((opts === undefined || opts.searchIndexes) && schemaOptions.searchIndexes) {
await applySearchIndexes(coll, schemaOptions.searchIndexes);
}
})
.then(c.resolver.resolve)
.catch(c.resolver.reject);
Expand Down
9 changes: 8 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export type { ReplaceOneQueryOptions } from "./collection/query/replace-one";
export type { UpdateManyQueryOptions } from "./collection/query/update-many";
export type { UpdateOneQueryOptions } from "./collection/query/update-one";
export type { PipelineStage } from "./collection/types/pipeline-stage";
export { createClient, createDatabase, Database, type DatabaseOptions } from "./database";
export { createClient, createDatabase, Database, type DatabaseOptions, type InitOptions } from "./database";
export { MonarchError, MonarchParseError } from "./errors";
export {
mergeRelations,
Expand All @@ -46,7 +46,14 @@ export {
type RelationsFn,
type SchemasRelations,
} from "./relations/relations";
export type { CreateIndexesOptions, SchemaIndex } from "./schema/indexes";
export { createSchema, defineSchemas, mergeSchemas, Schema, Schemas } from "./schema/schema";
export type {
SchemaSearchIndex,
SchemaSearchIndexDefinition,
SearchIndexDefinition,
VectorSearchIndexDefinition,
} from "./schema/search-indexes";
export type {
Condition,
CreateIndexKey,
Expand Down
10 changes: 6 additions & 4 deletions src/schema/indexes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import { MonarchError } from "../errors";
import type { AnyMonarchType } from "../types/type";
import type { CreateIndexKey } from "./type-helpers";

export type { CreateIndexesOptions };

export type CreateIndex<T extends Record<string, AnyMonarchType>> = (
key: CreateIndexKey<T>,
options?: CreateIndexesOptions,
Expand All @@ -28,15 +30,15 @@ export type SchemaIndexes<T extends Record<string, AnyMonarchType>> = (options:
[k: string]: SchemaIndex<T>;
};

export function makeIndexes<T extends Record<string, AnyMonarchType>>(indexesFn: SchemaIndexes<T>) {
return indexesFn({
export function makeIndexes<T extends Record<string, AnyMonarchType>>(fn: SchemaIndexes<T>) {
return fn({
createIndex: (key, options) => ({ key, options }),
unique: (key) => ({ key: { [key]: 1 } as CreateIndexKey<T>, options: { unique: true } }),
});
}

export async function applyIndexes(coll: MongoCollection, indexesFn: SchemaIndexes<any>) {
const indexes = Object.entries(makeIndexes(indexesFn));
export async function applyIndexes(coll: MongoCollection, fn: SchemaIndexes<any>) {
const indexes = Object.entries(makeIndexes(fn));
const desiredIndexes = new Map(indexes.map(([_, idx]) => [JSON.stringify(idx.key), idx.options ?? {}]));
const existingIndexes = await coll.indexes();
const indexesToDrop: string[] = [];
Expand Down
30 changes: 29 additions & 1 deletion src/schema/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { MonarchObjectId, objectId } from "../types/objectId";
import { MonarchNullable, MonarchOptional, MonarchType, type AnyMonarchType } from "../types/type";
import type { MergeAll, MergeN1All, Pretty, RequiredObject } from "../utils/type-helpers";
import type { SchemaIndexes } from "./indexes";
import type { SchemaSearchIndexes } from "./search-indexes";
import type {
InferSchemaData,
InferSchemaInput,
Expand Down Expand Up @@ -52,6 +53,7 @@ export class Schema<
private options: {
omit?: SchemaOmit<TTypes>;
indexes?: SchemaIndexes<TTypes>;
searchIndexes?: SchemaSearchIndexes<TTypes>;
validation?: SchemaValidation;
virtuals?: SchemaVirtuals<TTypes, TVirtuals>;
renames?: TRenames;
Expand Down Expand Up @@ -104,7 +106,6 @@ export class Schema<
* This method allows you to specify indexes that should be created for the schema.
*
* @param indexes - A function that defines the indexes to be created.
*
* @returns The current schema instance for method chaining.
*
* @example
Expand All @@ -121,6 +122,33 @@ export class Schema<
return this;
}

/**
* Defines the search indexes for the schema.
*
* Search indexes are only supported on MongoDB Atlas clusters and are applied during initialization.
*
* @param searchIndexes - A function that defines the search indexes to be created.
* @returns The current schema instance for method chaining.
*
* @example
* const articleSchema = createSchema("articles", {
* title: string(),
* body: string(),
* embedding: array(number()),
* }).searchIndexes(({ searchIndex, vectorSearchIndex }) => ({
* fullText: searchIndex("articles_search", {
* mappings: { dynamic: false, fields: { title: { type: "string" }, body: { type: "string" } } },
* }),
* semantic: vectorSearchIndex("articles_vector", {
* fields: [{ type: "vector", path: "embedding", numDimensions: 1536, similarity: "cosine" }],
* }),
* }));
*/
public searchIndexes(searchIndexes: SchemaSearchIndexes<TTypes>) {
this.options.searchIndexes = searchIndexes;
return this;
}

/**
* Sets MongoDB document validation for this schema.
*
Expand Down
94 changes: 94 additions & 0 deletions src/schema/search-indexes.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import type { Document, Collection as MongoCollection } from "mongodb";
import { MonarchError } from "../errors";
import type { AnyMonarchType } from "../types/type";
import type { CreateIndexKey } from "./type-helpers";

export type SearchIndexDefinition<T extends Record<string, AnyMonarchType>> = {
mappings?: {
dynamic?: boolean;
fields?: { [K in keyof CreateIndexKey<T>]?: Document };
};
};
export type VectorSearchIndexDefinition<T extends Record<string, AnyMonarchType>> = {
fields?: Array<{
type: "vector" | "filter";
path: keyof CreateIndexKey<T>;
numDimensions?: number;
similarity?: "euclidean" | "cosine" | "dotProduct";
[key: string]: unknown;
}>;
};
export type SchemaSearchIndexDefinition<T extends Record<string, AnyMonarchType>> =
| SearchIndexDefinition<T>
| VectorSearchIndexDefinition<T>;

export type SchemaSearchIndex<T extends Record<string, AnyMonarchType>> = {
name: string;
type?: "search" | "vectorSearch";
definition: SchemaSearchIndexDefinition<T>;
};

export type SchemaSearchIndexes<T extends Record<string, AnyMonarchType>> = (options: {
searchIndex: (name: string, definition: SearchIndexDefinition<T>) => SchemaSearchIndex<T>;
vectorSearchIndex: (name: string, definition: VectorSearchIndexDefinition<T>) => SchemaSearchIndex<T>;
}) => Record<string, SchemaSearchIndex<T>>;

export function makeSearchIndexes<T extends Record<string, AnyMonarchType>>(fn: SchemaSearchIndexes<T>) {
return fn({
searchIndex: (name, definition) => ({ name, type: "search", definition }),
vectorSearchIndex: (name, definition) => ({ name, type: "vectorSearch", definition }),
});
}

type ExistingSearchIndex = {
id: string;
name: string;
status: string;
queryable: boolean;
latestDefinition: Document;
};

export async function applySearchIndexes(coll: MongoCollection, fn: SchemaSearchIndexes<any>) {
const desired = Object.values(makeSearchIndexes(fn));
const desiredByName = new Map(desired.map((idx) => [idx.name, idx]));

let existing: ExistingSearchIndex[];
try {
existing = (await coll.listSearchIndexes().toArray()) as ExistingSearchIndex[];
} catch {
return;
}

const existingByName = new Map(existing.map((idx) => [idx.name, idx]));

// Drop stale indexes
await Promise.all(
Array.from(existingByName.keys())
.filter((name) => !desiredByName.has(name))
.map((name) =>
coll.dropSearchIndex(name).catch((error) => {
throw new MonarchError(`failed to drop search index '${name}': ${error}`, error);
}),
),
);

// Create or update indexes
await Promise.all(
desired.map(async (idx) => {
const existing = existingByName.get(idx.name);
if (!existing) {
await coll.createSearchIndex({ name: idx.name, type: idx.type, definition: idx.definition }).catch((error) => {
throw new MonarchError(`failed to create search index '${idx.name}': ${error}`, error);
});
return;
}

const defChanged = JSON.stringify(existing.latestDefinition) !== JSON.stringify(idx.definition);
if (defChanged) {
await coll.updateSearchIndex(idx.name, idx.definition).catch((error) => {
throw new MonarchError(`failed to update search index '${idx.name}': ${error}`, error);
});
}
}),
);
}
12 changes: 6 additions & 6 deletions tests/database.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ describe("Database options", async () => {
).rejects.toThrow("Document failed validation");
});

it("initialize validation false skips applying validators", async () => {
it("initialize without validation option skips applying validators", async () => {
const schema = createSchema("users", {
name: string(),
nickname: string(),
Expand All @@ -86,21 +86,21 @@ describe("Database options", async () => {
validationAction: "error",
},
});
await db.initialize({ validation: false });
await db.initialize({});

const rawCollection = client.db().collection("users");
await expect(rawCollection.insertOne({})).resolves.toMatchObject({ acknowledged: true });
});

it("initialize indexes false skips schema index creation", async () => {
it("initialize without indexes option skips schema index creation", async () => {
const schema = createSchema("users", {
username: string(),
}).indexes(({ unique }) => ({
username: unique("username"),
}));

const db = createDatabase(client.db(), defineSchemas({ users: schema }), { initialize: false });
await db.initialize({ indexes: false });
await db.initialize({});

const rawCollection = client.db().collection("users");
await rawCollection.insertOne({ username: "same-user" });
Expand All @@ -123,7 +123,7 @@ describe("Database options", async () => {
validationAction: "error",
},
});
await db.initialize({ collections: { users: true } });
await db.initialize(true, { users: true });

const existing = await client.db().listCollections({}, { nameOnly: true }).toArray();
const existingNames = new Set(existing.map((collection) => collection.name));
Expand Down Expand Up @@ -164,7 +164,7 @@ describe("Database options", async () => {
});

const firstDb = createDatabase(client.db(), defineSchemas({ users: schema }), { initialize: false });
await firstDb.initialize({ validation: false });
await firstDb.initialize({});

const rawCollection = client.db().collection("users");
await expect(rawCollection.insertOne({})).resolves.toMatchObject({ acknowledged: true });
Expand Down
Loading