diff --git a/warehouse/__init__.py b/warehouse/__init__.py new file mode 100644 index 0000000..00cbaf1 --- /dev/null +++ b/warehouse/__init__.py @@ -0,0 +1,24 @@ +""" +Snowflake-Compatible Data Warehouse Package +Provides ORM-based access to live Snowflake data for fast querying and LLM integration. +""" + +from .db_setup import HybridDataWarehouse, YearlongDataWarehouse, create_session +from .models import Base, Metric, Dimension, Fact, FactDimension +# Import query functions for easy access +from .query_wrapper import create_session as query_create_session, query_facts, convert_jargons + +__version__ = "2.1.0" # Updated with proper star schema design +__all__ = [ + 'HybridDataWarehouse', + 'YearlongDataWarehouse', # Backward compatibility alias + 'create_session', + 'query_create_session', + 'query_facts', + 'convert_jargons', + 'Base', + 'Metric', + 'Dimension', + 'Fact', + 'FactDimension' +] diff --git a/warehouse/data_ingestion.py b/warehouse/data_ingestion.py new file mode 100644 index 0000000..b36595f --- /dev/null +++ b/warehouse/data_ingestion.py @@ -0,0 +1,22 @@ +""" +Data Ingestion Module for Year-long Data Warehouse +Processes and loads data from various sources into the SQLite warehouse. +""" + +import pandas as pd +import dask.dataframe as dd +import os +import json +import sys +from datetime import datetime, date +from typing import List, Dict, Any, Optional +from .db_setup import YearlongDataWarehouse +import logging + +# Add parent directory to path for module imports +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +class DataIngestionManager: diff --git a/warehouse/db_setup.py b/warehouse/db_setup.py new file mode 100644 index 0000000..7362c0f --- /dev/null +++ b/warehouse/db_setup.py @@ -0,0 +1,364 @@ +""" +Hybrid ORM Data Warehouse +Supports both SQLite (development) and Snowflake (production) using SQLAlchemy ORM. +""" + +import os +import json +import logging +from typing import Dict, Any, Optional, List +from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, Text, Index, UniqueConstraint, text +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import sessionmaker, Session +from sqlalchemy.sql import func + +from .models import Base, Metric, Dimension, Fact, FactDimension + +logger = logging.getLogger(__name__) + +class HybridDataWarehouse: + """ + ORM-based data warehouse supporting both SQLite and Snowflake. + + Automatically detects database type based on connection string or environment variables: + - SQLite: For development/testing (uses local .db files) + - Snowflake: For production (queries live enterprise data) + """ + + def __init__(self, connection_string: Optional[str] = None): + """ + Initialize warehouse connection for SQLite or Snowflake. + + Args: + connection_string: Database connection string. If None, auto-detects based on environment. + SQLite: Path ending in '.db' or 'sqlite:///' + Snowflake: 'snowflake://' URL or uses SNOWFLAKE_* env vars + """ + self.db_type = "sqlite" # Default to SQLite for development + + if connection_string is None: + # Auto-detect based on environment variables + snowflake_account = os.getenv('SNOWFLAKE_ACCOUNT') + snowflake_user = os.getenv('SNOWFLAKE_USER') + snowflake_password = os.getenv('SNOWFLAKE_PASSWORD') + + if all([snowflake_account, snowflake_user, snowflake_password]): + # Snowflake credentials found - use Snowflake + self.db_type = "snowflake" + warehouse_env = os.getenv('SNOWFLAKE_WAREHOUSE', 'default_warehouse') + database = os.getenv('SNOWFLAKE_DATABASE', 'default_database') + schema = os.getenv('SNOWFLAKE_SCHEMA', 'default_schema') + connection_string = f"snowflake://{snowflake_user}:{snowflake_password}@{snowflake_account}/{database}/{schema}?warehouse={warehouse_env}" + else: + # No Snowflake credentials - use SQLite + self.db_type = "sqlite" + connection_string = "sqlite:///yearlong_warehouse.db" + + elif connection_string.startswith('snowflake://'): + self.db_type = "snowflake" + elif connection_string.endswith('.db'): + # Convert bare .db path to proper SQLAlchemy URL with absolute path + self.db_type = "sqlite" + abs_path = os.path.abspath(connection_string) + connection_string = f"sqlite:///{abs_path}" + elif 'sqlite://' in connection_string: + self.db_type = "sqlite" + + try: + logger.info(f"Initializing {self.db_type.upper()} data warehouse...") + self.engine = create_engine(connection_string, echo=False) + self.SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=self.engine) + + # Test connection + if self.db_type == "snowflake": + with self.engine.connect() as conn: + conn.execute(text("SELECT 1")) + # SQLite connection is automatically tested by engine creation + + # Create tables if they don't exist + if not self._tables_exist(): + logger.info(f"Creating tables in {self.db_type.upper()}...") + Base.metadata.create_all(bind=self.engine) + + # Initialize default metrics if database is empty + self._initialize_default_metrics() + + logger.info(f"Connected to {self.db_type.upper()} data warehouse") + + except Exception as e: + # Clean up any partially initialized state + self.engine = None + self.SessionLocal = None + raise ValueError(f"Failed to connect to {self.db_type.upper()}: {e}") + + def _tables_exist(self) -> bool: + """Check if required tables exist in the database.""" + if self.engine is None: + return False + try: + # Get table names using database-specific methods + if self.db_type == "snowflake": + with self.engine.connect() as conn: + result = conn.execute(text("SHOW TABLES")) + existing_tables = {row[1] for row in result} # table names are in second column + else: # SQLite + with self.engine.connect() as conn: + result = conn.execute(text("SELECT name FROM sqlite_master WHERE type='table'")) + existing_tables = {row[0] for row in result} # table names are in first column + + required_tables = {'metrics', 'dimensions', 'facts', 'fact_dimensions'} + return required_tables.issubset(existing_tables) + except Exception: + return False + + def _initialize_default_metrics(self): + """Initialize default metrics in the database during setup.""" + with self.get_session() as session: + try: + # Check if metrics already exist + existing_count = session.query(Metric).count() + if existing_count > 0: + return + + # Initialize default metrics + default_metrics = [ + (1, "Total Sales", "Total sales revenue in dollars", "numeric"), + (2, "Total Quantity", "Total quantity sold", "numeric"), + (3, "Total Returns", "Total return transactions", "numeric"), + (4, "Return Rate", "Percentage of returns", "percentage"), + (5, "Net Sales", "Sales after returns", "numeric"), + (6, "Email Opens", "Number of email opens", "numeric"), + (7, "Email Clicks", "Number of email clicks", "numeric"), + (8, "Social Media Engagement", "Social media engagement metrics", "numeric"), + (9, "Social Media Followers", "Total social media followers", "numeric") + ] + + for metric_id, name, desc, data_type in default_metrics: + metric = Metric( + metric_id=metric_id, + metric_name=name, + metric_desc=desc, + data_type=data_type + ) + session.add(metric) + + session.commit() + logger.info("Initialized default metrics in database") + + except Exception as e: + session.rollback() + logger.error(f"Failed to initialize default metrics: {e}") + + def get_session(self) -> Session: + """Get a new database session.""" + if self.SessionLocal is None: + raise RuntimeError("Database connection not established. Unable to create session.") + return self.SessionLocal() + + def get_dimension_id(self, dimension_type: str, dimension_code: str, + dimension_name: str, metadata: Optional[Dict[str, Any]] = None) -> Optional[int]: + """ + Get dimension ID by type and code. Creates dimension if it doesn't exist. + + Note: In production with live Snowflake data, dimensions should already exist. + This method is primarily for development/testing. + """ + with self.get_session() as session: + # Try to find existing dimension + dimension = session.query(Dimension).filter( + Dimension.dimension_type == dimension_type, + Dimension.dimension_code == dimension_code + ).first() + + if dimension: + return dimension.id # type: ignore[attr-defined] + + # Create new dimension (development/testing only) + metadata_str = json.dumps(metadata) if metadata else None + new_dimension = Dimension( + dimension_type=dimension_type, + dimension_code=dimension_code, + dimension_name=dimension_name, + dimension_metadata=metadata_str + ) + + try: + session.add(new_dimension) + session.commit() + session.refresh(new_dimension) + logger.info(f"Created new dimension: {dimension_code}") + return new_dimension.id # type: ignore[attr-defined] + except Exception as e: + session.rollback() + logger.error(f"Failed to create dimension: {e}") + return None + + def insert_fact(self, metric_id: int, dimension_ids: List[int], period_type: int, + period_start: str, period_end: str, period_key: str, value: float, count: int) -> bool: + """ + Insert a fact record with proper many-to-many relationships. + + Note: In production, facts are typically loaded via ETL pipelines. + This method is primarily for development/testing. + """ + with self.get_session() as session: + try: + # Create the fact record + fact = Fact( + metric_id=metric_id, + period_type=period_type, + period_start=period_start, + period_end=period_end, + period_key=period_key, + value=value, + count=count + ) + + session.add(fact) + session.flush() # Get the fact ID without committing + + # Create many-to-many relationships + for dimension_id in dimension_ids: + fact_dimension = FactDimension( + fact_id=fact.id, + dimension_id=dimension_id + ) + session.add(fact_dimension) + + session.commit() + logger.debug(f"Inserted fact: metric_id={metric_id}, value={value}, dimensions={dimension_ids}") + return True + + except Exception as e: + session.rollback() + logger.error(f"Failed to insert fact: {e}") + return False + + def query_facts(self, session: Session, metric_ids: List[int], group_names: List[str], + period_levels: List[int], exact_date: str) -> Dict[str, Any]: + """ + Query facts table for aggregated data using proper star schema relationships. + + This method queries live data using SQLAlchemy ORM relationships. + """ + try: + # Build query using proper ORM relationships + query = session.query( + Fact, + Metric, + Dimension + ).join(Metric, Fact.metric_id == Metric.metric_id)\ + .join(FactDimension, Fact.id == FactDimension.fact_id)\ + .join(Dimension, FactDimension.dimension_id == Dimension.id) + + # Apply filters + query = query.filter( + Fact.metric_id.in_(metric_ids), + Fact.period_type.in_(period_levels), + Fact.period_start <= exact_date, + Fact.period_end >= exact_date + ) + + # Filter by dimension codes if specified + if group_names: + query = query.filter( + Dimension.dimension_code.in_(group_names), + Dimension.dimension_type == 'site' + ) + + results = query.all() + + # Format results to match existing API structure + formatted_results = {} + for fact, metric, dimension in results: + # Parse dimension metadata + dimension_details = { + 'site_id': dimension.dimension_code, + 'site_name': dimension.dimension_name, + } + if dimension.dimension_metadata: + try: + dimension_details.update(json.loads(dimension.dimension_metadata)) + except: + pass + + # Group by metric for the expected format + metric_key = str(fact.metric_id) + if metric_key not in formatted_results: + formatted_results[metric_key] = [] + + # Check if we already have this fact to avoid duplicates + existing_fact = next( + (f for f in formatted_results[metric_key] + if f['period_key'] == fact.period_key and f['site_id'] == dimension_details['site_id']), + None + ) + + if existing_fact: + # Update existing fact with additional dimension info if needed + continue + else: + # Add new formatted result + formatted_results[metric_key].append({ + 'metric_id': fact.metric_id, + 'metric_name': metric.metric_name, + 'metric_description': metric.metric_desc or '', + 'site_id': dimension_details.get('site_id', ''), + 'site_name': dimension_details.get('site_name', ''), + 'command_name': dimension_details.get('command_name', ''), + 'store_format': dimension_details.get('store_format', ''), + 'period_key': fact.period_key, + 'value': fact.value, + 'count': fact.count + }) + + return {'result': formatted_results} + + except Exception as e: + logger.error(f"Query failed: {e}") + import traceback + logger.error(f"Traceback: {traceback.format_exc()}") + return {'result': {}} + + def close(self): + """Close database connections.""" + if self.engine: + self.engine.dispose() + logger.info("Snowflake connections closed") + + def __del__(self): + """Ensure connections are closed.""" + self.close() + + +# Backward compatibility: aliases for existing code +SnowflakeDataWarehouse = HybridDataWarehouse # For Snowflake-specific usage +YearlongDataWarehouse = HybridDataWarehouse # For existing code + + +def create_session(connection_string: Optional[str] = None) -> Session: + """ + Create a database session for the warehouse. + + Args: + connection_string: Optional connection string. If None, auto-detects database type. + + Returns: + SQLAlchemy session for database operations + """ + warehouse = HybridDataWarehouse(connection_string) + return warehouse.get_session() + + +if __name__ == '__main__': + # Test the connection (auto-detects database type) + try: + warehouse = HybridDataWarehouse() + print(f"✅ {warehouse.db_type.upper()} data warehouse connection successful!") + warehouse.close() + except Exception as e: + print(f"❌ Failed to connect to data warehouse: {e}") + if "SNOWFLAKE" in str(e): + print("For Snowflake access, set SNOWFLAKE_* environment variables") + else: + print("For SQLite access, ensure the database file is accessible") diff --git a/warehouse/models.py b/warehouse/models.py new file mode 100644 index 0000000..258e2f1 --- /dev/null +++ b/warehouse/models.py @@ -0,0 +1,92 @@ +""" +SQLAlchemy ORM Models for Snowflake Data Warehouse +Defines the metrics, dimensions, and facts tables for live data querying. +""" + +from sqlalchemy import Column, Integer, String, Float, DateTime, Text, Index, UniqueConstraint, ForeignKey +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import relationship +from sqlalchemy.sql import func + +Base = declarative_base() + +class Metric(Base): + """Metrics table: Defines available metrics (sales, engagement, etc.)""" + __tablename__ = 'metrics' + + id = Column(Integer, primary_key=True, autoincrement=True) + metric_id = Column(Integer, nullable=False, unique=True, index=True) + metric_name = Column(String(255), nullable=False) + metric_desc = Column(Text) + data_type = Column(String(50), nullable=False) # 'numeric', 'percentage', 'count' + created_at = Column(DateTime(timezone=True), server_default=func.now()) + + # Relationship to facts + facts = relationship("Fact", backref="metric") + + def __repr__(self): + return f"" + +class Dimension(Base): + """Dimensions table: Stores dimensional data (sites, commands, etc.)""" + __tablename__ = 'dimensions' + + id = Column(Integer, primary_key=True, autoincrement=True) + dimension_type = Column(String(100), nullable=False, index=True) # 'site', 'command', etc. + dimension_code = Column(String(100), nullable=False, index=True) + dimension_name = Column(String(255), nullable=False) + dimension_metadata = Column(Text) # JSON string for additional attributes + created_at = Column(DateTime(timezone=True), server_default=func.now()) + + __table_args__ = ( + UniqueConstraint('dimension_type', 'dimension_code', name='unique_dimension'), + Index('idx_dimension_type_code', 'dimension_type', 'dimension_code'), + ) + + # Relationship to fact-dimensions (through fact_dimensions table) + fact_dimensions = relationship("FactDimension", backref="dimension") + + def __repr__(self): + return f"" + +class Fact(Base): + """Facts table: Stores actual measurements""" + __tablename__ = 'facts' + + id = Column(Integer, primary_key=True, autoincrement=True) + metric_id = Column(Integer, ForeignKey('metrics.metric_id'), nullable=False, index=True) + period_type = Column(Integer, nullable=False, index=True) # 1=daily, 2=monthly, 3=quarterly, 4=yearly + period_start = Column(String(8), nullable=False, index=True) # YYYYMMDD format + period_end = Column(String(8), nullable=False, index=True) # YYYYMMDD format + period_key = Column(String(255), nullable=False) + value = Column(Float, nullable=False) + count = Column(Integer, nullable=False) + created_at = Column(DateTime(timezone=True), server_default=func.now()) + + __table_args__ = ( + Index('idx_fact_lookup', 'metric_id', 'period_type', 'period_start', 'period_end'), + Index('idx_fact_metric_period', 'metric_id', 'period_type'), + ) + + # Relationships + fact_dimensions = relationship("FactDimension", backref="fact") + + def __repr__(self): + return f"" + +class FactDimension(Base): + """Junction table for many-to-many relationship between facts and dimensions""" + __tablename__ = 'fact_dimensions' + + id = Column(Integer, primary_key=True, autoincrement=True) + fact_id = Column(Integer, ForeignKey('facts.id'), nullable=False, index=True) + dimension_id = Column(Integer, ForeignKey('dimensions.id'), nullable=False, index=True) + created_at = Column(DateTime(timezone=True), server_default=func.now()) + + __table_args__ = ( + UniqueConstraint('fact_id', 'dimension_id', name='unique_fact_dimension'), + Index('idx_fact_dimension_lookup', 'fact_id', 'dimension_id'), + ) + + def __repr__(self): + return f"" diff --git a/warehouse/query_wrapper.py b/warehouse/query_wrapper.py new file mode 100644 index 0000000..aed1a17 --- /dev/null +++ b/warehouse/query_wrapper.py @@ -0,0 +1,148 @@ +""" +Query Wrapper for Year-long Data Warehouse +Provides the same API interface as the user's existing warehouse system. +""" + +import pandas as pd +from typing import List, Dict, Any, cast +from datetime import datetime, date +from .db_setup import YearlongDataWarehouse +import logging + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +class YearlongWarehouseSession: + """Session wrapper to match existing query interface.""" + + def __init__(self, db_path = None): + if hasattr(db_path, 'get_session'): # If it's a warehouse object + self.warehouse: YearlongDataWarehouse = cast(YearlongDataWarehouse, db_path) + elif db_path is None: + # Use auto-detection + self.warehouse = YearlongDataWarehouse() + elif hasattr(db_path, 'startswith') and db_path.startswith('sqlite://'): + self.warehouse = YearlongDataWarehouse(db_path) + else: + # For direct file paths, convert to proper SQLite URL + import os + db_path = os.path.abspath(db_path) # Get absolute path + self.warehouse = YearlongDataWarehouse(f"sqlite:///{db_path}") + logger.info("Year-long warehouse session initialized") + + def query_facts(self, metric_ids: List[int], group_names: List[str], + period_levels: List[int], exact_date: date) -> Dict[str, Any]: + """Query interface matching the user's existing system.""" + + exact_date_str = exact_date.strftime('%Y%m%d') + + # Query the warehouse using session + session = self.warehouse.get_session() + df = self.warehouse.query_facts( + session=session, + metric_ids=metric_ids, + group_names=group_names, + period_levels=period_levels, + exact_date=exact_date_str + ) + + # Format results to match existing structure + result = {} + + if not df['result']: + logger.warning("No data found for the given query parameters") + return {'result': {}} + + # Process the results dictionary + for metric_id_str, facts in df['result'].items(): + metric_id = int(metric_id_str) + result[metric_id] = { + 'metadata': { + 'metric_name': 'Total Sales', + 'metric_desc': 'Total sales revenue' + } + } + + for fact in facts: + site_id = fact.get('site_id', '') + site_name = fact.get('site_name', '') + command_name = fact.get('command_name', '') + store_format = fact.get('store_format', '') + period_key = fact.get('period_key', '') + value = fact.get('value', 0) + + site_metadata = {} + if command_name: + site_metadata['command_name'] = command_name + if store_format: + site_metadata['store_format'] = store_format + + if site_id and site_id not in result[metric_id]: + result[metric_id][site_id] = { + 'metadata': site_metadata, + 'site_name': site_name, + period_key: value + } + elif site_id: + result[metric_id][site_id][period_key] = value + + # Return in the expected format + return {'result': result} + +# Wrapper functions to match the user's existing interface +def convert_jargons(session: YearlongWarehouseSession, df: Dict[str, Any]) -> Dict[str, Any]: + """Convert and format results (wrapper for compatibility).""" + # This is a pass-through for now, but you can add formatting logic here + return df + +def query_facts(session: YearlongWarehouseSession, metric_ids: List[int], + group_names: List[str], period_levels: List[int], + exact_date: date) -> Dict[str, Any]: + """Main query function to match existing interface.""" + return session.query_facts( + metric_ids=metric_ids, + group_names=group_names, + period_levels=period_levels, + exact_date=exact_date + ) + +# Factory function for creating sessions +def create_session(db_path: str = 'yearlong_warehouse.db') -> YearlongWarehouseSession: + """Create a new warehouse session.""" + return YearlongWarehouseSession(db_path) + +# Sample usage examples +def demo_yearlong_warehouse(): + """Demo function showing how to use the yearlong warehouse.""" + + # Create session + session = create_session('yearlong_warehouse.db') + + # Example 1: What is the total sales of three locations for 2025 + print("=" * 60) + print("Query: What is the total sales of three locations for 2025") + result = query_facts( + session=session, + metric_ids=[1], # Total Sales + group_names=["1100", "5206", "2301"], # Site IDs + period_levels=[4], # Yearly + exact_date=date(2025, 1, 1) + ) + + logger.info(convert_jargons(session=session, df=result)) + + # Example 2: What is the total sales of MCCS for Oct 2024 + print("=" * 60) + print("Query: What is the total sales of MCCS for Oct 2024") + result = query_facts( + session=session, + metric_ids=[1], + group_names=["1100"], + period_levels=[2], # Monthly + exact_date=date(2024, 10, 1) + ) + + logger.info(convert_jargons(session=session, df=result)) + +if __name__ == '__main__': + demo_yearlong_warehouse() diff --git a/warehouse/test_warehouse.py b/warehouse/test_warehouse.py new file mode 100644 index 0000000..f7d3fdf --- /dev/null +++ b/warehouse/test_warehouse.py @@ -0,0 +1,212 @@ +""" +Test and Demo Script for Year-long Data Warehouse +Demonstrates the warehouse functionality and evaluates the approach. +""" + +import logging +import os +from datetime import date +from warehouse.db_setup import YearlongDataWarehouse +from warehouse.query_wrapper import create_session, query_facts, convert_jargons, YearlongWarehouseSession +from typing import NamedTuple + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +class Sites(NamedTuple): + """Site information structure to match user's existing code.""" + site_id: str + site_name: str + command_name: str + store_format: str + +def test_warehouse_creation(): + """Test warehouse database creation.""" + logger.info("Testing warehouse creation...") + try: + # Auto-detect will use SQLite by default since no Snowflake env vars are set + warehouse = YearlongDataWarehouse() + logger.info("✓ Warehouse created successfully") + return warehouse + except Exception as e: + logger.error(f"✗ Warehouse creation failed: {e}") + return None + +def insert_sample_data(warehouse: YearlongDataWarehouse): + """Insert sample data that matches user's example structure.""" + logger.info("Inserting sample data...") + + try: + # Insert sample site dimensions + sites_data = [ + ("1100", "HHM MCX MAIN STORE", "HENDERSON HALL", "MAIN STORE"), + ("5206", "CLM MCX CAMP JOHNSON MARINE MART", "CAMP LEJEUNE", "MARINE MART"), + ("2301", "QUM MCX MARINE MART", "QUANTICO", "MARINE MART") + ] + + for site_id, site_name, command_name, store_format in sites_data: + dimension_id = warehouse.get_dimension_id( + dimension_type='site', + dimension_code=site_id, + dimension_name=site_name, + metadata={ + 'command_name': command_name, + 'store_format': store_format + } + ) + logger.info(f"✓ Created dimension for site {site_id} (ID: {dimension_id})") + + # Insert sample facts (matching user's examples) + sample_facts = [ + # 2025 total sales + (1, [1], 4, "20250101", "20251231", "20250101 to 20251231", 773686.11, 1000), # Site 1100 dimension ID + (1, [2], 4, "20250101", "20251231", "20250101 to 20251231", 616489.05, 850), # Site 5206 dimension ID + (1, [3], 4, "20250101", "20251231", "20250101 to 20251231", 204485.38, 600), # Site 2301 dimension ID + + # Oct 2024 sales + (1, [1], 2, "20241001", "20241031", "20241001 to 20241031", 1138733.48, 500), # Site 1100 dimension ID + ] + + for fact in sample_facts: + warehouse.insert_fact(*fact) + logger.info(f"✓ Inserted fact: {fact}") + + logger.info("Sample data insertion completed successfully") + return True + + except Exception as e: + logger.error(f"✗ Sample data insertion failed: {e}") + return False + +def test_queries(warehouse: YearlongDataWarehouse): + """Test the query functionality with sample data.""" + logger.info("Testing query functionality...") + + # Use the same warehouse for querying + session = YearlongWarehouseSession(warehouse) + + try: + # Test Query 1: Total sales of three locations for 2025 + logger.info("=" * 60) + logger.info("Query 1: What is the total sales of three locations for 2025?") + result1 = query_facts( + session=session, + metric_ids=[1], # Total Sales + group_names=["1100", "5206", "2301"], # Site IDs + period_levels=[4], # Yearly + exact_date=date(2025, 1, 1) + ) + + logger.info(f"Result 1: {convert_jargons(session=session, df=result1)}") + logger.info("✓ Query 1 executed successfully") + + # Test Query 2: Total sales for Oct 2024 + logger.info("=" * 60) + logger.info("Query 2: What is the total sales of MCCS for Oct 2024?") + result2 = query_facts( + session=session, + metric_ids=[1], + group_names=["1100"], + period_levels=[2], # Monthly + exact_date=date(2024, 10, 1) + ) + + logger.info(f"Result 2: {convert_jargons(session=session, df=result2)}") + logger.info("✓ Query 2 executed successfully") + + return True + + except Exception as e: + logger.error(f"✗ Query testing failed: {e}") + return False + +# def evaluate_approach(): +# """Evaluate the proposed warehouse approach.""" +# logger.info("=" * 80) +# logger.info("WAREHOUSE APPROACH EVALUATION") +# logger.info("=" * 80) + +# evaluation_points = [ +# ("✅ SQLite Database", "Simple, portable, easy deployment"), +# ("✅ Unified Interface", "Same API as existing system"), +# ("✅ Fast Query Performance", "Indexed for quick aggregations"), +# ("✅ Year-long Data Support", "Handles multiple periods efficiently"), +# ("✅ Multi-data Type Support", "Designed for sales, email, social media"), +# ("✅ LLM Integration Ready", "Structured data perfect for AI analysis"), +# ("✅ Portable Deployment", "Single SQLite file, ready to ship"), +# ("✅ Metadata Preservation", "Site, command, format info maintained"), +# ("✅ Time-based Aggregation", "Supports daily/monthly/quarterly/yearly"), +# ("✅ Extensible Design", "Easy to add new metrics/data types") +# ] + + +def run_complete_test(): + """Run the complete warehouse test suite.""" + logger.info("Starting Year-long Data Warehouse Test Suite...") + + # Clean up previous test database + if os.path.exists('yearlong_warehouse.db'): + os.remove('yearlong_warehouse.db') + logger.info("Removed existing test database") + + # Test 1: Warehouse creation + warehouse = test_warehouse_creation() + if not warehouse: + logger.error("Warehouse creation failed - aborting tests") + return False + + # Test 2: Sample data insertion + if not insert_sample_data(warehouse): + logger.error("Sample data insertion failed - aborting tests") + warehouse.close() + return False + + # Test 3: Query testing (using the same warehouse) + if not test_queries(warehouse): + logger.error("Query testing failed") + warehouse.close() + return False + + warehouse.close() + + logger.info("✓ Year-long Data Warehouse Test Suite completed successfully!") + return True + +def run_sprint1_test(): + """Run Sprint 1 warehouse tests: creation and data insertion.""" + logger.info("Starting Sprint 1: Foundation & Database Core Test Suite...") + + # Test 1: Warehouse creation + warehouse = test_warehouse_creation() + if not warehouse: + logger.error("Sprint 1 failed at warehouse creation") + return False + + # Test 2: Sample data insertion + if not insert_sample_data(warehouse): + logger.error("Sprint 1 failed at sample data insertion") + return False + + logger.info("✓ Sprint 2: Foundation & Database Core completed successfully!") + return True + +if __name__ == '__main__': + import sys + + if len(sys.argv) > 1 and sys.argv[1] == '--complete': + success = run_complete_test() + if success: + print("\n🎉 SUCCESS: Complete Year-long Data Warehouse Test Suite passed!") + + else: + print("\n❌ FAILURE: Complete test suite failed.") + print("Check the logs above for specific error details.") + else: + success = run_sprint1_test() + if success: + print("\n🎉 SUCCESS: Sprint 2 (Foundation & Database Core) implementation is working perfectly!") + + else: + print("\n❌ FAILURE: Sprint 2 test suite failed.") + print("Check the logs above for specific error details.") diff --git a/yearlong_warehouse.db b/yearlong_warehouse.db new file mode 100644 index 0000000..54b9abe Binary files /dev/null and b/yearlong_warehouse.db differ