From 1afc5729c5af96007aab396459c7ce5fe85ec9ec Mon Sep 17 00:00:00 2001 From: Arda Nakisci Date: Wed, 1 Jul 2026 11:40:57 +0300 Subject: [PATCH] Use sqlite-web transaction local DB API --- .../commands/local_db/pipeline/bootstrap.rs | 102 +++-- crates/common/src/local_db/executor.rs | 20 +- .../local_db/pipeline/adapters/bootstrap.rs | 101 +++-- .../src/local_db/query/clear_tables/mod.rs | 33 +- .../src/local_db/query/create_tables/mod.rs | 6 +- .../src/local_db/query/create_views/mod.rs | 40 +- crates/common/src/local_db/query/mod.rs | 2 + .../common/src/local_db/query/sql_script.rs | 159 +++++++ .../src/local_db/query/sql_statement_batch.rs | 8 + .../src/raindex_client/local_db/executor.rs | 425 +++++++++--------- .../common/src/raindex_client/local_db/mod.rs | 35 +- .../src/raindex_client/local_db/orders.rs | 19 +- .../local_db/pipeline/bootstrap.rs | 110 ++--- .../pipeline/runner/scheduler/wasm.rs | 16 +- .../local_db/query/clear_tables.rs | 76 +++- .../local_db/query/create_tables.rs | 58 ++- .../src/raindex_client/local_db/vaults.rs | 112 +++-- crates/common/src/raindex_client/mod.rs | 247 ++++++++-- .../common/src/raindex_client/raindex_yaml.rs | 32 +- .../src/raindex_client/trades/get_by_owner.rs | 11 +- .../src/raindex_client/trades/get_by_tx.rs | 7 +- .../common/src/raindex_client/trades/mod.rs | 7 +- crates/common/src/raindex_client/vaults.rs | 19 +- crates/js_api/src/registry.rs | 29 +- package-lock.json | 8 +- package.json | 2 +- packages/raindex/ARCHITECTURE.md | 6 +- packages/raindex/README.md | 21 +- packages/webapp/ARCHITECTURE.md | 7 +- packages/webapp/src/routes/+layout.ts | 18 +- 30 files changed, 1143 insertions(+), 593 deletions(-) create mode 100644 crates/common/src/local_db/query/sql_script.rs diff --git a/crates/cli/src/commands/local_db/pipeline/bootstrap.rs b/crates/cli/src/commands/local_db/pipeline/bootstrap.rs index dc7bf8daa7..5983291b26 100644 --- a/crates/cli/src/commands/local_db/pipeline/bootstrap.rs +++ b/crates/cli/src/commands/local_db/pipeline/bootstrap.rs @@ -45,8 +45,8 @@ mod tests { use alloy::primitives::Address; use async_trait::async_trait; use raindex_app_settings::local_db_manifest::DB_SCHEMA_VERSION; - use raindex_common::local_db::query::clear_tables::clear_tables_stmt; - use raindex_common::local_db::query::create_tables::create_tables_stmt; + use raindex_common::local_db::query::clear_tables::{clear_tables_batch, vacuum_stmt}; + use raindex_common::local_db::query::create_tables::create_tables_batch; use raindex_common::local_db::query::insert_db_metadata::insert_db_metadata_stmt; use raindex_common::local_db::query::{ FromDbJson, LocalDbQueryError, LocalDbQueryExecutor, SqlStatement, SqlStatementBatch, @@ -78,6 +78,48 @@ mod tests { .iter() .fold(self, |db, stmt| db.with_text(stmt, "ok")) } + + fn with_batch(self, batch: &SqlStatementBatch) -> Self { + batch + .statements() + .iter() + .fold(self, |db, stmt| db.with_text(stmt, "ok")) + } + + fn with_reset_batches(self) -> Self { + self.with_batch(&clear_tables_batch()) + .with_text(&vacuum_stmt(), "ok") + .with_batch(&create_tables_batch()) + } + } + + fn reset_batch_sqls() -> Vec { + let mut statements = clear_tables_batch().statements().to_vec(); + statements.push(vacuum_stmt()); + statements.extend(create_tables_batch().statements().iter().cloned()); + statements + .iter() + .map(|stmt| stmt.sql().to_string()) + .collect() + } + + fn assert_reset_batches_were_called(calls: &[String]) { + for sql in reset_batch_sqls() { + assert!(calls.contains(&sql), "missing reset SQL: {sql}"); + } + } + + fn first_reset_sql() -> String { + clear_tables_batch().statements()[0].sql().to_string() + } + + fn last_reset_sql() -> String { + create_tables_batch() + .statements() + .last() + .unwrap() + .sql() + .to_string() } #[cfg_attr(target_family = "wasm", async_trait(?Send))] @@ -119,8 +161,7 @@ mod tests { async fn engine_run_resets_and_does_not_import_when_no_dump() { let adapter = ProducerBootstrapAdapter::new(); let db = MockDb::default() - .with_text(&clear_tables_stmt(), "ok") - .with_text(&create_tables_stmt(), "ok") + .with_reset_batches() .with_text(&insert_db_metadata_stmt(DB_SCHEMA_VERSION), "ok") .with_views(); @@ -135,27 +176,23 @@ mod tests { adapter.engine_run(&db, &cfg).await.unwrap(); let calls = db.calls(); - // Presence assertions - let clear = clear_tables_stmt().sql().to_string(); - let create = create_tables_stmt().sql().to_string(); + let reset_start = first_reset_sql(); + let reset_end = last_reset_sql(); let insert = insert_db_metadata_stmt(DB_SCHEMA_VERSION).sql().to_string(); - assert!(calls.contains(&clear)); - assert!(calls.contains(&create)); + assert_reset_batches_were_called(&calls); assert!(calls.contains(&insert)); - // Ordering: clear -> create -> insert let idx = |s: &String| calls.iter().position(|c| c == s).unwrap(); - assert!(idx(&clear) < idx(&create)); - assert!(idx(&create) < idx(&insert)); + assert!(idx(&reset_start) < idx(&reset_end)); + assert!(idx(&reset_end) < idx(&insert)); } #[tokio::test] async fn engine_run_executes_view_creation() { let adapter = ProducerBootstrapAdapter::new(); let db = MockDb::default() - .with_text(&clear_tables_stmt(), "ok") - .with_text(&create_tables_stmt(), "ok") + .with_reset_batches() .with_text(&insert_db_metadata_stmt(DB_SCHEMA_VERSION), "ok") .with_views(); @@ -187,8 +224,7 @@ mod tests { let adapter = ProducerBootstrapAdapter::new(); let dump_stmt = SqlStatement::new("--dump-sql"); let db = MockDb::default() - .with_text(&clear_tables_stmt(), "ok") - .with_text(&create_tables_stmt(), "ok") + .with_reset_batches() .with_text(&insert_db_metadata_stmt(DB_SCHEMA_VERSION), "ok") .with_text(&dump_stmt, "ok") .with_views(); @@ -204,21 +240,18 @@ mod tests { adapter.engine_run(&db, &cfg).await.unwrap(); let calls = db.calls(); - // Presence assertions - let clear = clear_tables_stmt().sql().to_string(); - let create = create_tables_stmt().sql().to_string(); + let reset_start = first_reset_sql(); + let reset_end = last_reset_sql(); let insert = insert_db_metadata_stmt(DB_SCHEMA_VERSION).sql().to_string(); let dump = dump_stmt.sql().to_string(); - assert!(calls.contains(&clear)); - assert!(calls.contains(&create)); + assert_reset_batches_were_called(&calls); assert!(calls.contains(&insert)); assert!(calls.contains(&dump)); - // Ordering: clear -> create -> insert -> dump let idx = |s: &String| calls.iter().position(|c| c == s).unwrap(); - assert!(idx(&clear) < idx(&create)); - assert!(idx(&create) < idx(&insert)); + assert!(idx(&reset_start) < idx(&reset_end)); + assert!(idx(&reset_end) < idx(&insert)); assert!(idx(&insert) < idx(&dump)); } @@ -227,8 +260,7 @@ mod tests { let adapter = ProducerBootstrapAdapter::new(); let dump_stmt = SqlStatement::new("--dump-sql-missing"); let db = MockDb::default() - .with_text(&clear_tables_stmt(), "ok") - .with_text(&create_tables_stmt(), "ok") + .with_reset_batches() .with_text(&insert_db_metadata_stmt(DB_SCHEMA_VERSION), "ok") .with_views(); @@ -245,20 +277,18 @@ mod tests { assert!(result.is_err()); let calls = db.calls(); - let clear = clear_tables_stmt().sql().to_string(); - let create = create_tables_stmt().sql().to_string(); + let reset_start = first_reset_sql(); + let reset_end = last_reset_sql(); let insert = insert_db_metadata_stmt(DB_SCHEMA_VERSION).sql().to_string(); let dump = dump_stmt.sql().to_string(); - assert!(calls.contains(&clear)); - assert!(calls.contains(&create)); + assert_reset_batches_were_called(&calls); assert!(calls.contains(&insert)); assert!(calls.contains(&dump)); - // Ordering: clear -> create -> insert -> dump (dump last attempted and fails) let idx = |s: &String| calls.iter().position(|c| c == s).unwrap(); - assert!(idx(&clear) < idx(&create)); - assert!(idx(&create) < idx(&insert)); + assert!(idx(&reset_start) < idx(&reset_end)); + assert!(idx(&reset_end) < idx(&insert)); assert!(idx(&insert) < idx(&dump)); } @@ -266,7 +296,7 @@ mod tests { async fn engine_run_propagates_reset_error() { let adapter = ProducerBootstrapAdapter::new(); let db = MockDb::default() - .with_text(&clear_tables_stmt(), "ok") + .with_batch(&clear_tables_batch()) .with_views(); let cfg = BootstrapConfig { @@ -284,8 +314,8 @@ mod tests { } let calls = db.calls(); - assert!(calls.contains(&clear_tables_stmt().sql().to_string())); - assert!(calls.contains(&create_tables_stmt().sql().to_string())); + assert_eq!(calls.len(), clear_tables_batch().len() + 1); + assert_eq!(calls.last().unwrap(), vacuum_stmt().sql()); } #[tokio::test] diff --git a/crates/common/src/local_db/executor.rs b/crates/common/src/local_db/executor.rs index ce047e9724..bb017fb755 100644 --- a/crates/common/src/local_db/executor.rs +++ b/crates/common/src/local_db/executor.rs @@ -295,7 +295,7 @@ fn sqlite_file_paths(db_path: &Path) -> Vec { #[cfg(test)] mod tests { use super::*; - use crate::local_db::query::create_tables::create_tables_stmt; + use crate::local_db::query::create_tables::{create_tables_batch, create_tables_sql}; use tempfile::TempDir; #[test] @@ -309,7 +309,7 @@ mod tests { #[test] fn stamped_db_passes_schema_guard() { let conn = Connection::open_in_memory().unwrap(); - conn.execute_batch(create_tables_stmt().sql()) + conn.execute_batch(create_tables_sql()) .expect("create tables stamps the header"); verify_schema_guard(&conn).expect("correctly stamped db should pass the guard"); } @@ -317,7 +317,7 @@ mod tests { #[test] fn wrong_application_id_is_detected() { let conn = Connection::open_in_memory().unwrap(); - conn.execute_batch(create_tables_stmt().sql()) + conn.execute_batch(create_tables_sql()) .expect("create tables stamps the header"); // Re-stamp the header with a foreign application_id (a SQLite file that // is not a raindex local-db) while leaving user_version valid. @@ -337,7 +337,7 @@ mod tests { #[test] fn negative_application_id_is_detected() { let conn = Connection::open_in_memory().unwrap(); - conn.execute_batch(create_tables_stmt().sql()) + conn.execute_batch(create_tables_sql()) .expect("create tables stamps the header"); // A foreign file whose application_id has the top bit set reads back as // a negative i32. It is neither zero (unstamped) nor the raindex magic, @@ -358,7 +358,7 @@ mod tests { #[test] fn wrong_user_version_is_detected() { let conn = Connection::open_in_memory().unwrap(); - conn.execute_batch(create_tables_stmt().sql()) + conn.execute_batch(create_tables_sql()) .expect("create tables stamps the header"); // Keep the raindex application_id but advance user_version to a stale / // future schema number. @@ -378,7 +378,7 @@ mod tests { #[test] fn stale_user_version_is_detected() { let conn = Connection::open_in_memory().unwrap(); - conn.execute_batch(create_tables_stmt().sql()) + conn.execute_batch(create_tables_sql()) .expect("create tables stamps the header"); // Keep the raindex application_id but roll user_version back to an older // schema number. A stale (lower) version must be rejected just like a @@ -405,7 +405,7 @@ mod tests { // in the file header so a fresh open must reject it. { let conn = Connection::open(&db_path).unwrap(); - conn.execute_batch(create_tables_stmt().sql()).unwrap(); + conn.execute_batch(create_tables_sql()).unwrap(); conn.pragma_update(None, "application_id", RAINDEX_APPLICATION_ID + 7) .unwrap(); } @@ -428,7 +428,7 @@ mod tests { { let conn = Connection::open(&db_path).unwrap(); - conn.execute_batch(create_tables_stmt().sql()).unwrap(); + conn.execute_batch(create_tables_sql()).unwrap(); conn.pragma_update(None, "user_version", DB_SCHEMA_VERSION as i32 + 1) .unwrap(); } @@ -451,7 +451,7 @@ mod tests { let exec = RusqliteExecutor::new(&db_path); // First open hits an empty file (application_id == 0) and stamps it. - exec.query_text(&create_tables_stmt()).await.unwrap(); + exec.execute_batch(&create_tables_batch()).await.unwrap(); // A subsequent open must read back the stamped header and succeed. #[derive(serde::Deserialize)] @@ -477,7 +477,7 @@ mod tests { let db_path = temp_dir.path().join("dump.db"); let exec = RusqliteExecutor::new(&db_path); - exec.query_text(&create_tables_stmt()).await.unwrap(); + exec.execute_batch(&create_tables_batch()).await.unwrap(); // Apply a data-only insert batch (the shape produced by export_data_only). let mut batch = SqlStatementBatch::new(); diff --git a/crates/common/src/local_db/pipeline/adapters/bootstrap.rs b/crates/common/src/local_db/pipeline/adapters/bootstrap.rs index 53def3a0d4..e4a8f3b80f 100644 --- a/crates/common/src/local_db/pipeline/adapters/bootstrap.rs +++ b/crates/common/src/local_db/pipeline/adapters/bootstrap.rs @@ -1,6 +1,6 @@ use crate::local_db::query::clear_raindex_data::clear_raindex_data_batch; -use crate::local_db::query::clear_tables::clear_tables_stmt; -use crate::local_db::query::create_tables::create_tables_stmt; +use crate::local_db::query::clear_tables::{clear_tables_batch, vacuum_stmt}; +use crate::local_db::query::create_tables::create_tables_batch; use crate::local_db::query::create_tables::REQUIRED_TABLES; use crate::local_db::query::create_views::create_views_batch; use crate::local_db::query::fetch_db_metadata::{fetch_db_metadata_stmt, DbMetadataRow}; @@ -109,8 +109,9 @@ pub trait BootstrapPipeline { where DB: LocalDbQueryExecutor + ?Sized, { - db.query_text(&clear_tables_stmt()).await?; - db.query_text(&create_tables_stmt()).await?; + db.execute_batch(&clear_tables_batch()).await?; + db.query_text(&vacuum_stmt()).await?; + db.execute_batch(&create_tables_batch()).await?; db.query_text(&insert_db_metadata_stmt( db_schema_version.unwrap_or(DB_SCHEMA_VERSION), )) @@ -166,6 +167,8 @@ mod tests { use std::sync::Mutex; use super::*; + use crate::local_db::query::clear_tables::{clear_tables_batch, vacuum_stmt}; + use crate::local_db::query::create_tables::create_tables_batch; use crate::local_db::query::create_views::create_views_batch; use crate::local_db::query::fetch_db_metadata::{fetch_db_metadata_stmt, DbMetadataRow}; use crate::local_db::query::fetch_tables::{fetch_tables_stmt, TableResponse}; @@ -198,11 +201,27 @@ mod tests { .iter() .fold(self, |db, stmt| db.with_text(stmt, "ok")) } + fn with_batch(self, batch: &SqlStatementBatch) -> Self { + batch + .statements() + .iter() + .fold(self, |db, stmt| db.with_text(stmt, "ok")) + } fn calls(&self) -> Vec { self.calls_text.lock().unwrap().clone() } } + fn reset_prefix_sql() -> Vec { + let mut statements = clear_tables_batch().statements().to_vec(); + statements.push(vacuum_stmt()); + statements.extend(create_tables_batch().statements().iter().cloned()); + statements + .iter() + .map(|stmt| stmt.sql().to_string()) + .collect() + } + struct RecordingTextExecutor { result: Mutex>>, captured_sql: Mutex>, @@ -602,8 +621,9 @@ mod tests { async fn reset_db_runs_clear_create_and_insert() { let adapter = TestBootstrapPipeline::new(); let db = MockDb::default() - .with_text(&clear_tables_stmt(), "ok") - .with_text(&create_tables_stmt(), "ok") + .with_batch(&clear_tables_batch()) + .with_text(&vacuum_stmt(), "ok") + .with_batch(&create_tables_batch()) .with_text(&insert_db_metadata_stmt(DB_SCHEMA_VERSION), "ok") .with_views(); @@ -618,26 +638,30 @@ mod tests { .iter() .map(|s| s.sql().to_string()) .collect(); + let expected_prefix = reset_prefix_sql(); assert_eq!( calls.len(), - 3 + expected_views.len(), + expected_prefix.len() + 1 + expected_views.len(), "unexpected number of executed statements" ); - assert_eq!(calls[0], clear_tables_stmt().sql().to_string()); - assert_eq!(calls[1], create_tables_stmt().sql().to_string()); + assert_eq!(&calls[..expected_prefix.len()], expected_prefix.as_slice()); assert_eq!( - calls[2], + calls[expected_prefix.len()], insert_db_metadata_stmt(DB_SCHEMA_VERSION).sql().to_string() ); - assert_eq!(&calls[3..], expected_views.as_slice()); + assert_eq!( + &calls[expected_prefix.len() + 1..], + expected_views.as_slice() + ); } #[tokio::test] async fn reset_db_uses_default_version_when_none() { let adapter = TestBootstrapPipeline::new(); let db = MockDb::default() - .with_text(&clear_tables_stmt(), "ok") - .with_text(&create_tables_stmt(), "ok") + .with_batch(&clear_tables_batch()) + .with_text(&vacuum_stmt(), "ok") + .with_batch(&create_tables_batch()) .with_text(&insert_db_metadata_stmt(DB_SCHEMA_VERSION), "ok") .with_views(); @@ -649,10 +673,16 @@ mod tests { .iter() .map(|s| s.sql().to_string()) .collect(); - assert_eq!(calls[0], clear_tables_stmt().sql()); - assert_eq!(calls[1], create_tables_stmt().sql()); - assert_eq!(calls[2], insert_db_metadata_stmt(DB_SCHEMA_VERSION).sql()); - assert_eq!(&calls[3..], expected_views.as_slice()); + let expected_prefix = reset_prefix_sql(); + assert_eq!(&calls[..expected_prefix.len()], expected_prefix.as_slice()); + assert_eq!( + calls[expected_prefix.len()], + insert_db_metadata_stmt(DB_SCHEMA_VERSION).sql() + ); + assert_eq!( + &calls[expected_prefix.len() + 1..], + expected_views.as_slice() + ); } #[tokio::test] @@ -660,8 +690,9 @@ mod tests { let adapter = TestBootstrapPipeline::new(); let custom_version = DB_SCHEMA_VERSION + 9; let db = MockDb::default() - .with_text(&clear_tables_stmt(), "ok") - .with_text(&create_tables_stmt(), "ok") + .with_batch(&clear_tables_batch()) + .with_text(&vacuum_stmt(), "ok") + .with_batch(&create_tables_batch()) .with_text(&insert_db_metadata_stmt(custom_version), "ok") .with_views(); @@ -673,17 +704,23 @@ mod tests { .iter() .map(|s| s.sql().to_string()) .collect(); - assert_eq!(calls[0], clear_tables_stmt().sql()); - assert_eq!(calls[1], create_tables_stmt().sql()); - assert_eq!(calls[2], insert_db_metadata_stmt(custom_version).sql()); - assert_eq!(&calls[3..], expected_views.as_slice()); + let expected_prefix = reset_prefix_sql(); + assert_eq!(&calls[..expected_prefix.len()], expected_prefix.as_slice()); + assert_eq!( + calls[expected_prefix.len()], + insert_db_metadata_stmt(custom_version).sql() + ); + assert_eq!( + &calls[expected_prefix.len() + 1..], + expected_views.as_slice() + ); } #[tokio::test] async fn reset_db_propagates_errors() { let adapter = TestBootstrapPipeline::new(); - // Only the first statement is present; second will fail. - let db = MockDb::default().with_text(&clear_tables_stmt(), "ok"); + // Only the clear transaction is present; vacuum will fail. + let db = MockDb::default().with_batch(&clear_tables_batch()); let err = adapter.reset_db(&db, None).await.unwrap_err(); match err { @@ -692,9 +729,17 @@ mod tests { } let calls = db.calls(); - assert_eq!(calls.len(), 2); // attempted clear and create - assert_eq!(calls[0], clear_tables_stmt().sql()); - assert_eq!(calls[1], create_tables_stmt().sql()); + assert_eq!(calls.len(), clear_tables_batch().len() + 1); + assert_eq!( + &calls[..clear_tables_batch().len()], + clear_tables_batch() + .statements() + .iter() + .map(|stmt| stmt.sql().to_string()) + .collect::>() + .as_slice() + ); + assert_eq!(calls.last().unwrap(), vacuum_stmt().sql()); } #[tokio::test] diff --git a/crates/common/src/local_db/query/clear_tables/mod.rs b/crates/common/src/local_db/query/clear_tables/mod.rs index 57f453ee30..c268b3a867 100644 --- a/crates/common/src/local_db/query/clear_tables/mod.rs +++ b/crates/common/src/local_db/query/clear_tables/mod.rs @@ -1,11 +1,16 @@ -use crate::local_db::query::SqlStatement; +use crate::local_db::query::{SqlScript, SqlStatement, SqlStatementBatch}; pub const CLEAR_TABLES_SQL: &str = include_str!("query.sql"); -/// Returns the SQL statement that drops all local database tables and performs -/// cleanup (transaction + vacuum). No parameters are bound for this script. -pub fn clear_tables_stmt() -> SqlStatement { - SqlStatement::new(CLEAR_TABLES_SQL) +pub fn clear_tables_batch() -> SqlStatementBatch { + let mut statements = SqlScript::new(CLEAR_TABLES_SQL).statements(); + statements.retain(|sql| !sql.trim().eq_ignore_ascii_case("VACUUM;")); + + SqlStatementBatch::with_statements(statements.into_iter().map(SqlStatement::new).collect()) +} + +pub fn vacuum_stmt() -> SqlStatement { + SqlStatement::new("VACUUM;") } #[cfg(test)] @@ -26,17 +31,25 @@ mod tests { } #[test] - fn stmt_is_static_and_param_free() { - let stmt = clear_tables_stmt(); - assert_eq!(stmt.sql, CLEAR_TABLES_SQL); - assert!(stmt.params.is_empty()); - let lower = stmt.sql.to_lowercase(); + fn script_drops_tables_and_vacuums() { + let lower = CLEAR_TABLES_SQL.to_lowercase(); assert!(lower.contains("begin transaction")); assert!(lower.contains("drop view if exists vault_deltas")); assert!(lower.contains("drop table if exists")); assert!(lower.contains("vacuum")); } + #[test] + fn batch_excludes_vacuum_from_transaction() { + let batch = clear_tables_batch(); + assert!(batch.is_transaction()); + assert!(!batch + .statements() + .iter() + .any(|stmt| stmt.sql().trim().eq_ignore_ascii_case("VACUUM;"))); + assert_eq!(vacuum_stmt().sql().trim(), "VACUUM;"); + } + #[test] fn drops_all_required_tables() { let sql = CLEAR_TABLES_SQL.to_lowercase(); diff --git a/crates/common/src/local_db/query/create_tables/mod.rs b/crates/common/src/local_db/query/create_tables/mod.rs index f82d6025b8..647257f01f 100644 --- a/crates/common/src/local_db/query/create_tables/mod.rs +++ b/crates/common/src/local_db/query/create_tables/mod.rs @@ -1,4 +1,4 @@ -use crate::local_db::query::SqlStatement; +use crate::local_db::query::{SqlScript, SqlStatementBatch}; pub const CREATE_TABLES_SQL: &str = include_str!("query.sql"); @@ -41,8 +41,8 @@ pub fn create_tables_sql() -> &'static str { CREATE_TABLES_SQL } -pub fn create_tables_stmt() -> SqlStatement { - SqlStatement::new(CREATE_TABLES_SQL) +pub fn create_tables_batch() -> SqlStatementBatch { + SqlScript::new(CREATE_TABLES_SQL).statement_batch() } #[derive(Debug, Clone, PartialEq, Eq)] diff --git a/crates/common/src/local_db/query/create_views/mod.rs b/crates/common/src/local_db/query/create_views/mod.rs index 48661c2709..cc46452ccc 100644 --- a/crates/common/src/local_db/query/create_views/mod.rs +++ b/crates/common/src/local_db/query/create_views/mod.rs @@ -1,13 +1,27 @@ -use crate::local_db::query::{SqlStatement, SqlStatementBatch}; +use crate::local_db::query::{SqlScript, SqlStatement, SqlStatementBatch}; const VAULT_DELTAS_VIEW_SQL: &str = include_str!("./vault_deltas.sql"); pub fn create_views_batch() -> SqlStatementBatch { - SqlStatementBatch::with_statements(vec![vault_deltas_view_stmt()]).ensure_transaction() + SqlScript::new(VAULT_DELTAS_VIEW_SQL) + .statement_batch() + .ensure_transaction() } -pub fn vault_deltas_view_stmt() -> SqlStatement { - SqlStatement::new(VAULT_DELTAS_VIEW_SQL) +pub fn drop_vault_deltas_view_stmt() -> SqlStatement { + SqlStatement::new(view_statement(0)) +} + +pub fn create_vault_deltas_view_stmt() -> SqlStatement { + SqlStatement::new(view_statement(1)) +} + +fn view_statement(index: usize) -> &'static str { + SqlScript::new(VAULT_DELTAS_VIEW_SQL) + .statements() + .get(index) + .copied() + .expect("vault_deltas view SQL contains drop and create statements") } #[cfg(test)] @@ -18,20 +32,22 @@ mod tests { fn batch_wraps_transaction() { let batch = create_views_batch(); assert!(batch.is_transaction()); - assert_eq!(batch.len(), 3); // begin + drop/create view + commit + assert_eq!(batch.len(), 4); // begin + drop view + create view + commit let statements = batch.statements(); assert_eq!(statements.first().unwrap().sql(), "BEGIN TRANSACTION"); assert_eq!(statements.last().unwrap().sql(), "COMMIT"); - assert_eq!(statements[1].sql(), VAULT_DELTAS_VIEW_SQL); + assert_eq!(statements[1].sql(), drop_vault_deltas_view_stmt().sql()); + assert_eq!(statements[2].sql(), create_vault_deltas_view_stmt().sql()); } #[test] - fn single_stmt_matches_constant() { - let stmt = vault_deltas_view_stmt(); - assert_eq!(stmt.sql(), VAULT_DELTAS_VIEW_SQL); - assert!(stmt.sql().starts_with("DROP VIEW IF EXISTS vault_deltas;")); - assert!(stmt.sql().contains("CREATE VIEW vault_deltas AS")); - assert!(stmt.params().is_empty()); + fn statements_come_from_combined_script() { + let statements = SqlScript::new(VAULT_DELTAS_VIEW_SQL).statements(); + assert_eq!(statements.len(), 2); + assert!(statements[0].starts_with("DROP VIEW IF EXISTS vault_deltas;")); + assert!(statements[1] + .trim_start() + .starts_with("CREATE VIEW vault_deltas AS")); } } diff --git a/crates/common/src/local_db/query/mod.rs b/crates/common/src/local_db/query/mod.rs index 8220743050..1b292d20a6 100644 --- a/crates/common/src/local_db/query/mod.rs +++ b/crates/common/src/local_db/query/mod.rs @@ -27,6 +27,7 @@ pub mod fetch_vault_balance_changes; pub mod fetch_vaults; pub mod insert_db_metadata; pub mod integrity_check; +pub mod sql_script; pub mod sql_statement; pub mod sql_statement_batch; pub mod update_last_synced_block; @@ -36,6 +37,7 @@ pub mod upsert_target_watermark; pub mod upsert_vault_balances; pub use executor::LocalDbQueryExecutor; +pub use sql_script::SqlScript; pub use sql_statement::{SqlBuildError, SqlStatement, SqlValue}; pub use sql_statement_batch::SqlStatementBatch; diff --git a/crates/common/src/local_db/query/sql_script.rs b/crates/common/src/local_db/query/sql_script.rs new file mode 100644 index 0000000000..371a762b00 --- /dev/null +++ b/crates/common/src/local_db/query/sql_script.rs @@ -0,0 +1,159 @@ +use super::{SqlStatement, SqlStatementBatch}; + +#[derive(Debug, Clone, Copy)] +pub struct SqlScript { + sql: &'static str, +} + +impl SqlScript { + pub const fn new(sql: &'static str) -> Self { + Self { sql } + } + + pub fn statement_batch(self) -> SqlStatementBatch { + SqlStatementBatch::with_statements( + split_statements(self.sql) + .into_iter() + .map(SqlStatement::new) + .collect(), + ) + } + + pub fn statements(self) -> Vec<&'static str> { + split_statements(self.sql) + } +} + +fn split_statements(sql: &'static str) -> Vec<&'static str> { + let mut statements = Vec::new(); + let mut start = 0usize; + let mut chars = sql.char_indices().peekable(); + let mut in_single_quote = false; + let mut in_double_quote = false; + let mut in_line_comment = false; + let mut in_block_comment = false; + let mut has_executable_sql = false; + + while let Some((idx, ch)) = chars.next() { + let next = chars.peek().map(|(_, ch)| *ch); + + if in_line_comment { + if ch == '\n' { + in_line_comment = false; + } + continue; + } + + if in_block_comment { + if ch == '*' && next == Some('/') { + chars.next(); + in_block_comment = false; + } + continue; + } + + if in_single_quote { + if ch == '\'' { + if next == Some('\'') { + chars.next(); + } else { + in_single_quote = false; + } + } + continue; + } + + if in_double_quote { + if ch == '"' { + if next == Some('"') { + chars.next(); + } else { + in_double_quote = false; + } + } + continue; + } + + match (ch, next) { + ('-', Some('-')) => { + chars.next(); + in_line_comment = true; + } + ('/', Some('*')) => { + chars.next(); + in_block_comment = true; + } + ('\'', _) => { + has_executable_sql = true; + in_single_quote = true; + } + ('"', _) => { + has_executable_sql = true; + in_double_quote = true; + } + (';', _) => { + let end = idx + ch.len_utf8(); + let statement = sql[start..end].trim(); + if has_executable_sql && !statement.is_empty() { + statements.push(statement); + } + start = end; + has_executable_sql = false; + } + _ if !ch.is_whitespace() => has_executable_sql = true, + _ => {} + } + } + + let statement = sql[start..].trim(); + if has_executable_sql && !statement.is_empty() { + statements.push(statement); + } + + statements +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn splits_on_statement_semicolons() { + assert_eq!( + split_statements("BEGIN;\nSELECT 1;\nCOMMIT;"), + vec!["BEGIN;", "SELECT 1;", "COMMIT;"] + ); + } + + #[test] + fn ignores_semicolons_inside_quotes_and_comments() { + assert_eq!( + split_statements( + "SELECT ';' AS single_quote;\n\ + SELECT \";\" AS double_quote;\n\ + -- comment;\n\ + SELECT 1;\n\ + /* block; comment */\n\ + SELECT 2;" + ), + vec![ + "SELECT ';' AS single_quote;", + "SELECT \";\" AS double_quote;", + "-- comment;\nSELECT 1;", + "/* block; comment */\nSELECT 2;" + ] + ); + } + + #[test] + fn keeps_trailing_statement_without_semicolon() { + assert_eq!(split_statements("SELECT 1"), vec!["SELECT 1"]); + } + + #[test] + fn skips_comment_only_fragments() { + assert_eq!(split_statements("SELECT 1; -- note"), vec!["SELECT 1;"]); + assert_eq!(split_statements("SELECT 1; /* note */"), vec!["SELECT 1;"]); + assert!(split_statements("-- note;\n/* also note; */").is_empty()); + } +} diff --git a/crates/common/src/local_db/query/sql_statement_batch.rs b/crates/common/src/local_db/query/sql_statement_batch.rs index 6bb8d73a06..fd2ddf83a2 100644 --- a/crates/common/src/local_db/query/sql_statement_batch.rs +++ b/crates/common/src/local_db/query/sql_statement_batch.rs @@ -63,6 +63,14 @@ impl SqlStatementBatch { &self.statements } + pub fn inner_statements(&self) -> &[SqlStatement] { + if self.is_transaction() { + &self.statements[1..self.statements.len() - 1] + } else { + &self.statements + } + } + pub fn len(&self) -> usize { self.statements.len() } diff --git a/crates/common/src/raindex_client/local_db/executor.rs b/crates/common/src/raindex_client/local_db/executor.rs index 6d418a25b5..825c297977 100644 --- a/crates/common/src/raindex_client/local_db/executor.rs +++ b/crates/common/src/raindex_client/local_db/executor.rs @@ -4,29 +4,46 @@ use crate::local_db::query::{ }; use async_trait::async_trait; use futures::lock::Mutex; -use js_sys::{Array, BigInt}; +use js_sys::{Array, BigInt, Object, Reflect}; use std::rc::Rc; use wasm_bindgen_utils::prelude::wasm_bindgen_futures::JsFuture; +use wasm_bindgen_utils::prelude::JsCast; use wasm_bindgen_utils::result::WasmEncodedResult; #[derive(Clone)] pub struct JsCallbackExecutor { + local_db: JsValue, query_callback: js_sys::Function, - wipe_callback: Option, + wipe_callback: js_sys::Function, + transaction_callback: js_sys::Function, serialize: Rc>, } impl JsCallbackExecutor { - pub fn new(query_callback: js_sys::Function, wipe_callback: Option) -> Self { - Self { + pub fn new(local_db: JsValue) -> Result { + let query_callback = method(&local_db, "query")?; + let wipe_callback = method(&local_db, "wipeAndRecreate")?; + let transaction_callback = method(&local_db, "transaction")?; + + Ok(Self { + local_db, query_callback, wipe_callback, + transaction_callback, serialize: Rc::new(Mutex::new(())), - } + }) } pub fn from_ref(query_callback: &js_sys::Function) -> Self { - Self::new(query_callback.clone(), None) + let local_db = Object::new(); + Reflect::set(&local_db, &JsValue::from_str("query"), query_callback).unwrap(); + Self { + local_db: local_db.into(), + query_callback: query_callback.clone(), + wipe_callback: js_sys::Function::new_no_args("return undefined"), + transaction_callback: js_sys::Function::new_no_args("return undefined"), + serialize: Rc::new(Mutex::new(())), + } } fn function(&self) -> &js_sys::Function { @@ -39,27 +56,17 @@ impl JsCallbackExecutor { ) -> Result { // If there are no parameters, pass `undefined` to the JS callback // instead of an empty array to match the SDK's expected semantics. - let js_params_val = if stmt.params.is_empty() { + let js_params_val = if stmt.params().is_empty() { JsValue::UNDEFINED } else { - let array = Array::new(); - for param in stmt.params() { - let js_param = match param { - SqlValue::Text(text) => JsValue::from_str(text), - SqlValue::I64(value) => JsValue::from(BigInt::from(*value)), - SqlValue::U64(value) => JsValue::from(BigInt::from(*value)), - SqlValue::Null => JsValue::NULL, - }; - array.push(&js_param); - } - JsValue::from(array) + sql_params_to_js(stmt.params()) }; let result = self .function() .call2( - &JsValue::NULL, - &JsValue::from_str(&stmt.sql), + &self.local_db, + &JsValue::from_str(stmt.sql()), &js_params_val, ) .map_err(|e| { @@ -90,6 +97,79 @@ impl JsCallbackExecutor { let _guard = self.serialize.lock().await; self.invoke_statement_unlocked(stmt).await } + + async fn invoke_transaction_unlocked( + &self, + batch: &SqlStatementBatch, + ) -> Result<(), LocalDbQueryError> { + let statements = Array::new(); + batch.inner_statements().iter().try_for_each(|stmt| { + statements.push(&transaction_statement(stmt.sql(), stmt.params())?.into()); + Ok::<(), LocalDbQueryError>(()) + })?; + + let result = self + .transaction_callback + .call1(&self.local_db, &statements) + .map_err(|e| { + LocalDbQueryError::database(format!( + "JavaScript transaction callback invocation failed: {:?}", + e + )) + })?; + + let promise = js_sys::Promise::resolve(&result); + let future = JsFuture::from(promise); + let js_result = future.await.map_err(|e| { + LocalDbQueryError::database(format!("Transaction promise resolution failed: {:?}", e)) + })?; + + let wasm_result: WasmEncodedResult = serde_wasm_bindgen::from_value(js_result) + .map_err(|_| LocalDbQueryError::invalid_response())?; + + match wasm_result { + WasmEncodedResult::Success { .. } => Ok(()), + WasmEncodedResult::Err { error, .. } => { + Err(LocalDbQueryError::database(error.readable_msg)) + } + } + } +} + +fn transaction_statement(sql: &str, params: &[SqlValue]) -> Result { + let item = Object::new(); + Reflect::set(&item, &JsValue::from_str("sql"), &JsValue::from_str(sql)) + .map_err(|e| LocalDbQueryError::database(format!("Failed to set SQL: {:?}", e)))?; + if !params.is_empty() { + Reflect::set( + &item, + &JsValue::from_str("params"), + &sql_params_to_js(params), + ) + .map_err(|e| LocalDbQueryError::database(format!("Failed to set params: {:?}", e)))?; + } + Ok(item) +} + +fn method(local_db: &JsValue, name: &str) -> Result { + Reflect::get(local_db, &JsValue::from_str(name)) + .map_err(|e| LocalDbQueryError::database(format!("Failed to read localDb.{name}: {e:?}")))? + .dyn_into::() + .map_err(|_| LocalDbQueryError::database(format!("localDb.{name} must be a function"))) +} + +fn sql_params_to_js(params: &[SqlValue]) -> JsValue { + let array = Array::new(); + params.iter().for_each(|param| { + let js_param = match param { + SqlValue::Text(text) => JsValue::from_str(text), + SqlValue::I64(value) => JsValue::from(BigInt::from(*value)), + SqlValue::U64(value) => JsValue::from(BigInt::from(*value)), + SqlValue::Null => JsValue::NULL, + }; + array.push(&js_param); + }); + JsValue::from(array) } // SAFETY: WASM builds run on a single thread; the wrapped JavaScript callback is only invoked on @@ -107,14 +187,7 @@ impl LocalDbQueryExecutor for JsCallbackExecutor { )); } - for stmt in batch { - if let Err(err) = self.invoke_statement_unlocked(stmt).await { - let rollback_stmt = SqlStatement::new("ROLLBACK"); - let _ = self.invoke_statement_unlocked(&rollback_stmt).await; - return Err(err); - } - } - Ok(()) + self.invoke_transaction_unlocked(batch).await } async fn query_text(&self, stmt: &SqlStatement) -> Result { @@ -132,11 +205,7 @@ impl LocalDbQueryExecutor for JsCallbackExecutor { async fn wipe_and_recreate(&self) -> Result<(), LocalDbQueryError> { let _guard = self.serialize.lock().await; - let wipe_callback = self.wipe_callback.as_ref().ok_or_else(|| { - LocalDbQueryError::database("wipe_and_recreate callback not configured") - })?; - - let result = wipe_callback.call0(&JsValue::NULL).map_err(|e| { + let result = self.wipe_callback.call0(&self.local_db).map_err(|e| { LocalDbQueryError::database(format!( "JavaScript wipe callback invocation failed: {:?}", e @@ -226,6 +295,32 @@ pub mod tests { func } + fn create_local_db( + query: Option, + wipe: Option, + transaction: Option, + ) -> JsValue { + let local_db = js_sys::Object::new(); + if let Some(query) = query { + Reflect::set(&local_db, &JsValue::from_str("query"), &query).unwrap(); + } + if let Some(wipe) = wipe { + Reflect::set(&local_db, &JsValue::from_str("wipeAndRecreate"), &wipe).unwrap(); + } + if let Some(transaction) = transaction { + Reflect::set(&local_db, &JsValue::from_str("transaction"), &transaction).unwrap(); + } + local_db.into() + } + + fn success_wipe_callback() -> Function { + Function::new_no_args("return { value: undefined, error: null };") + } + + fn success_transaction_callback() -> Function { + Function::new_no_args("return { value: '', error: null };") + } + #[wasm_bindgen_test] async fn test_query_json_success_case() { let test_data = vec![ @@ -321,27 +416,31 @@ pub mod tests { } #[wasm_bindgen_test] - async fn execute_batch_invokes_all_statements_in_order() { + async fn execute_batch_uses_transaction_callback_with_inner_statements() { use std::cell::RefCell; use std::rc::Rc; use wasm_bindgen::prelude::Closure; - let calls: Rc>> = Rc::new(RefCell::new(Vec::new())); + let calls: Rc>> = Rc::new(RefCell::new(Vec::new())); let calls_clone = calls.clone(); - let closure = Closure::wrap(Box::new(move |sql: String, params: JsValue| -> JsValue { - calls_clone.borrow_mut().push((sql, params.clone())); + let closure = Closure::wrap(Box::new(move |statements: JsValue| -> JsValue { + calls_clone.borrow_mut().push(statements); let result = WasmEncodedResult::Success:: { value: String::new(), error: None, }; serde_wasm_bindgen::to_value(&result).unwrap() - }) - as Box JsValue>); + }) as Box JsValue>); let callback: Function = closure.as_ref().clone().unchecked_into(); closure.forget(); - let exec = JsCallbackExecutor::from_ref(&callback); + let exec = JsCallbackExecutor::new(create_local_db( + Some(create_success_callback("[]")), + Some(success_wipe_callback()), + Some(callback), + )) + .unwrap(); let mut batch = SqlStatementBatch::new(); batch.add(SqlStatement::new("CREATE TABLE example (val INTEGER)")); @@ -355,194 +454,87 @@ pub mod tests { exec.execute_batch(&batch).await.unwrap(); let calls = calls.borrow(); - assert_eq!(calls.len(), 5); - - assert_eq!(calls[0].0, "BEGIN TRANSACTION"); - assert!(calls[0].1.is_undefined()); - - assert_eq!(calls[1].0, "CREATE TABLE example (val INTEGER)"); - assert!(calls[1].1.is_undefined()); + assert_eq!(calls.len(), 1); + let statements = Array::from(&calls[0]); + assert_eq!(statements.length(), 3); - assert_eq!(calls[2].0, "INSERT INTO example (val) VALUES (?1)"); - let params_value = calls[2].1.clone(); - - assert_eq!(calls[3].0, "DELETE FROM example WHERE val = 0"); - assert!(calls[3].1.is_undefined()); - - assert_eq!(calls[4].0, "COMMIT"); - assert!(calls[4].1.is_undefined()); - drop(calls); + let first = js_sys::Object::from(statements.get(0)); + assert_eq!( + Reflect::get(&first, &JsValue::from_str("sql")) + .unwrap() + .as_string() + .unwrap(), + "CREATE TABLE example (val INTEGER)" + ); + assert!(Reflect::get(&first, &JsValue::from_str("params")) + .unwrap() + .is_undefined()); + let second = js_sys::Object::from(statements.get(1)); + assert_eq!( + Reflect::get(&second, &JsValue::from_str("sql")) + .unwrap() + .as_string() + .unwrap(), + "INSERT INTO example (val) VALUES (?1)" + ); + let params_value = Reflect::get(&second, &JsValue::from_str("params")).unwrap(); assert!(Array::is_array(¶ms_value)); let decoded = Array::from(¶ms_value); assert_eq!(decoded.length(), 1); let first = decoded.get(0).dyn_into::().unwrap(); assert_eq!(first.to_string(10).unwrap().as_string().unwrap(), "42"); - } - - #[wasm_bindgen_test] - async fn execute_batch_rolls_back_on_failure_without_params() { - use std::cell::RefCell; - use std::rc::Rc; - use wasm_bindgen::prelude::Closure; - use wasm_bindgen_utils::prelude::JsValue; - - let calls: Rc>> = Rc::new(RefCell::new(Vec::new())); - let calls_clone = calls.clone(); - let closure = Closure::wrap(Box::new(move |sql: String, params: JsValue| -> JsValue { - calls_clone.borrow_mut().push((sql.clone(), params)); - let result: WasmEncodedResult = - if sql == "INSERT INTO rollback_test (value) VALUES ('fail')" { - WasmEncodedResult::Err { - value: None, - error: WasmEncodedError { - msg: "boom".to_string(), - readable_msg: "boom".to_string(), - }, - } - } else { - WasmEncodedResult::Success { - value: String::new(), - error: None, - } - }; - serde_wasm_bindgen::to_value(&result).unwrap() - }) - as Box JsValue>); - let callback: Function = closure.as_ref().clone().unchecked_into(); - closure.forget(); - - let exec = JsCallbackExecutor::from_ref(&callback); - - let mut batch = SqlStatementBatch::new(); - batch.add(SqlStatement::new("CREATE TABLE rollback_test (value TEXT)")); - batch.add(SqlStatement::new( - "INSERT INTO rollback_test (value) VALUES ('ok')", - )); - batch.add(SqlStatement::new( - "INSERT INTO rollback_test (value) VALUES ('fail')", - )); - let batch = batch.ensure_transaction(); - - let err = exec.execute_batch(&batch).await.unwrap_err(); - assert!(matches!(err, LocalDbQueryError::Database { .. })); - let calls = calls.borrow(); - assert_eq!(calls.len(), 5); - assert_eq!(calls[0].0, "BEGIN TRANSACTION"); - assert_eq!(calls[1].0, "CREATE TABLE rollback_test (value TEXT)"); - assert_eq!( - calls[2].0, - "INSERT INTO rollback_test (value) VALUES ('ok')" - ); + let third = js_sys::Object::from(statements.get(2)); assert_eq!( - calls[3].0, - "INSERT INTO rollback_test (value) VALUES ('fail')" + Reflect::get(&third, &JsValue::from_str("sql")) + .unwrap() + .as_string() + .unwrap(), + "DELETE FROM example WHERE val = 0" ); - assert_eq!(calls[4].0, "ROLLBACK"); - assert!(!calls.iter().any(|(sql, _)| sql == "COMMIT")); + assert!(Reflect::get(&third, &JsValue::from_str("params")) + .unwrap() + .is_undefined()); } #[wasm_bindgen_test] - async fn execute_batch_rolls_back_on_failure_with_params() { + async fn execute_batch_propagates_transaction_error() { use std::cell::RefCell; use std::rc::Rc; use wasm_bindgen::prelude::Closure; - use wasm_bindgen_utils::prelude::JsValue; - let calls: Rc>> = Rc::new(RefCell::new(Vec::new())); + let calls: Rc> = Rc::new(RefCell::new(0)); let calls_clone = calls.clone(); - let seen_insert = Rc::new(RefCell::new(false)); - let seen_insert_clone = seen_insert.clone(); - let closure = Closure::wrap(Box::new(move |sql: String, params: JsValue| -> JsValue { - calls_clone.borrow_mut().push((sql.clone(), params.clone())); - let result: WasmEncodedResult = - if sql == "INSERT INTO rollback_param (id, value) VALUES (?1, ?2)" { - let mut seen = seen_insert_clone.borrow_mut(); - if !*seen { - *seen = true; - WasmEncodedResult::Success { - value: String::new(), - error: None, - } - } else { - WasmEncodedResult::Err { - value: None, - error: WasmEncodedError { - msg: "boom".to_string(), - readable_msg: "boom".to_string(), - }, - } - } - } else { - WasmEncodedResult::Success { - value: String::new(), - error: None, - } - }; + let closure = Closure::wrap(Box::new(move |_statements: JsValue| -> JsValue { + *calls_clone.borrow_mut() += 1; + let result = WasmEncodedResult::Err:: { + value: None, + error: WasmEncodedError { + msg: "boom".to_string(), + readable_msg: "boom readable".to_string(), + }, + }; serde_wasm_bindgen::to_value(&result).unwrap() - }) - as Box JsValue>); + }) as Box JsValue>); let callback: Function = closure.as_ref().clone().unchecked_into(); closure.forget(); - let exec = JsCallbackExecutor::from_ref(&callback); + let exec = JsCallbackExecutor::new(create_local_db( + Some(create_success_callback("[]")), + Some(success_wipe_callback()), + Some(callback), + )) + .unwrap(); let mut batch = SqlStatementBatch::new(); - batch.add(SqlStatement::new( - "CREATE TABLE rollback_param (id INTEGER PRIMARY KEY, value TEXT)", - )); - - let mut insert_ok = - SqlStatement::new("INSERT INTO rollback_param (id, value) VALUES (?1, ?2)"); - insert_ok.push(1i64); - insert_ok.push("ok"); - batch.add(insert_ok); - - let mut insert_fail = - SqlStatement::new("INSERT INTO rollback_param (id, value) VALUES (?1, ?2)"); - insert_fail.push(2i64); - insert_fail.push("fail"); - batch.add(insert_fail); - + batch.add(SqlStatement::new("INSERT INTO rollback_test VALUES (1)")); let batch = batch.ensure_transaction(); let err = exec.execute_batch(&batch).await.unwrap_err(); assert!(matches!(err, LocalDbQueryError::Database { .. })); - - let calls = calls.borrow(); - assert_eq!(calls.len(), 5); - assert_eq!(calls[0].0, "BEGIN TRANSACTION"); - assert_eq!( - calls[1].0, - "CREATE TABLE rollback_param (id INTEGER PRIMARY KEY, value TEXT)" - ); - assert_eq!( - calls[2].0, - "INSERT INTO rollback_param (id, value) VALUES (?1, ?2)" - ); - assert!(Array::is_array(&calls[2].1)); - let params_ok = Array::from(&calls[2].1); - assert_eq!(params_ok.length(), 2); - let first = params_ok.get(0).dyn_into::().unwrap(); - assert_eq!(first.to_string(10).unwrap().as_string().unwrap(), "1"); - let second = params_ok.get(1); - assert_eq!(second.as_string().unwrap(), "ok"); - assert_eq!( - calls[3].0, - "INSERT INTO rollback_param (id, value) VALUES (?1, ?2)" - ); - assert!(Array::is_array(&calls[3].1)); - let params_fail = Array::from(&calls[3].1); - assert_eq!(params_fail.length(), 2); - let first = params_fail.get(0).dyn_into::().unwrap(); - assert_eq!(first.to_string(10).unwrap().as_string().unwrap(), "2"); - let second = params_fail.get(1); - assert_eq!(second.as_string().unwrap(), "fail"); - assert_eq!(calls[4].0, "ROLLBACK"); - assert!(!calls.iter().any(|(sql, _)| sql == "COMMIT")); - - assert!(*seen_insert.borrow()); + assert!(err.to_string().contains("boom readable")); + assert_eq!(*calls.borrow(), 1); } #[wasm_bindgen_test] @@ -629,16 +621,21 @@ pub mod tests { } #[wasm_bindgen_test] - async fn wipe_and_recreate_returns_error_when_no_callback() { + async fn constructor_requires_wipe_callback() { let callback = create_success_callback("[]"); - let exec = JsCallbackExecutor::from_ref(&callback); + let result = JsCallbackExecutor::new(create_local_db( + Some(callback), + None, + Some(success_transaction_callback()), + )); - let result = exec.wipe_and_recreate().await; - assert!(matches!(result, Err(LocalDbQueryError::Database { .. }))); - assert!(result - .unwrap_err() + let Err(err) = result else { + panic!("constructor should reject missing wipe callback"); + }; + assert!(matches!(err, LocalDbQueryError::Database { .. })); + assert!(err .to_string() - .contains("wipe_and_recreate callback not configured")); + .contains("localDb.wipeAndRecreate must be a function")); } #[wasm_bindgen_test] @@ -661,7 +658,12 @@ pub mod tests { wipe_closure.forget(); let callback = create_success_callback("[]"); - let exec = JsCallbackExecutor::new(callback, Some(wipe_callback)); + let exec = JsCallbackExecutor::new(create_local_db( + Some(callback), + Some(wipe_callback), + Some(success_transaction_callback()), + )) + .unwrap(); exec.wipe_and_recreate().await.unwrap(); @@ -692,7 +694,12 @@ pub mod tests { }); let callback = create_success_callback("[]"); - let exec = JsCallbackExecutor::new(callback, Some(wipe_callback)); + let exec = JsCallbackExecutor::new(create_local_db( + Some(callback), + Some(wipe_callback), + Some(success_transaction_callback()), + )) + .unwrap(); let result = exec.wipe_and_recreate().await; assert!(matches!(result, Err(LocalDbQueryError::Database { .. }))); diff --git a/crates/common/src/raindex_client/local_db/mod.rs b/crates/common/src/raindex_client/local_db/mod.rs index c5013dad48..596249daed 100644 --- a/crates/common/src/raindex_client/local_db/mod.rs +++ b/crates/common/src/raindex_client/local_db/mod.rs @@ -140,11 +140,8 @@ impl LocalDb { } } - pub(crate) fn from_js_callback( - query_callback: js_sys::Function, - wipe_callback: Option, - ) -> Self { - Self::new(JsCallbackExecutor::new(query_callback, wipe_callback)) + pub(crate) fn from_js_local_db(local_db: JsValue) -> Result { + Ok(Self::new(JsCallbackExecutor::new(local_db)?)) } } @@ -426,6 +423,26 @@ raindexes: ) } + fn test_local_db(query: js_sys::Function) -> JsValue { + let local_db = js_sys::Object::new(); + js_sys::Reflect::set(&local_db, &JsValue::from_str("query"), &query).unwrap(); + js_sys::Reflect::set( + &local_db, + &JsValue::from_str("wipeAndRecreate"), + &js_sys::Function::new_no_args( + "return Promise.resolve({ value: undefined, error: null });", + ), + ) + .unwrap(); + js_sys::Reflect::set( + &local_db, + &JsValue::from_str("transaction"), + &js_sys::Function::new_no_args("return Promise.resolve({ value: '', error: null });"), + ) + .unwrap(); + local_db.into() + } + fn recording_status_callback( store: Rc>>, ) -> js_sys::Function { @@ -441,8 +458,8 @@ raindexes: } #[wasm_bindgen_test] - async fn local_db_from_js_callback_executes_queries() { - let db = LocalDb::from_js_callback(success_callback(), None); + async fn local_db_from_js_object_executes_queries() { + let db = LocalDb::from_js_local_db(test_local_db(success_callback())).unwrap(); let stmt = SqlStatement::new("SELECT 1"); let rows: Vec = db.query_json(&stmt).await.unwrap(); @@ -453,7 +470,7 @@ raindexes: } #[wasm_bindgen_test] - async fn local_db_from_js_callback_surfaces_errors() { + async fn local_db_from_js_object_surfaces_errors() { let error = WasmEncodedResult::Err:: { value: None, error: WasmEncodedError { @@ -470,7 +487,7 @@ raindexes: .unwrap() )); - let db = LocalDb::from_js_callback(callback, None); + let db = LocalDb::from_js_local_db(test_local_db(callback)).unwrap(); let stmt = SqlStatement::new("SELECT 1"); let err = db.query_text(&stmt).await.unwrap_err(); assert!(matches!(err, LocalDbQueryError::Database { .. })); diff --git a/crates/common/src/raindex_client/local_db/orders.rs b/crates/common/src/raindex_client/local_db/orders.rs index fd05e5ca2f..ee5017d5c1 100644 --- a/crates/common/src/raindex_client/local_db/orders.rs +++ b/crates/common/src/raindex_client/local_db/orders.rs @@ -232,7 +232,8 @@ mod tests { use super::*; use crate::local_db::query::{fetch_orders::LocalDbOrder, fetch_vaults::LocalDbVault}; use crate::raindex_client::tests::{ - get_local_db_test_yaml, new_test_client_with_db_callback, + get_local_db_test_yaml, local_db_object_from_query_callback, + new_test_client_with_local_db, }; use crate::raindex_client::ChainIds; use alloy::primitives::{address, b256, bytes, Bytes, U256}; @@ -392,9 +393,9 @@ mod tests { let callback = make_local_db_callback(vec![local_order.clone()]); - let client = new_test_client_with_db_callback( + let client = new_test_client_with_local_db( vec![get_local_db_test_yaml()], - callback, + local_db_object_from_query_callback(callback), vec![42161], ); @@ -575,9 +576,9 @@ mod tests { let callback = make_local_db_callback(vec![local_order.clone()]); - let client = new_test_client_with_db_callback( + let client = new_test_client_with_local_db( vec![get_local_db_test_yaml()], - callback, + local_db_object_from_query_callback(callback), vec![42161], ); @@ -631,9 +632,9 @@ mod tests { }; let callback = make_local_db_callback(vec![local_order.clone()]); - let client = new_test_client_with_db_callback( + let client = new_test_client_with_local_db( vec![get_local_db_test_yaml()], - callback, + local_db_object_from_query_callback(callback), vec![42161, 137], ); @@ -689,9 +690,9 @@ mod tests { }; let callback = make_local_db_callback(vec![local_order.clone()]); - let client = new_test_client_with_db_callback( + let client = new_test_client_with_local_db( vec![get_local_db_test_yaml()], - callback, + local_db_object_from_query_callback(callback), vec![42161, 137], ); diff --git a/crates/common/src/raindex_client/local_db/pipeline/bootstrap.rs b/crates/common/src/raindex_client/local_db/pipeline/bootstrap.rs index e06cc8c6db..3ca4729323 100644 --- a/crates/common/src/raindex_client/local_db/pipeline/bootstrap.rs +++ b/crates/common/src/raindex_client/local_db/pipeline/bootstrap.rs @@ -203,8 +203,8 @@ mod tests { use super::*; use crate::local_db::query::clear_raindex_data::clear_raindex_data_batch; - use crate::local_db::query::clear_tables::clear_tables_stmt; - use crate::local_db::query::create_tables::create_tables_stmt; + use crate::local_db::query::clear_tables::{clear_tables_batch, vacuum_stmt}; + use crate::local_db::query::create_tables::create_tables_batch; use crate::local_db::query::create_tables::REQUIRED_TABLES; use crate::local_db::query::create_views::create_views_batch; use crate::local_db::query::fetch_db_metadata::{fetch_db_metadata_stmt, DbMetadataRow}; @@ -244,6 +244,17 @@ mod tests { .insert(stmt.sql().to_string(), value.to_string()); self } + fn with_batch(self, batch: &SqlStatementBatch) -> Self { + batch + .statements() + .iter() + .fold(self, |db, stmt| db.with_text(stmt, "ok")) + } + fn with_reset_batches(self) -> Self { + self.with_batch(&clear_tables_batch()) + .with_text(&vacuum_stmt(), "ok") + .with_batch(&create_tables_batch()) + } fn calls(&self) -> Vec { self.calls_text.lock().unwrap().clone() } @@ -389,6 +400,29 @@ mod tests { .unwrap() } + fn reset_batch_sqls() -> Vec { + let mut statements = clear_tables_batch().statements().to_vec(); + statements.push(vacuum_stmt()); + statements.extend(create_tables_batch().statements().iter().cloned()); + statements + .iter() + .map(|stmt| stmt.sql().to_string()) + .collect() + } + + fn assert_reset_batches_were_called(calls: &[String]) { + let expected = reset_batch_sqls(); + for sql in expected { + assert!(calls.contains(&sql), "missing reset SQL: {sql}"); + } + } + + fn insert_reset_text_map(text_map: &mut HashMap) { + for sql in reset_batch_sqls() { + text_map.insert(sql, "ok".to_string()); + } + } + fn watermark_row(last_block: u64) -> TargetWatermarkRow { TargetWatermarkRow { chain_id: sample_ob_id().chain_id, @@ -415,8 +449,7 @@ mod tests { .with_json(&fetch_tables_stmt(), tables_json) .with_json(&fetch_db_metadata_stmt(), json!([db_meta_row])) .with_required_schema_columns() - .with_text(&clear_tables_stmt(), "ok") - .with_text(&create_tables_stmt(), "ok") + .with_reset_batches() .with_text(&insert_db_metadata_stmt(DB_SCHEMA_VERSION), "ok") .with_views(); adapter @@ -430,13 +463,16 @@ mod tests { .iter() .map(|s| s.sql().to_string()) .collect(); - assert_eq!(calls[0], clear_tables_stmt().sql().to_string()); - assert_eq!(calls[1], create_tables_stmt().sql().to_string()); + let expected_reset = reset_batch_sqls(); + assert_eq!(&calls[..expected_reset.len()], expected_reset.as_slice()); assert_eq!( - calls[2], + calls[expected_reset.len()], insert_db_metadata_stmt(DB_SCHEMA_VERSION).sql().to_string() ); - assert_eq!(&calls[3..], expected_views.as_slice()); + assert_eq!( + &calls[expected_reset.len() + 1..], + expected_views.as_slice() + ); } #[tokio::test] @@ -447,8 +483,7 @@ mod tests { .with_healthy_integrity() .with_json(&fetch_tables_stmt(), table_names_json(&["db_metadata"])) .with_json(&fetch_db_metadata_stmt(), json!([])) // triggers reset - .with_text(&clear_tables_stmt(), "ok") - .with_text(&create_tables_stmt(), "ok") + .with_reset_batches() .with_text(&insert_db_metadata_stmt(DB_SCHEMA_VERSION), "ok") .with_views(); adapter @@ -462,8 +497,7 @@ mod tests { .iter() .map(|s| s.sql().to_string()) .collect(); - assert!(calls.contains(&clear_tables_stmt().sql().to_string())); - assert!(calls.contains(&create_tables_stmt().sql().to_string())); + assert_reset_batches_were_called(&calls); assert!(calls.contains(&insert_db_metadata_stmt(DB_SCHEMA_VERSION).sql().to_string())); assert!( expected_views.iter().all(|stmt| calls.contains(stmt)), @@ -489,8 +523,7 @@ mod tests { &fetch_tables_stmt(), required_tables_without_db_metadata_json(), ) - .with_text(&clear_tables_stmt(), "ok") - .with_text(&create_tables_stmt(), "ok") + .with_reset_batches() .with_text(&insert_db_metadata_stmt(DB_SCHEMA_VERSION), "ok") .with_views(); @@ -505,8 +538,7 @@ mod tests { .iter() .map(|s| s.sql().to_string()) .collect(); - assert!(calls.contains(&clear_tables_stmt().sql().to_string())); - assert!(calls.contains(&create_tables_stmt().sql().to_string())); + assert_reset_batches_were_called(&calls); assert!(calls.contains(&insert_db_metadata_stmt(DB_SCHEMA_VERSION).sql().to_string())); assert!( expected_views.iter().all(|stmt| calls.contains(stmt)), @@ -542,8 +574,7 @@ mod tests { .with_healthy_integrity() .with_json(&fetch_tables_stmt(), table_names_json(&["db_metadata"])) .with_json(&fetch_db_metadata_stmt(), json!([mismatched_row])) - .with_text(&clear_tables_stmt(), "ok") - .with_text(&create_tables_stmt(), "ok") + .with_reset_batches() .with_text(&insert_db_metadata_stmt(DB_SCHEMA_VERSION), "ok") .with_views(); @@ -558,8 +589,7 @@ mod tests { .iter() .map(|s| s.sql().to_string()) .collect(); - assert!(calls.contains(&clear_tables_stmt().sql().to_string())); - assert!(calls.contains(&create_tables_stmt().sql().to_string())); + assert_reset_batches_were_called(&calls); assert!( expected_views.iter().all(|stmt| calls.contains(stmt)), "missing view creation statements" @@ -588,8 +618,7 @@ mod tests { .with_healthy_integrity() .with_json(&fetch_tables_stmt(), table_names_json(&["db_metadata"])) .with_json(&fetch_db_metadata_stmt(), json!([old_schema_row])) - .with_text(&clear_tables_stmt(), "ok") - .with_text(&create_tables_stmt(), "ok") + .with_reset_batches() .with_text(&insert_db_metadata_stmt(DB_SCHEMA_VERSION), "ok") .with_views(); @@ -604,8 +633,7 @@ mod tests { .iter() .map(|s| s.sql().to_string()) .collect(); - assert!(calls.contains(&clear_tables_stmt().sql().to_string())); - assert!(calls.contains(&create_tables_stmt().sql().to_string())); + assert_reset_batches_were_called(&calls); assert!(calls.contains(&insert_db_metadata_stmt(DB_SCHEMA_VERSION).sql().to_string())); assert!( expected_views.iter().all(|stmt| calls.contains(stmt)), @@ -681,8 +709,7 @@ mod tests { .with_json(&fetch_tables_stmt(), required_tables_json()) .with_json(&fetch_db_metadata_stmt(), json!([db_row])) .with_required_schema_columns_missing("target_watermarks", "raindex_address") - .with_text(&clear_tables_stmt(), "ok") - .with_text(&create_tables_stmt(), "ok") + .with_reset_batches() .with_text(&insert_db_metadata_stmt(DB_SCHEMA_VERSION), "ok") .with_views(); @@ -697,8 +724,7 @@ mod tests { .iter() .map(|s| s.sql().to_string()) .collect(); - assert!(calls.contains(&clear_tables_stmt().sql().to_string())); - assert!(calls.contains(&create_tables_stmt().sql().to_string())); + assert_reset_batches_were_called(&calls); assert!(calls.contains(&insert_db_metadata_stmt(DB_SCHEMA_VERSION).sql().to_string())); assert!( expected_views.iter().all(|stmt| calls.contains(stmt)), @@ -924,10 +950,7 @@ mod tests { integrity_check_stmt().sql().to_string(), json!([corrupted_row]).to_string(), ); - db.text_map - .insert(clear_tables_stmt().sql().to_string(), "ok".to_string()); - db.text_map - .insert(create_tables_stmt().sql().to_string(), "ok".to_string()); + insert_reset_text_map(&mut db.text_map); db.text_map.insert( insert_db_metadata_stmt(DB_SCHEMA_VERSION).sql().to_string(), "ok".to_string(), @@ -1002,14 +1025,7 @@ mod tests { .iter() .map(|s| s.sql().to_string()) .collect(); - assert!( - calls.contains(&clear_tables_stmt().sql().to_string()), - "should have cleared tables" - ); - assert!( - calls.contains(&create_tables_stmt().sql().to_string()), - "should have created tables" - ); + assert_reset_batches_were_called(&calls); assert!( calls.contains(&insert_db_metadata_stmt(DB_SCHEMA_VERSION).sql().to_string()), "should have inserted db metadata" @@ -1033,10 +1049,7 @@ mod tests { calls_text: Mutex::new(Vec::new()), wipe_called: Mutex::new(false), }; - db.text_map - .insert(clear_tables_stmt().sql().to_string(), "ok".to_string()); - db.text_map - .insert(create_tables_stmt().sql().to_string(), "ok".to_string()); + insert_reset_text_map(&mut db.text_map); db.text_map.insert( insert_db_metadata_stmt(DB_SCHEMA_VERSION).sql().to_string(), "ok".to_string(), @@ -1106,14 +1119,7 @@ mod tests { ); let calls = db.calls(); - assert!( - calls.contains(&clear_tables_stmt().sql().to_string()), - "should have cleared tables when integrity check errors" - ); - assert!( - calls.contains(&create_tables_stmt().sql().to_string()), - "should have created tables" - ); + assert_reset_batches_were_called(&calls); } struct WipeFailsDb; diff --git a/crates/common/src/raindex_client/local_db/pipeline/runner/scheduler/wasm.rs b/crates/common/src/raindex_client/local_db/pipeline/runner/scheduler/wasm.rs index 8ffbd22d98..91f2ff4297 100644 --- a/crates/common/src/raindex_client/local_db/pipeline/runner/scheduler/wasm.rs +++ b/crates/common/src/raindex_client/local_db/pipeline/runner/scheduler/wasm.rs @@ -474,7 +474,21 @@ mod wasm_tests { } fn noop_local_db() -> LocalDb { - LocalDb::from_js_callback(noop_callback(), None) + let local_db = js_sys::Object::new(); + js_sys::Reflect::set(&local_db, &JsValue::from_str("query"), &noop_callback()).unwrap(); + js_sys::Reflect::set( + &local_db, + &JsValue::from_str("wipeAndRecreate"), + &Function::new_no_args("return Promise.resolve({ value: undefined, error: null });"), + ) + .unwrap(); + js_sys::Reflect::set( + &local_db, + &JsValue::from_str("transaction"), + &Function::new_no_args("return Promise.resolve({ value: '', error: null });"), + ) + .unwrap(); + LocalDb::from_js_local_db(local_db.into()).unwrap() } impl SchedulerHandle { diff --git a/crates/common/src/raindex_client/local_db/query/clear_tables.rs b/crates/common/src/raindex_client/local_db/query/clear_tables.rs index fd8ac6ea0b..4d67be4b62 100644 --- a/crates/common/src/raindex_client/local_db/query/clear_tables.rs +++ b/crates/common/src/raindex_client/local_db/query/clear_tables.rs @@ -1,11 +1,11 @@ -use crate::local_db::query::clear_tables::clear_tables_stmt; +use crate::local_db::query::clear_tables::{clear_tables_batch, vacuum_stmt}; use crate::local_db::query::{LocalDbQueryError, LocalDbQueryExecutor}; pub async fn clear_tables( exec: &E, ) -> Result<(), LocalDbQueryError> { - let stmt = clear_tables_stmt(); - exec.query_text(&stmt).await.map(|_| ()) + exec.execute_batch(&clear_tables_batch()).await?; + exec.query_text(&vacuum_stmt()).await.map(|_| ()) } #[cfg(target_family = "wasm")] @@ -17,10 +17,12 @@ mod wasm { #[wasm_export(js_name = "clearTables", unchecked_return_type = "void")] pub async fn clear_tables_wasm( - #[wasm_export(param_description = "JavaScript function to execute database queries")] - db_callback: js_sys::Function, + #[wasm_export( + param_description = "Local database object with query, wipeAndRecreate, and transaction functions" + )] + local_db: JsValue, ) -> Result<(), LocalDbError> { - let exec = JsCallbackExecutor::from_ref(&db_callback); + let exec = JsCallbackExecutor::new(local_db).map_err(LocalDbError::from)?; clear_tables(&exec).await.map_err(LocalDbError::from) } } @@ -28,21 +30,67 @@ mod wasm { #[cfg(all(test, target_family = "wasm"))] mod wasm_tests { use super::*; - use crate::raindex_client::local_db::executor::tests::create_sql_capturing_callback; use crate::raindex_client::local_db::executor::JsCallbackExecutor; use std::cell::RefCell; use std::rc::Rc; + use wasm_bindgen::prelude::Closure; + use wasm_bindgen::JsCast; use wasm_bindgen_test::*; - use wasm_bindgen_utils::prelude::JsValue; + use wasm_bindgen_utils::prelude::{serde_wasm_bindgen, JsValue}; + use wasm_bindgen_utils::result::WasmEncodedResult; + use web_sys::js_sys::{Array, Function, Object, Reflect}; #[wasm_bindgen_test] - async fn wrapper_uses_raw_sql_exactly() { - let expected_sql = clear_tables_stmt(); - let store = Rc::new(RefCell::new((String::new(), JsValue::UNDEFINED))); - let callback = create_sql_capturing_callback("OK", store.clone()); - let exec = JsCallbackExecutor::from_ref(&callback); + async fn wrapper_uses_transaction_then_vacuum() { + let transaction_calls: Rc>> = Rc::new(RefCell::new(Vec::new())); + let transaction_calls_clone = transaction_calls.clone(); + let transaction = Closure::wrap(Box::new(move |statements: JsValue| -> JsValue { + transaction_calls_clone.borrow_mut().push(statements); + serde_wasm_bindgen::to_value(&WasmEncodedResult::Success:: { + value: String::new(), + error: None, + }) + .unwrap() + }) as Box JsValue>); + + let query_calls: Rc>> = Rc::new(RefCell::new(Vec::new())); + let query_calls_clone = query_calls.clone(); + let query = Closure::wrap(Box::new(move |sql: String, _params: JsValue| -> JsValue { + query_calls_clone.borrow_mut().push(sql); + serde_wasm_bindgen::to_value(&WasmEncodedResult::Success:: { + value: String::new(), + error: None, + }) + .unwrap() + }) as Box JsValue>); + + let local_db = Object::new(); + Reflect::set( + &local_db, + &JsValue::from_str("query"), + query.as_ref().unchecked_ref(), + ) + .unwrap(); + Reflect::set( + &local_db, + &JsValue::from_str("wipeAndRecreate"), + &Function::new_no_args("return { value: undefined, error: null };"), + ) + .unwrap(); + Reflect::set( + &local_db, + &JsValue::from_str("transaction"), + transaction.as_ref().unchecked_ref(), + ) + .unwrap(); + query.forget(); + transaction.forget(); + + let exec = JsCallbackExecutor::new(local_db.into()).unwrap(); let res = super::clear_tables(&exec).await; assert!(res.is_ok()); - assert_eq!(store.borrow().clone().0, expected_sql.sql); + assert_eq!(transaction_calls.borrow().len(), 1); + assert!(Array::from(&transaction_calls.borrow()[0]).length() > 1); + assert_eq!(query_calls.borrow().as_slice(), [vacuum_stmt().sql()]); } } diff --git a/crates/common/src/raindex_client/local_db/query/create_tables.rs b/crates/common/src/raindex_client/local_db/query/create_tables.rs index 6704bee910..87c8305222 100644 --- a/crates/common/src/raindex_client/local_db/query/create_tables.rs +++ b/crates/common/src/raindex_client/local_db/query/create_tables.rs @@ -1,34 +1,62 @@ -use crate::local_db::query::create_tables::create_tables_stmt; +use crate::local_db::query::create_tables::create_tables_batch; use crate::local_db::query::{LocalDbQueryError, LocalDbQueryExecutor}; pub async fn create_tables( exec: &E, ) -> Result<(), LocalDbQueryError> { - let stmt = create_tables_stmt(); - exec.query_text(&stmt).await.map(|_| ()) + exec.execute_batch(&create_tables_batch()).await } #[cfg(all(test, target_family = "wasm"))] mod wasm_tests { - use super::*; - use crate::raindex_client::local_db::executor::tests::create_sql_capturing_callback; use crate::raindex_client::local_db::executor::JsCallbackExecutor; use std::cell::RefCell; use std::rc::Rc; + use wasm_bindgen::prelude::Closure; + use wasm_bindgen::JsCast; use wasm_bindgen_test::*; - use wasm_bindgen_utils::prelude::*; + use wasm_bindgen_utils::prelude::{serde_wasm_bindgen, JsValue}; + use wasm_bindgen_utils::result::WasmEncodedResult; + use web_sys::js_sys::{Array, Function, Object, Reflect}; #[wasm_bindgen_test] - async fn wrapper_uses_raw_sql_exactly() { - let expected_stmt = create_tables_stmt(); - let store = Rc::new(RefCell::new(( - String::new(), - wasm_bindgen::JsValue::UNDEFINED, - ))); - let callback = create_sql_capturing_callback("OK", store.clone()); - let exec = JsCallbackExecutor::from_ref(&callback); + async fn wrapper_uses_transaction_batch() { + let calls: Rc>> = Rc::new(RefCell::new(Vec::new())); + let calls_clone = calls.clone(); + let transaction = Closure::wrap(Box::new(move |statements: JsValue| -> JsValue { + calls_clone.borrow_mut().push(statements); + serde_wasm_bindgen::to_value(&WasmEncodedResult::Success:: { + value: String::new(), + error: None, + }) + .unwrap() + }) as Box JsValue>); + + let local_db = Object::new(); + Reflect::set( + &local_db, + &JsValue::from_str("query"), + &Function::new_no_args("return { value: '', error: null };"), + ) + .unwrap(); + Reflect::set( + &local_db, + &JsValue::from_str("wipeAndRecreate"), + &Function::new_no_args("return { value: undefined, error: null };"), + ) + .unwrap(); + Reflect::set( + &local_db, + &JsValue::from_str("transaction"), + transaction.as_ref().unchecked_ref(), + ) + .unwrap(); + transaction.forget(); + + let exec = JsCallbackExecutor::new(local_db.into()).unwrap(); let res = super::create_tables(&exec).await; assert!(res.is_ok()); - assert_eq!(store.borrow().clone().0, expected_stmt.sql); + assert_eq!(calls.borrow().len(), 1); + assert!(Array::from(&calls.borrow()[0]).length() > 1); } } diff --git a/crates/common/src/raindex_client/local_db/vaults.rs b/crates/common/src/raindex_client/local_db/vaults.rs index 6ab5eeae89..31ed817d04 100644 --- a/crates/common/src/raindex_client/local_db/vaults.rs +++ b/crates/common/src/raindex_client/local_db/vaults.rs @@ -186,8 +186,10 @@ mod tests { use crate::local_db::query::fetch_vaults::LocalDbVault; use crate::raindex_client::local_db::executor::tests::create_sql_capturing_callback; use crate::raindex_client::local_db::LocalDb; - use crate::raindex_client::tests::get_local_db_test_yaml; - use crate::raindex_client::RaindexClient; + use crate::raindex_client::tests::{ + get_local_db_test_yaml, local_db_object_from_query_callback, + new_test_client_with_local_db, + }; use alloy::primitives::{address, Address, Bytes, U256}; use rain_math_float::Float; use serde_json; @@ -244,11 +246,14 @@ mod tests { make_local_vault("0x01", token, owner, Float::parse("1".to_string()).unwrap()); let callback = make_local_db_vaults_callback(vec![vault]); - let local_db = LocalDb::from_js_callback(callback, None); + let local_db_value = local_db_object_from_query_callback(callback); + let local_db = LocalDb::from_js_local_db(local_db_value.clone()).unwrap(); - let client = RaindexClient::new(vec![get_local_db_test_yaml()], None, None, None, None) - .await - .unwrap(); + let client = new_test_client_with_local_db( + vec![get_local_db_test_yaml()], + local_db_value, + vec![42161], + ); let data_source = LocalDbVaults::new(&local_db, Rc::new(client)); let vaults = data_source .list(Some(vec![42161]), &GetVaultsFilters::default(), None, None) @@ -276,11 +281,14 @@ mod tests { make_local_vault("0x02", token, owner, Float::parse("5".to_string()).unwrap()); let callback = make_local_db_vaults_callback(vec![local_vault.clone()]); - let local_db = LocalDb::from_js_callback(callback, None); + let local_db_value = local_db_object_from_query_callback(callback); + let local_db = LocalDb::from_js_local_db(local_db_value.clone()).unwrap(); - let client = RaindexClient::new(vec![get_local_db_test_yaml()], None, None, None, None) - .await - .unwrap(); + let client = new_test_client_with_local_db( + vec![get_local_db_test_yaml()], + local_db_value, + vec![42161], + ); let rc_client = Rc::new(client.clone()); let derived_vault = RaindexVault::try_from_local_db(Rc::clone(&rc_client), local_vault, None) @@ -321,11 +329,14 @@ mod tests { let captured_sql = Rc::new(RefCell::new((String::new(), JsValue::UNDEFINED))); let json = serde_json::to_string(&vec![vault]).unwrap(); let callback = create_sql_capturing_callback(&json, captured_sql.clone()); - let local_db = LocalDb::from_js_callback(callback, None); + let local_db_value = local_db_object_from_query_callback(callback); + let local_db = LocalDb::from_js_local_db(local_db_value.clone()).unwrap(); - let client = RaindexClient::new(vec![get_local_db_test_yaml()], None, None, None, None) - .await - .unwrap(); + let client = new_test_client_with_local_db( + vec![get_local_db_test_yaml()], + local_db_value, + vec![42161], + ); let data_source = LocalDbVaults::new(&local_db, Rc::new(client)); let vaults = data_source @@ -353,11 +364,14 @@ mod tests { let captured_sql = Rc::new(RefCell::new((String::new(), JsValue::UNDEFINED))); let json = serde_json::to_string(&vec![vault]).unwrap(); let callback = create_sql_capturing_callback(&json, captured_sql.clone()); - let local_db = LocalDb::from_js_callback(callback, None); + let local_db_value = local_db_object_from_query_callback(callback); + let local_db = LocalDb::from_js_local_db(local_db_value.clone()).unwrap(); - let client = RaindexClient::new(vec![get_local_db_test_yaml()], None, None, None, None) - .await - .unwrap(); + let client = new_test_client_with_local_db( + vec![get_local_db_test_yaml()], + local_db_value, + vec![42161], + ); let data_source = LocalDbVaults::new(&local_db, Rc::new(client)); let vaults = data_source @@ -391,11 +405,14 @@ mod tests { let captured_sql = Rc::new(RefCell::new((String::new(), JsValue::UNDEFINED))); let json = serde_json::to_string(&vec![keep_vault]).unwrap(); let callback = create_sql_capturing_callback(&json, captured_sql.clone()); - let local_db = LocalDb::from_js_callback(callback, None); + let local_db_value = local_db_object_from_query_callback(callback); + let local_db = LocalDb::from_js_local_db(local_db_value.clone()).unwrap(); - let client = RaindexClient::new(vec![get_local_db_test_yaml()], None, None, None, None) - .await - .unwrap(); + let client = new_test_client_with_local_db( + vec![get_local_db_test_yaml()], + local_db_value, + vec![42161], + ); let data_source = LocalDbVaults::new(&local_db, Rc::new(client)); let filters = GetVaultsFilters { @@ -473,11 +490,14 @@ mod tests { let captured_sql = Rc::new(RefCell::new((String::new(), JsValue::UNDEFINED))); let json = serde_json::to_string(&vec![token]).unwrap(); let callback = create_sql_capturing_callback(&json, captured_sql.clone()); - let local_db = LocalDb::from_js_callback(callback, None); + let local_db_value = local_db_object_from_query_callback(callback); + let local_db = LocalDb::from_js_local_db(local_db_value.clone()).unwrap(); - let client = RaindexClient::new(vec![get_local_db_test_yaml()], None, None, None, None) - .await - .unwrap(); + let client = new_test_client_with_local_db( + vec![get_local_db_test_yaml()], + local_db_value, + vec![42161], + ); let data_source = LocalDbVaults::new(&local_db, Rc::new(client)); let tokens = data_source @@ -512,11 +532,14 @@ mod tests { let captured_sql = Rc::new(RefCell::new((String::new(), JsValue::UNDEFINED))); let json = serde_json::to_string(&vec![token]).unwrap(); let callback = create_sql_capturing_callback(&json, captured_sql.clone()); - let local_db = LocalDb::from_js_callback(callback, None); + let local_db_value = local_db_object_from_query_callback(callback); + let local_db = LocalDb::from_js_local_db(local_db_value.clone()).unwrap(); - let client = RaindexClient::new(vec![get_local_db_test_yaml()], None, None, None, None) - .await - .unwrap(); + let client = new_test_client_with_local_db( + vec![get_local_db_test_yaml()], + local_db_value, + vec![42161], + ); let data_source = LocalDbVaults::new(&local_db, Rc::new(client)); let tokens = data_source @@ -550,11 +573,14 @@ mod tests { let captured_sql = Rc::new(RefCell::new((String::new(), JsValue::UNDEFINED))); let json = serde_json::to_string(&vec![token]).unwrap(); let callback = create_sql_capturing_callback(&json, captured_sql.clone()); - let local_db = LocalDb::from_js_callback(callback, None); + let local_db_value = local_db_object_from_query_callback(callback); + let local_db = LocalDb::from_js_local_db(local_db_value.clone()).unwrap(); - let client = RaindexClient::new(vec![get_local_db_test_yaml()], None, None, None, None) - .await - .unwrap(); + let client = new_test_client_with_local_db( + vec![get_local_db_test_yaml()], + local_db_value, + vec![42161], + ); let data_source = LocalDbVaults::new(&local_db, Rc::new(client)); let tokens = data_source @@ -610,19 +636,21 @@ mod tests { output_orders: None, }; - let client = RaindexClient::new(vec![get_local_db_test_yaml()], None, None, None, None) - .await - .unwrap(); + let json = serde_json::to_string(&vec![balance_change]).unwrap(); + let captured_sql = Rc::new(RefCell::new((String::new(), JsValue::UNDEFINED))); + let callback = create_sql_capturing_callback(&json, captured_sql.clone()); + let local_db_value = local_db_object_from_query_callback(callback); + let local_db = LocalDb::from_js_local_db(local_db_value.clone()).unwrap(); + let client = new_test_client_with_local_db( + vec![get_local_db_test_yaml()], + local_db_value, + vec![42161], + ); let rc_client = Rc::new(client.clone()); let raindex_vault = RaindexVault::try_from_local_db(Rc::clone(&rc_client), local_vault, None) .expect("should convert vault"); - let json = serde_json::to_string(&vec![balance_change]).unwrap(); - let captured_sql = Rc::new(RefCell::new((String::new(), JsValue::UNDEFINED))); - let callback = create_sql_capturing_callback(&json, captured_sql.clone()); - let local_db = LocalDb::from_js_callback(callback, None); - let data_source = LocalDbVaults::new(&local_db, Rc::new(client)); let changes = data_source diff --git a/crates/common/src/raindex_client/mod.rs b/crates/common/src/raindex_client/mod.rs index 76e241cc9c..200a2cc0b4 100644 --- a/crates/common/src/raindex_client/mod.rs +++ b/crates/common/src/raindex_client/mod.rs @@ -48,6 +48,8 @@ use std::sync::Arc; #[cfg(target_family = "wasm")] use std::{cell::RefCell, rc::Rc}; use std::{collections::BTreeMap, fmt, num::ParseIntError, str::FromStr}; +#[cfg(target_family = "wasm")] +use wasm_bindgen_utils::prelude::js_sys::Reflect; #[cfg(target_family = "wasm")] pub(crate) type ClientRef = std::rc::Rc; @@ -126,14 +128,11 @@ impl RaindexClient { /// // Subgraph-only (no local-db-sync in YAML) /// const result = await RaindexClient.new([yamlConfig]); /// - /// // With local DB (YAML has local-db-sync, pass callbacks) - /// const result = await RaindexClient.new( - /// [yamlConfig], - /// undefined, - /// localDb.query.bind(localDb), - /// localDb.wipeAndRecreate.bind(localDb), - /// updateStatus, - /// ); + /// // With local DB (YAML has local-db-sync) + /// const result = await RaindexClient.new([yamlConfig], undefined, { + /// localDb, + /// statusCallback: updateStatus, + /// }); /// ``` #[wasm_export( js_name = "new", @@ -150,21 +149,12 @@ impl RaindexClient { raindex_yamls: Vec, validate: Option, #[wasm_export( - js_name = "queryCallback", - param_description = "Optional JavaScript function to execute local database queries" - )] - query_callback: Option, - #[wasm_export( - js_name = "wipeCallback", - param_description = "Optional JavaScript function to wipe and recreate the database" + js_name = "options", + param_description = "Optional setup object with localDb and statusCallback" )] - wipe_callback: Option, - #[wasm_export( - js_name = "statusCallback", - param_description = "Optional callback invoked with the current local DB sync status" - )] - status_callback: Option, + options: Option, ) -> Result { + let options = LocalDbClientOptions::parse(options)?; let mut raindex_yaml = RaindexYaml::new( raindex_yamls, match validate { @@ -183,9 +173,10 @@ impl RaindexClient { } let local_db = if has_syncs { - let cb = query_callback - .ok_or_else(|| RaindexError::LocalDbSetupMissing("query_callback".to_string()))?; - Some(LocalDb::from_js_callback(cb, wipe_callback)) + let local_db = options + .local_db + .ok_or_else(|| RaindexError::LocalDbSetupMissing("options.localDb".to_string()))?; + Some(LocalDb::from_js_local_db(local_db)?) } else { None }; @@ -212,7 +203,7 @@ impl RaindexClient { let handle = crate::raindex_client::local_db::pipeline::runner::scheduler::start( settings, db, - status_callback, + options.status_callback, sync_readiness.clone(), sync_status_store.clone(), true, @@ -237,6 +228,56 @@ impl RaindexClient { } } +#[cfg(target_family = "wasm")] +#[derive(Default)] +pub struct LocalDbClientOptions { + pub local_db: Option, + pub status_callback: Option, +} + +#[cfg(target_family = "wasm")] +impl LocalDbClientOptions { + pub fn parse(options: Option) -> Result { + let Some(options) = options else { + return Ok(Self::default()); + }; + + let local_db = optional_field(&options, "localDb")?; + let status_callback = optional_function_field(&options, "statusCallback")?; + + Ok(Self { + local_db, + status_callback, + }) + } +} + +#[cfg(target_family = "wasm")] +fn optional_field(options: &JsValue, name: &str) -> Result, RaindexError> { + let value = Reflect::get(options, &JsValue::from_str(name)).map_err(|e| { + RaindexError::LocalDbQueryError(LocalDbQueryError::database(format!( + "Failed to read options.{name}: {e:?}" + ))) + })?; + Ok((!value.is_undefined() && !value.is_null()).then_some(value)) +} + +#[cfg(target_family = "wasm")] +fn optional_function_field( + options: &JsValue, + name: &str, +) -> Result, RaindexError> { + optional_field(options, name)? + .map(|value| { + value.dyn_into::().map_err(|_| { + RaindexError::LocalDbQueryError(LocalDbQueryError::database(format!( + "options.{name} must be a function" + ))) + }) + }) + .transpose() +} + #[wasm_export] impl RaindexClient { fn resolve_networks( @@ -926,9 +967,30 @@ accounts: } #[cfg(target_family = "wasm")] - pub fn new_test_client_with_db_callback( + pub fn local_db_object_from_query_callback(query_callback: js_sys::Function) -> JsValue { + let local_db = js_sys::Object::new(); + js_sys::Reflect::set(&local_db, &JsValue::from_str("query"), &query_callback).unwrap(); + js_sys::Reflect::set( + &local_db, + &JsValue::from_str("wipeAndRecreate"), + &js_sys::Function::new_no_args( + "return Promise.resolve({ value: undefined, error: null });", + ), + ) + .unwrap(); + js_sys::Reflect::set( + &local_db, + &JsValue::from_str("transaction"), + &js_sys::Function::new_no_args("return Promise.resolve({ value: '', error: null });"), + ) + .unwrap(); + local_db.into() + } + + #[cfg(target_family = "wasm")] + pub fn new_test_client_with_local_db( yamls: Vec, - query_callback: js_sys::Function, + local_db: JsValue, chain_ids: Vec, ) -> RaindexClient { let raindex_yaml = RaindexYaml::new(yamls, RaindexYamlValidation::default()) @@ -942,10 +1004,10 @@ accounts: RaindexClient { raindex_yaml, local_db_state: LocalDbState::new( - Some(super::local_db::LocalDb::from_js_callback( - query_callback, - None, - )), + Some( + super::local_db::LocalDb::from_js_local_db(local_db) + .expect("valid test local DB object"), + ), Rc::new(RefCell::new(None)), sync_readiness, db_chain_ids, @@ -1306,6 +1368,81 @@ accounts: ) } + fn local_db_sync_yaml() -> String { + format!( + r#" +version: {spec_version} +networks: + arbitrum: + rpcs: + - https://arb.example/rpc + chain-id: 42161 + label: Arbitrum + network-id: 42161 + currency: ETH +subgraphs: + arbitrum: https://arb.example/subgraph +local-db-remotes: + remote: https://remote.example/manifest +local-db-sync: + arbitrum: + batch-size: 10 + max-concurrent-batches: 2 + retry-attempts: 1 + retry-delay-ms: 1 + rate-limit-delay-ms: 1 + finality-depth: 12 + bootstrap-block-threshold: 100 + sync-interval-ms: 5000 +raindexes: + arbitrum-raindex: + address: 0x2f209e5b67A33B8fE96E28f24628dF6Da301c8eB + network: arbitrum + subgraph: arbitrum + local-db-remote: remote + deployment-block: 1 +"#, + spec_version = SpecVersion::current() + ) + } + + fn local_db_options(include_transaction: bool) -> JsValue { + let query = js_sys::Function::new_with_args( + "sql", + r#" + var value = '[]'; + if (sql && sql.toLowerCase().includes('quick_check')) { + value = '[{"quick_check":"ok"}]'; + } + return Promise.resolve({ value: value, error: null }); + "#, + ); + let local_db = js_sys::Object::new(); + js_sys::Reflect::set(&local_db, &JsValue::from_str("query"), &query).unwrap(); + js_sys::Reflect::set( + &local_db, + &JsValue::from_str("wipeAndRecreate"), + &js_sys::Function::new_no_args( + "return Promise.resolve({ value: undefined, error: null });", + ), + ) + .unwrap(); + if include_transaction { + js_sys::Reflect::set( + &local_db, + &JsValue::from_str("transaction"), + &js_sys::Function::new_no_args( + "return Promise.resolve({ value: '', error: null });", + ), + ) + .unwrap(); + } + + let options = js_sys::Object::new(); + js_sys::Reflect::set(&options, &JsValue::from_str("localDb"), &local_db).unwrap(); + options.into() + } + #[wasm_bindgen_test] async fn test_raindex_client_new_success() { let client = RaindexClient::new( @@ -1317,8 +1454,6 @@ accounts: )], None, None, - None, - None, ) .await .unwrap(); @@ -1327,7 +1462,7 @@ accounts: #[wasm_bindgen_test] async fn test_raindex_client_new_invalid_yaml() { - let err = RaindexClient::new(vec![get_invalid_yaml()], Some(true), None, None, None) + let err = RaindexClient::new(vec![get_invalid_yaml()], Some(true), None) .await .unwrap_err(); assert!(matches!( @@ -1341,12 +1476,40 @@ accounts: #[wasm_bindgen_test] async fn test_raindex_client_new_empty_yaml() { - let err = RaindexClient::new(vec!["".to_string()], None, None, None, None) + let err = RaindexClient::new(vec!["".to_string()], None, None) .await .unwrap_err(); assert!(matches!(err, RaindexError::YamlError(YamlError::EmptyFile))); } + #[wasm_bindgen_test] + async fn test_raindex_client_new_with_local_db_options() { + let client = RaindexClient::new( + vec![local_db_sync_yaml()], + None, + Some(local_db_options(true)), + ) + .await + .unwrap(); + + assert!(client.local_db_state.local_db().is_some()); + } + + #[wasm_bindgen_test] + async fn test_raindex_client_new_requires_transaction_method() { + let err = RaindexClient::new( + vec![local_db_sync_yaml()], + None, + Some(local_db_options(false)), + ) + .await + .unwrap_err(); + + assert!(err + .to_string() + .contains("localDb.transaction must be a function")); + } + #[wasm_bindgen_test] async fn test_get_multi_subgraph_args_single_chain() { let client = RaindexClient::new( @@ -1358,8 +1521,6 @@ accounts: )], None, None, - None, - None, ) .await .unwrap(); @@ -1384,8 +1545,6 @@ accounts: )], None, None, - None, - None, ) .await .unwrap(); @@ -1413,8 +1572,6 @@ accounts: )], None, None, - None, - None, ) .await .unwrap(); @@ -1438,8 +1595,6 @@ accounts: )], None, None, - None, - None, ) .await .unwrap(); @@ -1475,8 +1630,6 @@ accounts: )], None, None, - None, - None, ) .await .unwrap(); @@ -1526,9 +1679,7 @@ accounts: spec_version = SpecVersion::current() ); - let client = RaindexClient::new(vec![yaml], None, None, None, None) - .await - .unwrap(); + let client = RaindexClient::new(vec![yaml], None, None).await.unwrap(); let err = client.get_multi_subgraph_args(None).unwrap_err(); assert!(matches!( diff --git a/crates/common/src/raindex_client/raindex_yaml.rs b/crates/common/src/raindex_client/raindex_yaml.rs index 55825faf4c..926744f966 100644 --- a/crates/common/src/raindex_client/raindex_yaml.rs +++ b/crates/common/src/raindex_client/raindex_yaml.rs @@ -254,9 +254,7 @@ mod tests { "http://localhost:3003", "http://localhost:3004", ); - let client = RaindexClient::new(vec![yaml], None, None, None, None) - .await - .unwrap(); + let client = RaindexClient::new(vec![yaml], None, None).await.unwrap(); let result = client.get_unique_chain_ids().unwrap(); assert!(!result.is_empty()); @@ -273,9 +271,7 @@ mod tests { "http://localhost:3003", "http://localhost:3004", ); - let client = RaindexClient::new(vec![yaml], None, None, None, None) - .await - .unwrap(); + let client = RaindexClient::new(vec![yaml], None, None).await.unwrap(); let result = client.get_all_networks().unwrap(); assert_eq!(result.len(), 2); @@ -299,9 +295,7 @@ mod tests { "http://localhost:3003", "http://localhost:3004", ); - let client = RaindexClient::new(vec![yaml], None, None, None, None) - .await - .unwrap(); + let client = RaindexClient::new(vec![yaml], None, None).await.unwrap(); let mainnet = client.get_network_by_chain_id(1).unwrap(); assert_eq!(mainnet.chain_id, 1); @@ -323,9 +317,7 @@ mod tests { "http://localhost:3003", "http://localhost:3004", ); - let client = RaindexClient::new(vec![yaml], None, None, None, None) - .await - .unwrap(); + let client = RaindexClient::new(vec![yaml], None, None).await.unwrap(); let mainnet_address = "0x1234567890123456789012345678901234567890".to_string(); let mainnet_raindex = client @@ -362,9 +354,7 @@ mod tests { "http://localhost:3003", "http://localhost:3004", ); - let client = RaindexClient::new(vec![yaml], None, None, None, None) - .await - .unwrap(); + let client = RaindexClient::new(vec![yaml], None, None).await.unwrap(); let result = client.is_sentry_enabled().unwrap(); assert!(!result); @@ -378,9 +368,7 @@ mod tests { "http://localhost:3003", "http://localhost:3004", ); - let client = RaindexClient::new(vec![yaml], None, None, None, None) - .await - .unwrap(); + let client = RaindexClient::new(vec![yaml], None, None).await.unwrap(); let result = client.get_all_accounts().unwrap(); assert_eq!(result.len(), 3); @@ -415,9 +403,7 @@ mod tests { "http://localhost:3003", "http://localhost:3004", ); - let client = RaindexClient::new(vec![yaml], None, None, None, None) - .await - .unwrap(); + let client = RaindexClient::new(vec![yaml], None, None).await.unwrap(); let result = client.get_all_tokens().unwrap(); assert_eq!(result.len(), 2); @@ -451,9 +437,7 @@ mod tests { "http://localhost:3003", "http://localhost:3004", ); - let client = RaindexClient::new(vec![yaml], None, None, None, None) - .await - .unwrap(); + let client = RaindexClient::new(vec![yaml], None, None).await.unwrap(); let result = client.get_all_raindexes().unwrap(); assert_eq!(result.len(), 2); diff --git a/crates/common/src/raindex_client/trades/get_by_owner.rs b/crates/common/src/raindex_client/trades/get_by_owner.rs index 8094f0b668..0b38b927d5 100644 --- a/crates/common/src/raindex_client/trades/get_by_owner.rs +++ b/crates/common/src/raindex_client/trades/get_by_owner.rs @@ -181,7 +181,8 @@ mod tests { #[cfg(target_family = "wasm")] mod wasm { use crate::raindex_client::tests::{ - get_local_db_test_yaml, new_test_client_with_db_callback, + get_local_db_test_yaml, local_db_object_from_query_callback, + new_test_client_with_local_db, }; use crate::raindex_client::trades::test_helpers::{ build_local_trade_fixture, make_local_db_trades_callback, @@ -203,9 +204,9 @@ mod tests { vec![fixture.trade], 4, ); - let client = new_test_client_with_db_callback( + let client = new_test_client_with_local_db( vec![get_local_db_test_yaml()], - callback, + local_db_object_from_query_callback(callback), vec![42161], ); @@ -240,9 +241,9 @@ mod tests { vec![fixture.trade], 4, ); - let client = new_test_client_with_db_callback( + let client = new_test_client_with_local_db( vec![get_local_db_test_yaml()], - callback, + local_db_object_from_query_callback(callback), vec![42161], ); diff --git a/crates/common/src/raindex_client/trades/get_by_tx.rs b/crates/common/src/raindex_client/trades/get_by_tx.rs index 3bedf57c4e..aeb1946191 100644 --- a/crates/common/src/raindex_client/trades/get_by_tx.rs +++ b/crates/common/src/raindex_client/trades/get_by_tx.rs @@ -219,7 +219,8 @@ mod tests { #[cfg(target_family = "wasm")] mod wasm { use crate::raindex_client::tests::{ - get_local_db_test_yaml, new_test_client_with_db_callback, + get_local_db_test_yaml, local_db_object_from_query_callback, + new_test_client_with_local_db, }; use crate::raindex_client::trades::test_helpers::{ build_local_trade_fixture, make_local_db_trades_callback, @@ -242,9 +243,9 @@ mod tests { vec![fixture.trade], 4, ); - let client = new_test_client_with_db_callback( + let client = new_test_client_with_local_db( vec![get_local_db_test_yaml()], - callback, + local_db_object_from_query_callback(callback), vec![42161], ); diff --git a/crates/common/src/raindex_client/trades/mod.rs b/crates/common/src/raindex_client/trades/mod.rs index fa1b565ae2..e91e72173c 100644 --- a/crates/common/src/raindex_client/trades/mod.rs +++ b/crates/common/src/raindex_client/trades/mod.rs @@ -990,7 +990,8 @@ mod test_helpers { mod wasm_tests { use super::*; use crate::raindex_client::tests::{ - get_local_db_test_yaml, new_test_client_with_db_callback, + get_local_db_test_yaml, local_db_object_from_query_callback, + new_test_client_with_local_db, }; use raindex_subgraph_client::utils::float::{F1, F2, F3, NEG2}; use wasm_bindgen_test::wasm_bindgen_test; @@ -1009,9 +1010,9 @@ mod test_helpers { vec![fixture.trade.clone()], 4, ); - let client = new_test_client_with_db_callback( + let client = new_test_client_with_local_db( vec![get_local_db_test_yaml()], - callback, + local_db_object_from_query_callback(callback), vec![42161], ); diff --git a/crates/common/src/raindex_client/vaults.rs b/crates/common/src/raindex_client/vaults.rs index 41237b3168..43d3dfae79 100644 --- a/crates/common/src/raindex_client/vaults.rs +++ b/crates/common/src/raindex_client/vaults.rs @@ -2220,7 +2220,8 @@ mod tests { use crate::local_db::query::fetch_vault_balance_changes::LocalDbVaultBalanceChange; use crate::raindex_client::local_db::executor::tests::create_sql_capturing_callback; use crate::raindex_client::tests::{ - get_local_db_test_yaml, new_test_client_with_db_callback, + get_local_db_test_yaml, local_db_object_from_query_callback, + new_test_client_with_local_db, }; use alloy::primitives::{address, b256, Address, Bytes}; use rain_math_float::Float; @@ -2318,9 +2319,9 @@ mod tests { let callback = make_local_db_vaults_callback(vec![vault]); - let client = new_test_client_with_db_callback( + let client = new_test_client_with_local_db( vec![get_local_db_test_yaml()], - callback, + local_db_object_from_query_callback(callback), vec![42161], ); @@ -2352,9 +2353,9 @@ mod tests { let callback = make_local_db_vaults_callback(vec![local_vault.clone()]); - let client = new_test_client_with_db_callback( + let client = new_test_client_with_local_db( vec![get_local_db_test_yaml()], - callback, + local_db_object_from_query_callback(callback), vec![42161], ); @@ -2421,9 +2422,9 @@ mod tests { vec![balance_change], ); - let client = new_test_client_with_db_callback( + let client = new_test_client_with_local_db( vec![get_local_db_test_yaml()], - callback, + local_db_object_from_query_callback(callback), vec![42161], ); @@ -2479,9 +2480,9 @@ mod tests { let json = serde_json::to_string(&vec![keep_vault]).unwrap(); let callback = create_sql_capturing_callback(&json, captured_sql.clone()); - let client = new_test_client_with_db_callback( + let client = new_test_client_with_local_db( vec![get_local_db_test_yaml()], - callback, + local_db_object_from_query_callback(callback), vec![42161], ); diff --git a/crates/js_api/src/registry.rs b/crates/js_api/src/registry.rs index c06c71894a..2a5f221e78 100644 --- a/crates/js_api/src/registry.rs +++ b/crates/js_api/src/registry.rs @@ -391,11 +391,10 @@ impl DotrainRegistry { /// ## Examples /// /// ```javascript - /// const clientResult = await registry.getRaindexClient( - /// localDb.query.bind(localDb), - /// localDb.wipeAndRecreate.bind(localDb), - /// updateStatus, - /// ); + /// const clientResult = await registry.getRaindexClient({ + /// localDb, + /// statusCallback: updateStatus, + /// }); /// if (clientResult.error) { /// console.error("Failed to get RaindexClient:", clientResult.error.readableMsg); /// return; @@ -411,27 +410,15 @@ impl DotrainRegistry { pub async fn get_raindex_client( &self, #[wasm_export( - js_name = "queryCallback", - param_description = "Optional JavaScript function to execute local database queries" - )] - query_callback: Option, - #[wasm_export( - js_name = "wipeCallback", - param_description = "Optional JavaScript function to wipe and recreate the database" - )] - wipe_callback: Option, - #[wasm_export( - js_name = "statusCallback", - param_description = "Optional callback invoked with the current local DB sync status" + js_name = "options", + param_description = "Optional setup object with localDb and statusCallback" )] - status_callback: Option, + options: Option, ) -> Result { let client = raindex_common::raindex_client::RaindexClient::new( vec![self.inner.settings()], None, - query_callback, - wipe_callback, - status_callback, + options, ) .await .map_err(DotrainRegistryCoreError::from)?; diff --git a/package-lock.json b/package-lock.json index 13ba9e950f..8e29db6dcc 100644 --- a/package-lock.json +++ b/package-lock.json @@ -13,7 +13,7 @@ "dependencies": { "@fast-check/vitest": "0.2.0", "@rainlanguage/raindex": "workspace:*", - "@rainlanguage/sqlite-web": "^0.0.2", + "@rainlanguage/sqlite-web": "^0.0.3", "@rainlanguage/ui-components": "workspace:*", "@reown/appkit": "1.6.4", "@reown/appkit-adapter-wagmi": "1.6.4", @@ -2982,9 +2982,9 @@ "link": true }, "node_modules/@rainlanguage/sqlite-web": { - "version": "0.0.2", - "resolved": "https://registry.npmjs.org/@rainlanguage/sqlite-web/-/sqlite-web-0.0.2.tgz", - "integrity": "sha512-blERwVIaim+b0Kn1C9MbBQ1GATqtspcbebOjDsrraVRAXRdjr0k9rOA4MSWoZDZjrHuT9KytTEzqDzd9giBOng==" + "version": "0.0.3", + "resolved": "https://registry.npmjs.org/@rainlanguage/sqlite-web/-/sqlite-web-0.0.3.tgz", + "integrity": "sha512-XcE59X3/WWu2NcAkOPOk/s94LvWVcU6gG/7i4QCBSc+TX7VwtASkTQevjx1JA7QFuJQscHBucQvb6ptnIMAWcg==" }, "node_modules/@rainlanguage/ui-components": { "resolved": "packages/ui-components", diff --git a/package.json b/package.json index 41703dc72c..464e8e4042 100644 --- a/package.json +++ b/package.json @@ -72,7 +72,7 @@ "dependencies": { "@fast-check/vitest": "0.2.0", "@rainlanguage/raindex": "workspace:*", - "@rainlanguage/sqlite-web": "^0.0.2", + "@rainlanguage/sqlite-web": "^0.0.3", "@rainlanguage/ui-components": "workspace:*", "@reown/appkit": "1.6.4", "@reown/appkit-adapter-wagmi": "1.6.4", diff --git a/packages/raindex/ARCHITECTURE.md b/packages/raindex/ARCHITECTURE.md index f0888c90c7..4a45023958 100644 --- a/packages/raindex/ARCHITECTURE.md +++ b/packages/raindex/ARCHITECTURE.md @@ -113,10 +113,8 @@ items: - High-level classes (selected) - `RaindexClient` — raindex queries (orders, trades, vaults, quotes, transactions) across configured networks/subgraphs. Constructor is async - (`await RaindexClient.new(...)`) and accepts optional `queryCallback` (for - applying fetched records to the local DB), `wipeCallback` (for cleaning up - stale data during full re-sync), and `statusCallback` (for reporting sync - progress/errors to the caller) args for local DB sync when the YAML has + (`await RaindexClient.new(...)`) and accepts an optional + `{ localDb, statusCallback }` object for local DB sync when the YAML has `local-db-sync` sections. The sync scheduler starts automatically when configured and shuts down via Drop. - `RaindexOrder`, `RaindexVault`, `RaindexTrade`, `RaindexTransaction`, diff --git a/packages/raindex/README.md b/packages/raindex/README.md index 052b51d6b6..df77f7a62f 100644 --- a/packages/raindex/README.md +++ b/packages/raindex/README.md @@ -232,17 +232,14 @@ const client = clientResult.value; Pass `true` as the second argument to `RaindexClient.new` when you want strict schema validation. -When the YAML includes `local-db-sync` sections, pass optional callbacks to wire +When the YAML includes `local-db-sync` sections, pass a local DB object to wire up a local SQLite cache: ```ts -const clientResult = await RaindexClient.new( - [RAINDEX_SETTINGS], - undefined, - localDb.query.bind(localDb), - localDb.wipeAndRecreate.bind(localDb), - updateStatusCallback, -); +const clientResult = await RaindexClient.new([RAINDEX_SETTINGS], undefined, { + localDb, + statusCallback: updateStatusCallback, +}); ``` The client will automatically start the sync scheduler and route queries to the @@ -882,10 +879,10 @@ if (!postTaskResult.error) console.log(postTaskResult.value); Automatically fetches remote tokens from `using-tokens-from` URLs. - `RaindexClient.getAllAccounts()` / `getAllVaultTokens()` – introspect accounts and ERC20 metadata defined in your YAML or discovered via subgraphs. -- Local DB sync – pass `queryCallback`, `wipeCallback`, and `statusCallback` to - `RaindexClient.new()` when YAML has `local-db-sync` sections to enable an - offline-capable persistent cache. The scheduler starts automatically and - queries route to the local DB once the first sync cycle completes. +- Local DB sync – pass `{ localDb, statusCallback }` to `RaindexClient.new()` + when YAML has `local-db-sync` sections to enable an offline-capable persistent + cache. The scheduler starts automatically and queries route to the local DB + once the first sync cycle completes. - `RaindexVaultsList.getWithdrawCalldata()` – multicall builder that withdraws every vault with a balance. - `RaindexOrder.convertToSgOrder()` – convert WASM order representations back diff --git a/packages/webapp/ARCHITECTURE.md b/packages/webapp/ARCHITECTURE.md index bf444b0457..808e700414 100644 --- a/packages/webapp/ARCHITECTURE.md +++ b/packages/webapp/ARCHITECTURE.md @@ -38,10 +38,9 @@ nix develop -c npm run dev - App bootstrap (`src/routes/+layout.ts`) - Loads the dotrain registry (`REGISTRY_URL` or `?registry=` override) via the WASM `DotrainRegistry` and constructs a `RaindexClient` from the registry’s - shared settings. Construction is a single async call that accepts optional - local DB callbacks (`queryCallback`, `wipeCallback`, `statusCallback`); the - scheduler starts automatically when `local-db-sync` is configured in the - YAML. + shared settings. Construction is a single async call that accepts an + optional `{ localDb, statusCallback }` object; the scheduler starts + automatically when `local-db-sync` is configured in the YAML. - Exposes a set of Svelte stores (selected chains, active accounts, filters, etc.) to child routes. - `export const ssr = false;` — the app renders client‑side only. diff --git a/packages/webapp/src/routes/+layout.ts b/packages/webapp/src/routes/+layout.ts index 4247a5fc34..66098b8350 100644 --- a/packages/webapp/src/routes/+layout.ts +++ b/packages/webapp/src/routes/+layout.ts @@ -72,9 +72,12 @@ export const load: LayoutLoad = async ({ url }) => { try { if (!errorMessage && registry) { const raindexClientRes = await registry.getRaindexClient( - localDb?.query?.bind(localDb), - localDb?.wipeAndRecreate?.bind(localDb), - updateStatus + localDb + ? { + localDb, + statusCallback: updateStatus + } + : undefined ); if (raindexClientRes.error) { errorMessage = raindexClientRes.error.readableMsg; @@ -162,7 +165,7 @@ if (import.meta.vitest) { }; mockInit.mockResolvedValue(undefined); mockLocalDbNew.mockReturnValue({ - value: { db: true, query: vi.fn(), wipeAndRecreate: vi.fn() } + value: { db: true, query: vi.fn(), wipeAndRecreate: vi.fn(), transaction: vi.fn() } }); }); @@ -216,8 +219,9 @@ if (import.meta.vitest) { mockRegistryNew.mockResolvedValueOnce({ value: mockRegistry }); + const localDb = { db: true, query: vi.fn(), wipeAndRecreate: vi.fn(), transaction: vi.fn() }; mockLocalDbNew.mockReturnValue({ - value: { db: true, query: vi.fn(), wipeAndRecreate: vi.fn() } + value: localDb }); // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -226,6 +230,10 @@ if (import.meta.vitest) { expect(result.errorMessage).toBeUndefined(); expect(result.stores).not.toBeNull(); expect(result.registry).toEqual(mockRegistry); + expect(mockGetRaindexClient).toHaveBeenCalledWith({ + localDb, + statusCallback: updateStatus + }); }); }); }