From abf9d80e6f766242c14307b4b05149de56b4e316 Mon Sep 17 00:00:00 2001 From: Marko Lekic Date: Mon, 1 Dec 2025 22:18:05 +0100 Subject: [PATCH 1/2] Implemented stdlib macros + unittest + added this to macros demo --- .../common/fct_user_sales_by_country.ff.sql | 35 +++++ .../common/fct_user_sales_partitioned.ff.sql | 33 ++++ .../models/common/stg_orders.ff.sql | 13 +- .../models/common/stg_users.ff.sql | 10 +- examples/macros_demo/models/macros/utils.sql | 3 +- examples/macros_demo/project.yml | 80 +++++++++- src/fastflowtransform/cli/init_cmd.py | 128 +++++++-------- src/fastflowtransform/core.py | 44 +----- src/fastflowtransform/stdlib/__init__.py | 130 +++++++++++++++ src/fastflowtransform/stdlib/casts.py | 77 +++++++++ src/fastflowtransform/stdlib/dates.py | 148 ++++++++++++++++++ src/fastflowtransform/stdlib/engine.py | 77 +++++++++ src/fastflowtransform/stdlib/partitions.py | 110 +++++++++++++ src/fastflowtransform/stdlib/sql.py | 36 +++++ tests/unit/cli/test_cli_init_unit.py | 25 ++- tests/unit/stdlib/test_casts_unit.py | 48 ++++++ tests/unit/stdlib/test_dates_unit.py | 142 +++++++++++++++++ tests/unit/stdlib/test_engine_helpers_unit.py | 52 ++++++ tests/unit/stdlib/test_partitions_unit.py | 74 +++++++++ tests/unit/stdlib/test_register_jinja_unit.py | 59 +++++++ 20 files changed, 1211 insertions(+), 113 deletions(-) create mode 100644 examples/macros_demo/models/common/fct_user_sales_by_country.ff.sql create mode 100644 examples/macros_demo/models/common/fct_user_sales_partitioned.ff.sql create mode 100644 src/fastflowtransform/stdlib/__init__.py create mode 100644 src/fastflowtransform/stdlib/casts.py create mode 100644 src/fastflowtransform/stdlib/dates.py create mode 100644 src/fastflowtransform/stdlib/engine.py create mode 100644 src/fastflowtransform/stdlib/partitions.py create mode 100644 src/fastflowtransform/stdlib/sql.py create mode 100644 tests/unit/stdlib/test_casts_unit.py create mode 100644 tests/unit/stdlib/test_dates_unit.py create mode 100644 tests/unit/stdlib/test_engine_helpers_unit.py create mode 100644 tests/unit/stdlib/test_partitions_unit.py create mode 100644 tests/unit/stdlib/test_register_jinja_unit.py diff --git a/examples/macros_demo/models/common/fct_user_sales_by_country.ff.sql b/examples/macros_demo/models/common/fct_user_sales_by_country.ff.sql new file mode 100644 index 0000000..ae3d924 --- /dev/null +++ b/examples/macros_demo/models/common/fct_user_sales_by_country.ff.sql @@ -0,0 +1,35 @@ +{{ config( + materialized = 'table', + tags = [ + 'example:macros_demo', + 'scope:common', + 'engine:duckdb', + 'engine:postgres', + 'engine:databricks_spark', + 'engine:bigquery', + 'engine:snowflake_snowpark' + ] +) }} + +with sales as ( + select + u.user_id, + u.user_segment, + u.country, + o.amount, + o.order_ts + from {{ ref('stg_orders.ff') }} as o + join {{ ref('dim_users.ff') }} as u + on u.user_id = o.user_id + where + -- demo: stdlib partition IN helper on a dimension + {{ ff_partition_in('u.country', var('partition_countries', ['DE', 'AT'])) }} +) +select + user_id, + user_segment, + country, + count(*) as order_count, + sum(amount) as total_amount +from sales +group by user_id, user_segment, country; diff --git a/examples/macros_demo/models/common/fct_user_sales_partitioned.ff.sql b/examples/macros_demo/models/common/fct_user_sales_partitioned.ff.sql new file mode 100644 index 0000000..56ab8f0 --- /dev/null +++ b/examples/macros_demo/models/common/fct_user_sales_partitioned.ff.sql @@ -0,0 +1,33 @@ +{{ config( + materialized = 'table', + tags = [ + 'example:macros_demo', + 'scope:common', + 'engine:duckdb', + 'engine:postgres', + 'engine:databricks_spark', + 'engine:bigquery', + 'engine:snowflake_snowpark' + ] +) }} + +with sales as ( + select + u.user_id, + u.user_segment, + o.order_ts, + o.amount + from {{ ref('stg_orders.ff') }} as o + join {{ ref('dim_users.ff') }} as u + on u.user_id = o.user_id + where + -- demo: engine-aware partition predicate using stdlib + {{ ff_partition_filter('o.order_ts', var('from_date', '2025-10-01'), var('to_date', '2025-10-31')) }} +) +select + user_id, + user_segment, + count(*) as order_count, + sum(amount) as total_amount +from sales +group by user_id, user_segment; diff --git a/examples/macros_demo/models/common/stg_orders.ff.sql b/examples/macros_demo/models/common/stg_orders.ff.sql index a041e2f..e46235d 100644 --- a/examples/macros_demo/models/common/stg_orders.ff.sql +++ b/examples/macros_demo/models/common/stg_orders.ff.sql @@ -1,11 +1,22 @@ {{ config( materialized='view', - tags=['example:macros_demo', 'scope:common', 'engine:duckdb', 'engine:postgres', 'engine:databricks_spark', 'engine:bigquery', 'engine:snowflake_snowpark'] + tags=[ + 'example:macros_demo', + 'scope:common', + 'engine:duckdb', + 'engine:postgres', + 'engine:databricks_spark', + 'engine:bigquery', + 'engine:snowflake_snowpark' + ] ) }} select cast(order_id as int) as order_id, cast(customer_id as int) as user_id, {{ safe_cast_amount("amount") }} as amount, + {{ ff_safe_cast("amount", "numeric", default = "0") }} as amount_safe, + {{ ff_date_trunc("order_ts", "day") }} as order_day, + {{ ff_date_add("order_ts", "day", 1) }} as order_ts_plus_1d, cast(order_ts as timestamp) as order_ts from {{ source('sales', 'orders') }}; diff --git a/examples/macros_demo/models/common/stg_users.ff.sql b/examples/macros_demo/models/common/stg_users.ff.sql index f1c75bd..4b877f5 100644 --- a/examples/macros_demo/models/common/stg_users.ff.sql +++ b/examples/macros_demo/models/common/stg_users.ff.sql @@ -1,6 +1,14 @@ {{ config( materialized='view', - tags=['example:macros_demo', 'scope:common', 'engine:duckdb', 'engine:postgres', 'engine:databricks_spark', 'engine:bigquery', 'engine:snowflake_snowpark'] + tags=[ + 'example:macros_demo', + 'scope:common', + 'engine:duckdb', + 'engine:postgres', + 'engine:databricks_spark', + 'engine:bigquery', + 'engine:snowflake_snowpark' + ] ) }} with src as ( diff --git a/examples/macros_demo/models/macros/utils.sql b/examples/macros_demo/models/macros/utils.sql index eb0020b..b7c10f8 100644 --- a/examples/macros_demo/models/macros/utils.sql +++ b/examples/macros_demo/models/macros/utils.sql @@ -1,8 +1,7 @@ {# Reusable SQL helpers #} {%- macro email_domain(expr) -%} -{%- set e = engine('duckdb') -%} -{%- if e == 'bigquery' -%} +{%- if ff_is_engine('bigquery') -%} lower(split({{ expr }}, '@')[SAFE_OFFSET(1)]) {%- else -%} lower(split_part({{ expr }}, '@', 2)) diff --git a/examples/macros_demo/project.yml b/examples/macros_demo/project.yml index bf210b5..69c8182 100644 --- a/examples/macros_demo/project.yml +++ b/examples/macros_demo/project.yml @@ -2,19 +2,95 @@ name: macros_demo version: "0.1" vars: - # used by macros and examples default_country: "DE" models: {} - seeds: {} tests: + # dim_users - type: not_null table: dim_users column: user_id tags: [batch] + + # fct_user_sales (existing) + - type: not_null + table: fct_user_sales + column: user_id + tags: [batch] - type: row_count_between table: fct_user_sales min_rows: 1 tags: [batch] + + # NEW: stg_orders – tests stdlib safe_cast/date helpers + - type: not_null + table: stg_orders + column: user_id + tags: [batch] + - type: row_count_between + table: stg_orders + min_rows: 1 + tags: [batch] + + # amount_safe produced via ff_safe_cast should never be NULL and ≥ 0 + - type: not_null + table: stg_orders + column: amount_safe + tags: [batch] + - type: greater_equal + table: stg_orders + column: amount_safe + threshold: 0.0 + tags: [batch] + - type: non_negative_sum + table: stg_orders + column: amount_safe + tags: [batch] + + # order_day produced via ff_date_trunc should never be NULL + - type: not_null + table: stg_orders + column: order_day + tags: [batch] + + # stg_users (your existing checks) + - type: not_null + table: stg_users + column: user_id + tags: [batch] + - type: row_count_between + table: stg_users + min_rows: 1 + tags: [batch] + + # fct_user_sales_by_country – uses ff_partition_in + - type: not_null + table: fct_user_sales_by_country + column: user_id + tags: [batch] + - type: row_count_between + table: fct_user_sales_by_country + min_rows: 1 + tags: [batch] + + # Only DE / AT should appear (given your seed + ff_partition_in) + - type: accepted_values + table: fct_user_sales_by_country + column: country + values: ["DE", "AT"] + tags: [batch] + + # fct_user_sales_partitioned – uses ff_partition_filter + - type: not_null + table: fct_user_sales_partitioned + column: user_id + tags: [batch] + + # We know the date filter gives exactly two users (1 & 2) in the demo seeds + - type: row_count_between + table: fct_user_sales_partitioned + min_rows: 2 + max_rows: 2 + tags: [batch] diff --git a/src/fastflowtransform/cli/init_cmd.py b/src/fastflowtransform/cli/init_cmd.py index b6c42dd..e8e3c6d 100644 --- a/src/fastflowtransform/cli/init_cmd.py +++ b/src/fastflowtransform/cli/init_cmd.py @@ -112,6 +112,25 @@ def _build_project_yaml(ctx: _InitContext) -> str: '# run_date: "2024-01-01"', "vars: {}", "", + "# Optional project-wide hooks that run before/after the pipeline or per model.", + "# See docs/Hooks.md for examples (SQL + Python) and selector usage.", + "hooks:", + " on_run_start: []", + " on_run_end: []", + " before_model: []", + " after_model: []", + "", + "# Optional storage & incremental defaults applied per model name.", + "# See docs/Project_Config.md#models for field meanings.", + "models:", + " storage: {}", + " incremental: {}", + "", + "# Optional seed storage overrides (e.g., external locations per seed).", + "# See docs/Project_Config.md#seeds for supported keys.", + "seeds:", + " storage: {}", + "", "# Declare project-wide data quality checks under `tests`. " "See docs/Data_Quality_Tests.md.", "tests: []", @@ -137,80 +156,52 @@ def _build_sources_yaml() -> str: ) +def _build_packages_yaml() -> str: + return "\n".join( + [ + "# Packages bring in external models and macros. See docs/Packages.md.", + "packages:", + " # - name: shared_macros", + ' # path: "../shared_macros"', + ' # models_dir: "models"', + "", + ] + ) + + def _write_file(path: Path, content: str) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(content, encoding="utf-8") -def _create_directory_notes(target: Path) -> None: - notes = { - "models/README.md": "\n".join( - [ - "# Models directory", - "", - "Place SQL (`*.ff.sql`) and Python (`*.ff.py`) models here.", - "See docs/Config_and_Macros.md for modeling guidance and config options.", - "", - ] - ), - "seeds/README.md": "\n".join( - [ - "# Seeds directory", - "", - "Add CSV or Parquet files for reproducible seeds.", - "Usage examples are covered in docs/Quickstart.md and " - "docs/Config_and_Macros.md#13-seeds-sources-and-dependencies.", - "", - ] - ), - "tests/unit/README.md": "\n".join( - [ - "# Unit tests", - "", - "Define YAML unit specs as described in " - "docs/Config_and_Macros.md#73-model-unit-tests-fft-utest.", - "Invoke them with `fft utest --env `.", - "", - ] - ), - "tests/dq/README.md": "\n".join( - [ - "# Data quality tests", - "", - "Store custom data-quality tests that run via `fft test` " - "(docs/Data_Quality_Tests.md).", - "Use this directory for schema-bound tests separate from unit specs.", - "", - ] - ), - "docs/README.md": "\n".join( - [ - "# Project documentation", - "", - "Write operator or contributor notes here and keep " - "them in sync with generated docs.", - "See docs/Technical_Overview.md#auto-docs-and-lineage " - "for `fft dag` / `fft docgen` guidance.", - "", - ] - ), - } - for rel, text in notes.items(): - _write_file(target / rel, text) - - def _build_root_readme(ctx: _InitContext) -> str: return "\n".join( [ "# FastFlowTransform project scaffold", "", "This project was created with `fft init`.", + "", + "What lives here:", + "- models/: SQL (`*.ff.sql`) and Python (`*.ff.py`) models.", + " - models/macros/: Jinja SQL macros loaded automatically.", + " - models/macros_py/: Python helpers exposed as Jinja globals/filters.", + "- seeds/: CSV/Parquet inputs for reproducible seeds (see docs/Quickstart.md).", + "- sources.yml: External tables for source('group','table').", + "- profiles.yml: Engine connections; defaults come from docs/Profiles.md.", + "- packages.yml: Optional shared models/macros (docs/Packages.md).", + "- tests/unit/: YAML specs for `fft utest` (docs/Unit_Tests.md).", + "- tests/dq/: Custom data-quality tests for `fft test` (docs/Data_Quality_Tests.md).", + "- hooks/: SQL or Python hooks referenced from project.yml (docs/Hooks.md).", + "- docs/: Notes plus generated DAG site when using `fft dag --html`.", + "", "Next steps:", "1. Update `profiles.yml` with real connection details (docs/Profiles.md).", - "2. Add sources in `sources.yml` and author models " - "under `models/` (docs/Config_and_Macros.md).", - "3. Seed sample data with `fft seed` and execute models " - "with `fft run` (docs/Quickstart.md).", + "2. Add sources in `sources.yml` and author models under `models/` " + " (docs/Config_and_Macros.md).", + "3. Wire packages (optional) in `packages.yml` if you reuse shared " + " models/macros (docs/Packages.md).", + "4. Seed sample data with `fft seed` and execute models " + " with `fft run` (docs/Quickstart.md).", "", ] ) @@ -274,16 +265,25 @@ def init( engine=resolved_engine, ) - for sub in ("models", "seeds", "tests/unit", "tests/dq", "docs"): + for sub in ( + "models", + "models/macros", + "models/macros_py", + "seeds", + "tests/unit", + "tests/dq", + "hooks", + "docs", + ): (project_dir / sub).mkdir(parents=True, exist_ok=True) _write_file(project_dir / "project.yml", _build_project_yaml(ctx)) _write_file(project_dir / "profiles.yml", _build_profiles_yaml(ctx)) _write_file(project_dir / "sources.yml", _build_sources_yaml()) + _write_file(project_dir / "packages.yml", _build_packages_yaml()) _write_file(project_dir / "README.md", _build_root_readme(ctx)) - _create_directory_notes(project_dir) - typer.secho(f"✓ Project skeleton created at {project_dir}", fg="green") + typer.secho(f"Project skeleton created at {project_dir}", fg="green") def register(app: typer.Typer) -> None: diff --git a/src/fastflowtransform/core.py b/src/fastflowtransform/core.py index e80c7cb..5c31a88 100644 --- a/src/fastflowtransform/core.py +++ b/src/fastflowtransform/core.py @@ -4,7 +4,6 @@ import ast import importlib.util import inspect -import json import os import re import types @@ -16,10 +15,9 @@ import jinja2.runtime from jinja2 import Environment, FileSystemLoader, StrictUndefined -from jinja2.runtime import Undefined as JinjaUndefined from pydantic import ValidationError -from fastflowtransform import storage +from fastflowtransform import stdlib as ff_stdlib, storage from fastflowtransform.config.models import validate_model_meta_strict from fastflowtransform.config.packages import PackageSpec, load_packages_config from fastflowtransform.config.project import HookSpec, parse_project_yaml_config @@ -95,38 +93,6 @@ def _validate_py_model_signature(func: Callable, deps: list[str], *, path: Path, ) -def sql_literal(value: Any) -> str: - """ - Convert a Python value into a SQL literal string. - - - None -> "NULL" - - bool -> "TRUE"/"FALSE" - - int/float -> "123" (no quotes) - - str -> quoted with single quotes and escaped - - other -> JSON-dumped and treated as a string literal - """ - if value is None or isinstance(value, JinjaUndefined): - return "NULL" - - if isinstance(value, bool): - return "TRUE" if value else "FALSE" - - if isinstance(value, (int, float)): - return str(value) - - if isinstance(value, str): - # Simple quote-escape for single quotes - escaped = value.replace("'", "''") - return f"'{escaped}'" - - # Fallback: JSON (or str) and quote it - try: - json_text = json.dumps(value, separators=(",", ":"), sort_keys=True) - except TypeError: - json_text = str(value) - return "'" + json_text.replace("'", "''") + "'" - - @dataclass class Node: name: str @@ -463,7 +429,13 @@ def _env(name: str, default: Any | None = None) -> Any: self.env.filters["env"] = _env # Export sql_literal as filter as well - self.env.filters["sql_literal"] = sql_literal + self.env.filters["sql_literal"] = ff_stdlib.sql_literal + + # --- FastFlow stdlib: engine-aware SQL helpers ----------------- + ff_stdlib.register_jinja( + self.env, + engine_resolver=self._current_engine, + ) def _load_sources_yaml(self, project_dir: Path) -> None: """Load sources.yml (version 2) if present.""" diff --git a/src/fastflowtransform/stdlib/__init__.py b/src/fastflowtransform/stdlib/__init__.py new file mode 100644 index 0000000..3fd6efb --- /dev/null +++ b/src/fastflowtransform/stdlib/__init__.py @@ -0,0 +1,130 @@ +# fastflowtransform/stdlib/__init__.py +from __future__ import annotations + +from collections.abc import Callable, Mapping + +from jinja2 import Environment + +from .casts import sql_safe_cast +from .dates import sql_date_add, sql_date_trunc +from .engine import engine_family, is_engine, normalize_engine +from .partitions import sql_partition_filter, sql_partition_in +from .sql import sql_literal + +""" +FastFlowTransform stdlib - engine-aware SQL helper functions. + +These are meant to be exposed into Jinja as: + + {{ ff_date_trunc('day', 'order_date') }} + {{ ff_date_add('day', 'order_date', 7) }} + {{ ff_safe_cast('raw_value', 'INTEGER', default='0') }} + {{ ff_partition_filter('ds', var('from_date'), var('to_date')) }} + {{ ff_partition_in('ds', var('partitions')) }} + +The `register_jinja(...)` helper wires everything into a Jinja Environment +so that adding new stdlib helpers only requires changes inside this package. +""" + +__all__ = [ + "engine_family", + "is_engine", + "normalize_engine", + "register_jinja", + "sql_date_add", + "sql_date_trunc", + "sql_literal", + "sql_partition_filter", + "sql_partition_in", + "sql_safe_cast", +] + + +# --- Registration metadata ------------------------------------------------- + +# Functions that should get an implicit `engine=` kwarg when called +# from templates (but templates can still override engine=... explicitly). +_STD_FUNCS_ENGINE_DEFAULT: Mapping[str, Callable] = { + "ff_date_trunc": sql_date_trunc, + "ff_date_add": sql_date_add, + "ff_safe_cast": sql_safe_cast, + "ff_partition_filter": sql_partition_filter, + "ff_partition_in": sql_partition_in, +} + +# Helpers that are engine-agnostic and can be exposed as-is. +_STD_FUNCS_RAW: Mapping[str, Callable] = { + "normalize_engine": normalize_engine, + "engine_family": engine_family, + "is_engine": is_engine, +} + + +def register_jinja( + env: Environment, + *, + engine_resolver: Callable[[], str | None] | None = None, + engine: str | None = None, +) -> None: + """ + Register all stdlib helpers into a Jinja `Environment`. + + Either pass: + - engine_resolver: a callable returning the current engine key, OR + - engine: a fixed engine key string. + + Example from core: + from fastflowtransform import stdlib as ff_stdlib + ff_stdlib.register_jinja(env, engine_resolver=self._current_engine) + """ + if engine is None and engine_resolver is not None: + try: + engine = engine_resolver() + except Exception: + engine = None + + # normalized current engine (e.g. "duckdb", "bigquery", "postgres", "generic") + engine_key = normalize_engine(engine) + + def _bind_engine(fn: Callable) -> Callable: + """ + Wrap a stdlib function so that Jinja templates automatically get + the current engine injected as a default kwarg: + + {{ ff_date_trunc('day', 'col') }} + + becomes effectively: + + sql_date_trunc('day', 'col', engine=engine_key) + """ + + def wrapper(*args, **kwargs): + kwargs.setdefault("engine", engine_key) + return fn(*args, **kwargs) + + return wrapper + + # Register the engine-bound helpers + for name, fn in _STD_FUNCS_ENGINE_DEFAULT.items(): + env.globals[name] = _bind_engine(fn) + + # Register low-level engine helpers (no auto-engine injection) + for name, fn in _STD_FUNCS_RAW.items(): + env.globals[name] = fn + + # Template-friendly helpers that know the *current* engine: + # {{ ff_engine() }} + # {% if ff_is_engine('bigquery') %} ... {% endif %} + def _ff_engine(default: str | None = None) -> str: + # Prefer the active engine; fall back to a normalized default or "generic" + if engine_key != "generic": + return engine_key + if default is not None: + return normalize_engine(default) + return engine_key + + def _ff_is_engine(*candidates: str) -> bool: + return is_engine(engine_key, *candidates) + + env.globals["ff_engine"] = _ff_engine + env.globals["ff_is_engine"] = _ff_is_engine diff --git a/src/fastflowtransform/stdlib/casts.py b/src/fastflowtransform/stdlib/casts.py new file mode 100644 index 0000000..1330f61 --- /dev/null +++ b/src/fastflowtransform/stdlib/casts.py @@ -0,0 +1,77 @@ +# fastflowtransform/stdlib/casts.py +from __future__ import annotations + +from .engine import normalize_engine + + +def sql_safe_cast( + expr: str, + target_type: str, + *, + default: str | None = None, + engine: str | None = None, +) -> str: + """ + Engine-aware “safe cast” builder. + + Semantics by engine + ------------------- + DuckDB: + TRY_CAST(expr AS type) + BigQuery: + SAFE_CAST(expr AS type) + Spark (3.x+): + TRY_CAST(expr AS type) + Snowflake: + CAST(expr AS type) # TRY_CAST(FLOAT -> NUMBER) is not supported + Postgres / Redshift / Generic: + CAST(expr AS type) + + If `default` is provided, it is treated as a raw SQL snippet and + wrapped via COALESCE(…, default). + """ + eng = normalize_engine(engine) + expr_sql = expr.strip() + raw_type = target_type.strip() + if not expr_sql: + raise ValueError("expr must be a non-empty SQL expression") + if not raw_type: + raise ValueError("target_type must be a non-empty SQL type") + + # Normalize logical numeric/decimal types per engine + norm = raw_type.lower() + if norm in {"numeric", "number", "decimal"}: + if eng == "bigquery": + # BigQuery fixed-precision decimal + type_sql = "NUMERIC" + elif eng in {"duckdb", "postgres", "redshift"}: + type_sql = "NUMERIC" + elif eng == "snowflake": + # Use a concrete NUMBER with scale, but via plain CAST (no TRY_CAST) + type_sql = "NUMBER(38,10)" + else: + type_sql = "NUMERIC" + else: + type_sql = raw_type + + # Engine-specific safe cast core + if eng == "bigquery": + inner = f"SAFE_CAST({expr_sql} AS {type_sql})" + elif eng == "duckdb": + inner = f"try_cast({expr_sql} AS {type_sql})" + elif eng == "spark": + inner = f"TRY_CAST({expr_sql} AS {type_sql})" + elif eng == "snowflake": + # TRY_CAST(FLOAT -> NUMBER(...)) is not allowed, so we use plain CAST + inner = f"CAST({expr_sql} AS {type_sql})" + else: + # Fallback: no TRY_/SAFE_ variant + inner = f"CAST({expr_sql} AS {type_sql})" + + if default is not None: + default_sql = default.strip() + if not default_sql: + return inner + return f"COALESCE({inner}, {default_sql})" + + return inner diff --git a/src/fastflowtransform/stdlib/dates.py b/src/fastflowtransform/stdlib/dates.py new file mode 100644 index 0000000..07f7231 --- /dev/null +++ b/src/fastflowtransform/stdlib/dates.py @@ -0,0 +1,148 @@ +# fastflowtransform/stdlib/dates.py +from __future__ import annotations + +from .engine import normalize_engine + + +def _clean_expr(expr: str) -> str: + """ + Treat `expr` as a raw SQL snippet (column name, expression, etc.) + and strip surrounding whitespace. + """ + return expr.strip() + + +def sql_date_trunc(expr: str, part: str = "day", *, engine: str | None = None) -> str: + """ + Build an engine-aware DATE_TRUNC expression. + + Parameters + ---------- + expr: + SQL expression / column reference, e.g. "order_date" or "CAST(ts AS TIMESTAMP)". + part: + Date part like "day", "month", "year", "week", ... + engine: + Engine key/hint (e.g. "duckdb", "postgres", "bigquery"). + If omitted, "generic" semantics are used. + + Examples (golden SQL) + --------------------- + DuckDB / Postgres / Redshift / Snowflake / Spark: + sql_date_trunc("order_date", "day", engine="duckdb") + -> "date_trunc('day', order_date)" + + BigQuery: + sql_date_trunc("order_date", "day", engine="bigquery") + -> "DATE_TRUNC(order_date, DAY)" + + Generic: + sql_date_trunc("created_at", "month") + -> "date_trunc('month', created_at)" + """ + eng = normalize_engine(engine) + expr_sql = _clean_expr(expr) + part_norm = part.strip().lower() + if not part_norm: + raise ValueError("date part must be a non-empty string") + + # Engines like DuckDB want date_trunc('', ) + if eng in {"duckdb", "postgres", "redshift", "snowflake", "spark", "generic"}: + return f"date_trunc('{part_norm}', CAST({expr_sql} AS TIMESTAMP))" + + if eng == "bigquery": + # DATE_TRUNC(timestamp_expression, date_part) + part_upper = part_norm.upper() + return f"DATE_TRUNC(CAST({expr_sql} AS TIMESTAMP), {part_upper})" + + # Fallback: ANSI-ish + return f"date_trunc('{part_norm}', CAST({expr_sql} AS TIMESTAMP))" + + +def sql_date_add( + expr: str, + part: str, + amount: int, + *, + engine: str | None = None, +) -> str: + """ + Build an engine-aware date / timestamp addition expression. + + Parameters + ---------- + expr: + SQL expression / column reference to add to. + part: + "day", "month", "year", ... (engine-specific support may vary). + amount: + Integer offset (positive or negative). + engine: + Engine key/hint ("duckdb", "postgres", "bigquery", "snowflake", "spark", ...). + + Examples (golden SQL) + --------------------- + DuckDB / Postgres / Redshift / Generic: + sql_date_add("order_date", "day", 3, engine="duckdb") + -> "CAST(order_date AS TIMESTAMP) + INTERVAL '3 day'" + + Snowflake: + sql_date_add("created_at", "month", 1, engine="snowflake") + -> "DATEADD(MONTH, 1, created_at)" + + BigQuery: + sql_date_add("order_date", "day", -7, engine="bigquery") + -> "DATE_ADD(order_date, INTERVAL -7 DAY)" + """ + eng = normalize_engine(engine) + expr_sql = _clean_expr(expr) + part_norm = part.strip().lower() + if not part_norm: + raise ValueError("date part must be a non-empty string") + amt = int(amount) + + if eng in {"duckdb", "postgres", "redshift", "generic"}: + # For these engines we usually want TIMESTAMP + INTERVAL. + # Heuristic: if the expression already contains a cast, don't wrap it again. + lower_expr = expr_sql.lower() + already_cast = ( + "cast(" in lower_expr + or "::timestamp" in lower_expr + or "::timestamptz" in lower_expr + or "::date" in lower_expr + ) + base_expr = expr_sql if already_cast else f"CAST({expr_sql} AS TIMESTAMP)" + return f"{base_expr} + INTERVAL '{amt} {part_norm}'" + + if eng == "spark": + # Spark has DATE_ADD(date, days) for day-precision, but not all parts. + if part_norm == "day": + return f"date_add({expr_sql}, {amt})" + # fall back to ANSI-ish INTERVAL for other parts + return f"{expr_sql} + INTERVAL {amt} {part_norm.upper()}" + + if eng == "snowflake": + part_upper = part_norm.upper() + # Make sure we're not doing VARCHAR + INTERVAL + expr_ts = f"TO_TIMESTAMP({expr_sql})" + return f"DATEADD({part_upper}, {amt}, {expr_ts})" + + if eng == "bigquery": + part_upper = part_norm.upper() + + # If the user already passed a CAST(...) or SAFE_CAST(...), don't double-wrap. + lower_expr = expr_sql.lower().replace(" ", "") + already_casted = lower_expr.startswith("cast(") or lower_expr.startswith("safe_cast(") + + expr_for_bq = expr_sql + if not already_casted: + # Be permissive: coerce to TIMESTAMP so strings like '2025-10-01T12:00:00' + # work out of the box. + expr_for_bq = f"CAST({expr_sql} AS TIMESTAMP)" + + # For dates/timestamps BigQuery supports this signature: + # DATE_ADD(timestamp_expr, INTERVAL amt PART) + return f"DATE_ADD({expr_for_bq}, INTERVAL {amt} {part_upper})" + + # Fallback: ANSI-ish + return f"{expr_sql} + INTERVAL '{amt} {part_norm}'" diff --git a/src/fastflowtransform/stdlib/engine.py b/src/fastflowtransform/stdlib/engine.py new file mode 100644 index 0000000..23fc7c2 --- /dev/null +++ b/src/fastflowtransform/stdlib/engine.py @@ -0,0 +1,77 @@ +# fastflowtransform/stdlib/engine.py +from __future__ import annotations + +from typing import Final + +# Canonical engine keys we care about. This is intentionally small and focused. +# Unknown values will just be normalized to lower-case and treated as-is. +_ENGINE_ALIASES: Final[dict[str, str]] = { + # DuckDB + "duckdb": "duckdb", + # Postgres family + "postgres": "postgres", + "postgresql": "postgres", + "psql": "postgres", + # BigQuery + "bigquery": "bigquery", + "bq": "bigquery", + # Snowflake + "snowflake": "snowflake", + "snowflake_snowpark": "snowflake", + "sf": "snowflake", + # Spark / Databricks + "spark": "spark", + "databricks": "spark", + "databricks_spark": "spark", +} + + +def normalize_engine(engine: str | None) -> str: + """ + Normalize an engine string into a canonical key. + + - None / empty → "generic" + - Known aliases → canonical (e.g. "postgresql" → "postgres") + - Other values → lower-case as-is + + Examples + -------- + >>> normalize_engine("Postgres") + 'postgres' + >>> normalize_engine("databricks_spark") + 'spark' + >>> normalize_engine(None) + 'generic' + """ + if not engine: + return "generic" + key = engine.strip().lower() + if not key: + return "generic" + return _ENGINE_ALIASES.get(key, key) + + +def engine_family(engine: str | None) -> str: + """ + Return a broad engine *family* key. + + For now this is identical to normalize_engine(), but having a separate + function makes it easy to distinguish “exact engine” vs “family” later. + """ + return normalize_engine(engine) + + +def is_engine(engine: str | None, *candidates: str) -> bool: + """ + Convenience helper: check if `engine` matches any of the given candidates. + + Examples + -------- + >>> is_engine("duckdb", "duckdb", "postgres") + True + >>> is_engine("bigquery", "duckdb", "postgres") + False + """ + norm = normalize_engine(engine) + cand_norm = {normalize_engine(c) for c in candidates} + return norm in cand_norm diff --git a/src/fastflowtransform/stdlib/partitions.py b/src/fastflowtransform/stdlib/partitions.py new file mode 100644 index 0000000..8c2b5d8 --- /dev/null +++ b/src/fastflowtransform/stdlib/partitions.py @@ -0,0 +1,110 @@ +# fastflowtransform/stdlib/partitions.py +from __future__ import annotations + +from collections.abc import Iterable +from typing import Any + +from .engine import normalize_engine +from .sql import sql_literal + + +def _lit(value: Any) -> str: + """ + Delegate to core.sql_literal() so Python values become correct SQL literals. + """ + return sql_literal(value) + + +def sql_partition_filter( + column: str, + start: Any | None = None, + end: Any | None = None, + *, + engine: str | None = None, # reserved for future engine-specific tweaks +) -> str: + """ + Build a WHERE predicate for a *range* of partition values. + + Semantics: + - start only → col >= + - end only → col <= + - both → col BETWEEN AND + - neither → "1=1" (no-op filter) + + `start` and `end` are Python values and will be converted with sql_literal(), + so you can safely pass `datetime.date`, `datetime.datetime`, strings, ints, etc. + + Parameters + ---------- + column: + Partition column name / expression, e.g. "ds" or "DATE(event_time)". + start, end: + Python values interpreted as partition bounds. + engine: + Currently unused but accepted so callers can pass it consistently. + + Examples (golden SQL) + --------------------- + Daily date partition: + sql_partition_filter("ds", date(2024, 1, 1), date(2024, 1, 31)) + -> "ds BETWEEN '2024-01-01' AND '2024-01-31'" + + Open interval: + sql_partition_filter("ds", start=date(2024, 1, 1), end=None) + -> "ds >= '2024-01-01'" + """ + _ = normalize_engine(engine) # placeholder for future branching + + col = column.strip() + if not col: + raise ValueError("column must be a non-empty SQL expression") + + if start is None and end is None: + return "1=1" + + conds: list[str] = [] + if start is not None and end is not None: + conds.append(f"{col} BETWEEN {_lit(start)} AND {_lit(end)}") + else: + if start is not None: + conds.append(f"{col} >= {_lit(start)}") + if end is not None: + conds.append(f"{col} <= {_lit(end)}") + + return " AND ".join(conds) + + +def sql_partition_in( + column: str, + values: Iterable[Any], + *, + engine: str | None = None, # reserved for future engine-specific tweaks +) -> str: + """ + Build an IN() predicate for a set of partition values. + + - Empty values → "1=0" (guaranteed false, useful for guard rails). + - Non-empty → col IN (, , ...) + + Examples (golden SQL) + --------------------- + Daily partitions: + sql_partition_in("ds", [date(2024, 1, 1), date(2024, 1, 2)]) + -> "ds IN ('2024-01-01', '2024-01-02')" + + String partitions: + sql_partition_in("region", ["EU", "US"]) + -> "region IN ('EU', 'US')" + """ + _ = normalize_engine(engine) # placeholder for future branching + + col = column.strip() + if not col: + raise ValueError("column must be a non-empty SQL expression") + + vals = list(values or []) + if not vals: + return "1=0" + + literals = ", ".join(_lit(v) for v in vals) + return f"{col} IN ({literals})" diff --git a/src/fastflowtransform/stdlib/sql.py b/src/fastflowtransform/stdlib/sql.py new file mode 100644 index 0000000..36ebebe --- /dev/null +++ b/src/fastflowtransform/stdlib/sql.py @@ -0,0 +1,36 @@ +import json +from typing import Any + +from jinja2.runtime import Undefined as JinjaUndefined + + +def sql_literal(value: Any) -> str: + """ + Convert a Python value into a SQL literal string. + + - None -> "NULL" + - bool -> "TRUE"/"FALSE" + - int/float -> "123" (no quotes) + - str -> quoted with single quotes and escaped + - other -> JSON-dumped and treated as a string literal + """ + if value is None or isinstance(value, JinjaUndefined): + return "NULL" + + if isinstance(value, bool): + return "TRUE" if value else "FALSE" + + if isinstance(value, (int, float)): + return str(value) + + if isinstance(value, str): + # Simple quote-escape for single quotes + escaped = value.replace("'", "''") + return f"'{escaped}'" + + # Fallback: JSON (or str) and quote it + try: + json_text = json.dumps(value, separators=(",", ":"), sort_keys=True) + except TypeError: + json_text = str(value) + return "'" + json_text.replace("'", "''") + "'" diff --git a/tests/unit/cli/test_cli_init_unit.py b/tests/unit/cli/test_cli_init_unit.py index 1abe819..68a83be 100644 --- a/tests/unit/cli/test_cli_init_unit.py +++ b/tests/unit/cli/test_cli_init_unit.py @@ -21,12 +21,24 @@ def test_init_creates_minimal_skeleton(tmp_path: Path): assert result.exit_code == 0, result.output # Core directories exist - for rel in ("models", "seeds", "tests/unit", "docs"): + for rel in ( + "models", + "models/macros", + "models/macros_py", + "seeds", + "tests/unit", + "tests/dq", + "hooks", + "docs", + ): assert (target / rel).is_dir(), f"missing directory {rel}" # Configuration files contain doc references and comments project_yaml = _read(target / "project.yml") assert "docs/Project_Config.md" in project_yaml + assert "hooks:" in project_yaml + assert "models:" in project_yaml + assert "seeds:" in project_yaml assert "tests: []" in project_yaml profiles_yaml = _read(target / "profiles.yml") @@ -36,14 +48,13 @@ def test_init_creates_minimal_skeleton(tmp_path: Path): sources_yaml = _read(target / "sources.yml") assert "docs/Sources.md" in sources_yaml + packages_yaml = _read(target / "packages.yml") + assert "docs/Packages.md" in packages_yaml + readme = _read(target / "README.md") assert "docs/Quickstart.md" in readme - - models_note = _read(target / "models/README.md") - assert "Config_and_Macros.md" in models_note - - tests_note = _read(target / "tests/unit/README.md") - assert "fft utest" in tests_note + assert "packages.yml" in readme + assert "hooks" in readme @pytest.mark.unit diff --git a/tests/unit/stdlib/test_casts_unit.py b/tests/unit/stdlib/test_casts_unit.py new file mode 100644 index 0000000..20cc2e0 --- /dev/null +++ b/tests/unit/stdlib/test_casts_unit.py @@ -0,0 +1,48 @@ +import pytest + +from fastflowtransform.stdlib.casts import sql_safe_cast + + +@pytest.mark.parametrize( + "engine, type_str, expected_inner", + [ + ("bigquery", "numeric", "SAFE_CAST(amount AS NUMERIC)"), + ("bigquery", "NUMERIC", "SAFE_CAST(amount AS NUMERIC)"), + ("duckdb", "numeric", "try_cast(amount AS NUMERIC)"), + ("duckdb", "INTEGER", "try_cast(amount AS INTEGER)"), + ("postgres", "numeric", "CAST(amount AS NUMERIC)"), + ("redshift", "numeric", "CAST(amount AS NUMERIC)"), + ("snowflake", "numeric", "CAST(amount AS NUMBER(38,10))"), + ("spark", "numeric", "TRY_CAST(amount AS NUMERIC)"), + (None, "numeric", "CAST(amount AS NUMERIC)"), + ], +) +@pytest.mark.unit +def test_sql_safe_cast_numeric_normalization(engine, type_str, expected_inner): + sql = sql_safe_cast("amount", type_str, engine=engine) + assert sql == expected_inner + + +@pytest.mark.unit +def test_sql_safe_cast_default_coalesce(): + sql = sql_safe_cast("raw_col", "INTEGER", default="0", engine="duckdb") + assert sql == "COALESCE(try_cast(raw_col AS INTEGER), 0)" + + +@pytest.mark.unit +def test_sql_safe_cast_default_blank_is_ignored(): + sql = sql_safe_cast("raw_col", "INTEGER", default=" ", engine="duckdb") + # same as without default + assert sql == "try_cast(raw_col AS INTEGER)" + + +@pytest.mark.unit +def test_sql_safe_cast_raises_on_empty_expr(): + with pytest.raises(ValueError): + sql_safe_cast(" ", "INTEGER", engine="duckdb") + + +@pytest.mark.unit +def test_sql_safe_cast_raises_on_empty_type(): + with pytest.raises(ValueError): + sql_safe_cast("amount", " ", engine="duckdb") diff --git a/tests/unit/stdlib/test_dates_unit.py b/tests/unit/stdlib/test_dates_unit.py new file mode 100644 index 0000000..5924fd5 --- /dev/null +++ b/tests/unit/stdlib/test_dates_unit.py @@ -0,0 +1,142 @@ +import pytest + +from fastflowtransform.stdlib.dates import sql_date_add, sql_date_trunc + + +@pytest.mark.parametrize( + "engine, expr, part, expected", + [ + ( + "duckdb", + "order_ts", + "day", + "date_trunc('day', CAST(order_ts AS TIMESTAMP))", + ), + ( + "postgres", + "created_at", + "month", + "date_trunc('month', CAST(created_at AS TIMESTAMP))", + ), + ( + "snowflake", + "event_time", + "year", + "date_trunc('year', CAST(event_time AS TIMESTAMP))", + ), + ( + "spark", + "ts", + "day", + "date_trunc('day', CAST(ts AS TIMESTAMP))", + ), + ( + "bigquery", + "order_ts", + "day", + "DATE_TRUNC(CAST(order_ts AS TIMESTAMP), DAY)", + ), + ( + None, # generic + "ts", + "day", + "date_trunc('day', CAST(ts AS TIMESTAMP))", + ), + ], +) +@pytest.mark.unit +def test_sql_date_trunc(engine, expr, part, expected): + assert sql_date_trunc(expr, part, engine=engine) == expected + + +@pytest.mark.unit +def test_sql_date_trunc_default_part_is_day(): + out = sql_date_trunc("order_ts", engine="duckdb") + assert out == "date_trunc('day', CAST(order_ts AS TIMESTAMP))" + + +@pytest.mark.unit +def test_sql_date_trunc_invalid_part_raises(): + with pytest.raises(ValueError): + sql_date_trunc("order_ts", " ", engine="duckdb") + + +@pytest.mark.parametrize( + "engine, expr, part, amount, expected", + [ + ( + "duckdb", + "order_ts", + "day", + 1, + "CAST(order_ts AS TIMESTAMP) + INTERVAL '1 day'", + ), + ( + "duckdb", + "CAST(order_ts AS TIMESTAMP)", + "day", + 3, + # already casted, we don't wrap again + "CAST(order_ts AS TIMESTAMP) + INTERVAL '3 day'", + ), + ( + "postgres", + "created_at", + "month", + -2, + "CAST(created_at AS TIMESTAMP) + INTERVAL '-2 month'", + ), + ( + "spark", + "order_date", + "day", + 7, + "date_add(order_date, 7)", + ), + ( + "spark", + "order_ts", + "month", + 1, + "order_ts + INTERVAL 1 MONTH", + ), + ( + "snowflake", + "order_ts", + "day", + 1, + "DATEADD(DAY, 1, TO_TIMESTAMP(order_ts))", + ), + ( + "bigquery", + "order_ts", + "day", + 1, + "DATE_ADD(CAST(order_ts AS TIMESTAMP), INTERVAL 1 DAY)", + ), + ( + "bigquery", + "CAST(order_ts AS TIMESTAMP)", + "day", + 2, + # already casted -> no extra CAST wrapper + "DATE_ADD(CAST(order_ts AS TIMESTAMP), INTERVAL 2 DAY)", + ), + ( + None, # generic + "ts", + "day", + 1, + "CAST(ts AS TIMESTAMP) + INTERVAL '1 day'", + ), + ], +) +@pytest.mark.unit +def test_sql_date_add(engine, expr, part, amount, expected): + assert sql_date_add(expr, part, amount, engine=engine) == expected + + +@pytest.mark.unit +def test_sql_date_add_invalid_part_raises(): + with pytest.raises(ValueError): + sql_date_add("ts", " ", 1, engine="duckdb") diff --git a/tests/unit/stdlib/test_engine_helpers_unit.py b/tests/unit/stdlib/test_engine_helpers_unit.py new file mode 100644 index 0000000..db63bab --- /dev/null +++ b/tests/unit/stdlib/test_engine_helpers_unit.py @@ -0,0 +1,52 @@ +import pytest + +from fastflowtransform.stdlib.engine import engine_family, is_engine, normalize_engine + + +@pytest.mark.parametrize( + "raw, expected", + [ + (None, "generic"), + ("", "generic"), + (" ", "generic"), + ("duckdb", "duckdb"), + ("DuckDB", "duckdb"), + ("postgres", "postgres"), + ("postgresql", "postgres"), + ("psql", "postgres"), + ("bigquery", "bigquery"), + ("bq", "bigquery"), + ("snowflake", "snowflake"), + ("snowflake_snowpark", "snowflake"), + ("sf", "snowflake"), + ("spark", "spark"), + ("databricks", "spark"), + ("databricks_spark", "spark"), + ("MyCustomEngine", "mycustomengine"), + ], +) +@pytest.mark.unit +def test_normalize_engine(raw, expected): + assert normalize_engine(raw) == expected + + +@pytest.mark.unit +def test_engine_family_alias_for_now(): + # Today engine_family == normalize_engine, but kept as separate API. + assert engine_family("Postgres") == normalize_engine("Postgres") + + +@pytest.mark.parametrize( + "engine, candidates, expected", + [ + ("duckdb", ["duckdb", "postgres"], True), + ("postgresql", ["duckdb", "postgres"], True), + ("bigquery", ["duckdb", "postgres"], False), + ("snowflake_snowpark", ["snowflake"], True), + (None, ["generic"], True), + ("unknown", ["duckdb", "postgres"], False), + ], +) +@pytest.mark.unit +def test_is_engine(engine, candidates, expected): + assert is_engine(engine, *candidates) == expected diff --git a/tests/unit/stdlib/test_partitions_unit.py b/tests/unit/stdlib/test_partitions_unit.py new file mode 100644 index 0000000..93a89c9 --- /dev/null +++ b/tests/unit/stdlib/test_partitions_unit.py @@ -0,0 +1,74 @@ +from datetime import date, datetime + +import pytest + +from fastflowtransform.stdlib.partitions import sql_partition_filter, sql_partition_in + + +@pytest.mark.unit +def test_sql_partition_filter_between_dates(): + s = date(2025, 1, 1) + e = date(2025, 1, 31) + sql = sql_partition_filter("ds", s, e) + assert sql == "ds BETWEEN '2025-01-01' AND '2025-01-31'" + + +@pytest.mark.unit +def test_sql_partition_filter_start_only(): + s = date(2025, 1, 1) + sql = sql_partition_filter("ds", start=s, end=None) + assert sql == "ds >= '2025-01-01'" + + +@pytest.mark.unit +def test_sql_partition_filter_end_only(): + e = date(2025, 1, 31) + sql = sql_partition_filter("ds", start=None, end=e) + assert sql == "ds <= '2025-01-31'" + + +@pytest.mark.unit +def test_sql_partition_filter_no_bounds_is_noop(): + sql = sql_partition_filter("ds", None, None) + assert sql == "1=1" + + +@pytest.mark.unit +def test_sql_partition_filter_raises_on_empty_column(): + with pytest.raises(ValueError): + sql_partition_filter(" ", "2025-01-01", "2025-01-31") + + +@pytest.mark.unit +def test_sql_partition_filter_datetime_literal(): + ts = datetime(2025, 1, 1, 12, 30, 45) + sql = sql_partition_filter("event_ts", ts, None) + # default sql_literal(str(dt)) → "YYYY-MM-DD HH:MM:SS" + assert "event_ts >= '" in sql + assert "2025-01-01" in sql + + +@pytest.mark.unit +def test_sql_partition_in_dates(): + vals = [date(2025, 1, 1), date(2025, 1, 2)] + sql = sql_partition_in("ds", vals) + assert sql == "ds IN ('2025-01-01', '2025-01-02')" + + +@pytest.mark.unit +def test_sql_partition_in_strings(): + vals = ["EU", "US"] + sql = sql_partition_in("region", vals) + assert sql == "region IN ('EU', 'US')" + + +@pytest.mark.unit +def test_sql_partition_in_empty_values_is_false_guard(): + sql = sql_partition_in("ds", []) + assert sql == "1=0" + + +@pytest.mark.unit +def test_sql_partition_in_raises_on_empty_column(): + with pytest.raises(ValueError): + sql_partition_in(" ", ["a", "b"]) diff --git a/tests/unit/stdlib/test_register_jinja_unit.py b/tests/unit/stdlib/test_register_jinja_unit.py new file mode 100644 index 0000000..f75208a --- /dev/null +++ b/tests/unit/stdlib/test_register_jinja_unit.py @@ -0,0 +1,59 @@ +import pytest +from jinja2 import Environment + +from fastflowtransform.stdlib import register_jinja +from fastflowtransform.stdlib.dates import sql_date_trunc +from fastflowtransform.stdlib.engine import normalize_engine + + +@pytest.mark.unit +def test_register_jinja_registers_all_helpers(): + env = Environment() + # Use a fixed engine key here + register_jinja(env, engine="duckdb") + + # engine-aware helpers + for name in [ + "ff_date_trunc", + "ff_date_add", + "ff_safe_cast", + "ff_partition_filter", + "ff_partition_in", + ]: + assert name in env.globals + + # raw helpers (match actual exported names) + for name in ["ff_engine", "engine_family", "ff_is_engine"]: + assert name in env.globals + + +@pytest.mark.unit +def test_register_jinja_binds_engine_for_date_trunc(): + env = Environment() + register_jinja(env, engine="duckdb") + + fn = env.globals["ff_date_trunc"] + # this should behave like sql_date_trunc(..., engine='duckdb') + out = fn("order_ts", "day") + expected = sql_date_trunc("order_ts", "day", engine="duckdb") + assert out == expected + + +@pytest.mark.unit +def test_register_jinja_with_engine_resolver(): + env = Environment() + + def _resolver(): + return "bigquery" + + register_jinja(env, engine_resolver=_resolver) + + fn = env.globals["ff_date_trunc"] + out = fn("order_ts", "day") + # Should use bigquery semantics internally + assert "DATE_TRUNC(" in out + assert "CAST(order_ts AS TIMESTAMP)" in out + + # ff_engine helper should reflect the normalized key + ff_engine = env.globals["ff_engine"] + assert ff_engine("bigquery") == normalize_engine("bigquery") From 6506c4f72da084b0db219ece84d9e22eca1a3572 Mon Sep 17 00:00:00 2001 From: Marko Lekic Date: Mon, 1 Dec 2025 22:37:19 +0100 Subject: [PATCH 2/2] Updated docs for macros stdlibs --- docs/Config_and_Macros.md | 323 +++++++++++++++++++---- docs/examples/Macros_Demo.md | 497 ++++++++++++++++++++++++++--------- 2 files changed, 643 insertions(+), 177 deletions(-) diff --git a/docs/Config_and_Macros.md b/docs/Config_and_Macros.md index 2ceeac7..42608a4 100644 --- a/docs/Config_and_Macros.md +++ b/docs/Config_and_Macros.md @@ -32,6 +32,7 @@ For an operational walkthrough (CLI usage, troubleshooting, pipelines) see the [ - [2. `config()` options](#2-config-options) - [3. Variables with `var()`](#3-variables-with-var) - [4. Template context & helpers](#4-template-context-helpers) + - [4.1 Engine-aware stdlib helpers (`fastflowtransform.stdlib`)](#41-engine-aware-stdlib-helpers-fastflowtransformstdlib) - [5. Macros & reusable Jinja code](#5-macros-reusable-jinja-code) - [6. Materialization semantics](#6-materialization-semantics) - [7. Testing & quality gates](#7-testing-quality-gates) @@ -55,22 +56,23 @@ FastFlowTransform discovers models under `/models/` with two primary fl create or replace table users as select id, email from {{ source('crm', 'users') }}; -``` +```` + ### 1.2 Python models (`*.ff.py`) Use the `@model` decorator from `fastflowtransform.core` to register a callable. The decorator accepts: -- `name` (optional) → overrides the logical name (defaults to stem). -- `deps` → list of dependency nodes (file stems or logical names). -- `requires` → column contract per dependency (validated via `validation.validate_required_columns`). -- `materialized` (optional) → `'table' | 'view' | 'ephemeral'`; mirrors `config(materialized=...)` for SQL. -- `tags` (optional) → convenience for attaching selection labels without writing `meta={"tags": ...}`. +* `name` (optional) → overrides the logical name (defaults to stem). +* `deps` → list of dependency nodes (file stems or logical names). +* `requires` → column contract per dependency (validated via `validation.validate_required_columns`). +* `materialized` (optional) → `'table' | 'view' | 'ephemeral'`; mirrors `config(materialized=...)` for SQL. +* `tags` (optional) → convenience for attaching selection labels without writing `meta={"tags": ...}`. Dependencies determine the call signature: -- Single dependency → function receives a single `pandas.DataFrame`. -- Multiple dependencies → function receives `dict[str, pandas.DataFrame]` keyed by physical relation name (e.g. `"users"`). +* Single dependency → function receives a single `pandas.DataFrame`. +* Multiple dependencies → function receives `dict[str, pandas.DataFrame]` keyed by physical relation name (e.g. `"users"`). ```python # models/users_enriched.ff.py @@ -111,16 +113,17 @@ Allowed values are case-insensitive strings or tuples. If the engine does not ma ### 1.3 Seeds, sources, and dependencies -- Declare external tables in `sources.yml`; they become available via `source('group','table')`. -- Provide reproducible inputs with CSV/Parquet seeds in `/seeds/`. -- FastFlowTransform auto-detects dependencies: - - SQL models → parse `ref()` / `source()` calls. - - Python models → use the decorator’s `deps`. - - Additional runtime dependencies can be expressed via `relation_for()`. +* Declare external tables in `sources.yml`; they become available via `source('group','table')`. +* Provide reproducible inputs with CSV/Parquet seeds in `/seeds/`. +* FastFlowTransform auto-detects dependencies: + + * SQL models → parse `ref()` / `source()` calls. + * Python models → use the decorator’s `deps`. + * Additional runtime dependencies can be expressed via `relation_for()`. > **Warning:** SQL dependency detection is static. Only literal calls such as `ref('users.ff')` are registered. When you need to gate a dependency behind a variable, materialise the options in a mapping (`{'foo': ref('foo'), 'bar': ref('bar')}`) and pick from that map at runtime; a bare `ref(variable)` will not show up in the DAG. -- Persistence (e.g. Spark/Databricks): configure default targets under `project.yml → models.storage` (and optionally `seeds.storage`). Example: +* Persistence (e.g. Spark/Databricks): configure default targets under `project.yml → models.storage` (and optionally `seeds.storage`). Example: ```yaml models: @@ -186,19 +189,19 @@ Call `config()` at the top of SQL models. Python models get the same options via Supported keys: -| Key | Type | Description | -|----------------|-----------------|------------------------------------------------------------------------------| -| `materialized` | `"table" \| "view" \| "ephemeral"` | Controls how FastFlowTransform persists the model. See [Materialization semantics](#6-materialization-semantics). | -| `tags` | `list[str]` | Arbitrary labels surfaced in docs / selection tooling. | -| `engines` | `list[str]` or `str` | Restrict registration to the listed engines (case-insensitive). Requires the active engine to be known (profile selection or `FF_ENGINE`). | -| (future) | – | Additional metadata is stored under `node.meta[...]` if added later. | +| Key | Type | Description | +| -------------- | ---------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------ | +| `materialized` | `"table" \| "view" \| "ephemeral"` | Controls how FastFlowTransform persists the model. See [Materialization semantics](#6-materialization-semantics). | +| `tags` | `list[str]` | Arbitrary labels surfaced in docs / selection tooling. | +| `engines` | `list[str]` or `str` | Restrict registration to the listed engines (case-insensitive). Requires the active engine to be known (profile selection or `FF_ENGINE`). | +| (future) | – | Additional metadata is stored under `node.meta[...]` if added later. | **Tips** -- Place `config()` before any SQL text. -- Use tags to power custom filters in docs or to drive test selection. -- Combine `engines=[...]` with per-engine subfolders to keep one physical file per backend without name clashes. When no engine is active, FastFlowTransform raises a clear error to avoid silent skips. -- Ephemeral models inline into downstream SQL; pick `view` for shareable logic without materializing a table. +* Place `config()` before any SQL text. +* Use tags to power custom filters in docs or to drive test selection. +* Combine `engines=[...]` with per-engine subfolders to keep one physical file per backend without name clashes. When no engine is active, FastFlowTransform raises a clear error to avoid silent skips. +* Ephemeral models inline into downstream SQL; pick `view` for shareable logic without materializing a table. --- @@ -234,13 +237,14 @@ Resolution order: CLI overrides → project vars → default argument. Every model (SQL & Python) gets a rich Jinja context. Key helpers: -| Helper | Purpose | -|--------------------|------------------------------------------------------------------------------------------| -| `this` | Object exposing `name`, `relation`, `materialized`, `schema`, `database`. | -| `ref("model")` | Resolves another model’s physical relation (or inlines ephemeral SQL). | -| `source("group","table")` | Resolves entries defined in `sources.yml`. | -| `relation_for(node)` (Python utility) | Maps logical node names to physical relations (helpful inside UDFs/tests). | -| `var("key", default)` | Retrieves project/CLI variables (see above). | +| Helper | Purpose | +| ------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------- | +| `this` | Object exposing `name`, `relation`, `materialized`, `schema`, `database`. | +| `ref("model")` | Resolves another model’s physical relation (or inlines ephemeral SQL). | +| `source("group","table")` | Resolves entries defined in `sources.yml`. | +| `relation_for(node)` (Python utility) | Maps logical node names to physical relations (helpful inside UDFs/tests). | +| `var("key", default)` | Retrieves project/CLI variables (see above). | +| `engine(default=None)` | Returns the current engine key (`'duckdb'`, `'postgres'`, `'bigquery'`, `'snowflake'`, `'spark'`, …) or the provided `default` when unknown. | Example: @@ -255,6 +259,206 @@ from {{ ref('users.ff') }} as u -- {{ this.relation }} ``` +### 4.1 Engine-aware stdlib helpers (`fastflowtransform.stdlib`) + +FastFlowTransform ships a small “standard library” of **engine-aware SQL helpers** in `fastflowtransform.stdlib`. These are registered into Jinja as `ff_*` helpers: + +```jinja +{{ ff_date_trunc("order_ts", "day") }} +{{ ff_date_add("order_ts", "day", 1) }} +{{ ff_safe_cast("amount", "numeric", default="0") }} +{{ ff_partition_filter("o.order_ts", var("from"), var("to")) }} +{{ ff_partition_in("ds", var("partitions")) }} +``` + +They take care of emitting the right SQL for DuckDB, Postgres, BigQuery, Databricks/Spark, and Snowflake, based on the active engine/profile. + +#### Engine helpers + +| Helper | Description | Example | +| --------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------- | --------------------------------------------------- | +| `ff_engine()` | Returns the **canonical engine key** for the current run (`'duckdb'`, `'postgres'`, `'bigquery'`, `'snowflake'`, `'spark'`, or `'generic'`). | `{{ ff_engine() }}` | +| `ff_engine_family()` | Same as `ff_engine()` today; reserved for coarser groupings in the future. | – | +| `ff_is_engine(*candidates)` | Returns `true` if the current engine matches any of the given candidates (case-insensitive). | `{% if ff_is_engine('bigquery') %} ... {% endif %}` | + +These helpers use the same engine resolution as `config(engines=...)` (primarily `FF_ENGINE` / profile). + +#### Date helpers + +All date helpers accept a **SQL expression string**, not a Python value. You pass them column names or SQL snippets *as strings*: + +```jinja +{{ ff_date_trunc("order_ts", "day") }} -- OK +{{ ff_date_trunc("CAST(order_ts AS TIMESTAMP)", "month") }} +``` + +Under the hood they emit engine-specific SQL (“golden SQL”): + +##### `ff_date_trunc(expr, part="day")` + +Build an engine-aware `DATE_TRUNC` expression. + +| Engine family | Emitted SQL example | +| ---------------------------- | ------------------------------------------------ | +| DuckDB / Postgres / Redshift | `date_trunc('day', CAST(order_ts AS TIMESTAMP))` | +| Snowflake / Spark | `date_trunc('day', CAST(order_ts AS TIMESTAMP))` | +| BigQuery | `DATE_TRUNC(CAST(order_ts AS TIMESTAMP), DAY)` | +| Generic / unknown | `date_trunc('day', CAST(order_ts AS TIMESTAMP))` | + +Usage in a model: + +```sql +select + {{ ff_date_trunc("order_ts", "day") }} as order_day, + cast(order_ts as timestamp) as order_ts +from {{ source('sales', 'orders') }}; +``` + +##### `ff_date_add(expr, part, amount)` + +Build an engine-aware “add N units to a date or timestamp” expression. + +Examples (per engine): + +* DuckDB / Postgres / Redshift: + + ```sql + {{ ff_date_add("order_ts", "day", 1) }} + -- → CAST(order_ts AS TIMESTAMP) + INTERVAL '1 day' + ``` + +* Spark: + + ```sql + {{ ff_date_add("order_date", "day", 7) }} + -- → date_add(order_date, 7) + ``` + +* Snowflake: + + ```sql + {{ ff_date_add("order_ts", "day", 1) }} + -- → DATEADD(DAY, 1, TO_TIMESTAMP(order_ts)) + ``` + +* BigQuery: + + ```sql + {{ ff_date_add("order_ts", "day", 1) }} + -- → DATE_ADD(CAST(order_ts AS TIMESTAMP), INTERVAL 1 DAY) + ``` + +Heuristics: + +* If you already passed a `CAST(...)` / `SAFE_CAST(...)` expression as `expr`, the helper will **not** double-wrap it. +* Otherwise, it will coerce strings to `TIMESTAMP` where needed (BigQuery, Snowflake) to avoid “VARCHAR + INTERVAL” errors. + +#### Safe cast helper + +##### `ff_safe_cast(expr, target_type, default=None)` + +Engine-aware “safe cast” builder. It tries to use `TRY_CAST` / `SAFE_CAST` where available and falls back to `CAST(...)` otherwise, with an optional `COALESCE` default. + +Semantics by engine: + +* DuckDB → `try_cast(expr AS type)` +* BigQuery → `SAFE_CAST(expr AS type)` +* Spark → `TRY_CAST(expr AS type)` +* Snowflake → `CAST(expr AS type)` + (Snowflake rejects `TRY_CAST(FLOAT AS NUMBER(...))` in some cases, so we use plain `CAST` for compatibility.) +* Postgres / Redshift / generic → `CAST(expr AS type)` + +Additionally, for logical numeric/decimal types (`"numeric"`, `"number"`, `"decimal"`), it normalizes the target type per engine: + +* BigQuery → `NUMERIC` +* DuckDB / Postgres / Redshift → `NUMERIC` +* Snowflake → `NUMBER(38,10)` via `CAST(...)` + +Examples: + +```sql +select + {{ ff_safe_cast("amount", "numeric", default="0") }} as amount_safe +from {{ source('sales', 'orders') }}; +``` + +BigQuery: + +```sql +COALESCE(SAFE_CAST(amount AS NUMERIC), 0) +``` + +DuckDB: + +```sql +COALESCE(try_cast(amount AS NUMERIC), 0) +``` + +Snowflake: + +```sql +COALESCE(CAST(amount AS NUMBER(38,10)), 0) +``` + +You can also use engine-specific target types directly: + +```sql +{{ ff_safe_cast("col", "INT64") }} -- BigQuery numeric type +{{ ff_safe_cast("col", "double") }} -- DuckDB/Snowflake/Postgres floating type +``` + +#### Partition helpers + +These work on **Python values** (dates, datetimes, strings, ints, lists) and emit SQL literals using `sql_literal` internally. They’re great for **parameterized partition filters** that stay portable across engines. + +##### `ff_partition_filter(column, start=None, end=None)` + +Build a `WHERE` predicate for a **range** of partition values. + +* `start` only → `col >= ` +* `end` only → `col <= ` +* both → `col BETWEEN AND ` +* neither → `"1=1"` (no-op filter) + +Example: + +```sql +where + {{ ff_partition_filter( + "o.order_ts", + var("from_date", "2025-10-01"), + var("to_date", "2025-10-31") + ) }} +``` + +Rendered SQL (DuckDB/Postgres/BigQuery/etc.): + +```sql +o.order_ts BETWEEN '2025-10-01' AND '2025-10-31' +``` + +##### `ff_partition_in(column, values)` + +Build an `IN(...)` predicate for a set of partition values. + +* Empty values → `"1=0"` (always false). +* Non-empty → `col IN (, , ...)`. + +Example: + +```sql +where + {{ ff_partition_in("ds", var("partitions", ["2025-10-01", "2025-10-02"])) }} +``` + +Rendered: + +```sql +ds IN ('2025-10-01', '2025-10-02') +``` + +You can pass `list[date]`, `list[datetime]`, or plain strings; all are converted to safe SQL literals. + --- ## 5. Macros & reusable Jinja code @@ -285,9 +489,9 @@ from {{ ref('users.ff') }}; **Best practices** -- Keep macros idempotent and side-effect free. -- Group related macros per file (e.g., string utilities, date helpers). -- Document macros with inline comments; FastFlowTransform’s generated docs list each macro with its path. +* Keep macros idempotent and side-effect free. +* Group related macros per file (e.g., string utilities, date helpers). +* Document macros with inline comments; FastFlowTransform’s generated docs list each macro with its path. --- @@ -295,19 +499,19 @@ from {{ ref('users.ff') }}; ### SQL models -| Materialization | Behaviour | -|-----------------|-----------| -| `table` | `CREATE OR REPLACE TABLE … AS ` | +| Materialization | Behaviour | +| --------------- | --------------------------------------------------------------- | +| `table` | `CREATE OR REPLACE TABLE … AS ` | | `ephemeral` | No object is created; downstream `ref()` expands to a subquery. | **Postgres-specific:** FastFlowTransform rewrites the “create or replace” pattern into `DROP TABLE IF EXISTS …; CREATE TABLE … AS …` for compatibility. ### Python models -- Default → materialized as `table`. -- `materialized='view'` produces an engine-specific temporary table first, then creates/overwrites a view that selects from it. -- Ephemeral Python models are not supported. +* Default → materialized as `table`. +* `materialized='view'` produces an engine-specific temporary table first, then creates/overwrites a view that selects from it. +* Ephemeral Python models are not supported. --- @@ -345,9 +549,9 @@ tests: Keep transformation logic honest with small, engine-agnostic specs: -- Place YAML files under `/tests/unit/`. -- Express inputs via inline rows or CSV paths. -- Declare expected output rows plus comparison options (`order_by`, `any_order`, `ignore_columns`, `approx`). +* Place YAML files under `/tests/unit/`. +* Express inputs via inline rows or CSV paths. +* Declare expected output rows plus comparison options (`order_by`, `any_order`, `ignore_columns`, `approx`). ```yaml # tests/unit/users_enriched.yml @@ -383,18 +587,23 @@ See the [Model Unit Tests guide](./Unit_Tests.md) for an exhaustive walkthrough ## 8. Quick cheat sheet -| Task | Snippet / Pointer | -|------|-------------------| -| Set materialization | `{{ config(materialized='view') }}` | -| Add tags | `{{ config(tags=['mart','daily']) }}` | -| Read project variable | `{{ var('run_date', '1970-01-01') }}` | -| Current relation name | `{{ this.relation }}` | -| Reference another model | `{{ ref('users.ff') }}` | -| Reference source | `{{ source('crm','users') }}` | -| Macro definition | `models/macros/*.sql` | -| Guarantee columns (Python) | `@model(..., requires={'users': {'id','email'}})` | -| Data-quality test | `project.yml → tests` + `fft test …` | -| Unit test | `tests/unit/*.yml` + `fft utest …` | +| Task | Snippet / Pointer | +| -------------------------- | --------------------------------------------------------- | +| Set materialization | `{{ config(materialized='view') }}` | +| Add tags | `{{ config(tags=['mart','daily']) }}` | +| Read project variable | `{{ var('run_date', '1970-01-01') }}` | +| Current relation name | `{{ this.relation }}` | +| Reference another model | `{{ ref('users.ff') }}` | +| Reference source | `{{ source('crm','users') }}` | +| Engine check (BigQuery) | `{% if ff_is_engine('bigquery') %} ... {% endif %}` | +| Engine-aware truncate | `{{ ff_date_trunc("order_ts", "day") }}` | +| Engine-aware date add | `{{ ff_date_add("order_ts", "day", 1) }}` | +| Safe numeric cast | `{{ ff_safe_cast("amount", "numeric", default="0") }}` | +| Partition range filter | `{{ ff_partition_filter("ds", var("from"), var("to")) }}` | +| Partition IN filter | `{{ ff_partition_in("ds", var("parts", [])) }}` | +| Guarantee columns (Python) | `@model(..., requires={'users': {'id','email'}})` | +| Data-quality test | `project.yml → tests` + `fft test …` | +| Unit test | `tests/unit/*.yml` + `fft utest …` | --- diff --git a/docs/examples/Macros_Demo.md b/docs/examples/Macros_Demo.md index 7fcb5a8..09e7758 100644 --- a/docs/examples/Macros_Demo.md +++ b/docs/examples/Macros_Demo.md @@ -1,6 +1,19 @@ # Macros Demo -**Goal:** Showcase **SQL Jinja macros** and **Python render-time macros** working together across engines (DuckDB, Postgres, Databricks Spark, BigQuery, Snowflake Snowpark). +**Goal:** Showcase + +- **SQL Jinja macros** (`models/macros/*.sql`) +- **Python render-time macros** (`models/macros_py/*.py`) +- **Engine-aware stdlib helpers** (`fastflowtransform.stdlib.*` wired in as `ff_*` Jinja globals) + +working together across engines: + +- DuckDB +- Postgres +- Databricks Spark +- BigQuery (pandas & BigFrames) +- Snowflake Snowpark + You’ll see reusable SQL helpers, engine-aware SQL generation, and Python functions exposed as Jinja globals/filters. --- @@ -9,12 +22,11 @@ You’ll see reusable SQL helpers, engine-aware SQL generation, and Python funct ```text examples/macros_demo/ - .env + .env.dev_bigquery_bigframes + .env.dev_bigquery_pandas .env.dev_databricks .env.dev_duckdb .env.dev_postgres - .env.dev_bigquery_pandas - .env.dev_bigquery_bigframes .env.dev_snowflake Makefile profiles.yml @@ -23,64 +35,160 @@ examples/macros_demo/ seeds/ seed_users.csv seed_orders.csv - models/ - macros/ - utils.sql - star.sql - macros_py/ - helpers.py - common/ - stg_users.ff.sql - stg_orders.ff.sql - dim_users.ff.sql - fct_user_sales.ff.sql - engines/ - duckdb/ - py_example.ff.py - postgres/ + models/ + macros/ + utils.sql + star.sql + macros_py/ + helpers.py + common/ + stg_users.ff.sql + stg_orders.ff.sql + dim_users.ff.sql + fct_user_sales.ff.sql + fct_user_sales_by_country.ff.sql + fct_user_sales_partitioned.ff.sql + engines/ + duckdb/ + py_example.ff.py + postgres/ + py_example.ff.py + databricks_spark/ + py_example.ff.py + bigquery/ + bigframes/ py_example.ff.py - databricks_spark/ + pandas/ py_example.ff.py - bigquery/ - bigframes/ - py_example.ff.py - pandas/ - py_example.ff.py + snowflake_snowpark/ + py_example.ff.py + tests/ + unit/ + ... ``` +*(Exact engine folders may vary slightly depending on your checkout, but conceptually this is the layout.)* + --- ## What this demo shows -* **SQL Jinja macros** (`models/macros/*.sql`) +### 1. SQL Jinja macros (`models/macros/*.sql`) + +#### `utils.sql` + +High-level helpers used by `stg_users` / `stg_orders`: + +* **`email_domain(expr)`** + Engine-aware extraction of the domain part of an email address. + + * BigQuery: uses `split(...)[SAFE_OFFSET(1)]` + * Other engines: uses `split_part(..., '@', 2)` + +* **`safe_cast_amount(expr)`** + A convenience wrapper around the stdlib **`ff_safe_cast`** to cast arbitrary expressions into a numeric type safely and consistently across engines. + +* **`coalesce_any(expr, default)`** + Tiny convenience macro wrapping `coalesce(...)`. + +* **`default_country()`** + Reads a default country from `project.yml → vars.default_country`. + +#### `star.sql` + +* **`star_except(relation, exclude_cols)`** + Selects all columns of `relation` except a list of exclusions. + + * Uses `adapter_columns(relation)` if the executor can describe the table. + * Falls back to `*` if columns are unknown. + +--- + +### 2. Python macros (`models/macros_py/helpers.py`) + +Render-time Python helpers exposed as Jinja globals/filters: + +* **`slugify(value: str) -> str`** + Simple URL-friendly slug: lower-case, replace non-alphanumerics with `-`, squash duplicates. + +* **`mask_email(email: str) -> str`** + Redacts the local part of the email, e.g. `a@example.com` → `a***@example.com`. + +* **`csv_values(rows: list[dict[str, Any]], cols: list[str]) -> str`** + Renders a `VALUES(...)` list for small inline lookup tables in SQL. + +These run at **render time** (inside the Jinja phase), not as SQL UDFs. + +--- + +### 3. Stdlib helpers (`fastflowtransform.stdlib` → `ff_*` Jinja globals) + +The demo also showcases the built-in, engine-aware SQL stdlib. These are wired into Jinja as globals like `ff_safe_cast(...)`. + +Key helpers used in this demo: + +* **`ff_engine()` / `ff_engine_family()`** + Return the normalized engine name / family (`"duckdb"`, `"postgres"`, `"bigquery"`, `"snowflake"`, `"spark"`, `"generic"`). + +* **`ff_is_engine(*candidates)`** + Convenience predicate to branch on engine in Jinja: + + ```jinja + {% if ff_is_engine('bigquery') %} + ... + {% endif %} + ``` + +* **`ff_safe_cast(expr, target_type, default=None)`** + Engine-aware safe casting, used in this demo to safely cast `amount` to a numeric type: + + * DuckDB: `try_cast(expr AS NUMERIC)` + * BigQuery: `SAFE_CAST(expr AS NUMERIC)` + * Spark: `TRY_CAST(expr AS NUMERIC)` + * Snowflake / Postgres / others: `CAST(expr AS NUMERIC)` + * If `default` is given → wrapped as `COALESCE(..., default)`. + +* **`ff_date_trunc(expr, part="day")`** + Engine-aware `DATE_TRUNC`, used to derive `order_day` from `order_ts`: + + * DuckDB/Postgres/Snowflake/Spark: `date_trunc('', CAST(expr AS TIMESTAMP))` + * BigQuery: `DATE_TRUNC(CAST(expr AS TIMESTAMP), PART)` - * `email_domain(expr)` – derive email domain - * `safe_cast_amount(expr)` – engine-aware numeric cast - * `coalesce_any(expr, default)` – small convenience - * `default_country()` – pull a default from `project.yml → vars` - * `star_except(relation, exclude_cols)` – select all except listed columns (falls back to `*` if columns unknown) -* **Python macros** (`models/macros_py/helpers.py`) +* **`ff_date_add(expr, part, amount)`** + Engine-aware date/timestamp arithmetic, used to get `order_ts_plus_1d`: - * `slugify(str)` – URL-friendly slug - * `mask_email(email)` – redact local part - * `csv_values(rows, cols)` – inline small lookup tables via SQL `VALUES(...)` -* **Usage from models** + * DuckDB/Postgres: `CAST(expr AS TIMESTAMP) + INTERVAL 'amount part'` + * Snowflake: `DATEADD(PART, amount, TO_TIMESTAMP(expr))` + * BigQuery: `DATE_ADD(CAST(expr AS TIMESTAMP), INTERVAL amount PART)` + * Spark: `date_add(expr, amount)` when `part == "day"`, else falls back to `INTERVAL`. - * `stg_users` uses SQL + Python macros at render time - * `stg_orders` uses engine-aware casting - * `dim_users` builds a tiny inline lookup via `csv_values(...)` - * `fct_user_sales` aggregates across staged models +* **`ff_partition_filter(column, start, end)`** + Builds a range predicate for partitions; demo uses it in `fct_user_sales_partitioned`: + + * `start` & `end`: `column BETWEEN AND ` + * Only `start`: `column >= ` + * Only `end`: `column <= ` + * Both `None`: `1=1` (no-op) + +* **`ff_partition_in(column, values)`** + Builds an `IN (...)` predicate from Python values (`date`, `datetime`, strings, ints, etc.) via the stdlib’s `sql_literal`: + + * Empty values → `1=0` (safe “match nothing” guard). + * Non-empty → `column IN (, , ...)`. + +Even if only some of these are used in the demo models, they are all available to your own models and macros. --- ## Prerequisites -* A working FFT installation (CLI `fft` available) -* For Postgres/Databricks: valid local env and drivers -* The core must expose these Jinja globals (already done in the FFT core): +* A working FastFlowTransform installation (CLI `fft` available). +* For Postgres / Databricks / BigQuery / Snowflake: drivers and credentials configured via the `.env.dev_*` files. +* The FFT core already wires Jinja with: - * `var(name, default)`, `env(name, default)`, `engine(default)` - (Used by profiles/macros to read vars and detect engine.) + * `var(name, default)` + * `env(name, default)` + * `engine(default)` (legacy in macros; new code uses `ff_engine` / `ff_is_engine` from stdlib). --- @@ -88,16 +196,16 @@ examples/macros_demo/ Two tiny CSVs materialized via `fft seed`: -* `seed_users.csv` — `id,email,country` -* `seed_orders.csv` — `order_id,customer_id,amount,order_ts` +* `seeds/seed_users.csv` — `id,email,country` +* `seeds/seed_orders.csv` — `order_id,customer_id,amount,order_ts` -`profiles.yml` and `project.yml` give minimal storage and connection configs. +`profiles.yml` and `project.yml` provide minimal connection config; `.env.dev_*` files bind environment variables like `FF_DUCKDB_PATH`, `FF_PG_DSN`, `FF_BQ_PROJECT`, `FF_SF_*`, etc. --- ## How to run -From repo root: +From the repo root: ```bash cd examples/macros_demo @@ -114,15 +222,15 @@ make ENGINE=bigquery BQ_FRAME=pandas demo # or bigframes make ENGINE=snowflake_snowpark demo ``` -The `demo` target runs: +The `demo` target: -1. `fft seed` — loads CSVs -2. `fft run` — builds models using macros -3. `fft dag --html` — writes DAG HTML to `site/dag/index.html` -4. `fft test` — runs example tests -5. Prints artifact paths and tries to open the DAG +1. **`fft seed`** — loads the CSV seeds. +2. **`fft run`** — builds all tagged models using macros & stdlib. +3. **`fft dag --html`** — renders a DAG HTML to `site/dag/index.html`. +4. **`fft test`** — executes example tests from `project.yml`. +5. Prints artifact paths and (if possible) opens the DAG in your browser. -> For Snowflake, copy `.env.dev_snowflake` to `.env`, fill in the `FF_SF_*` values, and install `fastflowtransform[snowflake]` so the Snowpark executor is available. +> For Snowflake, copy `.env.dev_snowflake` to `.env` or export the `FF_SF_*` variables yourself, and install `fastflowtransform[snowflake]` so the Snowpark executor is available. --- @@ -130,66 +238,80 @@ The `demo` target runs: ### SQL macros – `models/macros/utils.sql` +Conceptually: + ```jinja +{# Engine-aware email domain extraction using stdlib helpers #} {%- macro email_domain(expr) -%} - lower(split_part({{ expr }}, '@', 2)) -{%- endmacro -%} - + {%- if ff_is_engine('bigquery') -%} + lower(split({{ expr }}, '@')[SAFE_OFFSET(1)]) + {%- else -%} + lower(split_part({{ expr }}, '@', 2)) + {%- endif -%} +{%- endmacro %} + +{# Convenience wrapper on top of ff_safe_cast #} {%- macro safe_cast_amount(expr) -%} -{%- set e = engine('duckdb') -%} -{%- if e in ['duckdb', 'postgres', 'databricks_spark'] -%} - cast({{ expr }} as double) -{%- else -%} - cast({{ expr }} as double) -{%- endif -%} -{%- endmacro -%} + {{ ff_safe_cast(expr, "numeric", default="0") }} +{%- endmacro %} {%- macro coalesce_any(expr, default) -%} coalesce({{ expr }}, {{ default }}) -{%- endmacro -%} +{%- endmacro %} {%- macro default_country() -%} '{{ var("default_country", "DE") }}' -{%- endmacro -%} +{%- endmacro %} ``` +> Exact implementation may differ slightly in your tree, but the *idea* is: +> +> * Use stdlib (`ff_safe_cast`, `ff_is_engine`) for the heavy lifting. +> * Keep project macros thin and readable. + ### SQL macros – `models/macros/star.sql` ```jinja +{# Select * except some columns. Works across engines. #} {%- macro star_except(relation, exclude_cols) -%} -{%- set excl = exclude_cols | map('lower') | list -%} -{%- set cols = adapter_columns(relation) -%} -{%- if cols and cols|length > 0 -%} - {{- (cols | reject('in', excl) | map('string') | join(', ')) -}} -{%- else -%} - * -{%- endif -%} -{%- endmacro -%} -``` - -> Note: If the executor can’t describe columns for `relation`, this macro falls back to `*`. - -### Python macros – `models/macros_py/helpers.py` - -```python -def slugify(value: str) -> str: ... -def mask_email(email: str) -> str: ... -def csv_values(rows: list[dict], cols: list[str]) -> str: ... + {%- set excl = exclude_cols | map('lower') | list -%} + {%- set cols = adapter_columns(relation) -%} + {# adapter_columns is provided by FFT executors' catalog/describe (if available). + To keep demo simple, fall back to literal star if unknown. #} + {%- if cols and cols|length > 0 -%} + {{- (cols | reject('in', excl) | map('string') | join(', ')) -}} + {%- else -%} + * + {%- endif -%} +{%- endmacro %} ``` -Exposed as Jinja globals/filters at **render time** (not runtime SQL UDFs). - --- -## Models using macros +## Models using macros & stdlib + +### `stg_users.ff.sql` — SQL + Python macros -### `stg_users.ff.sql` (Jinja + Python macro usage) +* Uses `coalesce_any(...)` + `default_country()` to fill missing countries. +* Uses `email_domain(...)` for engine-aware domain extraction. +* Injects a Python macro result via `slugify(var("site_name", "My Site"))`. -* Coalesces missing country with `default_country()` -* Adds `email_domain(...)` -* Embeds a `slugify(var('site_name', ...))` literal into SQL +Conceptually: ```jinja +{{ config( + materialized='view', + tags=[ + 'example:macros_demo', + 'scope:common', + 'engine:duckdb', + 'engine:postgres', + 'engine:databricks_spark', + 'engine:bigquery', + 'engine:snowflake_snowpark' + ] +) }} + with src as ( select cast(id as int) as user_id, @@ -202,49 +324,176 @@ select email, {{ email_domain("email") }} as email_domain, country, + -- Render-time Python macro usage (literal in SQL) '{{ slugify(var("site_name", "My Site")) }}' as site_slug from src; ``` -### `stg_orders.ff.sql` (engine-aware types) +### `stg_orders.ff.sql` — stdlib date & cast helpers + +This model shows: + +* `safe_cast_amount(...)` and `ff_safe_cast(...)` for robust numeric casting. +* `ff_date_trunc(...)` for engine-aware `DATE_TRUNC`. +* `ff_date_add(...)` for portable “+ 1 day” logic. + +Conceptually: ```jinja +{{ config( + materialized='view', + tags=[ + 'example:macros_demo', + 'scope:common', + 'engine:duckdb', + 'engine:postgres', + 'engine:databricks_spark', + 'engine:bigquery', + 'engine:snowflake_snowpark' + ] +) }} + select cast(order_id as int) as order_id, cast(customer_id as int) as user_id, {{ safe_cast_amount("amount") }} as amount, + {{ ff_safe_cast("amount", "numeric", default="0") }} as amount_safe, + {{ ff_date_trunc("order_ts", "day") }} as order_day, + {{ ff_date_add("order_ts", "day", 1) }} as order_ts_plus_1d, cast(order_ts as timestamp) as order_ts from {{ source('sales', 'orders') }}; ``` -### `dim_users.ff.sql` (inline lookup via Python macro) +### `dim_users.ff.sql` — inline lookup (can use Python or stdlib) + +The demo joins staged users with a tiny lookup table that maps domains to labels (e.g. `"example.com" → "internal"`). In your current project this may be expressed either: + +* via a Python macro (`csv_values(...)` → `VALUES(...)`), or +* via an explicit SQL snippet using literal `VALUES`. + +Either way, it shows how to generate small lookups at render time and join them in the model. + +### `fct_user_sales.ff.sql` — base fact table + +Joins `stg_orders` with `dim_users` and aggregates: + +* `order_count` +* `total_amount` +* `first_order_ts` +* `last_order_ts` + +This is the “plain” fact model without partitioning. + +### `fct_user_sales_by_country.ff.sql` — grouping example + +Demonstrates using the staged and dim models to build a simple grouped fact table, usually aggregating at `(country, user_segment)` or similar. + +### `fct_user_sales_partitioned.ff.sql` — partition range filter with stdlib + +This model demonstrates **`ff_partition_filter`** in action. It filters a date/timestamp column with a portable predicate built from variables: ```jinja -labels as ( - select * from (values {{ csv_values( - [ - {"domain":"example.com", "label":"internal"}, - {"domain":"gmail.com", "label":"consumer"}, - ], - ["domain","label"] - ) }}) as t(domain, label) +{{ config( + materialized='table', + tags=[ + 'example:macros_demo', + 'scope:common', + 'engine:duckdb', + 'engine:postgres', + 'engine:databricks_spark', + 'engine:bigquery', + 'engine:snowflake_snowpark' + ] +) }} + +with sales as ( + select + u.user_id, + u.user_segment, + o.order_ts, + o.amount + from {{ ref('stg_orders.ff') }} as o + join {{ ref('dim_users.ff') }} as u + on u.user_id = o.user_id + where + -- demo: engine-aware partition predicate using stdlib + {{ ff_partition_filter( + 'o.order_ts', + var('from_date', '2025-10-01'), + var('to_date', '2025-10-31') + ) }} ) +select + user_id, + user_segment, + count(*) as order_count, + sum(amount) as total_amount +from sales +group by user_id, user_segment; ``` -### `fct_user_sales.ff.sql` (final aggregation) - -Joins `stg_orders` with `dim_users` and aggregates. +You can use the same pattern with **`ff_partition_in`** if you prefer a discrete partition list driven by `var("partitions", [...])`. --- ## Tests (examples) -Declared in `project.yml`: - -* `not_null(dim_users.user_id)` -* `row_count_between(fct_user_sales, min_rows=1)` +In `project.yml` you’ll see example tests like: + +```yaml +tests: + - type: not_null + table: dim_users + column: user_id + tags: [batch] + + - type: row_count_between + table: fct_user_sales + min_rows: 1 + tags: [batch] + + - type: not_null + table: fct_user_sales_by_country + column: user_id + tags: [batch] + + - type: row_count_between + table: fct_user_sales_by_country + min_rows: 1 + tags: [batch] + + - type: not_null + table: fct_user_sales_partitioned + column: user_id + tags: [batch] + + - type: row_count_between + table: fct_user_sales_partitioned + min_rows: 1 + tags: [batch] + + - type: not_null + table: stg_orders + column: user_id + tags: [batch] + + - type: row_count_between + table: stg_orders + min_rows: 1 + tags: [batch] + + - type: not_null + table: stg_users + column: user_id + tags: [batch] + + - type: row_count_between + table: stg_users + min_rows: 1 + tags: [batch] +``` -Run with: +Run them using the appropriate profile, e.g.: ```bash fft test examples/macros_demo --env dev_duckdb --select tag:example:macros_demo @@ -255,24 +504,32 @@ fft test examples/macros_demo --env dev_duckdb --select tag:example:macros_demo ## Troubleshooting * **`jinja2.exceptions.UndefinedError: 'var'/'env'/'engine' is undefined`** - Ensure your core’s Jinja environment registers these globals before loading templates: + + Ensure your FFT core registers these Jinja globals before rendering models: ```python env.globals.update(var=..., env=..., engine=...) ``` -* **Engine differences (types & functions):** - Always branch in macros (`engine(...)`) when types or functions differ. -* **`adapter_columns(...)` returns none:** - The `star_except` macro will fallback to `*`. For strict behavior, replace with static column lists per engine. + + and that `fastflowtransform.stdlib.register_jinja(...)` is called to inject `ff_*` helpers. + +* **Engine differences (types & functions)** + + Use `ff_engine()` / `ff_is_engine(...)` or your own macros to branch on engine where syntax differs (e.g. `split_part` vs `split()[SAFE_OFFSET(...)]`, `SAFE_CAST` vs `TRY_CAST`). + +* **`adapter_columns(...)` returns None** + + In that case `star_except` falls back to `*`. If you need strict column lists for some engines, replace that macro with explicit column sets or configure your executor to provide catalog metadata. --- ## Extending this demo -* Add more helpers to `helpers.py` (e.g., `render_json(obj)`, `join_csv(list)`). -* Create reusable macro libraries under `models/macros/` (date handling, SCD helpers, etc.). -* Use `var(...)` to parameterize behavior per environment or profile. +* Add more helpers to `helpers.py` (e.g. JSON formatting, list formatting). +* Create more engine-aware macros for date handling or SCDs, potentially layered on top of stdlib (`ff_date_trunc`, `ff_date_add`). +* Add new models that use `ff_partition_in` or more elaborate `ff_safe_cast` combinations. +* Use `var(...)` to parameterize from/to dates, partition lists, or feature flags per environment. --- -Happy macro-ing! +Happy macro-ing 🚀