diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index be1ab8c21..9cf7d4cab 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -1,5 +1,35 @@ # @vlcn.io/crsqlite +## Unreleased + +### Patch Changes + +- Large speedup (~8x) of merging changes via `INSERT INTO crsql_changes`: + - Removed `RETURNING` clauses from hot-path statements (winner clock, pk + lookaside, site_id ordinal). `RETURNING` forced SQLite to materialize an + ephemeral btree -- with its own pager and page cache -- on every executed + row; values are now read from bound parameters or + `sqlite3_last_insert_rowid`. + - Unpacked primary keys are bound directly to the key lookaside statements + instead of round-tripping through a per-row `SELECT ?,?,...` prepare. + - The sync bit is toggled through a direct pointer instead of executing + `SELECT crsql_internal_sync_bit(x)` per merged change. + - `(table, pk) -> (lookaside key, causal length)` of the last merged row is + memoized, skipping redundant lookups for the N column changes of a row + (changesets arrive ordered by `db_version, seq`). Invalidated on + commit/rollback, savepoint rollback, local CRR writes, schema reloads and + merge errors. + - The last `site_id -> ordinal` resolution is memoized (changesets are + virtually always single-site). + - Equal-version changes originating from the site that authored the local + clock entry are rejected as identical without fetching the local value + for comparison (a site's col_version is monotonic per cell). Idempotent + re-imports and own-changes echoes become pure clock-table reads; true + concurrent edits (different sites) still tie-break on value. + - `crsql_next_db_version()` is computed in C and skips its + `PRAGMA data_version` probe when a transaction already established the + pending version. + ## 0.16.3 ### Patch Changes diff --git a/core/src/automigrate.c b/core/src/automigrate.c index 0eb7a9f0d..77fb90bc4 100644 --- a/core/src/automigrate.c +++ b/core/src/automigrate.c @@ -724,6 +724,9 @@ int crsql_compact_post_alter(sqlite3 *db, const char *tblName, // (forward declaration - implemented in db-version.c) extern int crsql_fill_db_version_if_needed(sqlite3 *, crsql_ExtData *, char **); + // Compaction can delete pk lookaside rows and clock entries; drop any + // merge memos that might reference them. + crsql_invalidate_merge_memos(pExtData); int rc = crsql_fill_db_version_if_needed(db, pExtData, errmsg); if (rc != SQLITE_OK) return rc; diff --git a/core/src/changes-vtab-impl.c b/core/src/changes-vtab-impl.c index 948a8bfa4..e959519e6 100644 --- a/core/src/changes-vtab-impl.c +++ b/core/src/changes-vtab-impl.c @@ -11,6 +11,7 @@ #include #include "consts.h" +#include "db-version.h" #include "ext-data.h" #include "pack-columns.h" #include "tableinfo.h" @@ -640,6 +641,62 @@ int crsql_changes_rowid(sqlite3_vtab_cursor *cur, sqlite_int64 *pRowid) { // ---------- Merge helpers ---------- +/** + * If the last-row memo currently describes (tblInfoIdx, key), update its + * cached causal length to reflect a clock write we just performed. + */ +static void update_row_memo_cl(crsql_ExtData *extData, int tblInfoIdx, + sqlite3_int64 key, sqlite3_int64 newCl) { + if (extData->rowMemoTblInfoIdx == tblInfoIdx && + extData->rowMemoKey == key) { + extData->rowMemoLocalCl = newCl; + } +} + +/** + * Resolve the ordinal for a site_id if it is already known to this db, + * consulting the (single-entry) memo first. Returns 1 and sets *outOrdinal + * when found, 0 when the site has no ordinal yet or on lookup failure. + */ +static int lookup_site_ordinal(crsql_ExtData *extData, + const unsigned char *siteId, int siteIdLen, + sqlite3_int64 *outOrdinal) { + if (siteIdLen <= 0 || siteId == 0) return 0; + + if (extData->cachedSiteIdLen == siteIdLen && + memcmp(extData->cachedSiteId, siteId, siteIdLen) == 0) { + *outOrdinal = extData->cachedSiteIdOrdinal; + return 1; + } + + int rc = sqlite3_bind_blob(extData->pSelectSiteIdOrdinalStmt, 1, siteId, + siteIdLen, SQLITE_STATIC); + if (rc != SQLITE_OK) { + sqlite3_clear_bindings(extData->pSelectSiteIdOrdinalStmt); + sqlite3_reset(extData->pSelectSiteIdOrdinalStmt); + return 0; + } + rc = sqlite3_step(extData->pSelectSiteIdOrdinalStmt); + int found = 0; + sqlite3_int64 ordinal = 0; + if (rc == SQLITE_ROW) { + ordinal = sqlite3_column_int64(extData->pSelectSiteIdOrdinalStmt, 0); + found = 1; + } + sqlite3_clear_bindings(extData->pSelectSiteIdOrdinalStmt); + sqlite3_reset(extData->pSelectSiteIdOrdinalStmt); + + if (found) { + *outOrdinal = ordinal; + if (siteIdLen <= CRSQL_SITE_ID_MEMO_LEN) { + memcpy(extData->cachedSiteId, siteId, siteIdLen); + extData->cachedSiteIdLen = siteIdLen; + extData->cachedSiteIdOrdinal = ordinal; + } + } + return found; +} + /** * Get the local causal length for a key. * Returns 0 if no record exists. @@ -710,6 +767,7 @@ static int did_cid_win(sqlite3 *db, crsql_ExtData *extData, rc = sqlite3_step(colVrsnStmt); if (rc == SQLITE_ROW) { sqlite3_int64 localVersion = sqlite3_column_int64(colVrsnStmt, 0); + sqlite3_int64 localSiteOrdinal = sqlite3_column_int64(colVrsnStmt, 1); resetCachedStmt(colVrsnStmt); // causal lengths are the same. Fall back to original algorithm. if (colVersion > localVersion) { @@ -719,7 +777,22 @@ static int did_cid_win(sqlite3 *db, crsql_ExtData *extData, *outWon = 0; return SQLITE_OK; } - // versions equal, fall through to value comparison + + // Versions equal. If the incoming change originates from the same site + // that authored the local entry, it is the identical change (a site's + // col_version is monotonic per cell): reject it without fetching the + // local value. This makes idempotent re-imports and own-changes echoes + // pure clock-table reads. Different sites with equal versions are true + // concurrent edits and still fall through to the deterministic value + // comparison. + sqlite3_int64 insertOrdinal = 0; + if (lookup_site_ordinal(extData, insertSiteId, insertSiteIdLen, + &insertOrdinal) && + insertOrdinal == localSiteOrdinal) { + *outWon = 0; + return SQLITE_OK; + } + // fall through to value comparison } else if (rc == SQLITE_DONE) { resetCachedStmt(colVrsnStmt); // no rows -- incoming wins @@ -824,25 +897,13 @@ static int set_winner_clock(sqlite3 *db, crsql_ExtData *extData, sqlite3_int64 ordinal = 0; if (insertSiteIdLen > 0 && insertSiteId != 0) { - // Try to select existing ordinal - rc = sqlite3_bind_blob(extData->pSelectSiteIdOrdinalStmt, 1, insertSiteId, - insertSiteIdLen, SQLITE_STATIC); - if (rc != SQLITE_OK) { - sqlite3_clear_bindings(extData->pSelectSiteIdOrdinalStmt); - sqlite3_reset(extData->pSelectSiteIdOrdinalStmt); - return rc; - } - rc = sqlite3_step(extData->pSelectSiteIdOrdinalStmt); - if (rc == SQLITE_ROW) { - ordinal = sqlite3_column_int64(extData->pSelectSiteIdOrdinalStmt, 0); + // Memoized lookup first: a changeset is virtually always single-site, + // so this resolves without a query for every change after the first. + if (lookup_site_ordinal(extData, insertSiteId, insertSiteIdLen, + &ordinal)) { hasOrdinal = 1; - sqlite3_clear_bindings(extData->pSelectSiteIdOrdinalStmt); - sqlite3_reset(extData->pSelectSiteIdOrdinalStmt); } else { - sqlite3_clear_bindings(extData->pSelectSiteIdOrdinalStmt); - sqlite3_reset(extData->pSelectSiteIdOrdinalStmt); - - // Insert new ordinal + // Unknown site -- insert a new ordinal rc = sqlite3_bind_blob(extData->pSetSiteIdOrdinalStmt, 1, insertSiteId, insertSiteIdLen, SQLITE_STATIC); if (rc != SQLITE_OK) { @@ -851,16 +912,20 @@ static int set_winner_clock(sqlite3 *db, crsql_ExtData *extData, return rc; } rc = sqlite3_step(extData->pSetSiteIdOrdinalStmt); - if (rc == SQLITE_DONE) { - sqlite3_clear_bindings(extData->pSetSiteIdOrdinalStmt); - sqlite3_reset(extData->pSetSiteIdOrdinalStmt); + sqlite3_clear_bindings(extData->pSetSiteIdOrdinalStmt); + sqlite3_reset(extData->pSetSiteIdOrdinalStmt); + if (rc != SQLITE_DONE) { return SQLITE_ABORT; } - // Should be SQLITE_ROW with the returning ordinal - ordinal = sqlite3_column_int64(extData->pSetSiteIdOrdinalStmt, 0); + // ordinal is an INTEGER PRIMARY KEY (rowid alias) + ordinal = sqlite3_last_insert_rowid(db); hasOrdinal = 1; - sqlite3_clear_bindings(extData->pSetSiteIdOrdinalStmt); - sqlite3_reset(extData->pSetSiteIdOrdinalStmt); + + if (insertSiteIdLen <= CRSQL_SITE_ID_MEMO_LEN) { + memcpy(extData->cachedSiteId, insertSiteId, insertSiteIdLen); + extData->cachedSiteIdLen = insertSiteIdLen; + extData->cachedSiteIdOrdinal = ordinal; + } } } @@ -868,6 +933,16 @@ static int set_winner_clock(sqlite3 *db, crsql_ExtData *extData, rc = crsql_get_set_winner_clock_stmt(db, tblInfo, &setStmt); if (rc != SQLITE_OK || !setStmt) return SQLITE_ERROR; + // Compute the next db version in C rather than via the + // crsql_next_db_version SQL function inside the insert statement. + char *dbVrsnErr = 0; + sqlite3_int64 nextDbVrsn = + crsql_next_db_version(db, extData, insertDbVrsn, &dbVrsnErr); + if (nextDbVrsn < 0) { + sqlite3_free(dbVrsnErr); + return SQLITE_ERROR; + } + rc = sqlite3_bind_int64(setStmt, 1, key); if (rc != SQLITE_OK) { resetCachedStmt(setStmt); @@ -883,7 +958,7 @@ static int set_winner_clock(sqlite3 *db, crsql_ExtData *extData, resetCachedStmt(setStmt); return rc; } - rc = sqlite3_bind_int64(setStmt, 4, insertDbVrsn); + rc = sqlite3_bind_int64(setStmt, 4, nextDbVrsn); if (rc != SQLITE_OK) { resetCachedStmt(setStmt); return rc; @@ -904,20 +979,21 @@ static int set_winner_clock(sqlite3 *db, crsql_ExtData *extData, } rc = sqlite3_step(setStmt); - if (rc == SQLITE_ROW) { - *outRowid = sqlite3_column_int64(setStmt, 0); - resetCachedStmt(setStmt); + resetCachedStmt(setStmt); + if (rc == SQLITE_DONE) { + // The clock table's key for this entry is exactly the key we bound; no + // need for a RETURNING round-trip. + *outRowid = key; return SQLITE_OK; - } else { - resetCachedStmt(setStmt); - return SQLITE_ERROR; } + return SQLITE_ERROR; } /** * Reset clocks when a row is resurrected (causal length increased). */ -static int zero_clocks_on_resurrect(sqlite3 *db, crsql_TableInfo *tblInfo, +static int zero_clocks_on_resurrect(sqlite3 *db, crsql_ExtData *extData, + crsql_TableInfo *tblInfo, sqlite3_int64 key, sqlite3_int64 insertDbVrsn) { sqlite3_stmt *zeroStmt = 0; @@ -925,7 +1001,15 @@ static int zero_clocks_on_resurrect(sqlite3 *db, crsql_TableInfo *tblInfo, crsql_get_zero_clocks_on_resurrect_stmt(db, tblInfo, &zeroStmt); if (rc != SQLITE_OK || !zeroStmt) return SQLITE_ERROR; - rc = sqlite3_bind_int64(zeroStmt, 1, insertDbVrsn); + char *dbVrsnErr = 0; + sqlite3_int64 nextDbVrsn = + crsql_next_db_version(db, extData, insertDbVrsn, &dbVrsnErr); + if (nextDbVrsn < 0) { + sqlite3_free(dbVrsnErr); + return SQLITE_ERROR; + } + + rc = sqlite3_bind_int64(zeroStmt, 1, nextDbVrsn); if (rc != SQLITE_OK) { resetCachedStmt(zeroStmt); return rc; @@ -962,30 +1046,18 @@ static int merge_sentinel_only_insert( } // Set sync bit, execute merge, clear sync bit - rc = sqlite3_step(extData->pSetSyncBitStmt); - if (rc != SQLITE_ROW && rc != SQLITE_DONE) { - sqlite3_reset(extData->pSetSyncBitStmt); - resetCachedStmt(mergeStmt); - return rc; - } - sqlite3_reset(extData->pSetSyncBitStmt); - + *extData->syncBitPtr = 1; rc = sqlite3_step(mergeStmt); int mergeRc = rc; resetCachedStmt(mergeStmt); - - int syncRc = sqlite3_step(extData->pClearSyncBitStmt); - sqlite3_reset(extData->pClearSyncBitStmt); - if (syncRc != SQLITE_ROW && syncRc != SQLITE_DONE) { - return syncRc; - } + *extData->syncBitPtr = 0; if (mergeRc != SQLITE_DONE && mergeRc != SQLITE_ROW) { return mergeRc; } // Success: zero clocks on resurrect, then set winner clock - rc = zero_clocks_on_resurrect(db, tblInfo, key, remoteDbVsn); + rc = zero_clocks_on_resurrect(db, extData, tblInfo, key, remoteDbVsn); if (rc != SQLITE_OK) return rc; return set_winner_clock(db, extData, tblInfo, key, SENTINEL_CID, @@ -1014,24 +1086,11 @@ static int merge_delete(sqlite3 *db, crsql_ExtData *extData, return rc; } - // Set sync bit - rc = sqlite3_step(extData->pSetSyncBitStmt); - if (rc != SQLITE_ROW && rc != SQLITE_DONE) { - sqlite3_reset(extData->pSetSyncBitStmt); - resetCachedStmt(deleteStmt); - return rc; - } - sqlite3_reset(extData->pSetSyncBitStmt); - + // Set sync bit, execute delete, clear sync bit + *extData->syncBitPtr = 1; rc = sqlite3_step(deleteStmt); resetCachedStmt(deleteStmt); - - // Clear sync bit - int syncRc = sqlite3_step(extData->pClearSyncBitStmt); - sqlite3_reset(extData->pClearSyncBitStmt); - if (syncRc != SQLITE_ROW && syncRc != SQLITE_DONE) { - return syncRc; - } + *extData->syncBitPtr = 0; if (rc != SQLITE_DONE && rc != SQLITE_ROW) { return rc; @@ -1076,6 +1135,10 @@ int crsql_changes_update(sqlite3_vtab *pVTab, int argc, sqlite3_value **argv, char *errMsg = 0; int rc = merge_insert_impl(pVTab, argc, argv, pRowid, &errMsg); if (rc != SQLITE_OK) { + // A failed insert aborts the statement and SQLite's statement journal + // undoes any partial writes (including prior rows of a multi-row + // INSERT), so the merge memos no longer describe reality. + crsql_invalidate_merge_memos(((crsql_Changes_vtab *)pVTab)->pExtData); pVTab->zErrMsg = errMsg; } return rc; @@ -1162,71 +1225,55 @@ static int merge_insert_impl(sqlite3_vtab *vtab, int argc, return SQLITE_ERROR; } - // Convert unpacked PKs to sqlite3_value** for crsql_get_or_create_key. - // We prepare a temporary "SELECT ?,?,?..." statement, bind the unpacked - // column values, step it, then extract sqlite3_column_value pointers. + // Resolve (pk lookaside key, local causal length) for this row. + // Changesets are ordered by (db_version, seq) so the N column changes of a + // single row arrive back-to-back: memoize the last resolution and skip the + // lookaside + causal length queries when the next change hits the same row. + crsql_ExtData *extData = tab->pExtData; sqlite3_int64 key = -1; - { - char *bindList = crsql_binding_list(numUnpackedPks); - if (!bindList) { - crsql_free_column_values(unpackedPks, numUnpackedPks); - return SQLITE_NOMEM; - } - char *tmpSql = sqlite3_mprintf("SELECT %s", bindList); - sqlite3_free(bindList); - if (!tmpSql) { - crsql_free_column_values(unpackedPks, numUnpackedPks); - return SQLITE_NOMEM; - } - sqlite3_stmt *pTmpStmt = 0; - rc = sqlite3_prepare_v2(db, tmpSql, -1, &pTmpStmt, 0); - sqlite3_free(tmpSql); - if (rc != SQLITE_OK) { + sqlite3_int64 localCl = 0; + + if (extData->rowMemoTblInfoIdx == tblInfoIdx && + extData->rowMemoPkLen == pkBlobLen && pkBlobLen > 0 && + memcmp(extData->rowMemoPkBlob, pkBlob, pkBlobLen) == 0) { + key = extData->rowMemoKey; + localCl = extData->rowMemoLocalCl; + } else { + key = crsql_get_or_create_key_packed(db, tblInfo, unpackedPks, + numUnpackedPks, errmsg); + if (key < 0) { crsql_free_column_values(unpackedPks, numUnpackedPks); - return rc; + if (!*errmsg) { + *errmsg = sqlite3_mprintf("crsql - failed to get or create key"); + } + return SQLITE_ERROR; } - rc = crsql_bind_package_to_stmt(pTmpStmt, unpackedPks, numUnpackedPks, 0); + + rc = get_local_cl(db, tblInfo, key, &localCl); if (rc != SQLITE_OK) { - sqlite3_finalize(pTmpStmt); crsql_free_column_values(unpackedPks, numUnpackedPks); return rc; } - rc = sqlite3_step(pTmpStmt); - if (rc != SQLITE_ROW) { - sqlite3_finalize(pTmpStmt); - crsql_free_column_values(unpackedPks, numUnpackedPks); - return SQLITE_ERROR; - } - // Extract column values as sqlite3_value* - sqlite3_value **pkValues = - (sqlite3_value **)sqlite3_malloc(sizeof(sqlite3_value *) * numUnpackedPks); - if (!pkValues) { - sqlite3_finalize(pTmpStmt); - crsql_free_column_values(unpackedPks, numUnpackedPks); - return SQLITE_NOMEM; - } - for (int i = 0; i < numUnpackedPks; i++) { - pkValues[i] = sqlite3_column_value(pTmpStmt, i); - } - - key = crsql_get_or_create_key(db, tblInfo, pkValues, numUnpackedPks, errmsg); - sqlite3_free(pkValues); - sqlite3_finalize(pTmpStmt); - } - if (key < 0) { - crsql_free_column_values(unpackedPks, numUnpackedPks); - if (!*errmsg) { - *errmsg = sqlite3_mprintf("crsql - failed to get or create key"); + // Memoize for the subsequent changes of this row. + extData->rowMemoTblInfoIdx = -1; + if (pkBlobLen > 0) { + if (pkBlobLen > extData->rowMemoPkCap) { + unsigned char *newBlob = + sqlite3_realloc(extData->rowMemoPkBlob, pkBlobLen); + if (newBlob) { + extData->rowMemoPkBlob = newBlob; + extData->rowMemoPkCap = pkBlobLen; + } + } + if (pkBlobLen <= extData->rowMemoPkCap) { + memcpy(extData->rowMemoPkBlob, pkBlob, pkBlobLen); + extData->rowMemoPkLen = pkBlobLen; + extData->rowMemoTblInfoIdx = tblInfoIdx; + extData->rowMemoKey = key; + extData->rowMemoLocalCl = localCl; + } } - return SQLITE_ERROR; - } - - sqlite3_int64 localCl = 0; - rc = get_local_cl(db, tblInfo, key, &localCl); - if (rc != SQLITE_OK) { - crsql_free_column_values(unpackedPks, numUnpackedPks); - return rc; } // We can ignore all updates from older causal lengths. @@ -1253,6 +1300,8 @@ static int merge_insert_impl(sqlite3_vtab *vtab, int argc, insertSiteIdLen, insertSeq, &innerRowid); crsql_free_column_values(unpackedPks, numUnpackedPks); if (rc != SQLITE_OK) return rc; + // The delete sentinel's col_version is the row's new causal length. + update_row_memo_cl(extData, tblInfoIdx, key, insertColVrsn); tab->pExtData->rowsImpacted += 1; *rowid = crsql_slab_rowid(tblInfoIdx, innerRowid); return SQLITE_OK; @@ -1271,6 +1320,8 @@ static int merge_insert_impl(sqlite3_vtab *vtab, int argc, insertSiteIdLen, insertSeq, &innerRowid); crsql_free_column_values(unpackedPks, numUnpackedPks); if (rc != SQLITE_OK) return rc; + // The sentinel's col_version is the row's new causal length. + update_row_memo_cl(extData, tblInfoIdx, key, insertColVrsn); if (innerRowid != -1) { tab->pExtData->rowsImpacted += 1; *rowid = crsql_slab_rowid(tblInfoIdx, innerRowid); @@ -1292,6 +1343,7 @@ static int merge_insert_impl(sqlite3_vtab *vtab, int argc, crsql_free_column_values(unpackedPks, numUnpackedPks); return rc; } + update_row_memo_cl(extData, tblInfoIdx, key, insertCl); tab->pExtData->rowsImpacted += 1; } @@ -1344,29 +1396,15 @@ static int merge_insert_impl(sqlite3_vtab *vtab, int argc, } // Set sync bit, merge, clear sync bit - rc = sqlite3_step(tab->pExtData->pSetSyncBitStmt); - if (rc != SQLITE_ROW && rc != SQLITE_DONE) { - sqlite3_reset(tab->pExtData->pSetSyncBitStmt); - resetCachedStmt(mergeStmt); - crsql_free_column_values(unpackedPks, numUnpackedPks); - return rc; - } - sqlite3_reset(tab->pExtData->pSetSyncBitStmt); - + *tab->pExtData->syncBitPtr = 1; rc = sqlite3_step(mergeStmt); resetCachedStmt(mergeStmt); - - int syncRc = sqlite3_step(tab->pExtData->pClearSyncBitStmt); - sqlite3_reset(tab->pExtData->pClearSyncBitStmt); + *tab->pExtData->syncBitPtr = 0; if (rc != SQLITE_DONE && rc != SQLITE_ROW) { crsql_free_column_values(unpackedPks, numUnpackedPks); return rc; } - if (syncRc != SQLITE_ROW && syncRc != SQLITE_DONE) { - crsql_free_column_values(unpackedPks, numUnpackedPks); - return syncRc; - } crsql_free_column_values(unpackedPks, numUnpackedPks); @@ -1376,6 +1414,13 @@ static int merge_insert_impl(sqlite3_vtab *vtab, int argc, insertSiteIdLen, insertSeq, &innerRowid); if (rc != SQLITE_OK) return rc; + // A new row (or a resurrection without an explicit sentinel in the + // changeset, possible when insertCl == 1) now exists with causal length + // insertCl: a clock entry for this column implies the row is alive. + if (needsResurrect) { + update_row_memo_cl(extData, tblInfoIdx, key, insertCl); + } + tab->pExtData->rowsImpacted += 1; *rowid = crsql_slab_rowid(tblInfoIdx, innerRowid); return SQLITE_OK; diff --git a/core/src/changes-vtab.c b/core/src/changes-vtab.c index fbbdd3484..c38c4ad7a 100644 --- a/core/src/changes-vtab.c +++ b/core/src/changes-vtab.c @@ -138,8 +138,37 @@ int crsql_changes_column( ); int crsql_changes_eof(sqlite3_vtab_cursor *cur); +/** + * The merge path keeps per-transaction memos (see crsql_ExtData). A rollback + * -- full or to a savepoint -- undoes clock/lookaside writes those memos may + * describe, so they must be dropped. Full rollbacks are also covered by the + * connection-level rollback hook; savepoint rollbacks are only visible here. + */ +static int changesRollback(sqlite3_vtab *pVTab) { + crsql_invalidate_merge_memos(((crsql_Changes_vtab *)pVTab)->pExtData); + return SQLITE_OK; +} + +static int changesSavepoint(sqlite3_vtab *pVTab, int n) { + (void)pVTab; + (void)n; + return SQLITE_OK; +} + +static int changesRelease(sqlite3_vtab *pVTab, int n) { + (void)pVTab; + (void)n; + return SQLITE_OK; +} + +static int changesRollbackTo(sqlite3_vtab *pVTab, int n) { + (void)n; + crsql_invalidate_merge_memos(((crsql_Changes_vtab *)pVTab)->pExtData); + return SQLITE_OK; +} + sqlite3_module crsql_changesModule = { - /* iVersion */ 0, + /* iVersion */ 2, /* xCreate */ 0, /* xConnect */ changesConnect, /* xBestIndex */ crsql_changes_best_index, @@ -156,12 +185,12 @@ sqlite3_module crsql_changesModule = { /* xBegin */ crsql_changes_begin, /* xSync */ 0, /* xCommit */ crsql_changes_commit, - /* xRollback */ 0, + /* xRollback */ changesRollback, /* xFindMethod */ 0, /* xRename */ 0, - /* xSavepoint */ 0, - /* xRelease */ 0, - /* xRollbackTo */ 0, + /* xSavepoint */ changesSavepoint, + /* xRelease */ changesRelease, + /* xRollbackTo */ changesRollbackTo, /* xShadowName */ 0 #ifdef LIBSQL , diff --git a/core/src/changes-vtab.test.c b/core/src/changes-vtab.test.c index 56650ba0e..653f787c5 100644 --- a/core/src/changes-vtab.test.c +++ b/core/src/changes-vtab.test.c @@ -138,6 +138,101 @@ static void testFilters() { // { // } +/** + * The merge path memoizes (table, pk) -> (key, causal length) for consecutive + * changes of the same row. A savepoint rollback undoes merge writes the memo + * may describe; the changes vtab must invalidate it (xRollbackTo) or stale + * causal lengths would make subsequent merges no-ops. + */ +// assert() is compiled out by NDEBUG in Release builds; these helpers are +// always active. +static void checkOk(int rc, char *err, const char *what) { + if (rc != SQLITE_OK) { + printf("\t\e[0;31m%s failed: %d (%s)\e[0m\n", what, rc, + err ? err : "no message"); + sqlite3_free(err); + abort(); + } + sqlite3_free(err); +} + +static void checkCount(sqlite3 *db, const char *sql, int expected) { + sqlite3_stmt *pStmt; + int rc = sqlite3_prepare_v2(db, sql, -1, &pStmt, 0); + if (rc != SQLITE_OK || sqlite3_step(pStmt) != SQLITE_ROW) { + printf("\t\e[0;31mfailed to run: %s\e[0m\n", sql); + abort(); + } + int actual = sqlite3_column_int(pStmt, 0); + sqlite3_finalize(pStmt); + if (actual != expected) { + printf("\t\e[0;31m%s: expected %d, got %d\e[0m\n", sql, expected, actual); + abort(); + } +} + +static void testRowMemoSavepointRollback() { + printf("RowMemoSavepointRollback\n"); + + sqlite3 *db; + char *err = 0; + int rc = sqlite3_open(":memory:", &db); + if (rc != SQLITE_OK) abort(); + + rc = sqlite3_exec( + db, "CREATE TABLE foo (a INTEGER PRIMARY KEY NOT NULL, b);", 0, 0, &err); + checkOk(rc, err, "create table"); + err = 0; + rc = sqlite3_exec(db, "SELECT crsql_as_crr('foo');", 0, 0, &err); + checkOk(rc, err, "as_crr"); + err = 0; + + const char *insertChange = + "INSERT INTO crsql_changes " + "([table], pk, cid, val, col_version, db_version, site_id, cl, seq) " + "VALUES ('foo', X'010901', 'b', 1, 1, 1, X'00000000000000000000000000000001', 1, 0)"; + const char *deleteChange = + "INSERT INTO crsql_changes " + "([table], pk, cid, val, col_version, db_version, site_id, cl, seq) " + "VALUES ('foo', X'010901', '-1', NULL, 2, 2, X'00000000000000000000000000000001', 2, 0)"; + + rc = sqlite3_exec(db, "BEGIN", 0, 0, &err); + checkOk(rc, err, "begin"); + err = 0; + // Create the row via a merged change -- memoizes (key, cl = 1) + rc = sqlite3_exec(db, insertChange, 0, 0, &err); + checkOk(rc, err, "insert change"); + err = 0; + checkCount(db, "SELECT count(*) FROM foo", 1); + + // Delete it inside a savepoint -- memo cl becomes 2 -- then roll back + rc = sqlite3_exec(db, "SAVEPOINT sp", 0, 0, &err); + checkOk(rc, err, "savepoint"); + err = 0; + rc = sqlite3_exec(db, deleteChange, 0, 0, &err); + checkOk(rc, err, "delete change"); + err = 0; + checkCount(db, "SELECT count(*) FROM foo", 0); + rc = sqlite3_exec(db, "ROLLBACK TO sp", 0, 0, &err); + checkOk(rc, err, "rollback to"); + err = 0; + checkCount(db, "SELECT count(*) FROM foo", 1); + + // Re-apply the delete: with a stale memo (cl = 2) this would be treated as + // already-processed and skipped; it must actually delete the row. + rc = sqlite3_exec(db, deleteChange, 0, 0, &err); + checkOk(rc, err, "re-applied delete change"); + err = 0; + checkCount(db, "SELECT count(*) FROM foo", 0); + + rc = sqlite3_exec(db, "COMMIT", 0, 0, &err); + checkOk(rc, err, "commit"); + checkCount(db, "SELECT count(*) FROM foo", 0); + + crsql_close(db); + printf("\t\e[0;32mSuccess\e[0m\n"); +} + // static void testOnlyPkTable() // { // } @@ -154,4 +249,5 @@ void crsqlChangesVtabTestSuite() { printf("\e[47m\e[1;30mSuite: crsql_changesVtab\e[0m\n"); testManyPkTable(); testFilters(); + testRowMemoSavepointRollback(); } diff --git a/core/src/consts.h b/core/src/consts.h index d4044f85a..d17efa0f4 100644 --- a/core/src/consts.h +++ b/core/src/consts.h @@ -16,9 +16,6 @@ "SELECT tbl_name FROM sqlite_master WHERE type='table' AND tbl_name LIKE " \ "'%__crsql_clock'" -#define SET_SYNC_BIT "SELECT crsql_internal_sync_bit(1)" -#define CLEAR_SYNC_BIT "SELECT crsql_internal_sync_bit(0)" - #define TBL_SITE_ID "crsql_site_id" #define TBL_DB_VERSION "db_version" #define TBL_SCHEMA "crsql_master" diff --git a/core/src/crr.c b/core/src/crr.c index 02ae2cefc..9e4aa708d 100644 --- a/core/src/crr.c +++ b/core/src/crr.c @@ -111,7 +111,8 @@ int crsql_create_clock_table(sqlite3 *db, crsql_TableInfo *tableInfo, * read from read_stmt columns [0..numPks). * Returns key >= 0 on success, < 0 on error. */ -static sqlite3_int64 backfill_get_or_create_key(sqlite3_stmt *selectKeyStmt, +static sqlite3_int64 backfill_get_or_create_key(sqlite3 *db, + sqlite3_stmt *selectKeyStmt, sqlite3_stmt *createKeyStmt, int numPks, sqlite3_stmt *readStmt) { @@ -130,8 +131,9 @@ static sqlite3_int64 backfill_get_or_create_key(sqlite3_stmt *selectKeyStmt, } sqlite3_reset(selectKeyStmt); - if (sqlite3_step(createKeyStmt) == SQLITE_ROW) { - key = sqlite3_column_int64(createKeyStmt, 0); + // __crsql_key is an INTEGER PRIMARY KEY (rowid alias) + if (sqlite3_step(createKeyStmt) == SQLITE_DONE) { + key = sqlite3_last_insert_rowid(db); sqlite3_reset(createKeyStmt); return key; } @@ -184,7 +186,7 @@ static int create_clock_rows_from_stmt(sqlite3_stmt *readStmt, sqlite3 *db, // Prepare create key stmt char *createKeySql = sqlite3_mprintf( - "INSERT INTO \"%s__crsql_pks\" (%s) VALUES (%s) RETURNING __crsql_key", + "INSERT INTO \"%s__crsql_pks\" (%s) VALUES (%s)", escTable, pkColList, pkBindings); sqlite3_free(pkColList); sqlite3_free(pkBindings); @@ -227,8 +229,9 @@ static int create_clock_rows_from_stmt(sqlite3_stmt *readStmt, sqlite3 *db, // Iterate rows while (sqlite3_step(readStmt) == SQLITE_ROW) { - sqlite3_int64 key = backfill_get_or_create_key(selectKeyStmt, createKeyStmt, - numPks, readStmt); + sqlite3_int64 key = backfill_get_or_create_key(db, selectKeyStmt, + createKeyStmt, numPks, + readStmt); if (key < 0) { rc = SQLITE_ERROR; break; diff --git a/core/src/crsqlite.c b/core/src/crsqlite.c index 34f066799..48e64e080 100644 --- a/core/src/crsqlite.c +++ b/core/src/crsqlite.c @@ -35,6 +35,7 @@ static int commitHook(void *pUserData) { pExtData->pendingDbVersion = -1; pExtData->seq = 0; pExtData->updatedTableInfosThisTx = 0; + crsql_invalidate_merge_memos(pExtData); return SQLITE_OK; } @@ -43,6 +44,7 @@ static void rollbackHook(void *pUserData) { pExtData->pendingDbVersion = -1; pExtData->seq = 0; pExtData->updatedTableInfosThisTx = 0; + crsql_invalidate_merge_memos(pExtData); } #ifdef LIBSQL @@ -380,6 +382,10 @@ __declspec(dllexport) crsql_ExtData *pExtData = crsql_newExtData(db, siteIdBuffer); if (!pExtData) return SQLITE_ERROR; + // Let the merge path flip the sync bit directly rather than through + // `SELECT crsql_internal_sync_bit(x)` statements. + pExtData->syncBitPtr = syncBitPtr; + // --- Register functions that need ExtData --- rc = sqlite3_create_function_v2( diff --git a/core/src/db-version.c b/core/src/db-version.c index 6a44db9c7..cb652920c 100644 --- a/core/src/db-version.c +++ b/core/src/db-version.c @@ -89,6 +89,20 @@ int crsql_fill_db_version_if_needed(sqlite3 *db, crsql_ExtData *pExtData, sqlite_int64 crsql_next_db_version(sqlite3 *db, crsql_ExtData *pExtData, sqlite3_int64 merging_version, char **errmsg) { + // Fast path: pendingDbVersion is only ever set inside a write transaction + // (and reset by the commit/rollback hooks). While that transaction is open + // no other connection can advance our stored db_version, so the cached + // values are authoritative and we can skip the PRAGMA data_version probe. + // This runs once per clock-table write during sync, so it matters. + if (pExtData->pendingDbVersion != -1) { + sqlite3_int64 fastRet = pExtData->pendingDbVersion; + if (merging_version >= 0 && fastRet < merging_version) { + fastRet = merging_version; + } + pExtData->pendingDbVersion = fastRet; + return fastRet; + } + int rc = crsql_fill_db_version_if_needed(db, pExtData, errmsg); if (rc != SQLITE_OK) { return -1; diff --git a/core/src/ext-data.c b/core/src/ext-data.c index e603eddc2..4578e6200 100644 --- a/core/src/ext-data.c +++ b/core/src/ext-data.c @@ -23,16 +23,12 @@ crsql_ExtData *crsql_newExtData(sqlite3 *db, unsigned char *siteIdBuffer) { rc += sqlite3_prepare_v3(db, "PRAGMA data_version", -1, SQLITE_PREPARE_PERSISTENT, &(pExtData->pPragmaDataVersionStmt), 0); - pExtData->pSetSyncBitStmt = 0; - rc += sqlite3_prepare_v3(db, SET_SYNC_BIT, -1, SQLITE_PREPARE_PERSISTENT, - &(pExtData->pSetSyncBitStmt), 0); - pExtData->pClearSyncBitStmt = 0; - rc += sqlite3_prepare_v3(db, CLEAR_SYNC_BIT, -1, SQLITE_PREPARE_PERSISTENT, - &(pExtData->pClearSyncBitStmt), 0); - + // `ordinal` is an INTEGER PRIMARY KEY (rowid alias) so the assigned value + // is read via sqlite3_last_insert_rowid -- a RETURNING clause would force + // an ephemeral btree per execution. pExtData->pSetSiteIdOrdinalStmt = 0; rc += sqlite3_prepare_v3( - db, "INSERT INTO crsql_site_id (site_id) VALUES (?) RETURNING ordinal", + db, "INSERT INTO crsql_site_id (site_id) VALUES (?)", -1, SQLITE_PREPARE_PERSISTENT, &(pExtData->pSetSiteIdOrdinalStmt), 0); pExtData->pSelectSiteIdOrdinalStmt = 0; @@ -55,6 +51,15 @@ crsql_ExtData *crsql_newExtData(sqlite3 *db, unsigned char *siteIdBuffer) { pExtData->tableInfos = 0; pExtData->rowsImpacted = 0; pExtData->updatedTableInfosThisTx = 0; + pExtData->syncBitPtr = 0; + pExtData->cachedSiteIdLen = 0; + pExtData->cachedSiteIdOrdinal = -1; + pExtData->rowMemoTblInfoIdx = -1; + pExtData->rowMemoPkLen = 0; + pExtData->rowMemoPkCap = 0; + pExtData->rowMemoPkBlob = 0; + pExtData->rowMemoKey = -1; + pExtData->rowMemoLocalCl = 0; crsql_init_table_info_vec(pExtData); sqlite3_stmt *pStmt; @@ -102,13 +107,21 @@ crsql_ExtData *crsql_newExtData(sqlite3 *db, unsigned char *siteIdBuffer) { return pExtData; } +void crsql_invalidate_merge_memos(crsql_ExtData *pExtData) { + pExtData->cachedSiteIdLen = 0; + pExtData->cachedSiteIdOrdinal = -1; + pExtData->rowMemoTblInfoIdx = -1; + pExtData->rowMemoPkLen = 0; + pExtData->rowMemoKey = -1; + pExtData->rowMemoLocalCl = 0; +} + void crsql_freeExtData(crsql_ExtData *pExtData) { sqlite3_free(pExtData->siteId); + sqlite3_free(pExtData->rowMemoPkBlob); sqlite3_finalize(pExtData->pDbVersionStmt); sqlite3_finalize(pExtData->pPragmaSchemaVersionStmt); sqlite3_finalize(pExtData->pPragmaDataVersionStmt); - sqlite3_finalize(pExtData->pSetSyncBitStmt); - sqlite3_finalize(pExtData->pClearSyncBitStmt); sqlite3_finalize(pExtData->pSetSiteIdOrdinalStmt); sqlite3_finalize(pExtData->pSelectSiteIdOrdinalStmt); sqlite3_finalize(pExtData->pSelectClockTablesStmt); @@ -126,17 +139,14 @@ void crsql_finalize(crsql_ExtData *pExtData) { sqlite3_finalize(pExtData->pDbVersionStmt); sqlite3_finalize(pExtData->pPragmaSchemaVersionStmt); sqlite3_finalize(pExtData->pPragmaDataVersionStmt); - sqlite3_finalize(pExtData->pSetSyncBitStmt); - sqlite3_finalize(pExtData->pClearSyncBitStmt); sqlite3_finalize(pExtData->pSetSiteIdOrdinalStmt); sqlite3_finalize(pExtData->pSelectSiteIdOrdinalStmt); sqlite3_finalize(pExtData->pSelectClockTablesStmt); crsql_clear_stmt_cache(pExtData); + crsql_invalidate_merge_memos(pExtData); pExtData->pDbVersionStmt = 0; pExtData->pPragmaSchemaVersionStmt = 0; pExtData->pPragmaDataVersionStmt = 0; - pExtData->pSetSyncBitStmt = 0; - pExtData->pClearSyncBitStmt = 0; pExtData->pSetSiteIdOrdinalStmt = 0; pExtData->pSelectSiteIdOrdinalStmt = 0; pExtData->pSelectClockTablesStmt = 0; diff --git a/core/src/ext-data.h b/core/src/ext-data.h index c4ff801d0..1ada7b960 100644 --- a/core/src/ext-data.h +++ b/core/src/ext-data.h @@ -4,8 +4,8 @@ #include "sqlite3ext.h" SQLITE_EXTENSION_INIT3 -// NOTE: any changes here must be updated in `c.rs` until we've finished porting -// to rust. +#define CRSQL_SITE_ID_MEMO_LEN 16 + typedef struct crsql_ExtData crsql_ExtData; struct crsql_ExtData { // perma statement -- used to check db schema version @@ -37,13 +37,40 @@ struct crsql_ExtData { int seq; - sqlite3_stmt *pSetSyncBitStmt; - sqlite3_stmt *pClearSyncBitStmt; sqlite3_stmt *pSetSiteIdOrdinalStmt; sqlite3_stmt *pSelectSiteIdOrdinalStmt; sqlite3_stmt *pSelectClockTablesStmt; int mergeEqualValues; + + // Borrowed pointer to the sync bit owned by the + // `crsql_internal_sync_bit` SQL function. Lets the merge path toggle the + // bit directly instead of executing `SELECT crsql_internal_sync_bit(x)` + // per merged change. + int *syncBitPtr; + + // ---- merge fast-path memos ---- + // These are pure caches for the crsql_changes insert (merge) path. + // They are invalidated via crsql_invalidate_merge_memos on transaction + // commit/rollback, on savepoint rollback over the changes vtab, on local + // writes to CRRs and whenever table infos are reloaded. + + // Memo of the last site_id -> ordinal lookup. A changeset is typically + // authored by a single site so this hits ~100% during sync. + int cachedSiteIdLen; // 0 = no memo + unsigned char cachedSiteId[CRSQL_SITE_ID_MEMO_LEN]; + sqlite3_int64 cachedSiteIdOrdinal; + + // Memo of the last (table, packed pks) -> (lookaside key, causal length) + // resolution. Changesets are ordered by (db_version, seq) so the N column + // changes of a single row arrive back-to-back; this skips the pk lookaside + // lookup and causal length select for all but the first change of a row. + int rowMemoTblInfoIdx; // -1 = no memo + int rowMemoPkLen; + int rowMemoPkCap; + unsigned char *rowMemoPkBlob; // sqlite3_malloc'd, owned + sqlite3_int64 rowMemoKey; + sqlite3_int64 rowMemoLocalCl; }; crsql_ExtData *crsql_newExtData(sqlite3 *db, unsigned char *siteIdBuffer); @@ -54,4 +81,9 @@ int crsql_fetchPragmaDataVersion(sqlite3 *db, crsql_ExtData *pExtData); int crsql_recreate_db_version_stmt(sqlite3 *db, crsql_ExtData *pExtData); void crsql_finalize(crsql_ExtData *pExtData); +// Drop the merge fast-path memos (site_id ordinal + last row key/cl). +// Must be called whenever cached state could go stale: tx boundaries, +// savepoint rollbacks, local CRR writes, table info reloads. +void crsql_invalidate_merge_memos(crsql_ExtData *pExtData); + #endif \ No newline at end of file diff --git a/core/src/local-writes.c b/core/src/local-writes.c index f6142e708..61bc367bd 100644 --- a/core/src/local-writes.c +++ b/core/src/local-writes.c @@ -62,6 +62,10 @@ static int trigger_fn_preamble(sqlite3_context *ctx, int argc, crsql_ExtData *extData = (crsql_ExtData *)sqlite3_user_data(ctx); *ppExtData = extData; + // Local writes mutate clock state outside the crsql_changes merge path; + // any memoized (key, causal length) could go stale. + crsql_invalidate_merge_memos(extData); + char *innerErr = 0; int rc = crsql_ensure_table_infos_are_up_to_date( sqlite3_context_db_handle(ctx), extData, &innerErr); diff --git a/core/src/tableinfo.c b/core/src/tableinfo.c index ec57fa7c3..8fe4feecf 100644 --- a/core/src/tableinfo.c +++ b/core/src/tableinfo.c @@ -449,6 +449,9 @@ static int pull_all_table_infos(sqlite3 *db, crsql_ExtData *pExtData, char **errmsg) { crsql_TableInfoVec *vec = (crsql_TableInfoVec *)pExtData->tableInfos; + // Table info indices are about to change; drop merge memos keyed on them. + crsql_invalidate_merge_memos(pExtData); + // Free old table infos for (int i = 0; i < vec->len; i++) { free_table_info_contents(&vec->aInfos[i]); @@ -602,11 +605,15 @@ int crsql_get_set_winner_clock_stmt(sqlite3 *db, crsql_TableInfo *tblInfo, return SQLITE_OK; } char *esc = crsql_escape_ident(tblInfo->tblName); + // No RETURNING here: the caller already knows the key it binds, and a + // RETURNING clause makes SQLite materialize an ephemeral btree (with its + // own pager + page cache) on every execution. The db_version is computed + // in C and bound directly rather than invoking the crsql_next_db_version + // SQL function per row. char *sql = sqlite3_mprintf( "INSERT OR REPLACE INTO \"%s__crsql_clock\"" " (key, col_name, col_version, db_version, seq, site_id)" - " VALUES (?, ?, ?, crsql_next_db_version(?), ?, ?)" - " RETURNING key", esc); + " VALUES (?, ?, ?, ?, ?, ?)", esc); sqlite3_free(esc); return lazy_prepare(db, &tblInfo->pSetWinnerClockStmt, sql, ppStmt); } @@ -634,8 +641,12 @@ int crsql_get_col_version_stmt(sqlite3 *db, crsql_TableInfo *tblInfo, return SQLITE_OK; } char *esc = crsql_escape_ident(tblInfo->tblName); + // site_id rides along in the same clock row at no extra cost; it lets the + // merge skip the value comparison when an equal-version change originates + // from the same site (i.e. it is the identical change). char *sql = sqlite3_mprintf( - "SELECT col_version FROM \"%s__crsql_clock\" WHERE key = ? AND col_name = ?", + "SELECT col_version, site_id FROM \"%s__crsql_clock\"" + " WHERE key = ? AND col_name = ?", esc); sqlite3_free(esc); return lazy_prepare(db, &tblInfo->pColVersionStmt, sql, ppStmt); @@ -711,8 +722,9 @@ int crsql_get_zero_clocks_on_resurrect_stmt(sqlite3 *db, return SQLITE_OK; } char *esc = crsql_escape_ident(tblInfo->tblName); + // db_version computed in C and bound directly (see set_winner_clock). char *sql = sqlite3_mprintf( - "UPDATE \"%s__crsql_clock\" SET col_version = 0, db_version = crsql_next_db_version(?)" + "UPDATE \"%s__crsql_clock\" SET col_version = 0, db_version = ?" " WHERE key = ? AND col_name IS NOT '" SENTINEL_CID "'", esc); sqlite3_free(esc); @@ -820,65 +832,83 @@ int crsql_get_maybe_mark_locally_reinserted_stmt(sqlite3 *db, // ---------- Key management ---------- -sqlite3_int64 crsql_get_key(sqlite3 *db, crsql_TableInfo *tblInfo, - sqlite3_value **pks, int numPks) { - if (!tblInfo->pSelectKeyStmt) { - char *esc = crsql_escape_ident(tblInfo->tblName); - char *where_list = crsql_where_list(tblInfo->pks, tblInfo->pksLen, 0); - char *sql = sqlite3_mprintf( - "SELECT __crsql_key FROM \"%s__crsql_pks\" WHERE %s", esc, where_list); - sqlite3_free(esc); - sqlite3_free(where_list); - if (!sql) return -1; - int rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, - &tblInfo->pSelectKeyStmt, 0); - sqlite3_free(sql); - if (rc != SQLITE_OK) return -1; - } +static int ensure_select_key_stmt(sqlite3 *db, crsql_TableInfo *tblInfo) { + if (tblInfo->pSelectKeyStmt) return SQLITE_OK; + char *esc = crsql_escape_ident(tblInfo->tblName); + char *where_list = crsql_where_list(tblInfo->pks, tblInfo->pksLen, 0); + char *sql = sqlite3_mprintf( + "SELECT __crsql_key FROM \"%s__crsql_pks\" WHERE %s", esc, where_list); + sqlite3_free(esc); + sqlite3_free(where_list); + if (!sql) return SQLITE_NOMEM; + int rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, + &tblInfo->pSelectKeyStmt, 0); + sqlite3_free(sql); + return rc; +} - for (int i = 0; i < numPks; i++) { - sqlite3_bind_value(tblInfo->pSelectKeyStmt, i + 1, pks[i]); - } +static int ensure_insert_key_stmt(sqlite3 *db, crsql_TableInfo *tblInfo) { + if (tblInfo->pInsertKeyStmt) return SQLITE_OK; + char *esc = crsql_escape_ident(tblInfo->tblName); + char *pk_list = crsql_as_identifier_list(tblInfo->pks, tblInfo->pksLen, 0); + char *pk_bindings = crsql_binding_list(tblInfo->pksLen); + // __crsql_key is an INTEGER PRIMARY KEY (rowid alias): the assigned key is + // read via sqlite3_last_insert_rowid. A RETURNING clause would force an + // ephemeral btree (own pager + page cache) on every execution. + char *sql = sqlite3_mprintf( + "INSERT INTO \"%s__crsql_pks\" (%s) VALUES (%s)", + esc, pk_list, pk_bindings); + sqlite3_free(esc); + sqlite3_free(pk_list); + sqlite3_free(pk_bindings); + if (!sql) return SQLITE_NOMEM; + int rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, + &tblInfo->pInsertKeyStmt, 0); + sqlite3_free(sql); + return rc; +} - int rc = sqlite3_step(tblInfo->pSelectKeyStmt); +// Step a select-key stmt with bindings already applied. +static sqlite3_int64 step_key_stmt(sqlite3_stmt *pStmt) { + int rc = sqlite3_step(pStmt); sqlite3_int64 key = -1; if (rc == SQLITE_ROW) { - key = sqlite3_column_int64(tblInfo->pSelectKeyStmt, 0); + key = sqlite3_column_int64(pStmt, 0); } - reset_cached_stmt(tblInfo->pSelectKeyStmt); + reset_cached_stmt(pStmt); return key; } -static sqlite3_int64 create_key_raw(sqlite3 *db, crsql_TableInfo *tblInfo, - sqlite3_value **pks, int numPks) { - if (!tblInfo->pInsertKeyStmt) { - char *esc = crsql_escape_ident(tblInfo->tblName); - char *pk_list = crsql_as_identifier_list(tblInfo->pks, tblInfo->pksLen, 0); - char *pk_bindings = crsql_binding_list(tblInfo->pksLen); - char *sql = sqlite3_mprintf( - "INSERT INTO \"%s__crsql_pks\" (%s) VALUES (%s) RETURNING __crsql_key", - esc, pk_list, pk_bindings); - sqlite3_free(esc); - sqlite3_free(pk_list); - sqlite3_free(pk_bindings); - if (!sql) return -1; - int rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, - &tblInfo->pInsertKeyStmt, 0); - sqlite3_free(sql); - if (rc != SQLITE_OK) return -1; +// Step an insert-key stmt with bindings already applied; the new key is the +// rowid assigned to the inserted row. +static sqlite3_int64 step_insert_key_stmt(sqlite3 *db, sqlite3_stmt *pStmt) { + int rc = sqlite3_step(pStmt); + sqlite3_int64 key = -1; + if (rc == SQLITE_DONE) { + key = sqlite3_last_insert_rowid(db); } + reset_cached_stmt(pStmt); + return key; +} + +sqlite3_int64 crsql_get_key(sqlite3 *db, crsql_TableInfo *tblInfo, + sqlite3_value **pks, int numPks) { + if (ensure_select_key_stmt(db, tblInfo) != SQLITE_OK) return -1; for (int i = 0; i < numPks; i++) { - sqlite3_bind_value(tblInfo->pInsertKeyStmt, i + 1, pks[i]); + sqlite3_bind_value(tblInfo->pSelectKeyStmt, i + 1, pks[i]); } + return step_key_stmt(tblInfo->pSelectKeyStmt); +} - int rc = sqlite3_step(tblInfo->pInsertKeyStmt); - sqlite3_int64 key = -1; - if (rc == SQLITE_ROW) { - key = sqlite3_column_int64(tblInfo->pInsertKeyStmt, 0); +static sqlite3_int64 create_key_raw(sqlite3 *db, crsql_TableInfo *tblInfo, + sqlite3_value **pks, int numPks) { + if (ensure_insert_key_stmt(db, tblInfo) != SQLITE_OK) return -1; + + for (int i = 0; i < numPks; i++) { + sqlite3_bind_value(tblInfo->pInsertKeyStmt, i + 1, pks[i]); } - reset_cached_stmt(tblInfo->pInsertKeyStmt); - return key; + return step_insert_key_stmt(db, tblInfo->pInsertKeyStmt); } sqlite3_int64 crsql_get_or_create_key(sqlite3 *db, crsql_TableInfo *tblInfo, @@ -896,19 +926,58 @@ sqlite3_int64 crsql_get_or_create_key(sqlite3 *db, crsql_TableInfo *tblInfo, return key; } +/** + * Same as crsql_get_or_create_key but takes unpacked ColumnValues directly, + * avoiding the temporary `SELECT ?,?,...` statement previously needed to + * convert them into sqlite3_value pointers. This is the merge hot path: it + * runs once per imported change. + */ +sqlite3_int64 crsql_get_or_create_key_packed(sqlite3 *db, + crsql_TableInfo *tblInfo, + crsql_ColumnValue *pks, + int numPks, char **errmsg) { + if (ensure_select_key_stmt(db, tblInfo) != SQLITE_OK) return -1; + + int rc = crsql_bind_package_to_stmt(tblInfo->pSelectKeyStmt, pks, numPks, 0); + if (rc != SQLITE_OK) { + reset_cached_stmt(tblInfo->pSelectKeyStmt); + return -1; + } + sqlite3_int64 key = step_key_stmt(tblInfo->pSelectKeyStmt); + if (key >= 0) { + return key; + } + + if (ensure_insert_key_stmt(db, tblInfo) != SQLITE_OK) return -1; + rc = crsql_bind_package_to_stmt(tblInfo->pInsertKeyStmt, pks, numPks, 0); + if (rc != SQLITE_OK) { + reset_cached_stmt(tblInfo->pInsertKeyStmt); + return -1; + } + key = step_insert_key_stmt(db, tblInfo->pInsertKeyStmt); + if (key < 0 && errmsg) { + *errmsg = sqlite3_mprintf("Failed to create key for table %s", + tblInfo->tblName); + } + return key; +} + sqlite3_int64 crsql_get_or_create_key_for_insert(sqlite3 *db, crsql_TableInfo *tblInfo, sqlite3_value **pks, int numPks, char **errmsg) { - // Try INSERT OR IGNORE RETURNING first + // Try INSERT OR IGNORE first if (!tblInfo->pInsertOrIgnoreReturningKeyStmt) { char *esc = crsql_escape_ident(tblInfo->tblName); char *pk_list = crsql_as_identifier_list(tblInfo->pks, tblInfo->pksLen, 0); char *pk_bindings = crsql_binding_list(tblInfo->pksLen); + // No RETURNING: whether the insert happened is read via + // sqlite3_changes64 and the new key via sqlite3_last_insert_rowid + // (__crsql_key is a rowid alias). RETURNING would force an ephemeral + // btree per execution. char *sql = sqlite3_mprintf( - "INSERT OR IGNORE INTO \"%s__crsql_pks\" (%s) VALUES (%s)" - " RETURNING __crsql_key", + "INSERT OR IGNORE INTO \"%s__crsql_pks\" (%s) VALUES (%s)", esc, pk_list, pk_bindings); sqlite3_free(esc); sqlite3_free(pk_list); @@ -926,15 +995,14 @@ sqlite3_int64 crsql_get_or_create_key_for_insert(sqlite3 *db, } int rc = sqlite3_step(tblInfo->pInsertOrIgnoreReturningKeyStmt); - if (rc == SQLITE_ROW) { + if (rc == SQLITE_DONE && sqlite3_changes64(db) > 0) { // Newly inserted - sqlite3_int64 key = - sqlite3_column_int64(tblInfo->pInsertOrIgnoreReturningKeyStmt, 0); + sqlite3_int64 key = sqlite3_last_insert_rowid(db); reset_cached_stmt(tblInfo->pInsertOrIgnoreReturningKeyStmt); return key; } - // Already existed (DONE = insert was ignored), fall back to select + // Already existed (insert was ignored), fall back to select reset_cached_stmt(tblInfo->pInsertOrIgnoreReturningKeyStmt); sqlite3_int64 key = crsql_get_key(db, tblInfo, pks, numPks); if (key < 0 && errmsg) { diff --git a/core/src/tableinfo.h b/core/src/tableinfo.h index 29fb39eb2..ff786ffe7 100644 --- a/core/src/tableinfo.h +++ b/core/src/tableinfo.h @@ -3,6 +3,7 @@ #include "crsqlite.h" #include "ext-data.h" +#include "pack-columns.h" typedef struct crsql_ColumnInfo crsql_ColumnInfo; struct crsql_ColumnInfo { @@ -81,6 +82,10 @@ sqlite3_int64 crsql_get_or_create_key_for_insert(sqlite3 *db, int numPks, char **errmsg); sqlite3_int64 crsql_get_key(sqlite3 *db, crsql_TableInfo *tblInfo, sqlite3_value **pks, int numPks); +sqlite3_int64 crsql_get_or_create_key_packed(sqlite3 *db, + crsql_TableInfo *tblInfo, + crsql_ColumnValue *pks, + int numPks, char **errmsg); // --- Lazy statement getters --- // These prepare statements on first call, cache for subsequent calls. diff --git a/core/test/perf/bench-import.sh b/core/test/perf/bench-import.sh new file mode 100755 index 000000000..8c5316e7d --- /dev/null +++ b/core/test/perf/bench-import.sh @@ -0,0 +1,57 @@ +#!/bin/bash +# Benchmark: import a crsql_changes diff into a fresh database. +# Usage: ./bench-import.sh [num_rows] [path-to-sqlite3-cli] +set -e + +ROWS="${1:-20000}" +SQLITE="${2:-$(dirname "$0")/../../build/sqlite3}" +DIR=$(mktemp -d) +trap 'rm -rf "$DIR"' EXIT + +# ---- 1. Build a source db with ROWS rows (4 data cols => 5 changes/row) ---- +"$SQLITE" "$DIR/source.db" >/dev/null <