From 8372816a343e3149bc7523c57c29177c9996dc91 Mon Sep 17 00:00:00 2001 From: Giovanni Petrantoni <7008900+sinkingsugar@users.noreply.github.com> Date: Fri, 8 May 2026 15:03:07 +0800 Subject: [PATCH] Implement crsql_tracked_peers auto-tracking The crsql_tracked_peers table has been created at extension init since forever but nothing read or wrote it. Wire it up so the merge path auto-records a per-peer (db_version, seq) RECEIVED watermark, and expose crsql_set_tracked_peer for explicit application use (e.g. SENT watermarks after a successful push). Implementation -------------- - consts.h: TRACKED_EVENT_RECEIVED / TRACKED_EVENT_SENT constants and UPSERT_TRACKED_PEER, a monotonic upsert that only advances the watermark via row-tuple comparison (excluded > current). - ext-data.h/.c: cache pUpsertTrackedPeerStmt as a persistent prepared statement, prepared once at connection open and finalized in both teardown paths. crsql_tracked_peers is created in crsql_init_peer_tracking_table earlier in the init order, so the prepare always succeeds. - changes-vtab-impl.c: track_peer_recv() helper called once at the top of merge_insert_impl after site_id validation. Skips NULL/short site_ids and self-writes (memcmp against pExtData->siteId). Done eagerly so no-op merges (older CL, losing cid, idempotent reapplies) still advance the watermark; rolls back with the surrounding txn on merge failure. Best-effort: tracking failures do not fail an otherwise successful merge. - crsqlite.c: register crsql_set_tracked_peer(site_id, version, seq, tag, event) bound to ExtData. Reuses the same monotonic upsert, so application calls cannot roll the watermark backwards either. Performance ----------- Per merged row: one 16-byte memcmp, one prepared-statement reset, five binds, one step doing a single PK probe on a tiny table (one row per peer). If this ever shows up in a profile the next-level optimization is an in-ExtData accumulator flushed once per peer at commit; the shape is intentionally amenable to that. Tests ----- - Adds testTrackedPeers covering: empty initial state, watermark written on RECEIVED merge with the correct site_id and (version, seq), idempotent re-sync (no advance), no self-row from local writes, and the monotonic semantics of crsql_set_tracked_peer (forwards wins, backwards no-op, equal stays put). - Note: existing tests in crsqlite.test.c put sqlite3_step and sqlite3_bind_value calls inside assert(), which become no-ops under -DNDEBUG (Release). The new test uses a CHECK() macro and a local sync helper to actually exercise the code in Release builds. Fixing the rest of the suite is left out of this PR. - All 155 py/correctness tests pass. gitignore --------- - .claude/ (local Claude Code state) - npm/*/*.{dylib,so,dll} (prebuilt platform binaries staged into npm packages by the publish workflow; not source). --- .gitignore | 8 ++ core/src/changes-vtab-impl.c | 37 ++++++ core/src/consts.h | 18 +++ core/src/crsqlite.c | 55 +++++++++ core/src/crsqlite.test.c | 223 +++++++++++++++++++++++++++++++++++ core/src/ext-data.c | 11 ++ core/src/ext-data.h | 4 + 7 files changed, 356 insertions(+) diff --git a/.gitignore b/.gitignore index 7fc9c0519..6e76a9833 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,11 @@ tsconfig.tsbuildinfo .turbo/ target/ core/build/ + +# Claude Code local state +.claude/ + +# Prebuilt platform binaries materialized into npm packages +npm/*/*.dylib +npm/*/*.so +npm/*/*.dll diff --git a/core/src/changes-vtab-impl.c b/core/src/changes-vtab-impl.c index 948a8bfa4..0989a5470 100644 --- a/core/src/changes-vtab-impl.c +++ b/core/src/changes-vtab-impl.c @@ -1069,6 +1069,35 @@ static int merge_insert_impl(sqlite3_vtab *vtab, int argc, sqlite3_value **argv, sqlite3_int64 *rowid, char **errmsg); +// Best-effort auto-update of crsql_tracked_peers for the RECEIVED watermark. +// Skips when the peer site_id is missing or matches the local site_id (so +// loop-back inserts produced by local triggers do not pollute the table). +// Failures are intentionally swallowed: tracking is an optimization for the +// puller and must not fail an otherwise successful merge. The cached +// statement uses a monotonic upsert (UPSERT_TRACKED_PEER) so concurrent +// out-of-order arrivals never roll the watermark backwards. +static void track_peer_recv(crsql_ExtData *pExtData, + const unsigned char *siteId, int siteIdLen, + sqlite3_int64 version, sqlite3_int64 seq) { + if (!siteId || siteIdLen != SITE_ID_LEN) return; + if (memcmp(siteId, pExtData->siteId, SITE_ID_LEN) == 0) return; + + sqlite3_stmt *s = pExtData->pUpsertTrackedPeerStmt; + if (!s) return; + sqlite3_reset(s); + if (sqlite3_bind_blob(s, 1, siteId, SITE_ID_LEN, SQLITE_TRANSIENT) != + SQLITE_OK || + sqlite3_bind_int64(s, 2, version) != SQLITE_OK || + sqlite3_bind_int64(s, 3, seq) != SQLITE_OK || + sqlite3_bind_int(s, 4, 0) != SQLITE_OK || + sqlite3_bind_int(s, 5, TRACKED_EVENT_RECEIVED) != SQLITE_OK) { + sqlite3_reset(s); + return; + } + (void)sqlite3_step(s); + sqlite3_reset(s); +} + int crsql_changes_update(sqlite3_vtab *pVTab, int argc, sqlite3_value **argv, sqlite3_int64 *pRowid) { if (argc > 1 && sqlite3_value_type(argv[0]) == SQLITE_NULL) { @@ -1137,6 +1166,14 @@ static int merge_insert_impl(sqlite3_vtab *vtab, int argc, (const unsigned char *)sqlite3_value_blob(insertSiteIdVal); int insertSiteIdLen = sqlite3_value_bytes(insertSiteIdVal); + // Auto-record the RECEIVED watermark for this peer. Done eagerly here so + // that no-op merges (older CL, losing cid, idempotent re-applies) still + // advance the puller's watermark — otherwise we'd keep refetching the + // same losing changes. Best-effort; rolls back with the surrounding txn + // if the merge ultimately fails. + track_peer_recv(tab->pExtData, insertSiteId, insertSiteIdLen, insertDbVrsn, + insertSeq); + // Find table info crsql_TableInfoVec *tblInfos = (crsql_TableInfoVec *)tab->pExtData->tableInfos; diff --git a/core/src/consts.h b/core/src/consts.h index d4044f85a..b6645e23a 100644 --- a/core/src/consts.h +++ b/core/src/consts.h @@ -37,6 +37,24 @@ #define ROW_TYPE_DELETE 1 #define ROW_TYPE_PKONLY 2 +// Tracked-peer event kinds. Recorded in crsql_tracked_peers.event. +// RECEIVED is auto-bumped by merge_insert_impl when a change is processed +// from a remote peer. SENT is reserved for application use (e.g. recording +// the watermark of changes the local node has shipped to a peer). +#define TRACKED_EVENT_RECEIVED 0 +#define TRACKED_EVENT_SENT 1 + +// Monotonic upsert into crsql_tracked_peers: only advances the watermark +// when the incoming (version, seq) is strictly greater than what is stored. +// Idempotent on equal values, safe under out-of-order arrival. +#define UPSERT_TRACKED_PEER \ + "INSERT INTO crsql_tracked_peers(site_id, version, seq, tag, event) " \ + "VALUES (?1, ?2, ?3, ?4, ?5) " \ + "ON CONFLICT(site_id, tag, event) DO UPDATE SET " \ + " version = excluded.version, seq = excluded.seq " \ + "WHERE (excluded.version, excluded.seq) > " \ + " (crsql_tracked_peers.version, crsql_tracked_peers.seq)" + // Version int: // MM.mm.pp.bb // 00 00 00 00 diff --git a/core/src/crsqlite.c b/core/src/crsqlite.c index 34f066799..26c0286fe 100644 --- a/core/src/crsqlite.c +++ b/core/src/crsqlite.c @@ -292,6 +292,55 @@ static void x_crsql_sync_bit(sqlite3_context *ctx, int argc, sqlite3_result_int(ctx, *syncBitPtr); } +// Monotonic upsert into crsql_tracked_peers. Mirrors what merge_insert_impl +// does for received changes, but lets the application record arbitrary +// (tag, event) watermarks — for example bumping TRACKED_EVENT_SENT (=1) +// after successfully shipping changes to a peer. The watermark only +// advances; an attempt to write a smaller (version, seq) is a no-op. +// +// Args: site_id BLOB(16), version INTEGER, seq INTEGER, tag INTEGER, +// event INTEGER. Returns NULL on success. +static void x_crsql_set_tracked_peer(sqlite3_context *ctx, int argc, + sqlite3_value **argv) { + if (argc != 5) { + sqlite3_result_error( + ctx, + "crsql_set_tracked_peer expects (site_id, version, seq, tag, event)", + -1); + return; + } + if (sqlite3_value_type(argv[0]) != SQLITE_BLOB || + sqlite3_value_bytes(argv[0]) != SITE_ID_LEN) { + sqlite3_result_error(ctx, "site_id must be a 16-byte BLOB", -1); + return; + } + crsql_ExtData *pExtData = (crsql_ExtData *)sqlite3_user_data(ctx); + sqlite3_stmt *s = pExtData->pUpsertTrackedPeerStmt; + if (!s) { + sqlite3_result_error(ctx, "tracked-peer statement not prepared", -1); + return; + } + sqlite3_reset(s); + int rc = sqlite3_bind_value(s, 1, argv[0]); + if (rc == SQLITE_OK) rc = sqlite3_bind_value(s, 2, argv[1]); + if (rc == SQLITE_OK) rc = sqlite3_bind_value(s, 3, argv[2]); + if (rc == SQLITE_OK) rc = sqlite3_bind_value(s, 4, argv[3]); + if (rc == SQLITE_OK) rc = sqlite3_bind_value(s, 5, argv[4]); + if (rc != SQLITE_OK) { + sqlite3_reset(s); + sqlite3_result_error(ctx, "failed to bind tracked-peer args", -1); + return; + } + rc = sqlite3_step(s); + sqlite3_reset(s); + if (rc != SQLITE_DONE && rc != SQLITE_ROW) { + sqlite3_result_error(ctx, sqlite3_errmsg(sqlite3_context_db_handle(ctx)), + -1); + return; + } + sqlite3_result_null(ctx); +} + static void x_crsql_sha(sqlite3_context *ctx, int argc, sqlite3_value **argv) { #ifdef CRSQLITE_COMMIT_SHA @@ -407,6 +456,12 @@ __declspec(dllexport) x_crsql_sha, 0, 0, 0); if (rc != SQLITE_OK) goto err_free_ext; + rc = sqlite3_create_function_v2( + db, "crsql_set_tracked_peer", 5, + SQLITE_UTF8 | SQLITE_DIRECTONLY, pExtData, + x_crsql_set_tracked_peer, 0, 0, 0); + if (rc != SQLITE_OK) goto err_free_ext; + rc = sqlite3_create_function_v2( db, "crsql_increment_and_get_seq", 0, SQLITE_UTF8 | SQLITE_INNOCUOUS, pExtData, diff --git a/core/src/crsqlite.test.c b/core/src/crsqlite.test.c index 03c322ab0..8775f9052 100644 --- a/core/src/crsqlite.test.c +++ b/core/src/crsqlite.test.c @@ -866,6 +866,228 @@ static void testModifyCompoundPK() { printf("\t\e[0;32mSuccess\e[0m\n"); } +// Hard check that survives -DNDEBUG (assert() is a no-op in release). +// Existing tests in this file embed sqlite3_step inside assert(), which +// makes them vacuous in release; the test suite as a whole inherits that +// limitation but this new test must not. +#define CHECK(cond) \ + do { \ + if (!(cond)) { \ + fprintf(stderr, "CHECK failed at %s:%d: %s\n", __FILE__, __LINE__, \ + #cond); \ + abort(); \ + } \ + } while (0) + +// Local sync helper that does not depend on assert(), unlike the file-wide +// syncLeftToRight which is a no-op under -DNDEBUG. +static int checkedSyncLeftToRight(sqlite3 *src, sqlite3 *dst) { + sqlite3_stmt *pSid = 0; + if (sqlite3_prepare_v2(dst, "SELECT crsql_site_id()", -1, &pSid, 0) != + SQLITE_OK) + return SQLITE_ERROR; + if (sqlite3_step(pSid) != SQLITE_ROW) { + sqlite3_finalize(pSid); + return SQLITE_ERROR; + } + sqlite3_stmt *pRead = 0; + if (sqlite3_prepare_v2( + src, + "SELECT * FROM crsql_changes WHERE site_id IS NOT ?", + -1, &pRead, 0) != SQLITE_OK) { + sqlite3_finalize(pSid); + return SQLITE_ERROR; + } + sqlite3_bind_value(pRead, 1, sqlite3_column_value(pSid, 0)); + + sqlite3_stmt *pWrite = 0; + if (sqlite3_prepare_v2( + dst, + "INSERT INTO crsql_changes VALUES (?,?,?,?,?,?,?,?,?)", -1, + &pWrite, 0) != SQLITE_OK) { + sqlite3_finalize(pRead); + sqlite3_finalize(pSid); + return SQLITE_ERROR; + } + + int rc; + while ((rc = sqlite3_step(pRead)) == SQLITE_ROW) { + for (int i = 0; i < 9; i++) { + if (sqlite3_bind_value(pWrite, i + 1, sqlite3_column_value(pRead, i)) != + SQLITE_OK) { + rc = SQLITE_ERROR; + goto done; + } + } + if (sqlite3_step(pWrite) != SQLITE_DONE) { + fprintf(stderr, "merge step failed: %s\n", sqlite3_errmsg(dst)); + rc = SQLITE_ERROR; + goto done; + } + sqlite3_reset(pWrite); + } + if (rc == SQLITE_DONE) rc = SQLITE_OK; +done: + sqlite3_finalize(pWrite); + sqlite3_finalize(pRead); + sqlite3_finalize(pSid); + return rc; +} + +// Verify the auto-tracking behavior of crsql_tracked_peers and that +// crsql_set_tracked_peer's monotonic upsert never rolls a watermark +// backwards. After db1 -> db2 sync, db2.crsql_tracked_peers must have +// exactly one row whose site_id matches db1, event = 0 (RECEIVED), and +// (version, seq) match the max db_version / seq actually merged. +static void testTrackedPeers() { + printf("trackedPeers\n"); + + sqlite3 *db1 = 0; + sqlite3 *db2 = 0; + sqlite3_stmt *pStmt = 0; + char *err = 0; + int rc; + + CHECK(sqlite3_open(":memory:", &db1) == SQLITE_OK); + CHECK(sqlite3_open(":memory:", &db2) == SQLITE_OK); + CHECK(createSimpleSchema(db1, &err) == SQLITE_OK); + CHECK(createSimpleSchema(db2, &err) == SQLITE_OK); + + // Sanity: each fresh db sees an empty peer-tracking table. + CHECK(sqlite3_prepare_v2(db2, "SELECT count(*) FROM crsql_tracked_peers", + -1, &pStmt, 0) == SQLITE_OK); + CHECK(sqlite3_step(pStmt) == SQLITE_ROW); + CHECK(sqlite3_column_int(pStmt, 0) == 0); + sqlite3_finalize(pStmt); + + CHECK(sqlite3_exec(db1, "insert into foo values (1, 'a');", 0, 0, &err) == + SQLITE_OK); + CHECK(sqlite3_exec(db1, "insert into foo values (2, 'b');", 0, 0, &err) == + SQLITE_OK); + + // Capture the max (db_version, seq) db1 produced; db2 must end up + // tracking exactly this watermark for db1 after the sync. + sqlite3_int64 expectedVersion = 0; + sqlite3_int64 expectedSeq = 0; + CHECK(sqlite3_prepare_v2( + db1, + "SELECT max(db_version), max(seq) FROM crsql_changes", + -1, &pStmt, 0) == SQLITE_OK); + CHECK(sqlite3_step(pStmt) == SQLITE_ROW); + expectedVersion = sqlite3_column_int64(pStmt, 0); + expectedSeq = sqlite3_column_int64(pStmt, 1); + sqlite3_finalize(pStmt); + CHECK(expectedVersion > 0); + + CHECK(checkedSyncLeftToRight(db1, db2) == SQLITE_OK); + + // db2 should now have exactly one peer row, for db1, with event=RECEIVED + // and the watermark we captured above. + CHECK(sqlite3_prepare_v2( + db2, + "SELECT site_id, version, seq, tag, event " + "FROM crsql_tracked_peers", + -1, &pStmt, 0) == SQLITE_OK); + CHECK(sqlite3_step(pStmt) == SQLITE_ROW); + + unsigned char trackedSiteId[16]; + CHECK(sqlite3_column_bytes(pStmt, 0) == 16); + memcpy(trackedSiteId, sqlite3_column_blob(pStmt, 0), 16); + CHECK(sqlite3_column_int64(pStmt, 1) == expectedVersion); + CHECK(sqlite3_column_int64(pStmt, 2) == expectedSeq); + CHECK(sqlite3_column_int(pStmt, 3) == 0); // tag + CHECK(sqlite3_column_int(pStmt, 4) == 0); // RECEIVED + + // No second row from this single sync. + CHECK(sqlite3_step(pStmt) == SQLITE_DONE); + sqlite3_finalize(pStmt); + + // Recorded site_id must be db1's, not db2's own. + CHECK(sqlite3_prepare_v2(db1, "SELECT crsql_site_id()", -1, &pStmt, 0) == + SQLITE_OK); + CHECK(sqlite3_step(pStmt) == SQLITE_ROW); + CHECK(sqlite3_column_bytes(pStmt, 0) == 16); + CHECK(memcmp(trackedSiteId, sqlite3_column_blob(pStmt, 0), 16) == 0); + sqlite3_finalize(pStmt); + + // Re-syncing the same changes must not advance the watermark (idempotent + // upsert with strict-greater guard). + CHECK(checkedSyncLeftToRight(db1, db2) == SQLITE_OK); + CHECK(sqlite3_prepare_v2(db2, + "SELECT version, seq FROM crsql_tracked_peers", + -1, &pStmt, 0) == SQLITE_OK); + CHECK(sqlite3_step(pStmt) == SQLITE_ROW); + CHECK(sqlite3_column_int64(pStmt, 0) == expectedVersion); + CHECK(sqlite3_column_int64(pStmt, 1) == expectedSeq); + sqlite3_finalize(pStmt); + + // Local writes on db2 must not produce a self-row. + CHECK(sqlite3_exec(db2, "insert into foo values (3, 'c');", 0, 0, &err) == + SQLITE_OK); + CHECK(sqlite3_prepare_v2( + db2, "SELECT count(*) FROM crsql_tracked_peers", -1, &pStmt, 0) == + SQLITE_OK); + CHECK(sqlite3_step(pStmt) == SQLITE_ROW); + CHECK(sqlite3_column_int(pStmt, 0) == 1); + sqlite3_finalize(pStmt); + + // crsql_set_tracked_peer writes a SENT watermark, then a backwards + // attempt is silently ignored (monotonic), then a forwards attempt wins. + rc = sqlite3_exec( + db2, + "SELECT crsql_set_tracked_peer(" + " (SELECT site_id FROM crsql_tracked_peers LIMIT 1), 100, 5, 0, 1)", + 0, 0, &err); + CHECK(rc == SQLITE_OK); + + CHECK(sqlite3_prepare_v2( + db2, + "SELECT version, seq FROM crsql_tracked_peers WHERE event = 1", + -1, &pStmt, 0) == SQLITE_OK); + CHECK(sqlite3_step(pStmt) == SQLITE_ROW); + CHECK(sqlite3_column_int64(pStmt, 0) == 100); + CHECK(sqlite3_column_int64(pStmt, 1) == 5); + sqlite3_finalize(pStmt); + + // Backwards write — should be a no-op. + rc = sqlite3_exec( + db2, + "SELECT crsql_set_tracked_peer(" + " (SELECT site_id FROM crsql_tracked_peers WHERE event = 1), " + " 50, 99, 0, 1)", + 0, 0, &err); + CHECK(rc == SQLITE_OK); + CHECK(sqlite3_prepare_v2( + db2, + "SELECT version, seq FROM crsql_tracked_peers WHERE event = 1", + -1, &pStmt, 0) == SQLITE_OK); + CHECK(sqlite3_step(pStmt) == SQLITE_ROW); + CHECK(sqlite3_column_int64(pStmt, 0) == 100); + CHECK(sqlite3_column_int64(pStmt, 1) == 5); + sqlite3_finalize(pStmt); + + // Forwards write — wins. + rc = sqlite3_exec( + db2, + "SELECT crsql_set_tracked_peer(" + " (SELECT site_id FROM crsql_tracked_peers WHERE event = 1), " + " 100, 6, 0, 1)", + 0, 0, &err); + CHECK(rc == SQLITE_OK); + CHECK(sqlite3_prepare_v2( + db2, + "SELECT version, seq FROM crsql_tracked_peers WHERE event = 1", + -1, &pStmt, 0) == SQLITE_OK); + CHECK(sqlite3_step(pStmt) == SQLITE_ROW); + CHECK(sqlite3_column_int64(pStmt, 0) == 100); + CHECK(sqlite3_column_int64(pStmt, 1) == 6); + sqlite3_finalize(pStmt); + + crsql_close(db1); + crsql_close(db2); + printf("\t\e[0;32mSuccess\e[0m\n"); +} + void crsqlTestSuite() { printf("\e[47m\e[1;30mSuite: crsql\e[0m\n"); @@ -880,4 +1102,5 @@ void crsqlTestSuite() { testRequiredPrimaryKey(); testModifySinglePK(); testModifyCompoundPK(); + testTrackedPeers(); } \ No newline at end of file diff --git a/core/src/ext-data.c b/core/src/ext-data.c index e603eddc2..2ee9c9cdc 100644 --- a/core/src/ext-data.c +++ b/core/src/ext-data.c @@ -45,6 +45,14 @@ crsql_ExtData *crsql_newExtData(sqlite3 *db, unsigned char *siteIdBuffer) { sqlite3_prepare_v3(db, CLOCK_TABLES_SELECT, -1, SQLITE_PREPARE_PERSISTENT, &(pExtData->pSelectClockTablesStmt), 0); + // crsql_tracked_peers is created at extension init in + // crsql_init_peer_tracking_table, before this function runs, so the + // statement compiles cleanly here. + pExtData->pUpsertTrackedPeerStmt = 0; + rc += sqlite3_prepare_v3(db, UPSERT_TRACKED_PEER, -1, + SQLITE_PREPARE_PERSISTENT, + &(pExtData->pUpsertTrackedPeerStmt), 0); + pExtData->dbVersion = -1; pExtData->pendingDbVersion = -1; pExtData->seq = 0; @@ -112,6 +120,7 @@ void crsql_freeExtData(crsql_ExtData *pExtData) { sqlite3_finalize(pExtData->pSetSiteIdOrdinalStmt); sqlite3_finalize(pExtData->pSelectSiteIdOrdinalStmt); sqlite3_finalize(pExtData->pSelectClockTablesStmt); + sqlite3_finalize(pExtData->pUpsertTrackedPeerStmt); crsql_clear_stmt_cache(pExtData); crsql_drop_table_info_vec(pExtData); sqlite3_free(pExtData); @@ -131,6 +140,7 @@ void crsql_finalize(crsql_ExtData *pExtData) { sqlite3_finalize(pExtData->pSetSiteIdOrdinalStmt); sqlite3_finalize(pExtData->pSelectSiteIdOrdinalStmt); sqlite3_finalize(pExtData->pSelectClockTablesStmt); + sqlite3_finalize(pExtData->pUpsertTrackedPeerStmt); crsql_clear_stmt_cache(pExtData); pExtData->pDbVersionStmt = 0; pExtData->pPragmaSchemaVersionStmt = 0; @@ -140,6 +150,7 @@ void crsql_finalize(crsql_ExtData *pExtData) { pExtData->pSetSiteIdOrdinalStmt = 0; pExtData->pSelectSiteIdOrdinalStmt = 0; pExtData->pSelectClockTablesStmt = 0; + pExtData->pUpsertTrackedPeerStmt = 0; } #define DB_VERSION_SCHEMA_VERSION 0 diff --git a/core/src/ext-data.h b/core/src/ext-data.h index c4ff801d0..20bef711c 100644 --- a/core/src/ext-data.h +++ b/core/src/ext-data.h @@ -42,6 +42,10 @@ struct crsql_ExtData { sqlite3_stmt *pSetSiteIdOrdinalStmt; sqlite3_stmt *pSelectSiteIdOrdinalStmt; sqlite3_stmt *pSelectClockTablesStmt; + // Monotonic upsert into crsql_tracked_peers. Used by the merge path to + // auto-record a per-peer (db_version, seq) watermark and by the + // crsql_set_tracked_peer SQL function for explicit application use. + sqlite3_stmt *pUpsertTrackedPeerStmt; int mergeEqualValues; };