Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,11 @@ tsconfig.tsbuildinfo
.turbo/
target/
core/build/

# Claude Code local state
.claude/

# Prebuilt platform binaries materialized into npm packages
npm/*/*.dylib
npm/*/*.so
npm/*/*.dll
37 changes: 37 additions & 0 deletions core/src/changes-vtab-impl.c
Original file line number Diff line number Diff line change
Expand Up @@ -1069,6 +1069,35 @@ static int merge_insert_impl(sqlite3_vtab *vtab, int argc,
sqlite3_value **argv, sqlite3_int64 *rowid,
char **errmsg);

// Best-effort auto-update of crsql_tracked_peers for the RECEIVED watermark.
// Skips when the peer site_id is missing or matches the local site_id (so
// loop-back inserts produced by local triggers do not pollute the table).
// Failures are intentionally swallowed: tracking is an optimization for the
// puller and must not fail an otherwise successful merge. The cached
// statement uses a monotonic upsert (UPSERT_TRACKED_PEER) so concurrent
// out-of-order arrivals never roll the watermark backwards.
static void track_peer_recv(crsql_ExtData *pExtData,
const unsigned char *siteId, int siteIdLen,
sqlite3_int64 version, sqlite3_int64 seq) {
if (!siteId || siteIdLen != SITE_ID_LEN) return;
if (memcmp(siteId, pExtData->siteId, SITE_ID_LEN) == 0) return;

sqlite3_stmt *s = pExtData->pUpsertTrackedPeerStmt;
if (!s) return;
sqlite3_reset(s);
if (sqlite3_bind_blob(s, 1, siteId, SITE_ID_LEN, SQLITE_TRANSIENT) !=
SQLITE_OK ||
sqlite3_bind_int64(s, 2, version) != SQLITE_OK ||
sqlite3_bind_int64(s, 3, seq) != SQLITE_OK ||
sqlite3_bind_int(s, 4, 0) != SQLITE_OK ||
sqlite3_bind_int(s, 5, TRACKED_EVENT_RECEIVED) != SQLITE_OK) {
sqlite3_reset(s);
return;
}
(void)sqlite3_step(s);
sqlite3_reset(s);
}

int crsql_changes_update(sqlite3_vtab *pVTab, int argc, sqlite3_value **argv,
sqlite3_int64 *pRowid) {
if (argc > 1 && sqlite3_value_type(argv[0]) == SQLITE_NULL) {
Expand Down Expand Up @@ -1137,6 +1166,14 @@ static int merge_insert_impl(sqlite3_vtab *vtab, int argc,
(const unsigned char *)sqlite3_value_blob(insertSiteIdVal);
int insertSiteIdLen = sqlite3_value_bytes(insertSiteIdVal);

// Auto-record the RECEIVED watermark for this peer. Done eagerly here so
// that no-op merges (older CL, losing cid, idempotent re-applies) still
// advance the puller's watermark — otherwise we'd keep refetching the
// same losing changes. Best-effort; rolls back with the surrounding txn
// if the merge ultimately fails.
track_peer_recv(tab->pExtData, insertSiteId, insertSiteIdLen, insertDbVrsn,
insertSeq);

// Find table info
crsql_TableInfoVec *tblInfos =
(crsql_TableInfoVec *)tab->pExtData->tableInfos;
Expand Down
18 changes: 18 additions & 0 deletions core/src/consts.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,24 @@
#define ROW_TYPE_DELETE 1
#define ROW_TYPE_PKONLY 2

// Tracked-peer event kinds. Recorded in crsql_tracked_peers.event.
// RECEIVED is auto-bumped by merge_insert_impl when a change is processed
// from a remote peer. SENT is reserved for application use (e.g. recording
// the watermark of changes the local node has shipped to a peer).
#define TRACKED_EVENT_RECEIVED 0
#define TRACKED_EVENT_SENT 1

// Monotonic upsert into crsql_tracked_peers: only advances the watermark
// when the incoming (version, seq) is strictly greater than what is stored.
// Idempotent on equal values, safe under out-of-order arrival.
#define UPSERT_TRACKED_PEER \
"INSERT INTO crsql_tracked_peers(site_id, version, seq, tag, event) " \
"VALUES (?1, ?2, ?3, ?4, ?5) " \
"ON CONFLICT(site_id, tag, event) DO UPDATE SET " \
" version = excluded.version, seq = excluded.seq " \
"WHERE (excluded.version, excluded.seq) > " \
" (crsql_tracked_peers.version, crsql_tracked_peers.seq)"

// Version int:
// MM.mm.pp.bb
// 00 00 00 00
Expand Down
55 changes: 55 additions & 0 deletions core/src/crsqlite.c
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,55 @@ static void x_crsql_sync_bit(sqlite3_context *ctx, int argc,
sqlite3_result_int(ctx, *syncBitPtr);
}

// Monotonic upsert into crsql_tracked_peers. Mirrors what merge_insert_impl
// does for received changes, but lets the application record arbitrary
// (tag, event) watermarks — for example bumping TRACKED_EVENT_SENT (=1)
// after successfully shipping changes to a peer. The watermark only
// advances; an attempt to write a smaller (version, seq) is a no-op.
//
// Args: site_id BLOB(16), version INTEGER, seq INTEGER, tag INTEGER,
// event INTEGER. Returns NULL on success.
static void x_crsql_set_tracked_peer(sqlite3_context *ctx, int argc,
sqlite3_value **argv) {
if (argc != 5) {
sqlite3_result_error(
ctx,
"crsql_set_tracked_peer expects (site_id, version, seq, tag, event)",
-1);
return;
}
if (sqlite3_value_type(argv[0]) != SQLITE_BLOB ||
sqlite3_value_bytes(argv[0]) != SITE_ID_LEN) {
sqlite3_result_error(ctx, "site_id must be a 16-byte BLOB", -1);
return;
}
crsql_ExtData *pExtData = (crsql_ExtData *)sqlite3_user_data(ctx);
sqlite3_stmt *s = pExtData->pUpsertTrackedPeerStmt;
if (!s) {
sqlite3_result_error(ctx, "tracked-peer statement not prepared", -1);
return;
}
sqlite3_reset(s);
int rc = sqlite3_bind_value(s, 1, argv[0]);
if (rc == SQLITE_OK) rc = sqlite3_bind_value(s, 2, argv[1]);
if (rc == SQLITE_OK) rc = sqlite3_bind_value(s, 3, argv[2]);
if (rc == SQLITE_OK) rc = sqlite3_bind_value(s, 4, argv[3]);
if (rc == SQLITE_OK) rc = sqlite3_bind_value(s, 5, argv[4]);
if (rc != SQLITE_OK) {
sqlite3_reset(s);
sqlite3_result_error(ctx, "failed to bind tracked-peer args", -1);
return;
}
rc = sqlite3_step(s);
sqlite3_reset(s);
if (rc != SQLITE_DONE && rc != SQLITE_ROW) {
sqlite3_result_error(ctx, sqlite3_errmsg(sqlite3_context_db_handle(ctx)),
-1);
return;
}
sqlite3_result_null(ctx);
}

static void x_crsql_sha(sqlite3_context *ctx, int argc,
sqlite3_value **argv) {
#ifdef CRSQLITE_COMMIT_SHA
Expand Down Expand Up @@ -407,6 +456,12 @@ __declspec(dllexport)
x_crsql_sha, 0, 0, 0);
if (rc != SQLITE_OK) goto err_free_ext;

rc = sqlite3_create_function_v2(
db, "crsql_set_tracked_peer", 5,
SQLITE_UTF8 | SQLITE_DIRECTONLY, pExtData,
x_crsql_set_tracked_peer, 0, 0, 0);
if (rc != SQLITE_OK) goto err_free_ext;

rc = sqlite3_create_function_v2(
db, "crsql_increment_and_get_seq", 0,
SQLITE_UTF8 | SQLITE_INNOCUOUS, pExtData,
Expand Down
223 changes: 223 additions & 0 deletions core/src/crsqlite.test.c
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,228 @@ static void testModifyCompoundPK() {
printf("\t\e[0;32mSuccess\e[0m\n");
}

// Hard check that survives -DNDEBUG (assert() is a no-op in release).
// Existing tests in this file embed sqlite3_step inside assert(), which
// makes them vacuous in release; the test suite as a whole inherits that
// limitation but this new test must not.
#define CHECK(cond) \
do { \
if (!(cond)) { \
fprintf(stderr, "CHECK failed at %s:%d: %s\n", __FILE__, __LINE__, \
#cond); \
abort(); \
} \
} while (0)

// Local sync helper that does not depend on assert(), unlike the file-wide
// syncLeftToRight which is a no-op under -DNDEBUG.
static int checkedSyncLeftToRight(sqlite3 *src, sqlite3 *dst) {
sqlite3_stmt *pSid = 0;
if (sqlite3_prepare_v2(dst, "SELECT crsql_site_id()", -1, &pSid, 0) !=
SQLITE_OK)
return SQLITE_ERROR;
if (sqlite3_step(pSid) != SQLITE_ROW) {
sqlite3_finalize(pSid);
return SQLITE_ERROR;
}
sqlite3_stmt *pRead = 0;
if (sqlite3_prepare_v2(
src,
"SELECT * FROM crsql_changes WHERE site_id IS NOT ?",
-1, &pRead, 0) != SQLITE_OK) {
sqlite3_finalize(pSid);
return SQLITE_ERROR;
}
sqlite3_bind_value(pRead, 1, sqlite3_column_value(pSid, 0));

sqlite3_stmt *pWrite = 0;
if (sqlite3_prepare_v2(
dst,
"INSERT INTO crsql_changes VALUES (?,?,?,?,?,?,?,?,?)", -1,
&pWrite, 0) != SQLITE_OK) {
sqlite3_finalize(pRead);
sqlite3_finalize(pSid);
return SQLITE_ERROR;
}

int rc;
while ((rc = sqlite3_step(pRead)) == SQLITE_ROW) {
for (int i = 0; i < 9; i++) {
if (sqlite3_bind_value(pWrite, i + 1, sqlite3_column_value(pRead, i)) !=
SQLITE_OK) {
rc = SQLITE_ERROR;
goto done;
}
}
if (sqlite3_step(pWrite) != SQLITE_DONE) {
fprintf(stderr, "merge step failed: %s\n", sqlite3_errmsg(dst));
rc = SQLITE_ERROR;
goto done;
}
sqlite3_reset(pWrite);
}
if (rc == SQLITE_DONE) rc = SQLITE_OK;
done:
sqlite3_finalize(pWrite);
sqlite3_finalize(pRead);
sqlite3_finalize(pSid);
return rc;
}

// Verify the auto-tracking behavior of crsql_tracked_peers and that
// crsql_set_tracked_peer's monotonic upsert never rolls a watermark
// backwards. After db1 -> db2 sync, db2.crsql_tracked_peers must have
// exactly one row whose site_id matches db1, event = 0 (RECEIVED), and
// (version, seq) match the max db_version / seq actually merged.
static void testTrackedPeers() {
printf("trackedPeers\n");

sqlite3 *db1 = 0;
sqlite3 *db2 = 0;
sqlite3_stmt *pStmt = 0;
char *err = 0;
int rc;

CHECK(sqlite3_open(":memory:", &db1) == SQLITE_OK);
CHECK(sqlite3_open(":memory:", &db2) == SQLITE_OK);
CHECK(createSimpleSchema(db1, &err) == SQLITE_OK);
CHECK(createSimpleSchema(db2, &err) == SQLITE_OK);

// Sanity: each fresh db sees an empty peer-tracking table.
CHECK(sqlite3_prepare_v2(db2, "SELECT count(*) FROM crsql_tracked_peers",
-1, &pStmt, 0) == SQLITE_OK);
CHECK(sqlite3_step(pStmt) == SQLITE_ROW);
CHECK(sqlite3_column_int(pStmt, 0) == 0);
sqlite3_finalize(pStmt);

CHECK(sqlite3_exec(db1, "insert into foo values (1, 'a');", 0, 0, &err) ==
SQLITE_OK);
CHECK(sqlite3_exec(db1, "insert into foo values (2, 'b');", 0, 0, &err) ==
SQLITE_OK);

// Capture the max (db_version, seq) db1 produced; db2 must end up
// tracking exactly this watermark for db1 after the sync.
sqlite3_int64 expectedVersion = 0;
sqlite3_int64 expectedSeq = 0;
CHECK(sqlite3_prepare_v2(
db1,
"SELECT max(db_version), max(seq) FROM crsql_changes",
-1, &pStmt, 0) == SQLITE_OK);
CHECK(sqlite3_step(pStmt) == SQLITE_ROW);
expectedVersion = sqlite3_column_int64(pStmt, 0);
expectedSeq = sqlite3_column_int64(pStmt, 1);
sqlite3_finalize(pStmt);
CHECK(expectedVersion > 0);

CHECK(checkedSyncLeftToRight(db1, db2) == SQLITE_OK);

// db2 should now have exactly one peer row, for db1, with event=RECEIVED
// and the watermark we captured above.
CHECK(sqlite3_prepare_v2(
db2,
"SELECT site_id, version, seq, tag, event "
"FROM crsql_tracked_peers",
-1, &pStmt, 0) == SQLITE_OK);
CHECK(sqlite3_step(pStmt) == SQLITE_ROW);

unsigned char trackedSiteId[16];
CHECK(sqlite3_column_bytes(pStmt, 0) == 16);
memcpy(trackedSiteId, sqlite3_column_blob(pStmt, 0), 16);
CHECK(sqlite3_column_int64(pStmt, 1) == expectedVersion);
CHECK(sqlite3_column_int64(pStmt, 2) == expectedSeq);
CHECK(sqlite3_column_int(pStmt, 3) == 0); // tag
CHECK(sqlite3_column_int(pStmt, 4) == 0); // RECEIVED

// No second row from this single sync.
CHECK(sqlite3_step(pStmt) == SQLITE_DONE);
sqlite3_finalize(pStmt);

// Recorded site_id must be db1's, not db2's own.
CHECK(sqlite3_prepare_v2(db1, "SELECT crsql_site_id()", -1, &pStmt, 0) ==
SQLITE_OK);
CHECK(sqlite3_step(pStmt) == SQLITE_ROW);
CHECK(sqlite3_column_bytes(pStmt, 0) == 16);
CHECK(memcmp(trackedSiteId, sqlite3_column_blob(pStmt, 0), 16) == 0);
sqlite3_finalize(pStmt);

// Re-syncing the same changes must not advance the watermark (idempotent
// upsert with strict-greater guard).
CHECK(checkedSyncLeftToRight(db1, db2) == SQLITE_OK);
CHECK(sqlite3_prepare_v2(db2,
"SELECT version, seq FROM crsql_tracked_peers",
-1, &pStmt, 0) == SQLITE_OK);
CHECK(sqlite3_step(pStmt) == SQLITE_ROW);
CHECK(sqlite3_column_int64(pStmt, 0) == expectedVersion);
CHECK(sqlite3_column_int64(pStmt, 1) == expectedSeq);
sqlite3_finalize(pStmt);

// Local writes on db2 must not produce a self-row.
CHECK(sqlite3_exec(db2, "insert into foo values (3, 'c');", 0, 0, &err) ==
SQLITE_OK);
CHECK(sqlite3_prepare_v2(
db2, "SELECT count(*) FROM crsql_tracked_peers", -1, &pStmt, 0) ==
SQLITE_OK);
CHECK(sqlite3_step(pStmt) == SQLITE_ROW);
CHECK(sqlite3_column_int(pStmt, 0) == 1);
sqlite3_finalize(pStmt);

// crsql_set_tracked_peer writes a SENT watermark, then a backwards
// attempt is silently ignored (monotonic), then a forwards attempt wins.
rc = sqlite3_exec(
db2,
"SELECT crsql_set_tracked_peer("
" (SELECT site_id FROM crsql_tracked_peers LIMIT 1), 100, 5, 0, 1)",
0, 0, &err);
CHECK(rc == SQLITE_OK);

CHECK(sqlite3_prepare_v2(
db2,
"SELECT version, seq FROM crsql_tracked_peers WHERE event = 1",
-1, &pStmt, 0) == SQLITE_OK);
CHECK(sqlite3_step(pStmt) == SQLITE_ROW);
CHECK(sqlite3_column_int64(pStmt, 0) == 100);
CHECK(sqlite3_column_int64(pStmt, 1) == 5);
sqlite3_finalize(pStmt);

// Backwards write — should be a no-op.
rc = sqlite3_exec(
db2,
"SELECT crsql_set_tracked_peer("
" (SELECT site_id FROM crsql_tracked_peers WHERE event = 1), "
" 50, 99, 0, 1)",
0, 0, &err);
CHECK(rc == SQLITE_OK);
CHECK(sqlite3_prepare_v2(
db2,
"SELECT version, seq FROM crsql_tracked_peers WHERE event = 1",
-1, &pStmt, 0) == SQLITE_OK);
CHECK(sqlite3_step(pStmt) == SQLITE_ROW);
CHECK(sqlite3_column_int64(pStmt, 0) == 100);
CHECK(sqlite3_column_int64(pStmt, 1) == 5);
sqlite3_finalize(pStmt);

// Forwards write — wins.
rc = sqlite3_exec(
db2,
"SELECT crsql_set_tracked_peer("
" (SELECT site_id FROM crsql_tracked_peers WHERE event = 1), "
" 100, 6, 0, 1)",
0, 0, &err);
CHECK(rc == SQLITE_OK);
CHECK(sqlite3_prepare_v2(
db2,
"SELECT version, seq FROM crsql_tracked_peers WHERE event = 1",
-1, &pStmt, 0) == SQLITE_OK);
CHECK(sqlite3_step(pStmt) == SQLITE_ROW);
CHECK(sqlite3_column_int64(pStmt, 0) == 100);
CHECK(sqlite3_column_int64(pStmt, 1) == 6);
sqlite3_finalize(pStmt);

crsql_close(db1);
crsql_close(db2);
printf("\t\e[0;32mSuccess\e[0m\n");
}

void crsqlTestSuite() {
printf("\e[47m\e[1;30mSuite: crsql\e[0m\n");

Expand All @@ -880,4 +1102,5 @@ void crsqlTestSuite() {
testRequiredPrimaryKey();
testModifySinglePK();
testModifyCompoundPK();
testTrackedPeers();
}
Loading
Loading