diff --git a/.github/workflows/rustdoc.yml b/.github/workflows/rustdoc.yml new file mode 100644 index 0000000..1e84ecb --- /dev/null +++ b/.github/workflows/rustdoc.yml @@ -0,0 +1,57 @@ +name: rustdoc-gate + +on: + push: + branches: ["**"] + pull_request: + +jobs: + rustdoc: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup Rust + uses: dtolnay/rust-toolchain@stable + + - name: Cache cargo + uses: Swatinem/rust-cache@v2 + + - name: Rustdoc warnings as errors (selected crates) + env: + RUSTDOCFLAGS: -D warnings + run: | + cargo doc --no-deps -p ffq-common + cargo doc --no-deps -p ffq-planner + cargo doc --no-deps -p ffq-execution + cargo doc --no-deps -p ffq-storage + cargo doc --no-deps -p ffq-client + cargo doc --no-deps -p ffq-distributed + + - name: Missing docs gate - ffq-common + run: cargo rustc -p ffq-common --lib -- -D missing-docs + + - name: Missing docs gate - ffq-storage + run: cargo rustc -p ffq-storage --lib -- -D missing-docs + + - name: Missing docs gate - ffq-planner + run: cargo rustc -p ffq-planner --lib -- -D missing-docs + + - name: Missing docs gate - ffq-execution + run: cargo rustc -p ffq-execution --lib -- -D missing-docs + + - name: Missing docs gate - ffq-shuffle + run: cargo rustc -p ffq-shuffle --lib -- -D missing-docs + + - name: Missing docs gate - ffq-sql + run: cargo rustc -p ffq-sql --lib -- -D missing-docs + + - name: Missing docs gate - ffq-client + run: cargo rustc -p ffq-client --lib -- -D missing-docs + + - name: Missing docs gate - ffq-distributed (no grpc) + run: cargo rustc -p ffq-distributed --lib -- -D missing-docs + + - name: Missing docs gate - ffq-distributed (grpc) + run: cargo rustc -p ffq-distributed --features grpc --lib -- -D missing-docs diff --git a/Cargo.lock b/Cargo.lock index aec117e..3befbdb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -737,7 +737,7 @@ dependencies = [ [[package]] name = "ffq-client" -version = "1.0.1" +version = "1.0.2" dependencies = [ "arrow", "arrow-schema", @@ -761,7 +761,7 @@ dependencies = [ [[package]] name = "ffq-common" -version = "1.0.1" +version = "1.0.2" dependencies = [ "axum", "prometheus", @@ -773,7 +773,7 @@ dependencies = [ [[package]] name = "ffq-distributed" -version = "1.0.1" +version = "1.0.2" dependencies = [ "arrow", "arrow-schema", @@ -798,7 +798,7 @@ dependencies = [ [[package]] name = "ffq-execution" -version = "1.0.1" +version = "1.0.2" dependencies = [ "arrow", "arrow-schema", @@ -811,7 +811,7 @@ dependencies = [ [[package]] name = "ffq-planner" -version = "1.0.1" +version = "1.0.2" dependencies = [ "arrow-schema", "ffq-common", @@ -823,7 +823,7 @@ dependencies = [ [[package]] name = "ffq-shuffle" -version = "1.0.1" +version = "1.0.2" dependencies = [ "arrow", "ffq-common", @@ -834,7 +834,7 @@ dependencies = [ [[package]] name = "ffq-sql" -version = "1.0.1" +version = "1.0.2" dependencies = [ "ffq-common", "sqlparser", @@ -842,7 +842,7 @@ dependencies = [ [[package]] name = "ffq-storage" -version = "1.0.1" +version = "1.0.2" dependencies = [ "arrow", "arrow-schema", diff --git a/Cargo.toml b/Cargo.toml index 756a9b7..49668b9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ default-members = ["crates/client"] [workspace.package] edition = "2024" license = "Apache-2.0" -version = "1.0.1" +version = "1.0.2" repository = "https://example.invalid/ffq" # TODO [workspace.dependencies] diff --git a/Contributing.md b/Contributing.md index 229412d..6182e8a 100644 --- a/Contributing.md +++ b/Contributing.md @@ -27,6 +27,9 @@ Open an issue describing: - Update docs/README if you change usage. - Be respectful in review discussions. +Source-level Rust documentation standard: +- `docs/dev/rustdoc-style.md` + ## Distributed Compose Smoke Test Use the v1 coordinator + 2 worker topology: diff --git a/crates/client/examples/run_bench_13_3.rs b/crates/client/examples/run_bench_13_3.rs index f86fd20..abf9761 100644 --- a/crates/client/examples/run_bench_13_3.rs +++ b/crates/client/examples/run_bench_13_3.rs @@ -1,7 +1,7 @@ use std::collections::{BTreeMap, HashMap}; use std::env; -use std::fs::File; use std::fs; +use std::fs::File; #[cfg(feature = "distributed")] use std::net::{TcpStream, ToSocketAddrs}; use std::path::{Path, PathBuf}; @@ -12,11 +12,11 @@ use std::time::{Instant, SystemTime, UNIX_EPOCH}; use arrow::array::{Float64Array, Int64Array, StringArray}; use arrow::record_batch::RecordBatch; use arrow_schema::{DataType, Field, Schema}; +use ffq_client::Engine; use ffq_client::bench_fixtures::{ default_benchmark_fixture_root, generate_default_benchmark_fixtures, }; -use ffq_client::bench_queries::{load_benchmark_query_from_root, BenchmarkQueryId}; -use ffq_client::Engine; +use ffq_client::bench_queries::{BenchmarkQueryId, load_benchmark_query_from_root}; use ffq_common::{EngineConfig, FfqError, Result}; use ffq_planner::LiteralValue; use ffq_storage::{TableDef, TableStats}; @@ -1262,7 +1262,10 @@ fn maybe_verify_official_tpch_correctness( if !is_official { return Ok(()); } - if !matches!(query_id, BenchmarkQueryId::TpchQ1 | BenchmarkQueryId::TpchQ3) { + if !matches!( + query_id, + BenchmarkQueryId::TpchQ1 | BenchmarkQueryId::TpchQ3 + ) { return Ok(()); } @@ -1417,9 +1420,12 @@ fn read_parquet_batches(path: &Path) -> Result> { let file = File::open(path)?; let builder = ParquetRecordBatchReaderBuilder::try_new(file) .map_err(|e| FfqError::Execution(format!("open parquet {} failed: {e}", path.display())))?; - let reader = builder - .build() - .map_err(|e| FfqError::Execution(format!("build parquet reader {} failed: {e}", path.display())))?; + let reader = builder.build().map_err(|e| { + FfqError::Execution(format!( + "build parquet reader {} failed: {e}", + path.display() + )) + })?; let mut out = Vec::new(); for batch in reader { out.push(batch.map_err(|e| { diff --git a/crates/client/src/bench_fixtures.rs b/crates/client/src/bench_fixtures.rs index 82daee1..4ca022d 100644 --- a/crates/client/src/bench_fixtures.rs +++ b/crates/client/src/bench_fixtures.rs @@ -18,27 +18,48 @@ const RAG_DOC_ROWS: i64 = 10_000; const RAG_EMBED_DIM: i32 = 64; const RAG_SYNTH_SEED: u64 = 42; +/// Per-file manifest entry for generated benchmark fixtures. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct FixtureFileManifest { + /// File name relative to fixture directory. pub file: String, + /// Number of rows in the parquet file. pub rows: i64, + /// Flattened field descriptors in `name:type:nullable` format. pub schema: Vec, } +/// Manifest for one generated fixture set. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct FixtureManifest { + /// Fixture identifier. pub fixture: String, + /// Human-readable fixture description. pub description: String, + /// Seed that makes generation deterministic. pub deterministic_seed: u64, + /// Files produced for this fixture. pub files: Vec, } +/// Top-level index file for all generated fixture sets. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct FixtureIndex { + /// Stable fixture-set id. pub fixture_set: String, + /// Subdirectories included in this fixture set. pub fixtures: Vec, } +/// Generates deterministic default benchmark fixtures under `root`. +/// +/// Creates: +/// - `tpch_sf1` (synthetic TPCH-shaped data) +/// - `rag_synth` (synthetic vector data) +/// - `index.json` and per-fixture manifests +/// +/// # Errors +/// Returns an error if directory creation, parquet writing, or manifest writes fail. pub fn generate_default_benchmark_fixtures(root: &Path) -> Result<()> { std::fs::create_dir_all(root)?; let tpch_root = root.join("tpch_sf1"); @@ -350,6 +371,7 @@ fn write_json(path: &Path, value: &T, err_prefix: &str) -> Result< Ok(()) } +/// Returns default directory for benchmark fixtures used by benchmark runners. pub fn default_benchmark_fixture_root() -> PathBuf { PathBuf::from("./tests/bench/fixtures") } diff --git a/crates/client/src/bench_queries.rs b/crates/client/src/bench_queries.rs index e0d9986..f1385e6 100644 --- a/crates/client/src/bench_queries.rs +++ b/crates/client/src/bench_queries.rs @@ -3,15 +3,21 @@ use std::path::{Path, PathBuf}; use ffq_common::{FfqError, Result}; +/// Identifier for canonical benchmark SQL files. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum BenchmarkQueryId { + /// TPC-H Q1 aggregate workload. TpchQ1, + /// TPC-H Q3 join + filter workload. TpchQ3, + /// Vector brute-force top-k benchmark query. RagTopkBruteforce, + /// Optional qdrant-backed vector top-k benchmark query. RagTopkQdrant, } impl BenchmarkQueryId { + /// Stable machine-readable identifier used in result artifacts. pub fn stable_id(self) -> &'static str { match self { Self::TpchQ1 => "tpch_q1", @@ -21,6 +27,7 @@ impl BenchmarkQueryId { } } + /// Relative SQL file location under the benchmark query root. pub fn file_name(self) -> &'static str { match self { Self::TpchQ1 => "canonical/tpch_q1.sql", @@ -31,6 +38,7 @@ impl BenchmarkQueryId { } } +/// Ordered list of benchmark queries expected by the benchmark runner. pub const CANONICAL_BENCHMARK_QUERIES: [BenchmarkQueryId; 4] = [ BenchmarkQueryId::TpchQ1, BenchmarkQueryId::TpchQ3, @@ -38,16 +46,25 @@ pub const CANONICAL_BENCHMARK_QUERIES: [BenchmarkQueryId; 4] = [ BenchmarkQueryId::RagTopkQdrant, ]; +/// Returns the default benchmark query directory. pub fn default_benchmark_query_root() -> PathBuf { Path::new(env!("CARGO_MANIFEST_DIR")) .join("../../tests/bench/queries") .to_path_buf() } +/// Loads one benchmark SQL file from the default query root. +/// +/// # Errors +/// Returns an error when file loading fails or query content is empty. pub fn load_benchmark_query(id: BenchmarkQueryId) -> Result { load_benchmark_query_from_root(&default_benchmark_query_root(), id) } +/// Loads one benchmark SQL file from an explicit query root. +/// +/// # Errors +/// Returns an error when file loading fails or query content is empty. pub fn load_benchmark_query_from_root(root: &Path, id: BenchmarkQueryId) -> Result { let path = root.join(id.file_name()); let query = fs::read_to_string(&path).map_err(|e| { @@ -71,6 +88,10 @@ pub fn load_benchmark_query_from_root(root: &Path, id: BenchmarkQueryId) -> Resu Ok(trimmed) } +/// Loads all canonical benchmark SQL files. +/// +/// # Errors +/// Returns an error if any canonical query fails to load. pub fn load_all_benchmark_queries() -> Result> { let root = default_benchmark_query_root(); CANONICAL_BENCHMARK_QUERIES diff --git a/crates/client/src/dataframe.rs b/crates/client/src/dataframe.rs index 8f795d0..aebfbc3 100644 --- a/crates/client/src/dataframe.rs +++ b/crates/client/src/dataframe.rs @@ -44,12 +44,18 @@ impl<'a> ffq_planner::OptimizerContext for CatalogProvider<'a> { } } +/// Write behavior for parquet outputs. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum WriteMode { + /// Replace destination data atomically. Overwrite, + /// Append a new deterministic part file to destination directory. Append, } +/// Query plan handle used for relational transformations and execution. +/// +/// `DataFrame` is immutable. Each transformation returns a new plan. #[derive(Debug, Clone)] pub struct DataFrame { session: SharedSession, @@ -64,6 +70,7 @@ impl DataFrame { } } + /// Returns the current logical plan. pub fn logical_plan(&self) -> &LogicalPlan { &self.logical_plan } @@ -78,6 +85,7 @@ impl DataFrame { Self::new(session, plan) } + /// Adds a filter predicate. /// df.filter(expr) pub fn filter(self, predicate: Expr) -> Self { let plan = LogicalPlan::Filter { @@ -87,6 +95,10 @@ impl DataFrame { Self::new(self.session, plan) } + /// Adds an inner join between two dataframes. + /// + /// # Errors + /// Returns an error when joining dataframes from different engine sessions. /// df.join(df2, on) /// on = vec![("left_key", "right_key"), ...] pub fn join(self, right: DataFrame, on: Vec<(String, String)>) -> Result { @@ -107,6 +119,7 @@ impl DataFrame { Ok(Self::new(self.session, plan)) } + /// Starts grouped aggregation builder for the given grouping keys. /// df.groupby(keys) pub fn groupby(self, keys: Vec) -> GroupedDataFrame { GroupedDataFrame { @@ -116,6 +129,10 @@ impl DataFrame { } } + /// Returns optimized logical plan text. + /// + /// # Errors + /// Returns an error when schema inference, optimization, or catalog lookup fails. pub fn explain(&self) -> Result { self.ensure_inferred_parquet_schemas()?; let cat = self.session.catalog.read().expect("catalog lock poisoned"); @@ -131,16 +148,55 @@ impl DataFrame { } /// df.collect() (async) + /// + /// # Examples + /// ```no_run + /// use ffq_client::Engine; + /// use ffq_common::EngineConfig; + /// + /// let engine = Engine::new(EngineConfig::default())?; + /// let df = engine.sql("SELECT 1 as one")?; + /// let batches = futures::executor::block_on(df.collect())?; + /// assert!(!batches.is_empty()); + /// # Ok::<(), ffq_common::FfqError>(()) + /// ``` + /// + /// # Errors + /// Returns an error when planning or execution fails. pub async fn collect(&self) -> Result> { let (_schema, batches) = self.execute_with_schema().await?; Ok(batches) } + /// Executes this plan and writes output to parquet, replacing destination by default. + /// + /// If `path` ends with `.parquet`, output is written to that file. + /// Otherwise, `path` is treated as output directory. + /// + /// # Examples + /// ```no_run + /// use ffq_client::Engine; + /// use ffq_common::EngineConfig; + /// + /// let engine = Engine::new(EngineConfig::default())?; + /// let df = engine.sql("SELECT 1 as id")?; + /// futures::executor::block_on(df.write_parquet("/tmp/ffq_out"))?; + /// # Ok::<(), ffq_common::FfqError>(()) + /// ``` + /// + /// # Errors + /// Returns an error when planning, execution, or file commit fails. pub async fn write_parquet>(&self, path: P) -> Result<()> { self.write_parquet_with_mode(path, WriteMode::Overwrite) .await } + /// Executes this plan and writes output to parquet with explicit write mode. + /// + /// Append is only supported for directory paths. + /// + /// # Errors + /// Returns an error for unsupported file append mode or write/commit failures. pub async fn write_parquet_with_mode>( &self, path: P, @@ -167,11 +223,34 @@ impl DataFrame { Ok(()) } + /// Executes this plan and saves output as a managed table with overwrite mode. + /// + /// # Errors + /// Returns an error when execution, persistence, or catalog update fails. pub async fn save_as_table(&self, name: &str) -> Result<()> { self.save_as_table_with_mode(name, WriteMode::Overwrite) .await } + /// Executes this plan and saves output as a managed table with explicit write mode. + /// + /// Overwrite replaces table data paths; append adds new parts and deduplicates paths. + /// + /// # Examples + /// ```no_run + /// use ffq_client::Engine; + /// use ffq_common::EngineConfig; + /// + /// let engine = Engine::new(EngineConfig::default())?; + /// let df = engine.sql("SELECT 42 as answer")?; + /// futures::executor::block_on(df.save_as_table("answers"))?; + /// let rows = futures::executor::block_on(engine.sql("SELECT answer FROM answers")?.collect())?; + /// assert!(!rows.is_empty()); + /// # Ok::<(), ffq_common::FfqError>(()) + /// ``` + /// + /// # Errors + /// Returns an error when name validation, execution, write, or catalog persistence fails. pub async fn save_as_table_with_mode(&self, name: &str, mode: WriteMode) -> Result<()> { if name.trim().is_empty() { return Err(FfqError::Planning("table name cannot be empty".to_string())); @@ -273,7 +352,8 @@ impl DataFrame { if !table.format.eq_ignore_ascii_case("parquet") { continue; } - if !self.session.config.schema_inference.allows_inference() && table.schema.is_none() + if !self.session.config.schema_inference.allows_inference() + && table.schema.is_none() { continue; } @@ -380,6 +460,7 @@ impl DataFrame { } } +/// Builder for grouped aggregations produced by [`DataFrame::groupby`]. #[derive(Debug, Clone)] pub struct GroupedDataFrame { session: SharedSession, @@ -388,6 +469,7 @@ pub struct GroupedDataFrame { } impl GroupedDataFrame { + /// Adds aggregate expressions and returns the resulting query dataframe. /// df.groupby(keys).agg(...) pub fn agg(self, aggs: Vec<(AggExpr, String)>) -> DataFrame { let plan = LogicalPlan::Aggregate { diff --git a/crates/client/src/engine.rs b/crates/client/src/engine.rs index b5d7a9d..b781470 100644 --- a/crates/client/src/engine.rs +++ b/crates/client/src/engine.rs @@ -7,24 +7,54 @@ use std::time::{SystemTime, UNIX_EPOCH}; use arrow_schema::Schema; use ffq_common::{EngineConfig, Result, SchemaInferencePolicy}; use ffq_planner::LiteralValue; -use ffq_storage::parquet_provider::{FileFingerprint, ParquetProvider}; use ffq_storage::TableDef; +use ffq_storage::parquet_provider::{FileFingerprint, ParquetProvider}; -use crate::session::{Session, SharedSession}; use crate::DataFrame; +use crate::session::{Session, SharedSession}; +/// Primary entry point for planning and executing queries. +/// +/// `Engine` owns a shared session containing planner, catalog, runtime, and metrics state. +/// Clone is cheap and shares the same underlying session. #[derive(Clone)] pub struct Engine { session: SharedSession, } +/// Source of a table schema returned by [`Engine::table_schema_with_origin`]. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum TableSchemaOrigin { + /// Schema came directly from catalog definition. CatalogDefined, + /// Schema was inferred from parquet files and cached/persisted as metadata. Inferred, } impl Engine { + /// Constructs a new engine with the provided configuration. + /// + /// Runtime selection behavior: + /// - when built **without** `distributed` feature: always embedded runtime + /// - when built **with** `distributed` feature: + /// - uses distributed runtime if `config.coordinator_endpoint` is set, or + /// - uses `FFQ_COORDINATOR_ENDPOINT` when set, otherwise falls back to embedded. + /// + /// Schema policy env overrides are also applied from session bootstrap: + /// `FFQ_SCHEMA_INFERENCE`, `FFQ_SCHEMA_WRITEBACK`, `FFQ_SCHEMA_DRIFT_POLICY`. + /// + /// # Examples + /// ```no_run + /// use ffq_client::Engine; + /// use ffq_common::EngineConfig; + /// + /// let engine = Engine::new(EngineConfig::default())?; + /// # let _ = engine; + /// # Ok::<(), ffq_common::FfqError>(()) + /// ``` + /// + /// # Errors + /// Returns an error if session initialization fails (for example catalog load or invalid config). pub fn new(config: EngineConfig) -> Result { let session = Arc::new(Session::new(config)?); Ok(Self { session }) @@ -37,7 +67,45 @@ impl Engine { .expect("table registration failed"); } - pub fn register_table_checked(&self, name: impl Into, mut table: TableDef) -> Result<()> { + /// Registers a table and returns a fallible result. + /// + /// For parquet tables with inference enabled and no explicit schema, + /// registration may infer schema immediately. + /// + /// # Examples + /// ```no_run + /// use arrow_schema::{DataType, Field, Schema}; + /// use ffq_client::Engine; + /// use ffq_common::EngineConfig; + /// use ffq_storage::{TableDef, TableStats}; + /// use std::collections::HashMap; + /// + /// let engine = Engine::new(EngineConfig::default())?; + /// engine.register_table_checked( + /// "lineitem", + /// TableDef { + /// name: "lineitem".to_string(), + /// uri: "tests/fixtures/parquet/lineitem.parquet".to_string(), + /// paths: vec![], + /// format: "parquet".to_string(), + /// schema: Some(Schema::new(vec![ + /// Field::new("l_orderkey", DataType::Int64, false), + /// Field::new("l_quantity", DataType::Float64, false), + /// ])), + /// stats: TableStats::default(), + /// options: HashMap::new(), + /// }, + /// )?; + /// # Ok::<(), ffq_common::FfqError>(()) + /// ``` + /// + /// # Errors + /// Returns an error when schema inference/validation fails or table metadata is invalid. + pub fn register_table_checked( + &self, + name: impl Into, + mut table: TableDef, + ) -> Result<()> { table.name = name.into(); maybe_infer_table_schema_on_register(self.session.config.schema_inference, &mut table)?; self.session @@ -48,11 +116,33 @@ impl Engine { Ok(()) } + /// Parses SQL into a query [`DataFrame`]. + /// + /// The query is planned/analyzed during execution (`collect`, write methods, etc.). + /// + /// # Examples + /// ```no_run + /// use ffq_client::Engine; + /// use ffq_common::EngineConfig; + /// + /// let engine = Engine::new(EngineConfig::default())?; + /// let df = engine.sql("SELECT 1")?; + /// let batches = futures::executor::block_on(df.collect())?; + /// # let _ = batches; + /// # Ok::<(), ffq_common::FfqError>(()) + /// ``` + /// + /// # Errors + /// Returns an error when SQL parsing fails. pub fn sql(&self, query: &str) -> Result { let logical = self.session.planner.plan_sql(query)?; Ok(DataFrame::new(self.session.clone(), logical)) } + /// Same as [`Engine::sql`] but binds named parameters. + /// + /// # Errors + /// Returns an error when SQL parsing/binding fails. pub fn sql_with_params( &self, query: &str, @@ -62,10 +152,15 @@ impl Engine { Ok(DataFrame::new(self.session.clone(), logical)) } + /// Returns a [`DataFrame`] that scans a registered table. + /// + /// # Errors + /// Returns an error if table lookup/planning fails. pub fn table(&self, name: &str) -> Result { Ok(DataFrame::table(self.session.clone(), name)) } + /// Lists all currently registered table names. pub fn list_tables(&self) -> Vec { self.session .catalog @@ -77,13 +172,24 @@ impl Engine { .collect() } + /// Returns schema for a registered table when available. + /// + /// # Errors + /// Returns an error if table lookup fails. pub fn table_schema(&self, name: &str) -> Result> { let cat = self.session.catalog.read().expect("catalog lock poisoned"); let table = cat.get(name)?; Ok(table.schema.clone()) } - pub fn table_schema_with_origin(&self, name: &str) -> Result> { + /// Returns schema together with origin metadata. + /// + /// # Errors + /// Returns an error if table lookup fails. + pub fn table_schema_with_origin( + &self, + name: &str, + ) -> Result> { let cat = self.session.catalog.read().expect("catalog lock poisoned"); let table = cat.get(name)?; let Some(schema) = table.schema.clone() else { @@ -99,15 +205,21 @@ impl Engine { Ok(Some((schema, origin))) } + /// Gracefully shuts down runtime resources. pub async fn shutdown(&self) -> Result<()> { self.session.runtime.shutdown().await } + /// Renders current Prometheus metrics exposition text. pub fn prometheus_metrics(&self) -> String { self.session.prometheus_metrics() } #[cfg(feature = "profiling")] + /// Serves metrics exporter endpoint for profiling/observability workflows. + /// + /// # Errors + /// Returns an error if binding or serving fails. pub async fn serve_metrics_exporter(&self, addr: SocketAddr) -> Result<()> { self.session.serve_metrics_exporter(addr).await } @@ -147,10 +259,9 @@ pub(crate) fn annotate_schema_inference_metadata( let now_secs = SystemTime::now() .duration_since(UNIX_EPOCH) .map_or(0, |d| d.as_secs()); - table.options.insert( - "schema.inferred_at".to_string(), - now_secs.to_string(), - ); + table + .options + .insert("schema.inferred_at".to_string(), now_secs.to_string()); table.options.insert( "schema.fingerprint".to_string(), serde_json::to_string(fingerprint).map_err(|e| { diff --git a/crates/client/src/expr.rs b/crates/client/src/expr.rs index 2441664..dbe179a 100644 --- a/crates/client/src/expr.rs +++ b/crates/client/src/expr.rs @@ -1,25 +1,31 @@ use ffq_planner::{BinaryOp, Expr, LiteralValue}; +/// Builds a column-reference expression. pub fn col(name: &str) -> Expr { Expr::Column(name.to_string()) } +/// Builds an `Int64` literal expression. pub fn lit_i64(v: i64) -> Expr { Expr::Literal(LiteralValue::Int64(v)) } +/// Builds a `Float64` literal expression. pub fn lit_f64(v: f64) -> Expr { Expr::Literal(LiteralValue::Float64(v)) } +/// Builds a boolean literal expression. pub fn lit_bool(v: bool) -> Expr { Expr::Literal(LiteralValue::Boolean(v)) } +/// Builds a UTF-8 string literal expression. pub fn lit_str(v: &str) -> Expr { Expr::Literal(LiteralValue::Utf8(v.to_string())) } +/// Builds an equality expression (`left = right`). pub fn eq(left: Expr, right: Expr) -> Expr { Expr::BinaryOp { left: Box::new(left), @@ -28,10 +34,12 @@ pub fn eq(left: Expr, right: Expr) -> Expr { } } +/// Builds a boolean AND expression. pub fn and(left: Expr, right: Expr) -> Expr { Expr::And(Box::new(left), Box::new(right)) } +/// Builds a boolean OR expression. pub fn or(left: Expr, right: Expr) -> Expr { Expr::Or(Box::new(left), Box::new(right)) } diff --git a/crates/client/src/lib.rs b/crates/client/src/lib.rs index 6bf1945..910eb2f 100644 --- a/crates/client/src/lib.rs +++ b/crates/client/src/lib.rs @@ -1,13 +1,42 @@ +#![deny(missing_docs)] + +//! End-user client surface for FFQ. +//! +//! Architecture role: +//! - exposes [`Engine`] and [`DataFrame`] APIs +//! - wires planner, catalog/session state, and runtime execution +//! - provides REPL/benchmark/fixture helper modules used in tests and tooling +//! +//! Key modules: +//! - [`engine`] +//! - [`dataframe`] +//! - [`expr`] +//! - [`repl`] +//! - [`tpch_tbl`] +//! - [`bench_queries`] +//! - [`bench_fixtures`] +//! +//! Feature flags: +//! - `distributed`: enables coordinator-backed runtime path +//! - `vector` / `qdrant` / `profiling`: enable optional vector and observability paths. + mod planner_facade; mod runtime; mod session; +/// Benchmark fixture generation and loading helpers. pub mod bench_fixtures; +/// Canonical benchmark SQL query definitions and loaders. pub mod bench_queries; +/// DataFrame API and write/query execution helpers. pub mod dataframe; +/// Engine/session entrypoints and table registration APIs. pub mod engine; +/// Expression builder helpers for DataFrame plans. pub mod expr; +/// Interactive SQL REPL implementation. pub mod repl; +/// TPC-H `.tbl` fixture conversion and validation helpers. pub mod tpch_tbl; pub use dataframe::{DataFrame, WriteMode}; diff --git a/crates/client/src/main.rs b/crates/client/src/main.rs index f873676..9ec6acd 100644 --- a/crates/client/src/main.rs +++ b/crates/client/src/main.rs @@ -1,6 +1,20 @@ +//! `ffq-client` command-line entrypoint. +//! +//! Architecture role: +//! - provides query mode and interactive REPL mode +//! - maps CLI/env configuration into [`ffq_common::EngineConfig`] +//! - executes SQL via [`ffq_client::Engine`] and renders result batches +//! +//! Key flows: +//! - `query`: one-shot SQL execution +//! - `repl`: interactive SQL and shell commands +//! +//! Feature flags: +//! - runtime behavior follows features enabled in `ffq-client` (for example `distributed`). + use arrow::util::pretty::pretty_format_batches; use ffq_client::Engine; -use ffq_client::repl::{run_repl, ReplOptions}; +use ffq_client::repl::{ReplOptions, run_repl}; use ffq_common::{EngineConfig, FfqError, SchemaDriftPolicy, SchemaInferencePolicy}; use ffq_storage::Catalog; @@ -25,7 +39,9 @@ fn run() -> Result<(), Box> { if args.first().map(|a| a.as_str()) == Some("repl") { let opts = parse_repl_opts(&args)?; - return run_repl(ReplOptions { config: opts.config }); + return run_repl(ReplOptions { + config: opts.config, + }); } let opts = parse_query_opts(&args)?; @@ -56,6 +72,7 @@ fn run() -> Result<(), Box> { } #[derive(Debug, Clone)] +/// Parsed one-shot query mode CLI options. struct QueryOpts { sql: String, plan_only: bool, @@ -63,10 +80,15 @@ struct QueryOpts { } #[derive(Debug, Clone)] +/// Parsed REPL mode CLI options. struct ReplOpts { config: EngineConfig, } +/// Parse query-mode CLI arguments. +/// +/// Supports both legacy form (`ffq-client "SELECT 1"`) and subcommand form +/// (`ffq-client query --sql ...`). fn parse_query_opts(args: &[String]) -> Result> { // Backward-compatible forms: // ffq-client "SELECT 1" @@ -99,18 +121,11 @@ fn parse_query_opts(args: &[String]) -> Result { i += 1; - sql = args - .get(i) - .cloned() - .ok_or("missing value for --sql")?; + sql = args.get(i).cloned().ok_or("missing value for --sql")?; } "--catalog" => { i += 1; - catalog = Some( - args.get(i) - .cloned() - .ok_or("missing value for --catalog")?, - ); + catalog = Some(args.get(i).cloned().ok_or("missing value for --catalog")?); } "--plan" => { plan_only = true; @@ -133,6 +148,7 @@ fn parse_query_opts(args: &[String]) -> Result Result> { let mut config = EngineConfig::default(); let mut i = 1usize; @@ -140,11 +156,8 @@ fn parse_repl_opts(args: &[String]) -> Result { i += 1; - config.catalog_path = Some( - args.get(i) - .cloned() - .ok_or("missing value for --catalog")?, - ); + config.catalog_path = + Some(args.get(i).cloned().ok_or("missing value for --catalog")?); } "--coordinator-endpoint" => { i += 1; @@ -221,6 +234,7 @@ fn parse_repl_opts(args: &[String]) -> Result\""); @@ -231,7 +245,10 @@ fn print_usage() { ); } -fn parse_schema_inference_policy(raw: &str) -> Result> { +/// Parse schema inference policy option value. +fn parse_schema_inference_policy( + raw: &str, +) -> Result> { match raw.trim().to_ascii_lowercase().as_str() { "off" => Ok(SchemaInferencePolicy::Off), "on" => Ok(SchemaInferencePolicy::On), @@ -244,6 +261,7 @@ fn parse_schema_inference_policy(raw: &str) -> Result Result> { match raw.trim().to_ascii_lowercase().as_str() { "fail" => Ok(SchemaDriftPolicy::Fail), @@ -255,17 +273,16 @@ fn parse_schema_drift_policy(raw: &str) -> Result Result> { match raw.trim().to_ascii_lowercase().as_str() { "1" | "true" | "yes" | "on" => Ok(true), "0" | "false" | "no" | "off" => Ok(false), - other => Err(format!( - "invalid value for {flag}: {other} (expected true|false)" - ) - .into()), + other => Err(format!("invalid value for {flag}: {other} (expected true|false)").into()), } } +/// Print categorized CLI errors with recovery hints. fn print_cli_error(err: &(dyn std::error::Error + 'static)) { if let Some(ffq) = err.downcast_ref::() { let (category, hint) = classify_ffq_error(ffq); @@ -277,11 +294,24 @@ fn print_cli_error(err: &(dyn std::error::Error + 'static)) { } let msg = err.to_string(); eprintln!("[error] {msg}"); - if msg.to_ascii_lowercase().contains("incompatible parquet files") { - eprintln!("hint: table points to parquet files with incompatible schemas; align schemas or split into separate tables"); + if msg + .to_ascii_lowercase() + .contains("incompatible parquet files") + { + eprintln!( + "hint: table points to parquet files with incompatible schemas; align schemas or split into separate tables" + ); } } +/// Map internal error type to user-facing category + hint. +/// +/// Taxonomy mapping: +/// - `Planning` -> `[planning]` +/// - `Execution` -> `[execution]` +/// - `InvalidConfig` -> `[config]` +/// - `Unsupported` -> `[unsupported]` +/// - `Io` -> `[io]` fn classify_ffq_error(err: &FfqError) -> (&'static str, Option<&'static str>) { match err { FfqError::Planning(msg) => ("planning", planning_hint(msg)), @@ -327,7 +357,9 @@ fn execution_hint(msg: &str) -> Option<&'static str> { || m.contains("transport error") || m.contains("coordinator") { - return Some("check FFQ_COORDINATOR_ENDPOINT and ensure coordinator/worker services are reachable"); + return Some( + "check FFQ_COORDINATOR_ENDPOINT and ensure coordinator/worker services are reachable", + ); } None } @@ -335,16 +367,22 @@ fn execution_hint(msg: &str) -> Option<&'static str> { fn config_hint(msg: &str) -> Option<&'static str> { let m = msg.to_ascii_lowercase(); if m.contains("schema inference failed") { - return Some("verify parquet files exist and are readable; or define schema explicitly in catalog"); + return Some( + "verify parquet files exist and are readable; or define schema explicitly in catalog", + ); } if m.contains("schema drift detected") { return Some("set FFQ_SCHEMA_DRIFT_POLICY=refresh to auto-refresh schema on file changes"); } if m.contains("incompatible parquet files") { - return Some("all parquet files in one table must have compatible schema; split mismatched files into separate tables"); + return Some( + "all parquet files in one table must have compatible schema; split mismatched files into separate tables", + ); } if m.contains("has no schema") { - return Some("define table schema in catalog or enable inference with FFQ_SCHEMA_INFERENCE=on|strict|permissive"); + return Some( + "define table schema in catalog or enable inference with FFQ_SCHEMA_INFERENCE=on|strict|permissive", + ); } if m.contains("catalog") { return Some("verify --catalog path exists and has .json/.toml extension"); @@ -355,7 +393,9 @@ fn config_hint(msg: &str) -> Option<&'static str> { fn unsupported_hint(msg: &str) -> Option<&'static str> { let m = msg.to_ascii_lowercase(); if m.contains("qdrant") { - return Some("enable required feature flags (vector/qdrant) or use brute-force fallback shape"); + return Some( + "enable required feature flags (vector/qdrant) or use brute-force fallback shape", + ); } None } diff --git a/crates/client/src/repl.rs b/crates/client/src/repl.rs index b4d3ce5..ba1991f 100644 --- a/crates/client/src/repl.rs +++ b/crates/client/src/repl.rs @@ -1,29 +1,54 @@ +//! Interactive FFQ SQL REPL. +//! +//! UX behavior: +//! - accepts SQL statements terminated by `;` +//! - supports shell commands when not in multiline SQL mode: +//! - `\help`, `\q`, `\tables`, `\schema ` +//! - `\plan on|off`, `\timing on|off`, `\mode table|csv|json` +//! - multiline SQL prompt switches from `ffq> ` to ` ...> ` until a trailing +//! semicolon is seen +//! - comment-only lines (`-- ...`) are ignored +//! - write queries (`INSERT INTO ... SELECT ...`) print `OK` when sink output +//! is empty instead of rendering an empty table +//! +//! Error taxonomy: +//! - planning/config/unsupported/io/execution errors are classified and +//! rendered with short hints for common recovery paths. + use std::path::PathBuf; use std::time::Instant; -use arrow::util::pretty::pretty_format_batches; -use arrow::util::display::array_value_to_string; use arrow::record_batch::RecordBatch; +use arrow::util::display::array_value_to_string; +use arrow::util::pretty::pretty_format_batches; use ffq_common::{EngineConfig, FfqError}; -use rustyline::error::ReadlineError; use rustyline::DefaultEditor; +use rustyline::error::ReadlineError; use serde_json::{Map, Value}; -use crate::engine::TableSchemaOrigin; use crate::Engine; +use crate::engine::TableSchemaOrigin; +/// REPL startup options. #[derive(Debug, Clone)] pub struct ReplOptions { + /// Engine configuration used for the session. pub config: EngineConfig, } +/// Runs the interactive FFQ SQL REPL. +/// +/// The REPL supports SQL statements and shell-style commands (for example `\help`, `\tables`, +/// `\schema`, `\mode`), with persistent history via `~/.ffq_history`. +/// +/// # Errors +/// Returns an error if engine bootstrap or line-editor initialization fails. pub fn run_repl(opts: ReplOptions) -> Result<(), Box> { let engine = Engine::new(opts.config)?; let mut rl = DefaultEditor::new()?; let history_path = repl_history_path(); if let Err(err) = rl.load_history(&history_path) { - if !matches!(err, ReadlineError::Io(ref io) if io.kind() == std::io::ErrorKind::NotFound) - { + if !matches!(err, ReadlineError::Io(ref io) if io.kind() == std::io::ErrorKind::NotFound) { eprintln!( "warning: failed to load history '{}': {err}", history_path.display() @@ -38,7 +63,11 @@ pub fn run_repl(opts: ReplOptions) -> Result<(), Box> { eprintln!("FFQ REPL (type \\q to quit)"); loop { - let prompt = if sql_buffer.is_empty() { "ffq> " } else { " ...> " }; + let prompt = if sql_buffer.is_empty() { + "ffq> " + } else { + " ...> " + }; let line = match rl.readline(prompt) { Ok(line) => { if !line.trim().is_empty() { @@ -95,7 +124,11 @@ pub fn run_repl(opts: ReplOptions) -> Result<(), Box> { continue; } - let sql = sql_buffer.trim_end().trim_end_matches(';').trim().to_string(); + let sql = sql_buffer + .trim_end() + .trim_end_matches(';') + .trim() + .to_string(); sql_buffer.clear(); if sql.is_empty() { continue; @@ -150,6 +183,7 @@ fn statement_terminated(sql: &str) -> bool { sql.trim_end().ends_with(';') } +/// Append one user input line to multiline SQL buffer. fn append_sql_line(sql_buffer: &mut String, raw: &str) { if !sql_buffer.is_empty() { sql_buffer.push('\n'); @@ -174,12 +208,16 @@ enum CommandResult { } #[derive(Debug, Clone, Copy, Eq, PartialEq)] +/// Output rendering mode for query result batches. enum OutputMode { Table, Csv, Json, } +/// Handle one REPL shell command. +/// +/// Command parsing is intentionally strict and usage errors are reported inline. fn handle_command( raw: &str, engine: &Engine, @@ -288,10 +326,8 @@ fn print_help() { println!(" \\mode table|csv|json set output rendering mode"); } -fn print_batches( - batches: &[RecordBatch], - mode: OutputMode, -) -> Result<(), FfqError> { +/// Render batches according to selected output mode. +fn print_batches(batches: &[RecordBatch], mode: OutputMode) -> Result<(), FfqError> { match mode { OutputMode::Table => { let rendered = pretty_format_batches(batches) @@ -308,6 +344,7 @@ fn print_batches( Ok(()) } +/// Render batches as CSV with header. fn print_batches_csv(batches: &[RecordBatch]) -> Result<(), FfqError> { if batches.is_empty() { return Ok(()); @@ -335,6 +372,7 @@ fn print_batches_csv(batches: &[RecordBatch]) -> Result<(), FfqError> { Ok(()) } +/// Render batches as JSON array of row objects. fn print_batches_json(batches: &[RecordBatch]) -> Result<(), FfqError> { let mut rows = Vec::::new(); for batch in batches { @@ -363,6 +401,7 @@ fn print_batches_json(batches: &[RecordBatch]) -> Result<(), FfqError> { Ok(()) } +/// Escape one CSV field. fn csv_escape(s: &str) -> String { if s.contains([',', '"', '\n']) { format!("\"{}\"", s.replace('"', "\"\"")) @@ -371,6 +410,7 @@ fn csv_escape(s: &str) -> String { } } +/// Print classified REPL error with optional hint. fn print_repl_error(stage: &str, err: &FfqError) { let (category, hint) = classify_error(err); eprintln!("[{category}] {stage}: {err}"); @@ -379,6 +419,14 @@ fn print_repl_error(stage: &str, err: &FfqError) { } } +/// Classify engine errors into user-facing REPL categories. +/// +/// Taxonomy mapping: +/// - `Planning` -> `[planning]` +/// - `Execution` -> `[execution]` +/// - `InvalidConfig` -> `[config]` +/// - `Unsupported` -> `[unsupported]` +/// - `Io` -> `[io]` fn classify_error(err: &FfqError) -> (&'static str, Option<&'static str>) { match err { FfqError::Planning(msg) => ("planning", planning_hint(msg)), @@ -429,7 +477,9 @@ fn execution_hint(msg: &str) -> Option<&'static str> { ); } if m.contains("query vector dim") { - return Some("ensure query vector length matches embedding column fixed-size list dimension"); + return Some( + "ensure query vector length matches embedding column fixed-size list dimension", + ); } None } @@ -474,7 +524,9 @@ fn unsupported_hint(msg: &str) -> Option<&'static str> { return Some("v1 supports ORDER BY only for cosine_similarity(...) DESC LIMIT k pattern"); } if m.contains("qdrant") { - return Some("enable required feature flags (vector/qdrant) or use brute-force fallback shape"); + return Some( + "enable required feature flags (vector/qdrant) or use brute-force fallback shape", + ); } None } @@ -616,13 +668,9 @@ mod tests { }, ); - let batches = futures::executor::block_on( - engine - .sql("SELECT a FROM t") - .expect("sql") - .collect(), - ) - .expect("collect"); + let batches = + futures::executor::block_on(engine.sql("SELECT a FROM t").expect("sql").collect()) + .expect("collect"); assert_eq!(batches.len(), 1); assert_eq!(batches[0].num_rows(), 3); diff --git a/crates/client/src/runtime.rs b/crates/client/src/runtime.rs index 2180385..669cd8a 100644 --- a/crates/client/src/runtime.rs +++ b/crates/client/src/runtime.rs @@ -1,6 +1,16 @@ +//! Query runtime implementations and operator execution helpers. +//! +//! This module executes physical plans in embedded and distributed modes. +//! Operator contracts in v1: +//! - scan/filter/project preserve row alignment per batch; +//! - join/aggregate may reorder rows, but preserve logical SQL semantics; +//! - sink operators return empty result batches and persist side effects; +//! - memory budget (`QueryContext.mem_budget_bytes`) triggers spill paths for +//! hash join/hash aggregate when estimates exceed the budget. + use std::cmp::{Ordering, Reverse}; use std::collections::BinaryHeap; -use std::collections::{hash_map::DefaultHasher, HashMap}; +use std::collections::{HashMap, hash_map::DefaultHasher}; use std::fmt::Debug; use std::fs::{self, File}; use std::hash::{Hash, Hasher}; @@ -18,7 +28,7 @@ use arrow::record_batch::RecordBatch; use arrow_schema::{DataType, Field, Schema, SchemaRef}; use ffq_common::metrics::global_metrics; use ffq_common::{FfqError, Result}; -use ffq_execution::{compile_expr, SendableRecordBatchStream, StreamAdapter, TaskContext}; +use ffq_execution::{SendableRecordBatchStream, StreamAdapter, TaskContext, compile_expr}; use ffq_planner::{AggExpr, BuildSide, ExchangeExec, Expr, PhysicalPlan}; use ffq_storage::parquet_provider::ParquetProvider; #[cfg(feature = "qdrant")] @@ -30,11 +40,15 @@ use futures::future::BoxFuture; use futures::{FutureExt, TryStreamExt}; use parquet::arrow::ArrowWriter; use serde::{Deserialize, Serialize}; +use tracing::{Instrument, info, info_span}; #[cfg(feature = "distributed")] use tracing::{debug, error}; -use tracing::{info, info_span, Instrument}; #[derive(Debug, Clone)] +/// Per-query runtime controls. +/// +/// `mem_budget_bytes` is a soft threshold used by join/aggregate operators to +/// choose in-memory vs spill-enabled execution. pub struct QueryContext { pub batch_size_rows: usize, pub mem_budget_bytes: usize, @@ -43,6 +57,10 @@ pub struct QueryContext { /// Runtime = something that can execute a PhysicalPlan and return a stream of RecordBatches. pub trait Runtime: Send + Sync + Debug { + /// Execute one physical plan and return its output stream. + /// + /// Errors are surfaced as `FfqError::Planning`, `FfqError::Execution`, + /// `FfqError::Io`, or `FfqError::InvalidConfig` depending on failure stage. fn execute( &self, plan: PhysicalPlan, @@ -56,6 +74,7 @@ pub trait Runtime: Send + Sync + Debug { } #[derive(Debug, Default)] +/// In-process runtime that executes all operators locally. pub struct EmbeddedRuntime; impl EmbeddedRuntime { @@ -112,6 +131,16 @@ struct TraceIds { task_id: u64, } +/// Recursively execute one physical-plan subtree and materialize output batches. +/// +/// This is the central operator dispatcher for scan/filter/project/limit/top-k, +/// exchange, hash join, hash aggregate, and parquet sink paths. +/// +/// Error taxonomy at call sites: +/// - `Execution`: operator evaluation, encode/decode, spill, or batch-shape failures +/// - `InvalidConfig`: missing table sink path/schema contracts discovered at runtime +/// - `Unsupported`: physical node or runtime feature path not supported in current build +/// - `Io`: filesystem failures surfaced through std io conversions fn execute_plan( plan: PhysicalPlan, ctx: QueryContext, @@ -529,6 +558,11 @@ impl Ord for TopKEntry { } #[cfg_attr(feature = "profiling", inline(never))] +/// Evaluate `TopKByScoreExec`. +/// +/// Input: arbitrary batches + numeric score expression (`Float32`/`Float64`). +/// Output: one batch containing top-k rows in descending score order. +/// Errors: non-numeric score expression evaluation or batch concat failures. fn run_topk_by_score(child: ExecOutput, score_expr: Expr, k: usize) -> Result { #[cfg(feature = "profiling")] let _profile_span = info_span!("profile_topk_by_score").entered(); @@ -737,6 +771,13 @@ enum JoinExecSide { } #[cfg_attr(feature = "profiling", inline(never))] +/// Execute `HashJoinExec` with optional spill to grace-hash mode. +/// +/// Input: fully materialized left/right child outputs and equi-join keys. +/// Output: one joined batch with schema `left ++ right`. +/// Spill behavior: when estimated build-side bytes exceed +/// `ctx.mem_budget_bytes`, join partitions are spilled to JSONL and joined +/// partition-wise. fn run_hash_join( left: ExecOutput, right: ExecOutput, @@ -943,6 +984,10 @@ fn estimate_join_rows_bytes(rows: &[Vec]) -> usize { } #[cfg_attr(feature = "profiling", inline(never))] +/// Grace hash-join spill path. +/// +/// Both build/probe rows are partitioned to disk by join key hash, then joined +/// partition-wise to keep peak in-memory state bounded. fn grace_hash_join( build_rows: &[Vec], probe_rows: &[Vec], @@ -1066,6 +1111,12 @@ fn hash_key(key: &[ScalarValue]) -> u64 { } #[cfg_attr(feature = "profiling", inline(never))] +/// Execute two-phase hash aggregation (partial or final mode). +/// +/// Input: child rows, group expressions, aggregate expressions. +/// Output: one batch with grouping columns followed by aggregate outputs. +/// Spill behavior: group state spills to JSONL files when estimated memory +/// usage exceeds `ctx.mem_budget_bytes`. fn run_hash_aggregate( child: ExecOutput, group_exprs: Vec, @@ -1435,9 +1486,7 @@ fn group_field( match &group_exprs[idx] { Expr::ColumnRef { name, index } => Ok(( name.clone(), - if *index < input_schema.fields().len() - && input_schema.field(*index).name() == name - { + if *index < input_schema.fields().len() && input_schema.field(*index).name() == name { input_schema.field(*index).data_type().clone() } else { // In final aggregate mode, group columns are physically laid out by position, @@ -1477,6 +1526,7 @@ fn state_to_scalar(state: &AggState, expr: &AggExpr, mode: AggregateMode) -> Sca } } +/// Spill aggregate state to disk when memory budget is exceeded. fn maybe_spill( groups: &mut HashMap, Vec>, spills: &mut Vec, @@ -1528,6 +1578,7 @@ fn maybe_spill( Ok(()) } +/// Merge one spilled aggregate state file back into in-memory groups. fn merge_spill_file( path: &PathBuf, groups: &mut HashMap, Vec>, @@ -1857,6 +1908,9 @@ fn scalar_gt(a: &ScalarValue, b: &ScalarValue) -> Result { scalar_lt(b, a) } +/// Execute parquet sink write with temp-file + atomic-commit semantics. +/// +/// Side effect: writes/overwrites target parquet object and returns no rows. fn write_parquet_sink(table: &ffq_storage::TableDef, child: &ExecOutput) -> Result<()> { ensure_sink_parent_layout(table)?; let out_path = resolve_sink_output_path(table)?; @@ -1995,14 +2049,16 @@ fn replace_file_atomically(staged: &PathBuf, target: &PathBuf) -> Result<()> { } #[cfg(feature = "distributed")] -use ffq_distributed::grpc::v1::QueryState as DistQueryState; +use ffq_distributed::DistributedRuntime as InnerDistributedRuntime; #[cfg(feature = "distributed")] use ffq_distributed::grpc::ControlPlaneClient; #[cfg(feature = "distributed")] -use ffq_distributed::DistributedRuntime as InnerDistributedRuntime; +use ffq_distributed::grpc::v1::QueryState as DistQueryState; #[cfg(feature = "distributed")] #[derive(Debug)] +/// gRPC-backed runtime that submits and fetches query work/results from the +/// distributed coordinator. pub struct DistributedRuntime { _inner: InnerDistributedRuntime, coordinator_endpoint: String, @@ -2010,6 +2066,7 @@ pub struct DistributedRuntime { #[cfg(feature = "distributed")] impl DistributedRuntime { + /// Create a distributed runtime targeting a coordinator endpoint. pub fn new(endpoint: impl Into) -> Self { Self { _inner: InnerDistributedRuntime::default(), @@ -2197,9 +2254,9 @@ mod tests { use ffq_storage::vector_index::{VectorIndexProvider, VectorTopKRow}; use futures::future::BoxFuture; - use super::{rows_to_vector_topk_output, run_vector_topk_with_provider}; #[cfg(feature = "vector")] - use super::{run_topk_by_score, ExecOutput}; + use super::{ExecOutput, run_topk_by_score}; + use super::{rows_to_vector_topk_output, run_vector_topk_with_provider}; struct MockVectorProvider; diff --git a/crates/client/src/session.rs b/crates/client/src/session.rs index ee640f1..6787cd0 100644 --- a/crates/client/src/session.rs +++ b/crates/client/src/session.rs @@ -1,13 +1,11 @@ +use std::collections::HashMap; #[cfg(feature = "profiling")] use std::net::SocketAddr; -use std::collections::HashMap; use std::sync::{Arc, RwLock}; use std::{env, path::Path, path::PathBuf}; use arrow_schema::Schema; -use ffq_common::{ - EngineConfig, MetricsRegistry, Result, SchemaDriftPolicy, SchemaInferencePolicy, -}; +use ffq_common::{EngineConfig, MetricsRegistry, Result, SchemaDriftPolicy, SchemaInferencePolicy}; use ffq_storage::Catalog; use ffq_storage::parquet_provider::FileFingerprint; diff --git a/crates/client/src/tpch_tbl.rs b/crates/client/src/tpch_tbl.rs index 2d606c1..f43329b 100644 --- a/crates/client/src/tpch_tbl.rs +++ b/crates/client/src/tpch_tbl.rs @@ -12,20 +12,34 @@ use parquet::basic::Compression; use parquet::file::properties::WriterProperties; use serde::{Deserialize, Serialize}; +/// Per-file manifest entry for converted TPCH parquet outputs. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct TpchParquetFileManifest { + /// File name relative to output directory. pub file: String, + /// Number of rows in the output file. pub rows: i64, + /// Flattened field descriptors in `name:type:nullable` format. pub schema: Vec, } +/// Manifest emitted after converting TPCH `.tbl` files to parquet. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct TpchParquetManifest { + /// Fixture identifier. pub fixture: String, + /// Input source format identifier. pub source_format: String, + /// Converted files. pub files: Vec, } +/// Converts TPCH dbgen SF1 `.tbl` files (`customer`, `orders`, `lineitem`) to parquet. +/// +/// Writes parquet files and a `manifest.json` into `output_dir`. +/// +/// # Errors +/// Returns an error when input files are missing or parsing/writing fails. pub fn convert_tpch_sf1_tbl_to_parquet( input_dir: &Path, output_dir: &Path, @@ -57,10 +71,12 @@ pub fn convert_tpch_sf1_tbl_to_parquet( Ok(manifest) } +/// Default location for TPCH dbgen `.tbl` source files. pub fn default_tpch_dbgen_tbl_input_dir() -> PathBuf { PathBuf::from("./tests/bench/fixtures/tpch_dbgen_sf1") } +/// Default location for converted TPCH parquet fixture files. pub fn default_tpch_dbgen_parquet_output_dir() -> PathBuf { PathBuf::from("./tests/bench/fixtures/tpch_dbgen_sf1_parquet") } diff --git a/crates/client/tests/benchmark_fixtures_deterministic.rs b/crates/client/tests/benchmark_fixtures_deterministic.rs index fa852b4..7f377e8 100644 --- a/crates/client/tests/benchmark_fixtures_deterministic.rs +++ b/crates/client/tests/benchmark_fixtures_deterministic.rs @@ -3,7 +3,7 @@ use std::path::{Path, PathBuf}; use std::time::{SystemTime, UNIX_EPOCH}; use ffq_client::bench_fixtures::{ - generate_default_benchmark_fixtures, FixtureIndex, FixtureManifest, + FixtureIndex, FixtureManifest, generate_default_benchmark_fixtures, }; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; diff --git a/crates/client/tests/dataframe_write_api.rs b/crates/client/tests/dataframe_write_api.rs index 798d79f..18d8bb8 100644 --- a/crates/client/tests/dataframe_write_api.rs +++ b/crates/client/tests/dataframe_write_api.rs @@ -10,8 +10,8 @@ use arrow_schema::{DataType, Field, Schema}; use ffq_client::{Engine, WriteMode}; use ffq_common::EngineConfig; use ffq_storage::TableDef; -use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use parquet::arrow::ArrowWriter; +use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; fn unique_path(prefix: &str, ext: &str) -> std::path::PathBuf { let nanos = SystemTime::now() diff --git a/crates/client/tests/distributed_runtime_roundtrip.rs b/crates/client/tests/distributed_runtime_roundtrip.rs index 3959ee3..36abeba 100644 --- a/crates/client/tests/distributed_runtime_roundtrip.rs +++ b/crates/client/tests/distributed_runtime_roundtrip.rs @@ -3,8 +3,8 @@ use std::collections::HashMap; #[cfg(feature = "vector")] use std::fs::File; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; use arrow::array::Int64Array; diff --git a/crates/client/tests/embedded_hash_aggregate.rs b/crates/client/tests/embedded_hash_aggregate.rs index aa9c346..f219925 100644 --- a/crates/client/tests/embedded_hash_aggregate.rs +++ b/crates/client/tests/embedded_hash_aggregate.rs @@ -3,8 +3,8 @@ use std::sync::Arc; use arrow::array::{Float64Array, Int64Array, StringArray}; use arrow_schema::{DataType, Field, Schema}; -use ffq_client::expr::col; use ffq_client::Engine; +use ffq_client::expr::col; use ffq_common::EngineConfig; use ffq_planner::AggExpr; use ffq_storage::TableDef; @@ -163,7 +163,9 @@ fn hash_aggregate_deterministic_with_spill_and_non_spill_parity() { groups.sort(); assert_eq!( groups, - vec!["group_0", "group_1", "group_2", "group_3", "group_4", "group_5", "group_6"] + vec![ + "group_0", "group_1", "group_2", "group_3", "group_4", "group_5", "group_6" + ] ); assert_eq!(seen, 840); diff --git a/crates/client/tests/embedded_parquet_scan.rs b/crates/client/tests/embedded_parquet_scan.rs index 0b1fc67..09c00c9 100644 --- a/crates/client/tests/embedded_parquet_scan.rs +++ b/crates/client/tests/embedded_parquet_scan.rs @@ -1,13 +1,13 @@ use std::collections::HashMap; use std::fs::File; -use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; use std::thread::sleep; -use std::time::{SystemTime, UNIX_EPOCH}; use std::time::Duration; +use std::time::{SystemTime, UNIX_EPOCH}; -use arrow::array::{Int64Array, StringArray}; use arrow::array::Int32Array; +use arrow::array::{Int64Array, StringArray}; use arrow::record_batch::RecordBatch; use arrow_schema::{DataType, Field, Schema}; use ffq_client::Engine; @@ -106,13 +106,9 @@ fn sql_collect_works_when_parquet_schema_is_missing_in_catalog() { }, ); - let batches = futures::executor::block_on( - engine - .sql("SELECT id, name FROM t") - .expect("sql") - .collect(), - ) - .expect("collect"); + let batches = + futures::executor::block_on(engine.sql("SELECT id, name FROM t").expect("sql").collect()) + .expect("collect"); let rows: usize = batches.iter().map(|b| b.num_rows()).sum(); assert_eq!(rows, 3); @@ -218,10 +214,8 @@ fn schema_cache_refreshes_on_drift_when_policy_allows_refresh() { }, ); - let first = futures::executor::block_on( - engine.sql("SELECT id FROM t").expect("sql").collect(), - ) - .expect("collect first"); + let first = futures::executor::block_on(engine.sql("SELECT id FROM t").expect("sql").collect()) + .expect("collect first"); assert_eq!(first.iter().map(|b| b.num_rows()).sum::(), 3); sleep(Duration::from_millis(2)); @@ -261,10 +255,8 @@ fn schema_cache_can_fail_on_drift_when_configured() { }, ); - let _ = futures::executor::block_on( - engine.sql("SELECT id FROM t").expect("sql").collect(), - ) - .expect("collect first"); + let _ = futures::executor::block_on(engine.sql("SELECT id FROM t").expect("sql").collect()) + .expect("collect first"); sleep(Duration::from_millis(2)); write_id_name_city_parquet(&parquet_path); @@ -307,23 +299,28 @@ fn inferred_schema_writeback_persists_across_restart() { cfg.schema_writeback = true; let engine = Engine::new(cfg.clone()).expect("engine"); - let rows = futures::executor::block_on( - engine - .sql("SELECT id, name FROM t") - .expect("sql") - .collect(), - ) - .expect("collect"); + let rows = + futures::executor::block_on(engine.sql("SELECT id, name FROM t").expect("sql").collect()) + .expect("collect"); assert_eq!(rows.iter().map(|b| b.num_rows()).sum::(), 3); let saved = std::fs::read_to_string(&catalog_path).expect("read catalog"); - assert!(saved.contains("schema.inferred_at"), "missing inferred_at marker"); - assert!(saved.contains("schema.fingerprint"), "missing fingerprint marker"); + assert!( + saved.contains("schema.inferred_at"), + "missing inferred_at marker" + ); + assert!( + saved.contains("schema.fingerprint"), + "missing fingerprint marker" + ); assert!(saved.contains("\"schema\""), "missing persisted schema"); let restarted = Engine::new(cfg).expect("restart engine"); let persisted_schema = restarted.table_schema("t").expect("table schema"); - assert!(persisted_schema.is_some(), "schema should be loaded from writeback"); + assert!( + persisted_schema.is_some(), + "schema should be loaded from writeback" + ); let _ = std::fs::remove_file(parquet_path); let _ = std::fs::remove_file(catalog_path); @@ -350,13 +347,8 @@ fn schema_inference_off_requires_predeclared_schema() { }, ); - let err = futures::executor::block_on( - engine - .sql("SELECT id FROM t") - .expect("sql") - .collect(), - ) - .expect_err("must fail without schema inference"); + let err = futures::executor::block_on(engine.sql("SELECT id FROM t").expect("sql").collect()) + .expect_err("must fail without schema inference"); assert!(format!("{err}").contains("has no schema")); let _ = std::fs::remove_file(parquet_path); @@ -374,20 +366,20 @@ fn schema_inference_strict_rejects_numeric_widening_across_files() { let engine = Engine::new(cfg).expect("engine"); let err = engine .register_table_checked( - "t", - TableDef { - name: "ignored".to_string(), - uri: String::new(), - paths: vec![ - p1.to_string_lossy().to_string(), - p2.to_string_lossy().to_string(), - ], - format: "parquet".to_string(), - schema: None, - stats: ffq_storage::TableStats::default(), - options: HashMap::new(), - }, - ) + "t", + TableDef { + name: "ignored".to_string(), + uri: String::new(), + paths: vec![ + p1.to_string_lossy().to_string(), + p2.to_string_lossy().to_string(), + ], + format: "parquet".to_string(), + schema: None, + stats: ffq_storage::TableStats::default(), + options: HashMap::new(), + }, + ) .expect_err("strict should fail registration"); assert!(format!("{err}").contains("strict policy")); @@ -479,8 +471,11 @@ fn write_id_name_city_parquet(path: &std::path::Path) { fn write_single_numeric_parquet_i32(path: &std::path::Path) { let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int32, false)])); - let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from(vec![1_i32, 2]))]) - .expect("build batch"); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from(vec![1_i32, 2]))], + ) + .expect("build batch"); let file = File::create(path).expect("create parquet file"); let mut writer = ArrowWriter::try_new(file, schema, None).expect("create parquet writer"); writer.write(&batch).expect("write parquet batch"); @@ -489,8 +484,11 @@ fn write_single_numeric_parquet_i32(path: &std::path::Path) { fn write_single_numeric_parquet_i64(path: &std::path::Path) { let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int64, false)])); - let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(Int64Array::from(vec![3_i64, 4]))]) - .expect("build batch"); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int64Array::from(vec![3_i64, 4]))], + ) + .expect("build batch"); let file = File::create(path).expect("create parquet file"); let mut writer = ArrowWriter::try_new(file, schema, None).expect("create parquet writer"); writer.write(&batch).expect("write parquet batch"); diff --git a/crates/client/tests/embedded_parquet_sink.rs b/crates/client/tests/embedded_parquet_sink.rs index ea0e08a..f745e63 100644 --- a/crates/client/tests/embedded_parquet_sink.rs +++ b/crates/client/tests/embedded_parquet_sink.rs @@ -9,8 +9,8 @@ use arrow_schema::{DataType, Field, Schema}; use ffq_client::Engine; use ffq_common::EngineConfig; use ffq_storage::TableDef; -use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use parquet::arrow::ArrowWriter; +use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; fn unique_path(prefix: &str, ext: &str) -> std::path::PathBuf { let nanos = SystemTime::now() diff --git a/crates/client/tests/tpch_catalog_profiles.rs b/crates/client/tests/tpch_catalog_profiles.rs index 2c0ff3f..4abec1f 100644 --- a/crates/client/tests/tpch_catalog_profiles.rs +++ b/crates/client/tests/tpch_catalog_profiles.rs @@ -2,8 +2,8 @@ use std::env; use std::path::{Path, PathBuf}; use std::sync::Mutex; -use ffq_client::bench_queries::{load_benchmark_query, BenchmarkQueryId}; use ffq_client::Engine; +use ffq_client::bench_queries::{BenchmarkQueryId, load_benchmark_query}; use ffq_common::EngineConfig; use ffq_storage::Catalog; @@ -80,15 +80,13 @@ fn run_catalog_profile_query_checks(profile_path: &Path) { #[test] #[ignore = "slow official SF1 Q1/Q3 profile validation"] fn tpch_official_catalog_profile_json_runs_q1_q3_without_manual_registration() { - let profile = - repo_root().join("tests/fixtures/catalog/tpch_dbgen_sf1_parquet.tables.json"); + let profile = repo_root().join("tests/fixtures/catalog/tpch_dbgen_sf1_parquet.tables.json"); run_catalog_profile_query_checks(&profile); } #[test] #[ignore = "slow official SF1 Q1/Q3 profile validation"] fn tpch_official_catalog_profile_toml_runs_q1_q3_without_manual_registration() { - let profile = - repo_root().join("tests/fixtures/catalog/tpch_dbgen_sf1_parquet.tables.toml"); + let profile = repo_root().join("tests/fixtures/catalog/tpch_dbgen_sf1_parquet.tables.toml"); run_catalog_profile_query_checks(&profile); } diff --git a/crates/common/src/config.rs b/crates/common/src/config.rs index 1463b4b..0a9d7a2 100644 --- a/crates/common/src/config.rs +++ b/crates/common/src/config.rs @@ -1,11 +1,16 @@ use serde::{Deserialize, Serialize}; +/// Schema inference policy for parquet-like tables with optional schema. #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "lowercase")] pub enum SchemaInferencePolicy { + /// Never infer schema; explicit schema is required. Off, + /// Infer schema when missing; fail on incompatible multi-file schema merges. On, + /// Infer schema with strict compatibility checks. Strict, + /// Infer schema with permissive numeric widening where supported. Permissive, } @@ -16,19 +21,24 @@ impl Default for SchemaInferencePolicy { } impl SchemaInferencePolicy { + /// Returns whether schema inference is permitted. pub fn allows_inference(self) -> bool { !matches!(self, Self::Off) } + /// Returns whether permissive schema merge behavior is enabled. pub fn is_permissive_merge(self) -> bool { matches!(self, Self::On | Self::Permissive) } } +/// Behavior when observed file fingerprint no longer matches cached schema metadata. #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "lowercase")] pub enum SchemaDriftPolicy { + /// Fail query/registration when schema drift is detected. Fail, + /// Refresh inferred schema from current files when drift is detected. Refresh, } @@ -38,21 +48,32 @@ impl Default for SchemaDriftPolicy { } } +/// Global engine/session configuration shared across planner/runtime layers. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct EngineConfig { + /// Target rows per output/input batch for operators. pub batch_size_rows: usize, + /// Soft per-task memory budget used by spill decisions. pub mem_budget_bytes: usize, + /// Shuffle partition count used by distributed and physical planning paths. pub shuffle_partitions: usize, + /// Broadcast join threshold in bytes for optimizer join hinting. pub broadcast_threshold_bytes: u64, + /// Directory used for spill files. pub spill_dir: String, + /// Optional catalog file path (`.json` or `.toml`). pub catalog_path: Option, + /// Optional distributed coordinator endpoint (for example `http://127.0.0.1:50051`). pub coordinator_endpoint: Option, + /// Schema inference policy. #[serde(default)] pub schema_inference: SchemaInferencePolicy, + /// Schema drift policy. #[serde(default)] pub schema_drift_policy: SchemaDriftPolicy, + /// Whether inferred schema/fingerprint metadata should be persisted back to catalog. #[serde(default)] pub schema_writeback: bool, } diff --git a/crates/common/src/error.rs b/crates/common/src/error.rs index 0912eea..b515cb6 100644 --- a/crates/common/src/error.rs +++ b/crates/common/src/error.rs @@ -1,21 +1,54 @@ use thiserror::Error; +/// Canonical FFQ error taxonomy used across crates. +/// +/// Classification guidance: +/// - [`FfqError::Planning`]: query shape/name/type issues discovered before execution +/// - [`FfqError::Execution`]: runtime operator evaluation, decode/encode, or data-shape failures +/// - [`FfqError::InvalidConfig`]: catalog/config/environment/path contract violations +/// - [`FfqError::Unsupported`]: syntactically valid but intentionally unimplemented behavior +/// - [`FfqError::Io`]: raw filesystem/network IO failures from std APIs #[derive(Debug, Error)] pub enum FfqError { + /// Invalid or inconsistent configuration/catalog state. + /// + /// Examples: + /// - missing required table location (`uri`/`paths`) + /// - schema inference disabled for schema-less parquet table + /// - invalid policy/env/CLI option values #[error("invalid configuration: {0}")] InvalidConfig(String), + /// Query planning/analyzer/optimizer failures. + /// + /// Examples: + /// - unknown table/column + /// - type mismatch in expressions or join keys + /// - invalid LIMIT/TOP-K values #[error("planning error: {0}")] Planning(String), + /// Runtime execution failures after planning succeeded. + /// + /// Examples: + /// - expression evaluation/type mismatch at runtime + /// - parquet/shuffle decode failures + /// - spill/merge state shape mismatches #[error("execution error: {0}")] Execution(String), + /// Transparent std IO failures. #[error("io error: {0}")] Io(#[from] std::io::Error), + /// Valid request for a feature/shape not implemented in current version. + /// + /// Examples: + /// - SQL constructs outside supported subset + /// - provider/feature-flag-gated functionality unavailable at build/runtime #[error("unsupported: {0}")] Unsupported(String), } +/// Standard FFQ result alias. pub type Result = std::result::Result; diff --git a/crates/common/src/ids.rs b/crates/common/src/ids.rs index 16f7e09..b9e7278 100644 --- a/crates/common/src/ids.rs +++ b/crates/common/src/ids.rs @@ -1,19 +1,43 @@ +//! Typed identifiers shared across coordinator/runtime components. + use serde::{Deserialize, Serialize}; use std::fmt; -macro_rules! id_type { - ($name:ident) => { - #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] - pub struct $name(pub u64); +/// Stable query identifier. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct QueryId( + /// Raw numeric id value. + pub u64, +); + +impl fmt::Display for QueryId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +/// Stable stage identifier within a query DAG. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct StageId( + /// Raw numeric id value. + pub u64, +); - impl fmt::Display for $name { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.0) - } - } - }; +impl fmt::Display for StageId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } } -id_type!(QueryId); -id_type!(StageId); -id_type!(TaskId); +/// Stable task identifier within a stage. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct TaskId( + /// Raw numeric id value. + pub u64, +); + +impl fmt::Display for TaskId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index d1e4a79..375c3fd 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -1,8 +1,32 @@ +#![deny(missing_docs)] + +//! Shared configuration, error types, IDs, and observability primitives for FFQ crates. +//! +//! Architecture role: +//! - defines engine/runtime configuration passed across layers +//! - provides common [`FfqError`] / [`Result`] contracts +//! - hosts metrics and optional exporter utilities +//! +//! Key modules: +//! - [`config`] +//! - [`error`] +//! - [`ids`] +//! - [`metrics`] +//! - `metrics_exporter` (feature-gated) +//! +//! Feature flags: +//! - `profiling`: enables the metrics HTTP exporter helpers. + +/// Shared engine/runtime configuration types. pub mod config; +/// Shared error taxonomy. pub mod error; +/// Strongly-typed identifier wrappers. pub mod ids; +/// Metrics registry and Prometheus rendering helpers. pub mod metrics; #[cfg(feature = "profiling")] +/// Optional HTTP metrics exporter. pub mod metrics_exporter; pub use config::{EngineConfig, SchemaDriftPolicy, SchemaInferencePolicy}; diff --git a/crates/common/src/metrics.rs b/crates/common/src/metrics.rs index 67c1b8e..ee0bc3a 100644 --- a/crates/common/src/metrics.rs +++ b/crates/common/src/metrics.rs @@ -4,6 +4,7 @@ use prometheus::{ CounterVec, Encoder, GaugeVec, HistogramOpts, HistogramVec, Opts, Registry, TextEncoder, }; +/// Shared metrics registry for query/operator/shuffle/spill/scheduler telemetry. #[derive(Clone, Debug)] pub struct MetricsRegistry { inner: Arc, @@ -32,12 +33,14 @@ struct MetricsInner { } impl MetricsRegistry { + /// Create a new in-memory Prometheus registry with FFQ metric families. pub fn new() -> Self { Self { inner: Arc::new(MetricsInner::new()), } } + /// Record per-operator throughput and latency metrics. pub fn record_operator( &self, query_id: &str, @@ -88,6 +91,7 @@ impl MetricsRegistry { .observe(secs.max(0.0)); } + /// Record shuffle write metrics for one task. pub fn record_shuffle_write( &self, query_id: &str, @@ -112,6 +116,7 @@ impl MetricsRegistry { .observe(secs.max(0.0)); } + /// Record shuffle read metrics for one task. pub fn record_shuffle_read( &self, query_id: &str, @@ -136,6 +141,7 @@ impl MetricsRegistry { .observe(secs.max(0.0)); } + /// Record spill bytes/time metrics for one task and spill kind. pub fn record_spill( &self, query_id: &str, @@ -156,6 +162,7 @@ impl MetricsRegistry { .observe(secs.max(0.0)); } + /// Set current scheduler queued-task gauge for one stage. pub fn set_scheduler_queued_tasks(&self, query_id: &str, stage_id: u64, queued: u64) { let labels = [query_id, &stage_id.to_string()]; self.inner @@ -164,6 +171,7 @@ impl MetricsRegistry { .set(queued as f64); } + /// Set current scheduler running-task gauge for one stage. pub fn set_scheduler_running_tasks(&self, query_id: &str, stage_id: u64, running: u64) { let labels = [query_id, &stage_id.to_string()]; self.inner @@ -172,6 +180,7 @@ impl MetricsRegistry { .set(running as f64); } + /// Increment scheduler retry counter for one stage. pub fn inc_scheduler_retries(&self, query_id: &str, stage_id: u64) { let labels = [query_id, &stage_id.to_string()]; self.inner @@ -180,6 +189,7 @@ impl MetricsRegistry { .inc(); } + /// Render current metrics in Prometheus text exposition format. pub fn render_prometheus(&self) -> String { let metric_families = self.inner.registry.gather(); let mut out = Vec::new(); @@ -356,6 +366,7 @@ fn histogram_vec(registry: &Registry, name: &str, help: &str, labels: &[&str]) - static GLOBAL_METRICS: OnceLock = OnceLock::new(); +/// Returns process-global shared metrics registry singleton. pub fn global_metrics() -> &'static MetricsRegistry { GLOBAL_METRICS.get_or_init(MetricsRegistry::new) } diff --git a/crates/common/src/metrics_exporter.rs b/crates/common/src/metrics_exporter.rs index a927b34..21ae735 100644 --- a/crates/common/src/metrics_exporter.rs +++ b/crates/common/src/metrics_exporter.rs @@ -1,7 +1,7 @@ use std::io; use std::net::SocketAddr; -use axum::{routing::get, Router}; +use axum::{Router, routing::get}; use tokio::net::TcpListener; use crate::metrics::global_metrics; diff --git a/crates/distributed/build.rs b/crates/distributed/build.rs index 399a3ba..5e7d9d5 100644 --- a/crates/distributed/build.rs +++ b/crates/distributed/build.rs @@ -6,15 +6,14 @@ fn main() -> Result<(), Box> { } let protoc = protoc_bin_vendored::protoc_bin_path()?; - tonic_build::configure() - .compile_protos_with_config( - { - let mut cfg = prost_build::Config::new(); - cfg.protoc_executable(protoc); - cfg - }, - &["proto/ffq_distributed.proto"], - &["proto"], - )?; + tonic_build::configure().compile_protos_with_config( + { + let mut cfg = prost_build::Config::new(); + cfg.protoc_executable(protoc); + cfg + }, + &["proto/ffq_distributed.proto"], + &["proto"], + )?; Ok(()) } diff --git a/crates/distributed/src/coordinator.rs b/crates/distributed/src/coordinator.rs index 109c475..fae2496 100644 --- a/crates/distributed/src/coordinator.rs +++ b/crates/distributed/src/coordinator.rs @@ -1,3 +1,17 @@ +//! Coordinator state machine and scheduling logic. +//! +//! Responsibilities: +//! - accept submitted physical plans and cut stage DAGs; +//! - materialize task attempts and serve pull-based task assignment; +//! - track query/task status transitions and aggregate stage metrics; +//! - maintain map-output registry keyed by `(query, stage, map_task, attempt)`; +//! - enforce basic worker blacklisting/retry behavior. +//! +//! Retry semantics: +//! - attempts are explicit in task and map-output keys; +//! - fetches must request the intended attempt; stale attempts are not used +//! unless caller asks for latest-attempt read path. + use std::collections::{HashMap, HashSet}; use std::path::PathBuf; use std::time::{SystemTime, UNIX_EPOCH}; @@ -6,16 +20,20 @@ use ffq_common::metrics::global_metrics; use ffq_common::{FfqError, Result, SchemaInferencePolicy}; use ffq_planner::{ExchangeExec, PhysicalPlan}; use ffq_shuffle::ShuffleReader; -use ffq_storage::parquet_provider::ParquetProvider; use ffq_storage::Catalog; +use ffq_storage::parquet_provider::ParquetProvider; use tracing::{debug, info, warn}; -use crate::stage::{build_stage_dag, StageDag}; +use crate::stage::{StageDag, build_stage_dag}; #[derive(Debug, Clone)] +/// Coordinator behavior/configuration knobs. pub struct CoordinatorConfig { + /// Consecutive task failures before a worker is blacklisted. pub blacklist_failure_threshold: u32, + /// Root directory containing shuffle files and indexes. pub shuffle_root: PathBuf, + /// Coordinator-side schema inference policy for schema-less parquet scans. pub schema_inference: SchemaInferencePolicy, } @@ -30,63 +48,106 @@ impl Default for CoordinatorConfig { } #[derive(Debug, Clone, Copy, PartialEq, Eq)] +/// Query lifecycle states tracked by the coordinator. pub enum QueryState { + /// Query is accepted but not yet running. Queued, + /// At least one task attempt is currently running. Running, + /// All tasks completed successfully. Succeeded, + /// At least one task failed and query cannot recover. Failed, + /// Query was canceled by user or system request. Canceled, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] +/// Task lifecycle states tracked by the coordinator. pub enum TaskState { + /// Task is pending scheduling. Queued, + /// Task is currently executing. Running, + /// Task completed successfully. Succeeded, + /// Task execution failed. Failed, } #[derive(Debug, Clone)] +/// One schedulable task assignment returned to workers. pub struct TaskAssignment { + /// Stable query identifier. pub query_id: String, + /// Stage identifier within query DAG. pub stage_id: u64, + /// Task identifier within stage. pub task_id: u64, + /// Attempt number for retries. pub attempt: u32, + /// Serialized physical-plan fragment for this task. pub plan_fragment_json: Vec, } #[derive(Debug, Clone, Default)] +/// Aggregated per-stage progress and map-output metrics. pub struct StageMetrics { + /// Number of queued tasks in the stage. pub queued_tasks: u32, + /// Number of running tasks in the stage. pub running_tasks: u32, + /// Number of succeeded tasks in the stage. pub succeeded_tasks: u32, + /// Number of failed tasks in the stage. pub failed_tasks: u32, + /// Total rows written by map outputs in this stage. pub map_output_rows: u64, + /// Total bytes written by map outputs in this stage. pub map_output_bytes: u64, + /// Total batches written by map outputs in this stage. pub map_output_batches: u64, } #[derive(Debug, Clone)] +/// Map output metadata for one reduce partition. pub struct MapOutputPartitionMeta { + /// Reduce partition id this map output belongs to. pub reduce_partition: u32, + /// Bytes produced for the partition. pub bytes: u64, + /// Rows produced for the partition. pub rows: u64, + /// Batches produced for the partition. pub batches: u64, } #[derive(Debug, Clone)] +/// Public query status snapshot returned by control-plane APIs. pub struct QueryStatus { + /// Stable query identifier. pub query_id: String, + /// Current query state. pub state: QueryState, + /// Submission timestamp in unix milliseconds. pub submitted_at_ms: u64, + /// First-start timestamp in unix milliseconds, or 0 if not started. pub started_at_ms: u64, + /// Finish timestamp in unix milliseconds, or 0 if unfinished. pub finished_at_ms: u64, + /// Human-readable status message. pub message: String, + /// Total number of task attempts tracked for the query. pub total_tasks: u32, + /// Number of queued tasks across all stages. pub queued_tasks: u32, + /// Number of running tasks across all stages. pub running_tasks: u32, + /// Number of succeeded tasks across all stages. pub succeeded_tasks: u32, + /// Number of failed tasks across all stages. pub failed_tasks: u32, + /// Per-stage metrics keyed by stage id. pub stage_metrics: HashMap, } @@ -121,6 +182,7 @@ struct QueryRuntime { } #[derive(Debug, Default)] +/// In-memory coordinator runtime for query/task orchestration. pub struct Coordinator { config: CoordinatorConfig, catalog: Catalog, @@ -132,6 +194,7 @@ pub struct Coordinator { } impl Coordinator { + /// Construct coordinator with an empty catalog. pub fn new(config: CoordinatorConfig) -> Self { Self { config, @@ -140,6 +203,7 @@ impl Coordinator { } } + /// Construct coordinator with a preloaded catalog. pub fn with_catalog(config: CoordinatorConfig, catalog: Catalog) -> Self { Self { config, @@ -148,6 +212,7 @@ impl Coordinator { } } + /// Submit a physical plan and initialize query runtime/task attempts. pub fn submit_query( &mut self, query_id: String, @@ -161,8 +226,9 @@ impl Coordinator { let mut plan: PhysicalPlan = serde_json::from_slice(physical_plan_json) .map_err(|e| FfqError::Planning(format!("invalid physical plan json: {e}")))?; self.resolve_parquet_scan_schemas(&mut plan)?; - let resolved_plan_json = serde_json::to_vec(&plan) - .map_err(|e| FfqError::Planning(format!("encode resolved physical plan failed: {e}")))?; + let resolved_plan_json = serde_json::to_vec(&plan).map_err(|e| { + FfqError::Planning(format!("encode resolved physical plan failed: {e}")) + })?; let dag = build_stage_dag(&plan); info!( query_id = %query_id, @@ -219,7 +285,9 @@ impl Coordinator { PhysicalPlan::Filter(x) => self.resolve_parquet_scan_schemas(&mut x.input), PhysicalPlan::Project(x) => self.resolve_parquet_scan_schemas(&mut x.input), PhysicalPlan::CoalesceBatches(x) => self.resolve_parquet_scan_schemas(&mut x.input), - PhysicalPlan::PartialHashAggregate(x) => self.resolve_parquet_scan_schemas(&mut x.input), + PhysicalPlan::PartialHashAggregate(x) => { + self.resolve_parquet_scan_schemas(&mut x.input) + } PhysicalPlan::FinalHashAggregate(x) => self.resolve_parquet_scan_schemas(&mut x.input), PhysicalPlan::HashJoin(x) => { self.resolve_parquet_scan_schemas(&mut x.left)?; @@ -236,6 +304,10 @@ impl Coordinator { } } + /// Worker pull-scheduling API. + /// + /// Returns up to `capacity` runnable task attempts for the requesting + /// worker, skipping blacklisted workers. pub fn get_task(&mut self, worker_id: &str, capacity: u32) -> Result> { if self.blacklisted_workers.contains(worker_id) || capacity == 0 { debug!( @@ -303,6 +375,7 @@ impl Coordinator { Ok(out) } + /// Record a task attempt status transition and update query/stage metrics. pub fn report_task_status( &mut self, query_id: &str, @@ -381,6 +454,7 @@ impl Coordinator { Ok(()) } + /// Cancel a running/queued query. pub fn cancel_query(&mut self, query_id: &str, reason: &str) -> Result { let query = self .queries @@ -392,6 +466,7 @@ impl Coordinator { Ok(QueryState::Canceled) } + /// Read current query status snapshot. pub fn get_query_status(&self, query_id: &str) -> Result { let query = self .queries @@ -400,6 +475,7 @@ impl Coordinator { Ok(build_query_status(query_id, query)) } + /// Register map output metadata for one `(query, stage, map_task, attempt)`. pub fn register_map_output( &mut self, query_id: String, @@ -429,10 +505,12 @@ impl Coordinator { Ok(()) } + /// Number of registered map-output entries. pub fn map_output_registry_size(&self) -> usize { self.map_outputs.len() } + /// Store final query result payload (Arrow IPC bytes). pub fn register_query_results(&mut self, query_id: String, ipc_payload: Vec) -> Result<()> { if !self.queries.contains_key(&query_id) { return Err(FfqError::Planning(format!("unknown query: {query_id}"))); @@ -441,6 +519,7 @@ impl Coordinator { Ok(()) } + /// Fetch final query result payload. pub fn fetch_query_results(&self, query_id: &str) -> Result> { if !self.queries.contains_key(query_id) { return Err(FfqError::Planning(format!("unknown query: {query_id}"))); @@ -451,10 +530,12 @@ impl Coordinator { .ok_or_else(|| FfqError::Execution("query results not ready".to_string())) } + /// Returns whether worker is currently blacklisted. pub fn is_worker_blacklisted(&self, worker_id: &str) -> bool { self.blacklisted_workers.contains(worker_id) } + /// Read shuffle partition bytes for the requested map attempt. pub fn fetch_shuffle_partition_chunks( &self, query_id: &str, diff --git a/crates/distributed/src/grpc.rs b/crates/distributed/src/grpc.rs index c274139..0924e91 100644 --- a/crates/distributed/src/grpc.rs +++ b/crates/distributed/src/grpc.rs @@ -1,3 +1,23 @@ +//! gRPC service/client glue for coordinator and worker shuffle services. +//! +//! RPC schema source: `proto/ffq_distributed.proto`. +//! +//! Key control-plane RPCs (generated under [`v1`]): +//! - `SubmitQuery`, `GetTask`, `ReportTaskStatus` +//! - `GetQueryStatus`, `CancelQuery` +//! - `RegisterQueryResults`, `FetchQueryResults` +//! +//! Key shuffle/data RPCs: +//! - `RegisterMapOutput` +//! - `FetchShufflePartition` (stream) +//! - `Heartbeat` +//! +//! Useful generated request/response types: +//! [`v1::SubmitQueryRequest`], [`v1::GetTaskRequest`], +//! [`v1::ReportTaskStatusRequest`], [`v1::GetQueryStatusRequest`], +//! [`v1::RegisterMapOutputRequest`], [`v1::FetchShufflePartitionRequest`], +//! [`v1::FetchQueryResultsRequest`]. + use std::sync::Arc; use std::{collections::HashMap, path::PathBuf}; @@ -12,6 +32,7 @@ use crate::coordinator::{ TaskState as CoreTaskState, }; +#[allow(missing_docs)] pub mod v1 { tonic::include_proto!("ffq.distributed.v1"); } @@ -24,21 +45,25 @@ pub use v1::shuffle_service_client::ShuffleServiceClient; pub use v1::shuffle_service_server::{ShuffleService, ShuffleServiceServer}; #[derive(Clone)] +/// Combined gRPC service implementation backed by shared [`Coordinator`]. pub struct CoordinatorServices { coordinator: Arc>, } impl CoordinatorServices { + /// Build services from an owned coordinator instance. pub fn new(coordinator: Coordinator) -> Self { Self { coordinator: Arc::new(Mutex::new(coordinator)), } } + /// Build services from shared coordinator state. pub fn from_shared(coordinator: Arc>) -> Self { Self { coordinator } } + /// Access shared coordinator state. pub fn coordinator(&self) -> Arc> { Arc::clone(&self.coordinator) } @@ -285,12 +310,14 @@ fn to_status(err: ffq_common::FfqError) -> Status { } #[derive(Clone)] +/// Worker-local shuffle service that reads shuffle data from local filesystem. pub struct WorkerShuffleService { shuffle_root: PathBuf, map_outputs: Arc>>>, } impl WorkerShuffleService { + /// Create service bound to a shuffle root directory. pub fn new(shuffle_root: impl Into) -> Self { Self { shuffle_root: shuffle_root.into(), diff --git a/crates/distributed/src/lib.rs b/crates/distributed/src/lib.rs index 5d8db29..4c37b90 100644 --- a/crates/distributed/src/lib.rs +++ b/crates/distributed/src/lib.rs @@ -1,8 +1,42 @@ +#![deny(missing_docs)] + +//! Distributed coordinator/worker runtime building blocks. +//! +//! Architecture role: +//! - coordinator state machine and scheduling APIs +//! - worker execution/control-plane integration (feature-gated) +//! - stage DAG construction from physical plans +//! - gRPC control/shuffle/query-result RPC bindings (feature-gated) +//! +//! Key modules: +//! - [`coordinator`] +//! - [`stage`] +//! - `worker` (feature-gated) +//! - `grpc` (feature-gated) +//! +//! Feature flags: +//! - `grpc`: enables tonic-generated RPC services and client/server glue. +//! +//! End-to-end flow: +//! 1. client submits a serialized physical plan (`SubmitQuery`) +//! 2. coordinator cuts stages, creates task attempts, and serves `GetTask` +//! 3. workers execute stage fragments and report `ReportTaskStatus` +//! 4. map-stage workers publish shuffle metadata (`RegisterMapOutput`) +//! 5. downstream stages read shuffle partitions (`FetchShufflePartition`) +//! 6. sink/final stage publishes query results (`RegisterQueryResults`), client reads via `FetchQueryResults` +//! +//! RPC type definitions live in `proto/ffq_distributed.proto` and are generated +//! under `grpc::v1` when `grpc` feature is enabled. + +/// Coordinator state machine and scheduling APIs. pub mod coordinator; #[cfg(feature = "grpc")] +/// gRPC services/clients and protobuf-generated bindings. pub mod grpc; +/// Stage DAG modeling and stage-cut construction. pub mod stage; #[cfg(feature = "grpc")] +/// Worker runtime and control-plane adapters. pub mod worker; pub use coordinator::{ @@ -19,9 +53,11 @@ pub use worker::{ }; #[derive(Debug, Default)] +/// Thin facade for stage-DAG construction used by client/runtime integration. pub struct DistributedRuntime; impl DistributedRuntime { + /// Build a stage DAG by cutting the physical plan at shuffle-read boundaries. pub fn build_stage_dag(&self, plan: &PhysicalPlan) -> Result { Ok(stage::build_stage_dag(plan)) } diff --git a/crates/distributed/src/stage.rs b/crates/distributed/src/stage.rs index e101212..091872b 100644 --- a/crates/distributed/src/stage.rs +++ b/crates/distributed/src/stage.rs @@ -1,32 +1,51 @@ +//! Stage DAG construction from physical plans. +//! +//! Contract: +//! - stage boundaries are cut at `ShuffleRead` exchanges; +//! - each stage contains operator names reachable without crossing another +//! `ShuffleRead`; +//! - edges represent upstream (map) -> downstream (reduce) dependencies. + use ffq_planner::{ExchangeExec, PhysicalPlan}; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +/// Stable stage identifier inside one query DAG. pub struct StageId(pub usize); #[derive(Debug, Clone, Serialize, Deserialize)] +/// One stage node and its dependency links. pub struct StageNode { + /// Stage identifier. pub id: StageId, + /// Physical operator names assigned to this stage. pub operators: Vec, + /// Upstream dependencies that must complete before this stage. pub parents: Vec, + /// Downstream stages that consume this stage outputs. pub children: Vec, } #[derive(Debug, Clone, Serialize, Deserialize)] +/// Query stage DAG used by coordinator task scheduling. pub struct StageDag { + /// All stage nodes in this query DAG. pub stages: Vec, } impl StageDag { + /// Create an empty DAG. pub fn new() -> Self { Self { stages: vec![] } } + /// Returns root stage id if present. pub fn root_id(&self) -> Option { self.stages.first().map(|s| s.id) } } +/// Build stage DAG by traversing a physical plan and cutting at shuffle reads. pub fn build_stage_dag(plan: &PhysicalPlan) -> StageDag { let mut dag = StageDag::new(); let root = new_stage(&mut dag); @@ -118,7 +137,7 @@ mod tests { use std::collections::HashSet; use ffq_planner::{ - create_physical_plan, AggExpr, Expr, LogicalPlan, PhysicalPlan, PhysicalPlannerConfig, + AggExpr, Expr, LogicalPlan, PhysicalPlan, PhysicalPlannerConfig, create_physical_plan, }; #[test] diff --git a/crates/distributed/src/worker.rs b/crates/distributed/src/worker.rs index 90afeb0..4dbe09f 100644 --- a/crates/distributed/src/worker.rs +++ b/crates/distributed/src/worker.rs @@ -1,5 +1,20 @@ +//! Worker runtime and task execution loop. +//! +//! Responsibilities: +//! - pull task attempts from coordinator (`GetTask`); +//! - execute stage fragments with shared planner/runtime semantics; +//! - write/register shuffle outputs for map stages; +//! - publish final query results for sink stages; +//! - report task state transitions and heartbeat. +//! +//! Retry/attempt semantics: +//! - each assignment carries an explicit `attempt`; +//! - map outputs are keyed by `(query, stage, map_task, attempt)`; +//! - coordinator-side status updates use the same attempt key so stale +//! attempts are not mistaken for current progress. + use std::cmp::{Ordering, Reverse}; -use std::collections::{hash_map::DefaultHasher, BinaryHeap, HashMap}; +use std::collections::{BinaryHeap, HashMap, hash_map::DefaultHasher}; use std::fs::{self, File}; use std::hash::{Hash, Hasher}; use std::io::{BufRead, BufReader, BufWriter, Write}; @@ -16,7 +31,7 @@ use arrow::record_batch::RecordBatch; use arrow_schema::{DataType, Field, Schema, SchemaRef}; use ffq_common::metrics::global_metrics; use ffq_common::{FfqError, Result}; -use ffq_execution::{compile_expr, TaskContext as ExecTaskContext}; +use ffq_execution::{TaskContext as ExecTaskContext, compile_expr}; use ffq_planner::{AggExpr, BuildSide, ExchangeExec, Expr, PartitioningSpec, PhysicalPlan}; use ffq_shuffle::{ShuffleReader, ShuffleWriter}; use ffq_storage::parquet_provider::ParquetProvider; @@ -35,11 +50,17 @@ use crate::coordinator::{Coordinator, MapOutputPartitionMeta, TaskAssignment, Ta use crate::grpc::v1; #[derive(Debug, Clone)] +/// Worker resource/configuration controls. pub struct WorkerConfig { + /// Stable worker id used in scheduling and heartbeats. pub worker_id: String, + /// Max concurrent task executions. pub cpu_slots: usize, + /// Per-task soft memory budget. pub per_task_memory_budget_bytes: usize, + /// Local spill directory for memory-pressure fallback paths. pub spill_dir: PathBuf, + /// Root directory containing shuffle data. pub shuffle_root: PathBuf, } @@ -56,27 +77,43 @@ impl Default for WorkerConfig { } #[derive(Debug, Clone)] +/// Task-scoped execution context provided to task executors. pub struct TaskContext { + /// Query id for this task attempt. pub query_id: String, + /// Stage id for this task attempt. pub stage_id: u64, + /// Task id within stage. pub task_id: u64, + /// Attempt number for retries. pub attempt: u32, + /// Per-task soft memory budget. pub per_task_memory_budget_bytes: usize, + /// Local spill directory. pub spill_dir: PathBuf, + /// Root directory containing shuffle data. pub shuffle_root: PathBuf, } #[derive(Debug, Clone, Default)] +/// Task execution outputs returned by [`TaskExecutor`]. pub struct TaskExecutionResult { + /// Map output partition metadata emitted by map stages. pub map_output_partitions: Vec, + /// Output batches emitted by sink/final stages. pub output_batches: Vec, + /// Whether result batches should be published to coordinator. pub publish_results: bool, + /// Human-readable completion message. pub message: String, } #[async_trait] +/// Control-plane contract used by worker runtime. pub trait WorkerControlPlane: Send + Sync { + /// Pull up to `capacity` task assignments for `worker_id`. async fn get_task(&self, worker_id: &str, capacity: u32) -> Result>; + /// Report a task state transition and status message. async fn report_task_status( &self, worker_id: &str, @@ -84,17 +121,22 @@ pub trait WorkerControlPlane: Send + Sync { state: TaskState, message: String, ) -> Result<()>; + /// Register map output partition metadata for a completed map task. async fn register_map_output( &self, assignment: &TaskAssignment, partitions: Vec, ) -> Result<()>; + /// Publish final query results payload for client fetching. async fn register_query_results(&self, query_id: &str, ipc_payload: Vec) -> Result<()>; + /// Send periodic heartbeat with currently running task count. async fn heartbeat(&self, worker_id: &str, running_tasks: u32) -> Result<()>; } #[async_trait] +/// Task execution contract for worker-assigned plan fragments. pub trait TaskExecutor: Send + Sync { + /// Execute one task assignment and return map/sink outputs. async fn execute( &self, assignment: &TaskAssignment, @@ -103,6 +145,7 @@ pub trait TaskExecutor: Send + Sync { } #[derive(Clone, Default)] +/// Default task executor that evaluates physical plan fragments in-process. pub struct DefaultTaskExecutor { catalog: Arc, sink_outputs: Arc>>>, @@ -115,6 +158,7 @@ impl std::fmt::Debug for DefaultTaskExecutor { } impl DefaultTaskExecutor { + /// Construct executor backed by provided catalog. pub fn new(catalog: Arc) -> Self { Self { catalog, @@ -122,6 +166,7 @@ impl DefaultTaskExecutor { } } + /// Take and clear sink output batches for a query (test helper). pub async fn take_query_output(&self, query_id: &str) -> Option> { self.sink_outputs.lock().await.remove(query_id) } @@ -203,6 +248,7 @@ impl TaskExecutor for DefaultTaskExecutor { } #[derive(Clone)] +/// Worker runtime that orchestrates pull scheduling and task execution. pub struct Worker where C: WorkerControlPlane + 'static, @@ -219,6 +265,7 @@ where C: WorkerControlPlane + 'static, E: TaskExecutor + 'static, { + /// Build worker runtime with control plane and task executor. pub fn new(config: WorkerConfig, control_plane: Arc, task_executor: Arc) -> Self { let slots = config.cpu_slots.max(1); Self { @@ -229,6 +276,10 @@ where } } + /// Perform one poll cycle: + /// - pull assignments + /// - execute up to available CPU slots + /// - report status/map outputs/results pub async fn poll_once(&self) -> Result { let capacity = self.cpu_slots.available_permits() as u32; if capacity == 0 { @@ -342,17 +393,20 @@ where } #[derive(Clone)] +/// In-process control-plane adapter for embedded/distributed tests. pub struct InProcessControlPlane { coordinator: Arc>, } impl InProcessControlPlane { + /// Create adapter backed by shared in-memory coordinator. pub fn new(coordinator: Arc>) -> Self { Self { coordinator } } } #[derive(Debug)] +/// gRPC-based control-plane adapter for remote coordinator connectivity. pub struct GrpcControlPlane { control: Mutex>, shuffle: Mutex>, @@ -360,6 +414,7 @@ pub struct GrpcControlPlane { } impl GrpcControlPlane { + /// Connect gRPC control/shuffle/heartbeat clients to a coordinator endpoint. pub async fn connect(endpoint: &str) -> Result { let control = crate::grpc::ControlPlaneClient::connect(endpoint.to_string()) .await @@ -549,6 +604,7 @@ fn proto_task_state(state: TaskState) -> v1::TaskState { } } +/// Encode a set of record batches as Arrow IPC stream bytes. pub fn encode_record_batches_ipc(batches: &[RecordBatch]) -> Result> { if batches.is_empty() { return Ok(Vec::new()); @@ -2593,8 +2649,8 @@ mod tests { use super::*; use crate::coordinator::CoordinatorConfig; use ffq_planner::{ - create_physical_plan, AggExpr, Expr, JoinStrategyHint, JoinType, LogicalPlan, - ParquetScanExec, ParquetWriteExec, PhysicalPlan, PhysicalPlannerConfig, + AggExpr, Expr, JoinStrategyHint, JoinType, LogicalPlan, ParquetScanExec, ParquetWriteExec, + PhysicalPlan, PhysicalPlannerConfig, create_physical_plan, }; use ffq_storage::{TableDef, TableStats}; use parquet::arrow::ArrowWriter; diff --git a/crates/execution/src/context.rs b/crates/execution/src/context.rs index b7051eb..5a837c8 100644 --- a/crates/execution/src/context.rs +++ b/crates/execution/src/context.rs @@ -1,6 +1,9 @@ +//! Execution task context shared by physical operators. + use std::sync::Arc; #[derive(Debug, Clone)] +/// Runtime-level limits and sizing hints for one task. pub struct TaskContext { /// Target batch size for operators that coalesce/split. pub batch_size_rows: usize, @@ -9,4 +12,5 @@ pub struct TaskContext { pub mem_budget_bytes: usize, } +/// Shared task context handle passed across operator boundaries. pub type SharedTaskContext = Arc; diff --git a/crates/execution/src/exec.rs b/crates/execution/src/exec.rs index 12054f1..990ec3b 100644 --- a/crates/execution/src/exec.rs +++ b/crates/execution/src/exec.rs @@ -1,15 +1,22 @@ +//! Legacy execution-plan trait used by early v1 scaffolding. +//! +//! New operator code should generally use [`crate::exec_node::ExecNode`]. + use arrow::record_batch::RecordBatch; use ffq_common::Result; -use futures::stream::BoxStream; use futures::StreamExt; +use futures::stream::BoxStream; +/// Boxed stream used by [`ExecutionPlan`]. pub type SendableRecordBatchStream = BoxStream<'static, Result>; #[derive(Debug, Clone)] +/// Minimal execution context for legacy `ExecutionPlan`. pub struct ExecContext { pub batch_size_rows: usize, } +/// Legacy plan trait. pub trait ExecutionPlan: Send + Sync { fn name(&self) -> &'static str; fn execute(&self, ctx: ExecContext) -> SendableRecordBatchStream; diff --git a/crates/execution/src/exec_node.rs b/crates/execution/src/exec_node.rs index 70e859c..f94ef14 100644 --- a/crates/execution/src/exec_node.rs +++ b/crates/execution/src/exec_node.rs @@ -1,3 +1,5 @@ +//! Physical operator execution-node contract. + use std::sync::Arc; use arrow_schema::SchemaRef; @@ -10,9 +12,16 @@ use crate::stream::SendableRecordBatchStream; /// Operators are pull-based (consumer polls the stream), /// but can also use bounded channels internally for push-based parts (shuffle, etc.). pub trait ExecNode: Send + Sync { + /// Stable operator name for explain/logging. fn name(&self) -> &'static str; + /// Output schema for all batches emitted by this node. fn schema(&self) -> SchemaRef; + /// Start execution and return a stream of output batches. + /// + /// Implementations should surface deterministic operator failures as + /// `FfqError::Execution` and configuration/state failures via appropriate + /// error variants. fn execute(&self, ctx: Arc) -> Result; } diff --git a/crates/execution/src/expressions/mod.rs b/crates/execution/src/expressions/mod.rs index ee8cc47..afa63d8 100644 --- a/crates/execution/src/expressions/mod.rs +++ b/crates/execution/src/expressions/mod.rs @@ -1,3 +1,12 @@ +//! Expression compilation and evaluation for execution operators. +//! +//! Input contract: +//! - analyzer has resolved/typed expressions (primarily `ColumnRef`); +//! - execution may still accept unresolved `Column` as a compatibility fallback. +//! +//! Output contract: +//! - each evaluation returns an `ArrayRef` aligned to input batch row count. + use std::sync::Arc; use arrow::array::{ @@ -23,7 +32,9 @@ use ffq_planner::{BinaryOp, Expr, LiteralValue}; /// - execution compiles Expr -> PhysicalExpr /// - evaluation returns Arrow ArrayRef aligned with the input RecordBatch length pub trait PhysicalExpr: Send + Sync { + /// Static output data type of this expression. fn data_type(&self) -> DataType; + /// Evaluate the expression for every row in `batch`. fn evaluate(&self, batch: &RecordBatch) -> Result; } @@ -462,7 +473,7 @@ fn eval_cmp(op: BinaryOp, l: &ArrayRef, r: &ArrayRef) -> Result { _ => { return Err(FfqError::Unsupported( "ordering comparisons not supported for boolean in v1".to_string(), - )) + )); } } .map_err(|e| FfqError::Execution(format!("cmp kernel failed: {e}")))?; diff --git a/crates/execution/src/lib.rs b/crates/execution/src/lib.rs index cb96aed..092da07 100644 --- a/crates/execution/src/lib.rs +++ b/crates/execution/src/lib.rs @@ -1,3 +1,21 @@ +#![deny(missing_docs)] + +//! Execution-layer primitives used by runtimes and physical operators. +//! +//! Architecture role: +//! - task context and execution node contracts +//! - expression compilation/evaluation +//! - batch stream abstractions and channels +//! +//! Key modules: +//! - [`context`] +//! - [`exec_node`] +//! - [`expressions`] +//! - [`stream`] +//! +//! Feature flags: +//! - no crate-level flags; vector expression support is coordinated with planner/runtime features. + pub mod context; pub mod exec_node; pub mod expressions; @@ -6,8 +24,8 @@ pub mod stream; // Re-export only what you want at the crate root (no globs). pub use context::{SharedTaskContext, TaskContext}; pub use exec_node::ExecNode; -pub use expressions::{compile_expr, PhysicalExpr}; +pub use expressions::{PhysicalExpr, compile_expr}; pub use stream::{ - bounded_batch_channel, empty_stream, BatchSender, RecordBatchStream, SendableRecordBatchStream, - StreamAdapter, + BatchSender, RecordBatchStream, SendableRecordBatchStream, StreamAdapter, + bounded_batch_channel, empty_stream, }; diff --git a/crates/execution/src/stream.rs b/crates/execution/src/stream.rs index cb45fdf..edaf601 100644 --- a/crates/execution/src/stream.rs +++ b/crates/execution/src/stream.rs @@ -1,27 +1,31 @@ +//! Record-batch stream abstractions and channel adapters. + use std::pin::Pin; use std::task::{Context, Poll}; use arrow::record_batch::RecordBatch; use arrow_schema::SchemaRef; use ffq_common::Result; -use futures::channel::mpsc; use futures::Stream; +use futures::channel::mpsc; /// A stream of RecordBatches that also knows its output schema. pub trait RecordBatchStream: Stream> + Send { + /// Output schema for every batch yielded by this stream. fn schema(&self) -> SchemaRef; } /// The standard "stream you can return from operators". pub type SendableRecordBatchStream = Pin>; -/// Adapter that attaches a schema to any Stream>. +/// Adapter that attaches a schema to any `Stream>`. pub struct StreamAdapter { schema: SchemaRef, inner: S, } impl StreamAdapter { + /// Create a new schema-attached stream adapter. pub fn new(schema: SchemaRef, inner: S) -> Self { Self { schema, inner } } diff --git a/crates/planner/src/analyzer.rs b/crates/planner/src/analyzer.rs index b97de86..dcbd9b1 100644 --- a/crates/planner/src/analyzer.rs +++ b/crates/planner/src/analyzer.rs @@ -8,21 +8,31 @@ use crate::logical_plan::{AggExpr, BinaryOp, Expr, JoinType, LiteralValue, Logic /// The analyzer needs schemas to resolve columns. /// The client (Engine) will provide this from its Catalog. pub trait SchemaProvider { + /// Return schema for a table by name. fn table_schema(&self, table: &str) -> Result; } #[derive(Debug, Default)] +/// Logical-plan semantic analyzer. pub struct Analyzer; impl Analyzer { + /// Create a new analyzer. pub fn new() -> Self { Self } - /// Analyze: - /// - resolve columns -> ColumnRef { index } - /// - infer types - /// - insert minimal casts + /// Analyze a logical plan and return a semantically validated plan. + /// + /// Guarantees: + /// - unresolved `Expr::Column` references become `Expr::ColumnRef`; + /// - expression/aggregate types are inferred and checked; + /// - required casts are inserted for supported coercions; + /// - join and insert contracts are validated early. + /// + /// Error taxonomy: + /// - `Planning`: semantic/type/name resolution failures + /// - `Unsupported`: valid SQL shape that analyzer intentionally does not support in v1 pub fn analyze(&self, plan: LogicalPlan, provider: &dyn SchemaProvider) -> Result { let (p, _schema, _resolver) = self.analyze_plan(plan, provider)?; Ok(p) diff --git a/crates/planner/src/explain.rs b/crates/planner/src/explain.rs index e53842d..003a7bb 100644 --- a/crates/planner/src/explain.rs +++ b/crates/planner/src/explain.rs @@ -1,5 +1,6 @@ use crate::logical_plan::{Expr, JoinStrategyHint, LogicalPlan}; +/// Render logical plan as human-readable multiline text. pub fn explain_logical(plan: &LogicalPlan) -> String { let mut s = String::new(); fmt_plan(plan, 0, &mut s); diff --git a/crates/planner/src/lib.rs b/crates/planner/src/lib.rs index d5713d1..a3eac3c 100644 --- a/crates/planner/src/lib.rs +++ b/crates/planner/src/lib.rs @@ -1,9 +1,36 @@ +#![deny(missing_docs)] + +//! Logical/physical planning stack for FFQ SQL and DataFrame execution. +//! +//! Architecture role: +//! - SQL frontend translation into logical plans +//! - analysis (name/type resolution) and optimizer rewrites +//! - physical plan model and lowering +//! +//! Key modules: +//! - [`sql_frontend`] +//! - [`analyzer`] +//! - [`optimizer`] +//! - [`physical_plan`] +//! - [`physical_planner`] +//! - [`explain`] +//! +//! Feature flags: +//! - vector and qdrant-related rewrites are conditionally compiled via crate features. + +/// Semantic analyzer and schema resolution. pub mod analyzer; +/// Logical plan explain/pretty formatting helpers. pub mod explain; +/// Logical plan and expression model. pub mod logical_plan; +/// Rule-based logical optimizer. pub mod optimizer; +/// Physical operator plan model. pub mod physical_plan; +/// Logical-to-physical lowering. pub mod physical_planner; +/// SQL frontend translation into logical plans. pub mod sql_frontend; pub use analyzer::*; diff --git a/crates/planner/src/logical_plan.rs b/crates/planner/src/logical_plan.rs index 929723f..98c7156 100644 --- a/crates/planner/src/logical_plan.rs +++ b/crates/planner/src/logical_plan.rs @@ -1,137 +1,251 @@ use arrow_schema::DataType; use serde::{Deserialize, Serialize}; +/// Join semantics supported by the logical planner. +/// +/// v1 currently only supports [`JoinType::Inner`]. #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] pub enum JoinType { + /// Keep only rows where join keys match on both sides. Inner, } +/// Optimizer hint controlling join distribution strategy. +/// +/// This is a hint, not a hard promise. Physical planning may still choose a +/// safe fallback shape when constraints are not met. #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] pub enum JoinStrategyHint { + /// Let optimizer/physical planner pick the strategy. Auto, + /// Broadcast left side and build hash table from left. BroadcastLeft, + /// Broadcast right side and build hash table from right. BroadcastRight, + /// Shuffle both sides by join key and join partition-wise. Shuffle, } +/// Scalar expression used by logical and physical planning. #[derive(Debug, Clone, Serialize, Deserialize)] pub enum Expr { + /// Unresolved column name (before analysis). Column(String), + /// Resolved column binding emitted by analyzer. ColumnRef { + /// Resolved display name. name: String, + /// Resolved column index in input schema. index: usize, }, + /// Scalar literal. Literal(LiteralValue), + /// Binary operator expression. BinaryOp { + /// Left operand. left: Box, + /// Binary operator. op: BinaryOp, + /// Right operand. right: Box, }, + /// Explicit type cast. Cast { + /// Input expression. expr: Box, + /// Target type. to_type: DataType, }, + /// Boolean conjunction. And(Box, Box), + /// Boolean disjunction. Or(Box, Box), + /// Boolean negation. Not(Box), #[cfg(feature = "vector")] + /// Cosine similarity between a vector expression and query vector literal. CosineSimilarity { + /// Vector-valued input expression. vector: Box, + /// Query vector expression (typically a literal). query: Box, }, #[cfg(feature = "vector")] + /// L2 distance between a vector expression and query vector literal. L2Distance { + /// Vector-valued input expression. vector: Box, + /// Query vector expression (typically a literal). query: Box, }, #[cfg(feature = "vector")] + /// Dot product between a vector expression and query vector literal. DotProduct { + /// Vector-valued input expression. vector: Box, + /// Query vector expression (typically a literal). query: Box, }, } +/// Literal values supported by the v1 planner. #[derive(Debug, Clone, Serialize, Deserialize)] pub enum LiteralValue { + /// 64-bit integer literal. Int64(i64), + /// 64-bit floating literal. Float64(f64), + /// UTF-8 string literal. Utf8(String), + /// Boolean literal. Boolean(bool), + /// Null literal. Null, #[cfg(feature = "vector")] + /// `f32` vector literal (feature `vector`). VectorF32(Vec), } +/// Binary operators supported by v1 expression evaluation. #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] pub enum BinaryOp { + /// Equality. Eq, + /// Inequality. NotEq, + /// Less-than. Lt, + /// Less-than or equal. LtEq, + /// Greater-than. Gt, + /// Greater-than or equal. GtEq, + /// Addition. Plus, + /// Subtraction. Minus, + /// Multiplication. Multiply, + /// Division. Divide, } +/// Logical plan tree produced by SQL/DataFrame frontend and rewritten by +/// analyzer/optimizer passes. +/// +/// Contracts: +/// - `TableScan.projection` is best-effort pushdown and may be widened later. +/// - `Join.on` uses `(left_col, right_col)` column names. +/// - `Aggregate` uses SQL grouped-aggregate semantics. +/// - `TopKByScore` is the safe fallback path when vector index rewrite cannot +/// be applied. #[derive(Debug, Clone, Serialize, Deserialize)] pub enum LogicalPlan { + /// Scan a catalog table. TableScan { + /// Catalog table name. table: String, + /// Optional projected column names. projection: Option>, + /// Best-effort pushdown filters. filters: Vec, }, + /// Compute named expressions from input rows. Projection { + /// `(expr, output_name)` pairs. exprs: Vec<(Expr, String)>, + /// Input plan. input: Box, }, + /// Keep rows matching predicate. Filter { + /// Boolean predicate. predicate: Expr, + /// Input plan. input: Box, }, + /// Equi-join two inputs using `on` key pairs. Join { + /// Left input. left: Box, + /// Right input. right: Box, + /// Join key pairs `(left_col, right_col)`. on: Vec<(String, String)>, + /// Join type. join_type: JoinType, + /// Distribution strategy hint. strategy_hint: JoinStrategyHint, }, + /// Grouped aggregate. + /// + /// `group_exprs` define grouping keys; `aggr_exprs` define aggregate + /// outputs and aliases. Aggregate { + /// Grouping expressions. group_exprs: Vec, + /// Aggregate expressions and aliases. aggr_exprs: Vec<(AggExpr, String)>, + /// Input plan. input: Box, }, + /// Return at most `n` rows. Limit { + /// Maximum number of rows. n: usize, + /// Input plan. input: Box, }, + /// Return top `k` rows by score expression. + /// + /// This is used for brute-force vector reranking and remains the fallback + /// when index-backed rewrite preconditions fail. TopKByScore { + /// Score expression. score_expr: Expr, + /// Number of rows to keep. k: usize, + /// Input plan. input: Box, }, + /// Index-backed vector top-k logical operator. + /// + /// Rewritten from `TopKByScore` only when optimizer preconditions are met. VectorTopK { + /// Table name. table: String, + /// Query vector literal. query_vector: Vec, + /// Number of rows to keep. k: usize, + /// Optional provider-specific filter payload. filter: Option, }, + /// Insert query result into a target table. InsertInto { + /// Target table. table: String, + /// Target column list. columns: Vec, + /// Input plan. input: Box, }, } +/// Aggregate expression kinds supported by v1. #[derive(Debug, Clone, Serialize, Deserialize)] pub enum AggExpr { + /// Count non-null input rows. Count(Expr), + /// Sum numeric input. Sum(Expr), + /// Minimum input value. Min(Expr), + /// Maximum input value. Max(Expr), + /// Average numeric input. Avg(Expr), } diff --git a/crates/planner/src/optimizer.rs b/crates/planner/src/optimizer.rs index 6bbc134..1d6a398 100644 --- a/crates/planner/src/optimizer.rs +++ b/crates/planner/src/optimizer.rs @@ -4,8 +4,10 @@ use std::collections::{HashMap, HashSet}; use crate::analyzer::SchemaProvider; use crate::logical_plan::{BinaryOp, Expr, JoinStrategyHint, JoinType, LiteralValue, LogicalPlan}; +/// Configuration knobs for rule-based optimization. #[derive(Debug, Clone, Copy)] pub struct OptimizerConfig { + /// Max table byte size eligible for broadcast join hinting. pub broadcast_threshold_bytes: u64, } @@ -18,36 +20,62 @@ impl Default for OptimizerConfig { } #[derive(Debug, Clone, PartialEq, Eq, Default)] +/// Table metadata exposed to optimizer rewrite rules. pub struct TableMetadata { + /// Storage format (for example `parquet`, `qdrant`). pub format: String, + /// Provider-specific options used by rewrite rules. pub options: HashMap, } /// Provide table stats for join hinting + schemas for pushdown decisions. pub trait OptimizerContext: SchemaProvider { + /// Return `(bytes, rows)` estimates for a table. fn table_stats(&self, table: &str) -> Result<(Option, Option)>; // (bytes, rows) + /// Return table metadata used by rewrite rules. fn table_metadata(&self, _table: &str) -> Result> { Ok(None) } + /// Convenience getter for table format. fn table_format(&self, table: &str) -> Result> { Ok(self.table_metadata(table)?.map(|m| m.format)) } + /// Convenience getter for table options map. fn table_options(&self, table: &str) -> Result>> { Ok(self.table_metadata(table)?.map(|m| m.options)) } } #[derive(Debug, Default)] +/// Rule-based optimizer for v1 logical plans. +/// +/// The implementation is intentionally conservative: pushdowns and rewrites are +/// applied only when correctness preconditions are satisfied; otherwise, the +/// original logical behavior is preserved. pub struct Optimizer; impl Optimizer { + /// Create a new optimizer. pub fn new() -> Self { Self } + /// Apply v1 rule pipeline to a logical plan. + /// + /// Pass order is fixed and intentionally conservative: + /// 1. constant folding + /// 2. filter merge + /// 3. projection pushdown + /// 4. predicate pushdown + /// 5. join strategy hinting + /// 6. vector index rewrite + /// + /// Rewrite contract: + /// - When rewrite preconditions are not met, optimizer must preserve a + /// valid fallback plan (for example `TopKByScore`). pub fn optimize( &self, plan: LogicalPlan, @@ -965,7 +993,7 @@ fn evaluate_vector_topk_rewrite( Err(_) => { return Ok(VectorRewriteDecision::Fallback { _reason: "filter translation unsupported", - }) + }); } }; diff --git a/crates/planner/src/physical_plan.rs b/crates/planner/src/physical_plan.rs index 7d85c89..18c6fdc 100644 --- a/crates/planner/src/physical_plan.rs +++ b/crates/planner/src/physical_plan.rs @@ -8,25 +8,41 @@ use serde::{Deserialize, Serialize}; /// Later we'll split "physical expr" vs "logical expr" more strictly. #[derive(Debug, Clone, Serialize, Deserialize)] pub enum PhysicalPlan { + /// Parquet table scan. ParquetScan(ParquetScanExec), + /// Parquet sink write. ParquetWrite(ParquetWriteExec), + /// Row filter. Filter(FilterExec), + /// Projection. Project(ProjectExec), + /// Batch coalescing. CoalesceBatches(CoalesceBatchesExec), + /// Partial aggregate. PartialHashAggregate(PartialHashAggregateExec), + /// Final aggregate. FinalHashAggregate(FinalHashAggregateExec), + /// Hash join. HashJoin(HashJoinExec), + /// Data exchange boundary. Exchange(ExchangeExec), + /// Limit. Limit(LimitExec), + /// Brute-force top-k. TopKByScore(TopKByScoreExec), + /// Index-backed vector top-k. VectorTopK(VectorTopKExec), } impl PhysicalPlan { + /// Returns direct child operators. + /// + /// This is used by explain/inspection code and assumes `VectorTopK` is a + /// leaf and exchange operators have exactly one child. pub fn children(&self) -> Vec<&PhysicalPlan> { match self { PhysicalPlan::ParquetScan(_) => vec![], @@ -49,6 +65,7 @@ impl PhysicalPlan { } } +/// Physical parquet scan operator. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ParquetScanExec { /// Table name from the catalog (v1). @@ -62,56 +79,95 @@ pub struct ParquetScanExec { pub filters: Vec, } +/// Physical parquet sink operator. +/// +/// The execution runtime uses `table` to resolve target path and commit +/// semantics from catalog/table options. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ParquetWriteExec { + /// Target table. pub table: String, + /// Input plan. pub input: Box, } +/// Row filter operator. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct FilterExec { + /// Predicate. pub predicate: Expr, + /// Input plan. pub input: Box, } +/// Projection operator. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ProjectExec { /// (expr, output_name) pub exprs: Vec<(Expr, String)>, + /// Input plan. pub input: Box, } +/// Batch coalescing operator. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CoalesceBatchesExec { + /// Desired row count per output batch. pub target_batch_rows: usize, + /// Input plan. pub input: Box, } +/// Phase-1 hash aggregate over local/shuffle partitions. +/// +/// Must be followed by compatible repartition + final aggregate for global SQL +/// aggregate semantics. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PartialHashAggregateExec { + /// Grouping expressions. pub group_exprs: Vec, + /// Aggregate expressions and aliases. pub aggr_exprs: Vec<(AggExpr, String)>, + /// Input plan. pub input: Box, } +/// Phase-2 hash aggregate merging partial states after shuffle. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct FinalHashAggregateExec { + /// Grouping expressions. pub group_exprs: Vec, + /// Aggregate expressions and aliases. pub aggr_exprs: Vec<(AggExpr, String)>, + /// Input plan. pub input: Box, } +/// Side chosen to build the hash table for [`HashJoinExec`]. #[derive(Debug, Clone, Copy, Serialize, Deserialize)] pub enum BuildSide { + /// Build hash table from left input. Left, + /// Build hash table from right input. Right, } +/// Hash join physical operator. +/// +/// Contract: +/// - `on` is positional key mapping `(left_key, right_key)`. +/// - `strategy_hint` records optimizer intent; exchange nodes define actual +/// data movement. +/// - `build_side` must match the side expected to be in-memory hash build. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct HashJoinExec { + /// Left input. pub left: Box, + /// Right input. pub right: Box, + /// Join key pairs `(left_key, right_key)`. pub on: Vec<(String, String)>, + /// Join type. pub join_type: JoinType, /// From optimizer (broadcast/shuffle hint). Physical planner inserts exchanges accordingly. pub strategy_hint: JoinStrategyHint, @@ -119,58 +175,88 @@ pub struct HashJoinExec { pub build_side: BuildSide, } +/// Stage-boundary exchange operators. #[derive(Debug, Clone, Serialize, Deserialize)] pub enum ExchangeExec { + /// Shuffle write boundary. ShuffleWrite(ShuffleWriteExchange), + /// Shuffle read boundary. ShuffleRead(ShuffleReadExchange), + /// Broadcast boundary. Broadcast(BroadcastExchange), } +/// Shuffle write boundary. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ShuffleWriteExchange { + /// Input plan. pub input: Box, + /// Partitioning specification. pub partitioning: PartitioningSpec, } +/// Shuffle read boundary. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ShuffleReadExchange { + /// Input plan. pub input: Box, + /// Partitioning specification. pub partitioning: PartitioningSpec, } +/// Broadcast boundary. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct BroadcastExchange { + /// Input plan. pub input: Box, } +/// Partitioning contract used by exchanges and distributed stage planner. #[derive(Debug, Clone, Serialize, Deserialize)] pub enum PartitioningSpec { /// Hash partition by expressions into N partitions. HashKeys { + /// Partition key names. keys: Vec, + /// Partition count. partitions: usize, }, /// Single partition. Single, } +/// Limit operator. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct LimitExec { + /// Maximum number of rows. pub n: usize, + /// Input plan. pub input: Box, } +/// Brute-force top-k by score expression. +/// +/// Used both as explicit SQL top-k execution path and as fallback when vector +/// index rewrite does not apply. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TopKByScoreExec { + /// Score expression. pub score_expr: Expr, + /// Number of rows to keep. pub k: usize, + /// Input plan. pub input: Box, } +/// Index-backed vector top-k physical operator. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct VectorTopKExec { + /// Table name. pub table: String, + /// Query vector literal. pub query_vector: Vec, + /// Number of rows to keep. pub k: usize, + /// Optional provider-specific filter payload. pub filter: Option, } diff --git a/crates/planner/src/physical_planner.rs b/crates/planner/src/physical_planner.rs index 099300b..860d9c6 100644 --- a/crates/planner/src/physical_planner.rs +++ b/crates/planner/src/physical_planner.rs @@ -8,8 +8,11 @@ use crate::physical_plan::{ }; #[derive(Debug, Clone)] +/// Physical planning knobs that control exchange fanout and output batch sizing. pub struct PhysicalPlannerConfig { + /// Number of hash partitions used by shuffle exchanges. pub shuffle_partitions: usize, + /// Target row count for coalescing output batches. pub target_batch_rows: usize, } @@ -22,6 +25,14 @@ impl Default for PhysicalPlannerConfig { } } +/// Lower analyzed/optimized logical plan to executable physical operators. +/// +/// Contracts: +/// - logical semantics are preserved; +/// - aggregate lowers to `PartialHashAggregate -> Exchange -> FinalHashAggregate`; +/// - join hints are honored where possible, with `Auto` safely falling back to +/// shuffle shape; +/// - unsupported logical shapes return a planning error. pub fn create_physical_plan( logical: &LogicalPlan, cfg: &PhysicalPlannerConfig, diff --git a/crates/planner/src/sql_frontend.rs b/crates/planner/src/sql_frontend.rs index 99a863e..ea8da2d 100644 --- a/crates/planner/src/sql_frontend.rs +++ b/crates/planner/src/sql_frontend.rs @@ -9,7 +9,16 @@ use sqlparser::ast::{ use crate::logical_plan::{AggExpr, BinaryOp, Expr, JoinStrategyHint, LiteralValue, LogicalPlan}; -/// Convert a SQL string into a LogicalPlan, binding named parameters (like :k, :query). +/// Convert a SQL string into a [`LogicalPlan`], binding named parameters (for +/// example `:k`, `:query`). +/// +/// Contract: +/// - exactly one statement must be present; +/// - supported statements are delegated to [`statement_to_logical`]. +/// +/// Error taxonomy: +/// - `Unsupported`: SQL construct is outside v1 supported subset +/// - `Planning`: parse/parameter literal shape issues (for example bad LIMIT literal) pub fn sql_to_logical(sql: &str, params: &HashMap) -> Result { let stmts = ffq_sql::parse_sql(sql)?; if stmts.len() != 1 { @@ -20,6 +29,13 @@ pub fn sql_to_logical(sql: &str, params: &HashMap) -> Resu statement_to_logical(&stmts[0], params) } +/// Convert one parsed SQL statement into a [`LogicalPlan`]. +/// +/// v1 supports `SELECT` and `INSERT INTO ... SELECT ...` only. +/// +/// Error taxonomy: +/// - `Unsupported`: statement kind not supported in v1 +/// - `Planning`: invalid statement arguments/literals where applicable pub fn statement_to_logical( stmt: &Statement, params: &HashMap, @@ -62,7 +78,7 @@ fn query_to_logical(q: &Query, params: &HashMap) -> Result _ => { return Err(FfqError::Unsupported( "only simple SELECT is supported (no UNION/EXCEPT/INTERSECT)".to_string(), - )) + )); } }; @@ -113,7 +129,7 @@ fn query_to_logical(q: &Query, params: &HashMap) -> Result SelectItem::Wildcard(_) | SelectItem::QualifiedWildcard(_, _) => { return Err(FfqError::Unsupported( "SELECT * is not supported in v1 subset (use explicit columns)".to_string(), - )) + )); } } } @@ -221,7 +237,7 @@ fn from_to_plan( _ => { return Err(FfqError::Unsupported( "only INNER JOIN is supported in v1".to_string(), - )) + )); } } } @@ -518,7 +534,7 @@ fn sql_binop_to_binop(op: &SqlBinaryOp) -> Result { _ => { return Err(FfqError::Unsupported(format!( "unsupported binary operator in v1: {op}" - ))) + ))); } }) } diff --git a/crates/planner/tests/optimizer_golden.rs b/crates/planner/tests/optimizer_golden.rs index 65c5d95..68fd772 100644 --- a/crates/planner/tests/optimizer_golden.rs +++ b/crates/planner/tests/optimizer_golden.rs @@ -5,8 +5,8 @@ use std::sync::Arc; use arrow_schema::{DataType, Field, Schema, SchemaRef}; use ffq_planner::{ - explain_logical, BinaryOp, Expr, JoinStrategyHint, LogicalPlan, Optimizer, OptimizerConfig, - OptimizerContext, SchemaProvider, TableMetadata, + BinaryOp, Expr, JoinStrategyHint, LogicalPlan, Optimizer, OptimizerConfig, OptimizerContext, + SchemaProvider, TableMetadata, explain_logical, }; #[derive(Clone)] diff --git a/crates/planner/tests/physical_plan_serde.rs b/crates/planner/tests/physical_plan_serde.rs index 67bf6f0..3d9f252 100644 --- a/crates/planner/tests/physical_plan_serde.rs +++ b/crates/planner/tests/physical_plan_serde.rs @@ -1,4 +1,4 @@ -use ffq_planner::{create_physical_plan, LogicalPlan, PhysicalPlan, PhysicalPlannerConfig}; +use ffq_planner::{LogicalPlan, PhysicalPlan, PhysicalPlannerConfig, create_physical_plan}; #[test] fn physical_plan_is_serializable() { diff --git a/crates/shuffle/src/layout.rs b/crates/shuffle/src/layout.rs index f8c1fc8..b9a7ebe 100644 --- a/crates/shuffle/src/layout.rs +++ b/crates/shuffle/src/layout.rs @@ -1,5 +1,6 @@ use serde::{Deserialize, Serialize}; +/// Build relative path to one shuffle partition IPC file. pub fn shuffle_path( query_id: u64, stage_id: u64, @@ -10,14 +11,17 @@ pub fn shuffle_path( format!("shuffle/{query_id}/{stage_id}/{map_task}/{attempt}/part-{reduce_partition}.ipc") } +/// Build relative directory path for one map-task attempt. pub fn map_task_dir(query_id: u64, stage_id: u64, map_task: u64, attempt: u32) -> String { format!("shuffle/{query_id}/{stage_id}/{map_task}/{attempt}") } +/// Build relative base directory for a map task containing all attempts. pub fn map_task_base_dir(query_id: u64, stage_id: u64, map_task: u64) -> String { format!("shuffle/{query_id}/{stage_id}/{map_task}") } +/// Build relative path to JSON index metadata for one map-task attempt. pub fn index_json_path(query_id: u64, stage_id: u64, map_task: u64, attempt: u32) -> String { format!( "{}/index.json", @@ -25,6 +29,7 @@ pub fn index_json_path(query_id: u64, stage_id: u64, map_task: u64, attempt: u32 ) } +/// Build relative path to binary index metadata for one map-task attempt. pub fn index_bin_path(query_id: u64, stage_id: u64, map_task: u64, attempt: u32) -> String { format!( "{}/index.bin", @@ -33,21 +38,34 @@ pub fn index_bin_path(query_id: u64, stage_id: u64, map_task: u64, attempt: u32) } #[derive(Debug, Clone, Serialize, Deserialize)] +/// Metadata describing one map-output partition artifact. pub struct ShufflePartitionMeta { + /// Reduce partition id. pub reduce_partition: u32, + /// Relative file path of partition payload. pub file: String, + /// Payload size in bytes. pub bytes: u64, + /// Row count in payload. pub rows: u64, + /// Batch count in payload. pub batches: u64, } #[derive(Debug, Clone, Serialize, Deserialize)] +/// Per-attempt index metadata describing all produced partitions. pub struct MapTaskIndex { + /// Query id. pub query_id: u64, + /// Stage id. pub stage_id: u64, + /// Map task id. pub map_task: u64, + /// Attempt number. pub attempt: u32, + /// Creation time in unix milliseconds. #[serde(default)] pub created_at_ms: u64, + /// Partition metadata entries for this attempt. pub partitions: Vec, } diff --git a/crates/shuffle/src/lib.rs b/crates/shuffle/src/lib.rs index db9d41a..cc57b3b 100644 --- a/crates/shuffle/src/lib.rs +++ b/crates/shuffle/src/lib.rs @@ -1,5 +1,25 @@ +#![deny(missing_docs)] + +//! Shuffle file layout and read/write utilities. +//! +//! Architecture role: +//! - defines deterministic shuffle path contracts +//! - writes partitioned Arrow IPC data and index metadata +//! - reads shuffle partitions for downstream stages +//! +//! Key modules: +//! - [`layout`] +//! - [`writer`] +//! - [`reader`] +//! +//! Feature flags: +//! - none. + +/// Deterministic path and index metadata contracts for shuffle files. pub mod layout; +/// Shuffle readers for partition/index payloads. pub mod reader; +/// Shuffle writers for partition/index payloads and TTL cleanup. pub mod writer; pub use layout::*; diff --git a/crates/shuffle/src/reader.rs b/crates/shuffle/src/reader.rs index facc7c0..a692255 100644 --- a/crates/shuffle/src/reader.rs +++ b/crates/shuffle/src/reader.rs @@ -6,19 +6,21 @@ use arrow::record_batch::RecordBatch; use ffq_common::{FfqError, Result}; use crate::layout::{ - index_bin_path, index_json_path, map_task_base_dir, shuffle_path, MapTaskIndex, - ShufflePartitionMeta, + MapTaskIndex, ShufflePartitionMeta, index_bin_path, index_json_path, map_task_base_dir, + shuffle_path, }; const INDEX_BIN_MAGIC: &[u8; 4] = b"FFQI"; const INDEX_BIN_HEADER_LEN: usize = 12; +/// Reads shuffle partitions and index metadata from local storage. pub struct ShuffleReader { root_dir: PathBuf, fetch_chunk_bytes: usize, } impl ShuffleReader { + /// Create a reader rooted at `root_dir`. pub fn new(root_dir: impl Into) -> Self { Self { root_dir: root_dir.into(), @@ -26,11 +28,13 @@ impl ShuffleReader { } } + /// Configure maximum chunk size used by streamed partition fetch simulation. pub fn with_fetch_chunk_bytes(mut self, bytes: usize) -> Self { self.fetch_chunk_bytes = bytes.max(1); self } + /// Read map-task index metadata, preferring binary index when present. pub fn read_map_task_index( &self, query_id: u64, @@ -54,6 +58,7 @@ impl ShuffleReader { .map_err(|e| FfqError::Execution(format!("index json decode failed: {e}"))) } + /// List available attempt ids for a given `(query, stage, map_task)`. pub fn available_attempts( &self, query_id: u64, @@ -81,6 +86,7 @@ impl ShuffleReader { Ok(attempts) } + /// Return highest available attempt id for a map task, if any exists. pub fn latest_attempt( &self, query_id: u64, @@ -93,6 +99,7 @@ impl ShuffleReader { .max()) } + /// Return partition metadata for one reduce partition in one attempt. pub fn partition_meta( &self, query_id: u64, @@ -112,6 +119,7 @@ impl ShuffleReader { }) } + /// Read one partition payload and decode as Arrow record batches. pub fn read_partition( &self, query_id: u64, @@ -125,6 +133,7 @@ impl ShuffleReader { decode_ipc_bytes(&bytes) } + /// Read partition payload using the newest available attempt. pub fn read_partition_latest( &self, query_id: u64, @@ -143,6 +152,7 @@ impl ShuffleReader { } // Simulates FetchShufflePartition as server-streamed byte chunks. + /// Read one partition payload and split bytes into fetch-sized chunks. pub fn fetch_partition_chunks( &self, query_id: u64, @@ -163,6 +173,7 @@ impl ShuffleReader { Ok(out) } + /// Fetch partition chunks for the newest available attempt. pub fn fetch_partition_chunks_latest( &self, query_id: u64, @@ -180,6 +191,7 @@ impl ShuffleReader { Ok((attempt, chunks)) } + /// Decode record batches from previously streamed byte chunks. pub fn read_partition_from_streamed_chunks( &self, chunks: impl IntoIterator>, diff --git a/crates/shuffle/src/writer.rs b/crates/shuffle/src/writer.rs index 5057c1f..01be988 100644 --- a/crates/shuffle/src/writer.rs +++ b/crates/shuffle/src/writer.rs @@ -7,23 +7,26 @@ use arrow::record_batch::RecordBatch; use ffq_common::{FfqError, Result}; use crate::layout::{ - index_bin_path, index_json_path, map_task_dir, shuffle_path, MapTaskIndex, ShufflePartitionMeta, + MapTaskIndex, ShufflePartitionMeta, index_bin_path, index_json_path, map_task_dir, shuffle_path, }; const INDEX_BIN_MAGIC: &[u8; 4] = b"FFQI"; const INDEX_BIN_VERSION: u32 = 1; +/// Writes shuffle partition payloads and map-task index metadata. pub struct ShuffleWriter { root_dir: PathBuf, } impl ShuffleWriter { + /// Create a writer rooted at `root_dir`. pub fn new(root_dir: impl Into) -> Self { Self { root_dir: root_dir.into(), } } + /// Write one reduce partition payload as Arrow IPC and return its metadata. pub fn write_partition( &self, query_id: u64, @@ -71,6 +74,7 @@ impl ShuffleWriter { }) } + /// Write JSON and binary index files for one map-task attempt. pub fn write_map_task_index( &self, query_id: u64, @@ -119,6 +123,7 @@ impl ShuffleWriter { Ok(()) } + /// Remove expired non-latest attempts based on `ttl`. pub fn cleanup_expired_attempts(&self, ttl: Duration, now: SystemTime) -> Result { let shuffle_root = self.root_dir.join("shuffle"); if !shuffle_root.exists() { @@ -213,7 +218,7 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; - use crate::layout::{index_json_path, MapTaskIndex}; + use crate::layout::{MapTaskIndex, index_json_path}; use crate::reader::ShuffleReader; use super::ShuffleWriter; diff --git a/crates/sql/src/lib.rs b/crates/sql/src/lib.rs index f7e1ffe..151f0a2 100644 --- a/crates/sql/src/lib.rs +++ b/crates/sql/src/lib.rs @@ -1,8 +1,25 @@ +#![deny(missing_docs)] + +//! Minimal SQL parsing facade used by planner/frontend code. +//! +//! Architecture role: +//! - wraps `sqlparser` invocation and normalizes parse errors into FFQ error types +//! - keeps parser dependency details out of higher-level crates +//! +//! Key API: +//! - [`parse_sql`] +//! +//! Feature flags: +//! - none. + use ffq_common::{FfqError, Result}; use sqlparser::ast::Statement; use sqlparser::dialect::GenericDialect; use sqlparser::parser::Parser; +/// Parse one SQL string into `sqlparser` statements using the generic dialect. +/// +/// Returns [`ffq_common::FfqError::Planning`] when parsing fails. pub fn parse_sql(sql: &str) -> Result> { let dialect = GenericDialect {}; Parser::parse_sql(&dialect, sql).map_err(|e| FfqError::Planning(e.to_string())) diff --git a/crates/storage/src/catalog.rs b/crates/storage/src/catalog.rs index 23323ce..d7f7c81 100644 --- a/crates/storage/src/catalog.rs +++ b/crates/storage/src/catalog.rs @@ -7,26 +7,54 @@ use std::path::Path; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; +/// Logical table definition persisted in catalog files (`tables.json` / `tables.toml`). +/// +/// Contract: +/// - a table must provide data locations via either [`TableDef::uri`] or [`TableDef::paths`] +/// - `schema` may be omitted only for inferable formats (currently parquet and qdrant) +/// - format-specific options are carried in `options` +/// +/// Persistence: +/// - this struct is serialized directly by [`Catalog::save`] and loaded by [`Catalog::load`] +/// - when schema inference/writeback is enabled at client layer, inferred schema and +/// fingerprint metadata are stored in `schema` and `options` #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TableDef { + /// Registry key for this table. pub name: String, + /// Single path/URI location for table data. + /// + /// When set and [`TableDef::paths`] is empty, providers read from this location. #[serde(default)] pub uri: String, + /// Multi-file explicit data paths. + /// + /// Takes precedence over [`TableDef::uri`] when non-empty. #[serde(default)] pub paths: Vec, + /// Storage format identifier (for example `parquet`, `qdrant`). pub format: String, + /// Optional table schema. + /// + /// For parquet, schema can be omitted and inferred by client/storage inference policies. #[serde(default)] pub schema: Option, + /// Optional table statistics used by optimizer heuristics. #[serde(default)] pub stats: crate::TableStats, + /// Provider- and feature-specific options map. #[serde(default)] pub options: HashMap, } impl TableDef { + /// Returns schema as [`SchemaRef`] or an error if missing. + /// + /// # Errors + /// Returns an error when schema is absent. pub fn schema_ref(&self) -> Result { match &self.schema { Some(s) => Ok(Arc::new(s.clone())), @@ -37,6 +65,14 @@ impl TableDef { } } + /// Resolves data locations in provider-consumable order. + /// + /// Resolution: + /// - returns `paths` when non-empty + /// - otherwise returns `uri` as a single entry + /// + /// # Errors + /// Returns an error when neither `paths` nor `uri` is configured. pub fn data_paths(&self) -> Result> { if !self.paths.is_empty() { return Ok(self.paths.clone()); @@ -51,40 +87,59 @@ impl TableDef { } } +/// In-memory table catalog with JSON/TOML persistence helpers. #[derive(Debug, Default, Clone)] pub struct Catalog { tables: HashMap, } impl Catalog { + /// Creates an empty catalog. pub fn new() -> Self { Self { tables: HashMap::new(), } } + /// Registers or replaces a table by name. pub fn register_table(&mut self, table: TableDef) { self.tables.insert(table.name.clone(), table); } + /// Retrieves a table definition by name. + /// + /// # Errors + /// Returns planning error for unknown table names. pub fn get(&self, name: &str) -> Result<&TableDef> { self.tables .get(name) .ok_or_else(|| FfqError::Planning(format!("unknown table: {name}"))) } + /// Loads catalog from a JSON file. + /// + /// # Errors + /// Returns an error on read/parse/validation failures. pub fn load_from_json(path: &str) -> Result { let s = fs::read_to_string(path)?; let tables = parse_tables_json(&s)?; Self::from_tables(tables) } + /// Loads catalog from a TOML file. + /// + /// # Errors + /// Returns an error on read/parse/validation failures. pub fn load_from_toml(path: &str) -> Result { let s = fs::read_to_string(path)?; let tables = parse_tables_toml(&s)?; Self::from_tables(tables) } + /// Loads catalog from JSON or TOML based on file extension. + /// + /// # Errors + /// Returns an error for unsupported extensions or load/validation failures. pub fn load(path: &str) -> Result { match Path::new(path).extension().and_then(|ext| ext.to_str()) { Some("json") => Self::load_from_json(path), @@ -107,12 +162,17 @@ impl Catalog { Ok(cat) } + /// Returns all table definitions sorted by table name. pub fn tables(&self) -> Vec { let mut v = self.tables.values().cloned().collect::>(); v.sort_by(|a, b| a.name.cmp(&b.name)); v } + /// Persists catalog atomically to JSON. + /// + /// # Errors + /// Returns an error on encode/write/commit failures. pub fn save_to_json(&self, path: &str) -> Result<()> { if let Some(parent) = Path::new(path).parent() { fs::create_dir_all(parent)?; @@ -125,6 +185,10 @@ impl Catalog { Ok(()) } + /// Persists catalog atomically to TOML. + /// + /// # Errors + /// Returns an error on encode/write/commit failures. pub fn save_to_toml(&self, path: &str) -> Result<()> { if let Some(parent) = Path::new(path).parent() { fs::create_dir_all(parent)?; @@ -137,6 +201,10 @@ impl Catalog { Ok(()) } + /// Persists catalog to JSON/TOML based on file extension. + /// + /// # Errors + /// Returns an error for unsupported extension or save failures. pub fn save(&self, path: &str) -> Result<()> { match Path::new(path).extension().and_then(|ext| ext.to_str()) { Some("json") => self.save_to_json(path), @@ -196,6 +264,15 @@ fn format_supports_schema_inference(format: &str) -> bool { matches!(format.to_ascii_lowercase().as_str(), "parquet" | "qdrant") } +/// Writes bytes to `path` using stage-then-rename atomic commit semantics. +/// +/// Behavior: +/// - writes staged file under the same parent directory +/// - if target exists, renames existing target to backup, then commits staged file +/// - on commit failure, best-effort rollback restores backup +/// +/// # Errors +/// Returns an error when stage/write/rename steps fail. fn write_atomically(path: &str, content: &[u8]) -> Result<()> { let target = Path::new(path); let parent = target diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index b594d94..6b2602a 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -1,13 +1,42 @@ +#![deny(missing_docs)] + +//! Storage providers, catalog model, and table metadata APIs. +//! +//! Architecture role: +//! - table definition/catalog load-save contracts +//! - provider abstraction for scan/stat estimation +//! - parquet and optional external/vector backends +//! +//! Key modules: +//! - [`catalog`] +//! - [`provider`] +//! - [`parquet_provider`] +//! - [`stats`] +//! - [`vector_index`] +//! - `object_store_provider` (feature-gated) +//! - `qdrant_provider` (feature-gated) +//! +//! Feature flags: +//! - `s3`: enables object-store provider implementation +//! - `qdrant`: enables qdrant-backed vector index provider. + +/// Table/catalog model and persistence. pub mod catalog; +/// Parquet-backed storage provider and schema inference helpers. pub mod parquet_provider; +/// Provider traits and scan/stats abstractions. pub mod provider; +/// Table statistics model. pub mod stats; +/// Vector index provider abstraction. pub mod vector_index; #[cfg(feature = "s3")] +/// Experimental object-store storage provider. pub mod object_store_provider; #[cfg(feature = "qdrant")] +/// Qdrant-backed vector index provider. pub mod qdrant_provider; pub use catalog::*; diff --git a/crates/storage/src/parquet_provider.rs b/crates/storage/src/parquet_provider.rs index 65369d8..c899664 100644 --- a/crates/storage/src/parquet_provider.rs +++ b/crates/storage/src/parquet_provider.rs @@ -1,6 +1,6 @@ use std::fs::File; -use std::time::UNIX_EPOCH; use std::sync::Arc; +use std::time::UNIX_EPOCH; use arrow::record_batch::RecordBatch; use arrow_schema::{DataType, Field, Schema, SchemaRef}; @@ -12,24 +12,55 @@ use serde::{Deserialize, Serialize}; use crate::catalog::TableDef; use crate::provider::{Stats, StorageExecNode, StorageProvider}; +/// Local parquet-backed [`StorageProvider`] implementation. +/// +/// Supports: +/// - schema inference from parquet footers +/// - deterministic multi-file schema merge with strict/permissive policy +/// - basic projection pushdown by column selection +/// +/// Drift semantics: +/// - drift detection itself is handled by client-side schema-fingerprint policy +/// - this provider exposes [`FileFingerprint`] helpers used by that logic pub struct ParquetProvider; +/// Stable per-file fingerprint used for schema drift detection. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct FileFingerprint { + /// File path. pub path: String, + /// File size in bytes. pub size_bytes: u64, + /// File modification timestamp (nanoseconds since Unix epoch). pub mtime_ns: u128, } impl ParquetProvider { + /// Creates a parquet provider instance. pub fn new() -> Self { Self } + /// Infers schema for a parquet path list using permissive merge policy. + /// + /// Equivalent to [`ParquetProvider::infer_parquet_schema_with_policy`] with + /// `permissive_merge = true`. + /// + /// # Errors + /// Returns an error for empty path lists, read/decode failures, or incompatible schemas. pub fn infer_parquet_schema(paths: &[String]) -> Result { Self::infer_parquet_schema_with_policy(paths, true) } + /// Infers schema for one or more parquet files and merges them deterministically. + /// + /// Policy: + /// - strict mode (`permissive_merge = false`): requires exact type compatibility + /// - permissive mode (`permissive_merge = true`): allows nullable widening and + /// limited numeric widening (for example int32 + int64 => int64) + /// + /// # Errors + /// Returns an error for empty path list, parquet read failures, or incompatible schemas. pub fn infer_parquet_schema_with_policy( paths: &[String], permissive_merge: bool, @@ -63,28 +94,23 @@ impl ParquetProvider { }) } + /// Builds per-file fingerprints for schema drift checks. + /// + /// # Errors + /// Returns an error when file metadata cannot be read. pub fn fingerprint_paths(paths: &[String]) -> Result> { let mut out = Vec::with_capacity(paths.len()); for path in paths { let md = std::fs::metadata(path).map_err(|e| { - FfqError::InvalidConfig(format!( - "failed to stat parquet path '{}': {e}", - path - )) + FfqError::InvalidConfig(format!("failed to stat parquet path '{}': {e}", path)) })?; let modified = md.modified().map_err(|e| { - FfqError::InvalidConfig(format!( - "failed to read modified time for '{}': {e}", - path - )) + FfqError::InvalidConfig(format!("failed to read modified time for '{}': {e}", path)) })?; let mtime_ns = modified .duration_since(UNIX_EPOCH) .map_err(|e| { - FfqError::InvalidConfig(format!( - "invalid modified time for '{}': {e}", - path - )) + FfqError::InvalidConfig(format!("invalid modified time for '{}': {e}", path)) })? .as_nanos(); out.push(FileFingerprint { @@ -97,7 +123,12 @@ impl ParquetProvider { } } -fn merge_schemas(base: &Schema, next: &Schema, path: &str, permissive_merge: bool) -> Result { +fn merge_schemas( + base: &Schema, + next: &Schema, + path: &str, + permissive_merge: bool, +) -> Result { if base.fields().len() != next.fields().len() { return Err(FfqError::InvalidConfig(format!( "incompatible parquet files: schema mismatch across table paths; '{}' has {} fields but expected {}", @@ -307,6 +338,7 @@ impl StorageProvider for ParquetProvider { } } +/// Execution node that scans parquet files and emits Arrow record batches. pub struct ParquetScanNode { paths: Vec, schema: SchemaRef, @@ -338,8 +370,8 @@ impl ExecNode for ParquetScanNode { .map_err(|e| FfqError::Execution(format!("parquet reader open failed: {e}")))?; for batch in reader { - let batch = - batch.map_err(|e| FfqError::Execution(format!("parquet decode failed: {e}")))?; + let batch = batch + .map_err(|e| FfqError::Execution(format!("parquet decode failed: {e}")))?; if batch.schema().fields().len() != self.source_schema.fields().len() { return Err(FfqError::Execution(format!( "parquet scan schema mismatch for '{}': expected {} columns, got {}", @@ -426,7 +458,10 @@ mod tests { #[test] fn infer_parquet_schema_rejects_incompatible_files() { - let paths = vec![fixture_path("lineitem.parquet"), fixture_path("orders.parquet")]; + let paths = vec![ + fixture_path("lineitem.parquet"), + fixture_path("orders.parquet"), + ]; let err = ParquetProvider::infer_parquet_schema(&paths).expect_err("must reject"); let msg = format!("{err}"); assert!(msg.contains("incompatible parquet files")); diff --git a/crates/storage/src/provider.rs b/crates/storage/src/provider.rs index 5d37280..c7090b8 100644 --- a/crates/storage/src/provider.rs +++ b/crates/storage/src/provider.rs @@ -3,17 +3,32 @@ use std::sync::Arc; use ffq_common::Result; use ffq_execution::ExecNode; +/// Lightweight statistics used by planner/optimizer. #[derive(Debug, Clone, Default)] pub struct Stats { + /// Estimated output row count, if known. pub estimated_rows: Option, + /// Estimated scanned bytes, if known. pub estimated_bytes: Option, } +/// Type-erased storage execution node. pub type StorageExecNode = Arc; +/// Storage abstraction for table scanning and basic stats estimation. +/// +/// Implementations are format/backend-specific (for example parquet, object-store, qdrant). pub trait StorageProvider: Send + Sync { + /// Estimates table-level stats for optimizer decisions. fn estimate_stats(&self, table: &crate::catalog::TableDef) -> Stats; + /// Builds scan node for a table with optional projection/filter pushdown hints. + /// + /// `projection` and `filters` are best-effort pushdown inputs; providers may partially + /// apply or ignore unsupported predicates. + /// + /// # Errors + /// Returns an error for invalid table configuration or unsupported format/provider behavior. fn scan( &self, table: &crate::catalog::TableDef, diff --git a/crates/storage/src/qdrant_provider.rs b/crates/storage/src/qdrant_provider.rs index 71d6ea5..ed81cea 100644 --- a/crates/storage/src/qdrant_provider.rs +++ b/crates/storage/src/qdrant_provider.rs @@ -2,8 +2,8 @@ use std::collections::HashMap; use ffq_common::{FfqError, Result}; use futures::future::{BoxFuture, FutureExt}; -use qdrant_client::qdrant::{point_id, Condition, Filter, SearchPointsBuilder, Value}; use qdrant_client::Qdrant; +use qdrant_client::qdrant::{Condition, Filter, SearchPointsBuilder, Value, point_id}; use crate::vector_index::{VectorIndexProvider, VectorTopKRow}; diff --git a/crates/storage/src/stats.rs b/crates/storage/src/stats.rs index 7fdc150..b54d99b 100644 --- a/crates/storage/src/stats.rs +++ b/crates/storage/src/stats.rs @@ -1,7 +1,10 @@ use serde::{Deserialize, Serialize}; +/// Lightweight table statistics used by optimizer heuristics. #[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)] pub struct TableStats { + /// Estimated row count if known. pub rows: Option, + /// Estimated bytes if known. pub bytes: Option, } diff --git a/crates/storage/src/vector_index.rs b/crates/storage/src/vector_index.rs index 6fef9ba..3ed1d39 100644 --- a/crates/storage/src/vector_index.rs +++ b/crates/storage/src/vector_index.rs @@ -2,14 +2,20 @@ use futures::future::BoxFuture; use ffq_common::Result; +/// One vector top-k result row returned by index providers. #[derive(Debug, Clone, PartialEq)] pub struct VectorTopKRow { + /// Document identifier. pub id: i64, + /// Similarity/distance score as returned by provider. pub score: f32, + /// Optional payload serialized as JSON text. pub payload_json: Option, } +/// Vector index abstraction used by `VectorTopKExec`. pub trait VectorIndexProvider: Send + Sync { + /// Fetch top-k rows for `query_vec`, optionally applying provider-specific filter. fn topk<'a>( &'a self, query_vec: Vec, diff --git a/docs/dev/missing-docs-backlog.md b/docs/dev/missing-docs-backlog.md new file mode 100644 index 0000000..c897e62 --- /dev/null +++ b/docs/dev/missing-docs-backlog.md @@ -0,0 +1,45 @@ +# Missing Docs Rollout Backlog + +This tracks incremental `#![warn(missing_docs)]` rollout across crates. + +## Enabled now + +- `ffq-client` + - status: `deny(missing_docs)` in `crates/client/src/lib.rs` + - current gate: `cargo check -p ffq-client` clean for `missing_docs` +- `ffq-common` + - status: `deny(missing_docs)` in `crates/common/src/lib.rs` + - current gate: `cargo rustc -p ffq-common --lib -- -D missing-docs` clean +- `ffq-storage` + - status: `deny(missing_docs)` in `crates/storage/src/lib.rs` + - current gate: `cargo rustc -p ffq-storage --lib -- -D missing-docs` clean +- `ffq-planner` + - status: `deny(missing_docs)` in `crates/planner/src/lib.rs` + - current gate: `cargo rustc -p ffq-planner --lib -- -D missing-docs` clean +- `ffq-execution` + - status: `deny(missing_docs)` in `crates/execution/src/lib.rs` + - current gate: `cargo rustc -p ffq-execution --lib -- -D missing-docs` clean +- `ffq-distributed` + - status: `deny(missing_docs)` in `crates/distributed/src/lib.rs` + - current gate: `cargo rustc -p ffq-distributed --lib -- -D missing-docs` clean +- `ffq-shuffle` + - status: `deny(missing_docs)` in `crates/shuffle/src/lib.rs` + - current gate: `cargo rustc -p ffq-shuffle --lib -- -D missing-docs` clean +- `ffq-sql` + - status: `deny(missing_docs)` in `crates/sql/src/lib.rs` + - current gate: `cargo rustc -p ffq-sql --lib -- -D missing-docs` clean + +## Next crates + +No additional crates are currently queued in this backlog. + +## Rollout command pattern + +Use this per crate: + +```bash +cargo check -p +``` + +When all crates above are clean, optionally escalate lint strength from +`warn(missing_docs)` to `deny(missing_docs)` crate-by-crate. diff --git a/docs/dev/rustdoc-style.md b/docs/dev/rustdoc-style.md new file mode 100644 index 0000000..cf0b971 --- /dev/null +++ b/docs/dev/rustdoc-style.md @@ -0,0 +1,113 @@ +# Rustdoc Style Guide + +This document defines source-level Rust documentation standards for FastFlowQuery. + +## Scope + +Use this guide for: +- crate-level docs (`//!`) +- module-level docs (`//!`) +- item-level docs (`///`) on public APIs + +Goals: +- make public behavior and constraints explicit +- keep docs accurate and reviewable +- keep doc examples minimal and runnable where practical + +## `//!` vs `///` + +Use `//!` for: +- crate overviews (`lib.rs`, `main.rs`) +- module architecture notes +- cross-cutting contracts that apply to multiple items + +Use `///` for: +- public structs, enums, traits, functions, type aliases, constants +- externally relevant behavior for non-public items when it materially reduces ambiguity + +Rule of thumb: +- if the information is about a namespace/module, use `//!` +- if the information is about one item, use `///` + +## Required Sections (when applicable) + +For public functions/methods with fallible behavior: +- include `# Errors` +- describe expected error classes and triggering conditions + +For functions with panics that are part of contract: +- include `# Panics` + +For unsafe APIs: +- include `# Safety` + +For performance-sensitive or non-obvious semantics: +- include `# Notes` or `# Invariants` + +## Examples + +Use `# Examples` for key public entry points. + +Guidelines: +- keep examples short and focused +- prefer compile-checked examples +- if runtime setup is heavy, use `no_run` +- avoid stale pseudo-code + +Example shape: + +```rust +/// Executes a query and collects all batches. +/// +/// # Errors +/// Returns an error if planning or execution fails. +/// +/// # Examples +/// ```no_run +/// # use ffq_client::Engine; +/// # use ffq_common::EngineConfig; +/// # async fn run() -> Result<(), Box> { +/// let engine = Engine::new(EngineConfig::default())?; +/// let batches = engine.sql("SELECT 1")?.collect().await?; +/// assert!(!batches.is_empty()); +/// # Ok(()) +/// # } +/// ``` +/// ``` + +## Invariants and Contracts + +Document invariants where violations can cause subtle bugs: +- schema assumptions (column order/type expectations) +- planner/optimizer preconditions +- runtime semantics (determinism, spill behavior, retry semantics) +- sink commit semantics and idempotency expectations + +When possible, tie invariants to corresponding tests by file name. + +## Intra-doc Links + +Prefer intra-doc links for navigation: +- types: [`Engine`], [`DataFrame`] +- methods: [`Engine::sql`] +- modules: [`crate::runtime`] + +Guidelines: +- link to canonical items instead of repeating full explanations +- avoid broken links by running `cargo doc` in CI/dev checks + +## Tone and Style + +- be precise and concrete +- avoid marketing language +- avoid duplicating implementation details that change frequently +- document behavior/contracts, not line-by-line code flow + +## Review Checklist + +When reviewing doc changes, verify: +- correct placement of `//!` vs `///` +- `# Errors` exists where needed +- examples compile or are explicitly `no_run` +- invariants and constraints are explicitly stated +- links resolve and docs reflect current behavior