diff --git a/Data_analysis/Advertising_Email_Engagement/Email_Engagement_Engagement_Tim.py b/Data_analysis/Advertising_Email_Engagement/Email_Engagement_Engagement_Tim.py index 71f1c82..bfb6d97 100644 --- a/Data_analysis/Advertising_Email_Engagement/Email_Engagement_Engagement_Tim.py +++ b/Data_analysis/Advertising_Email_Engagement/Email_Engagement_Engagement_Tim.py @@ -2,9 +2,13 @@ import streamlit as st import pandas as pd import matplotlib.pyplot as plt +import os -# Load the data -data = pd.read_csv('/Users/huawei/Spaces/pbl/data-analysis/data/convertedcsv/Advertising_Email_Engagement/Advertising_Email_Engagement.xlsx-Email_Engagement_Engagement_Tim.csv', header=0) # Adjust header if necessary +# Load the data using relative path +script_dir = os.path.dirname(os.path.abspath(__file__)) +project_root = os.path.dirname(os.path.dirname(script_dir)) # Go up 2 levels to project root +data_path = os.path.join(project_root, 'data', 'convertedcsv', 'Advertising_Email_Engagement', 'Advertising_Email_Engagement.xlsx-Email_Engagement_Engagement_Tim.csv') +data = pd.read_csv(data_path, header=0) # Adjust header if necessary data.columns = data.columns.str.strip() print(data.columns) @@ -64,4 +68,4 @@ # Conclusion st.subheader('Conclusion') -st.write("This dashboard provides insights into email marketing engagement metrics, including trends in deliveries, unique opens, open rates, click rates, and unsubscribe rates.") \ No newline at end of file +st.write("This dashboard provides insights into email marketing engagement metrics, including trends in deliveries, unique opens, open rates, click rates, and unsubscribe rates.") diff --git a/retail_trends_analysis.py b/retail_trends_analysis.py index 1d309e3..ff320bf 100644 --- a/retail_trends_analysis.py +++ b/retail_trends_analysis.py @@ -5,7 +5,7 @@ from datetime import datetime import os import dask.dataframe as dd -from dask.diagnostics import ProgressBar +from dask.diagnostics.progress import ProgressBar import plotly.express as px import plotly.graph_objects as go from plotly.subplots import make_subplots @@ -24,55 +24,127 @@ print("Starting Retail Sales Data Analysis...") print("Loading data (this may take a few minutes due to the large file size)...") -# Load the data using Dask for better memory management with large files -# Try to read from CSV first, if not available, use the Parquet file +# Try to load data from warehouse first, fallback to direct file loading import os +try: + print("Attempting to load data from warehouse...") + from warehouse.query_wrapper import create_session + from warehouse.data_ingestion import DataIngestionManager + from datetime import date + + # Create warehouse session + session = create_session('yearlong_warehouse.db') + + # Check if warehouse has data, if not, try to load from files + try: + # Test query to see if warehouse has data + test_result = session.query_facts([1], ['1100'], [4], date(2025, 1, 1)) + if test_result['result']: + print("✅ Warehouse data found, using warehouse for analysis") + + # Get all available sites and periods for comprehensive analysis + # For now, we'll query specific periods and combine them + df_2024 = session.query_facts_as_dataframe([1], ['1100', '5206', '2301'], [2], date(2024, 12, 1)) + df_2025 = session.query_facts_as_dataframe([1], ['1100', '5206', '2301'], [4], date(2025, 1, 1)) + + # Combine dataframes + df = pd.concat([df_2024, df_2025], ignore_index=True) + + # Convert period_key to datetime for analysis + df['SALE_DATE'] = pd.to_datetime(df['period_key'].str.split(' to ').str[0], format='%Y%m%d') + df['MONTH'] = df['SALE_DATE'].dt.month + df['MONTH_NAME'] = df['SALE_DATE'].dt.month_name() + df['DAY_OF_WEEK'] = df['SALE_DATE'].dt.day_name() + df['HOUR'] = 12 # Default hour since warehouse doesn't store hourly data + df['IS_RETURN'] = False # Default, warehouse has separate return metrics + df['EXTENSION_AMOUNT'] = df['value'] # Map value to extension_amount for compatibility + df['SITE_ID'] = df['site_id'] + df['SITE_NAME'] = df['site_name'] + df['COMMAND_NAME'] = df['command_name'] + df['STORE_FORMAT'] = df['store_format'] + + warehouse_loaded = True + print(f"Loaded {len(df)} records from warehouse") + + else: + print("⚠️ Warehouse found but no data, falling back to file loading...") + raise Exception("No warehouse data") + + except Exception as e: + print(f"❌ Warehouse loading failed: {e}") + print("Falling back to direct file loading...") + raise Exception("Warehouse unavailable") + +except Exception as warehouse_error: + print(f"❌ Warehouse not available: {warehouse_error}") + print("Loading data directly from files...") + + # Fallback to original file loading logic + csv_path = 'data/convertedcsv/MCCS_RetailData.csv' + parquet_path = 'data/rawdata/MCCS_RetailData.parquet' + sample_parquet_path = 'retail_data_sample.parquet' + + if os.path.exists(csv_path): + print(f"Reading from CSV file: {csv_path}") + df = dd.read_csv(csv_path, + assume_missing=True, + blocksize="64MB") # Adjust blocksize as needed + elif os.path.exists(parquet_path): + print(f"CSV file not found. Reading from Parquet file: {parquet_path}") + df = dd.read_parquet(parquet_path) + elif os.path.exists(sample_parquet_path): + print(f"Using sample Parquet file: {sample_parquet_path}") + df = dd.read_parquet(sample_parquet_path) + else: + raise FileNotFoundError("Could not find retail data in CSV or Parquet format. Please ensure either file exists.") + + # Convert date columns to datetime - using flexible format detection + df['SALE_DATE'] = dd.to_datetime(df['SALE_DATE'], errors='coerce') + df['SALE_DATE_TIME'] = dd.to_datetime(df['SALE_DATE_TIME'], errors='coerce') + + # Create a month column for monthly analysis + df['MONTH'] = df['SALE_DATE'].dt.month + df['MONTH_NAME'] = df['SALE_DATE'].dt.month_name() + df['DAY_OF_WEEK'] = df['SALE_DATE'].dt.day_name() + df['HOUR'] = df['SALE_DATE_TIME'].dt.hour + + # Create a flag for returns + df['IS_RETURN'] = df['RETURN_IND'] == 'Y' -csv_path = 'data/convertedcsv/MCCS_RetailData.csv' -parquet_path = 'data/rawdata/MCCS_RetailData.parquet' -sample_parquet_path = 'retail_data_sample.parquet' - -if os.path.exists(csv_path): - print(f"Reading from CSV file: {csv_path}") - df = dd.read_csv(csv_path, - assume_missing=True, - blocksize="64MB") # Adjust blocksize as needed -elif os.path.exists(parquet_path): - print(f"CSV file not found. Reading from Parquet file: {parquet_path}") - df = dd.read_parquet(parquet_path) -elif os.path.exists(sample_parquet_path): - print(f"Using sample Parquet file: {sample_parquet_path}") - df = dd.read_parquet(sample_parquet_path) -else: - raise FileNotFoundError("Could not find retail data in CSV or Parquet format. Please ensure either file exists.") +# Compute basic statistics +print("Computing basic statistics...") -# Convert date columns to datetime - using flexible format detection -df['SALE_DATE'] = dd.to_datetime(df['SALE_DATE'], errors='coerce') -df['SALE_DATE_TIME'] = dd.to_datetime(df['SALE_DATE_TIME'], errors='coerce') +# Handle warehouse vs file data differently +if 'warehouse_loaded' in locals() and warehouse_loaded: + # Data is already pandas from warehouse + total_sales = df['EXTENSION_AMOUNT'].sum() + total_transactions = 0 # Not available in warehouse summary data + total_items = 0 # Not available in warehouse summary data + total_quantity = df['count'].sum() + return_rate = 0.0 # Not available in current warehouse data -# Create a month column for monthly analysis -df['MONTH'] = df['SALE_DATE'].dt.month -df['MONTH_NAME'] = df['SALE_DATE'].dt.month_name() -df['DAY_OF_WEEK'] = df['SALE_DATE'].dt.day_name() -df['HOUR'] = df['SALE_DATE_TIME'].dt.hour + # Get unique stores and commands + stores = df['SITE_NAME'].unique() + commands = df['COMMAND_NAME'].unique() + store_formats = df['STORE_FORMAT'].unique() -# Create a flag for returns -df['IS_RETURN'] = df['RETURN_IND'] == 'Y' + # For warehouse data, we need to adapt the analysis since we don't have detailed transaction data + print("Note: Using warehouse summary data - some detailed metrics may not be available") -# Compute basic statistics -with ProgressBar(): - print("Computing basic statistics...") - # Convert to pandas for easier analysis after aggregation - total_sales = df['EXTENSION_AMOUNT'].sum().compute() - total_transactions = df['SLIP_NO'].nunique().compute() - total_items = df['ITEM_ID'].nunique().compute() - total_quantity = df['QTY'].sum().compute() - return_rate = df['IS_RETURN'].mean().compute() * 100 - - # Get unique stores and commands - stores = df['SITE_NAME'].unique().compute() - commands = df['COMMAND_NAME'].unique().compute() - store_formats = df['STORE_FORMAT'].unique().compute() +else: + # Data is from files (dask) + with ProgressBar(): + # Convert to pandas for easier analysis after aggregation + total_sales = df['EXTENSION_AMOUNT'].sum().compute() + total_transactions = df['SLIP_NO'].nunique().compute() + total_items = df['ITEM_ID'].nunique().compute() + total_quantity = df['QTY'].sum().compute() + return_rate = df['IS_RETURN'].mean().compute() * 100 + + # Get unique stores and commands + stores = df['SITE_NAME'].unique().compute() + commands = df['COMMAND_NAME'].unique().compute() + store_formats = df['STORE_FORMAT'].unique().compute() print(f"\nAnalysis Period: Dec 2024 - Jan 2025") print(f"Total Sales: ${total_sales:,.2f}") @@ -319,41 +391,41 @@ f.write(f"Number of Stores: {len(stores)}\n") f.write(f"Number of Commands: {len(commands)}\n") f.write(f"Store Formats: {', '.join(store_formats)}\n\n") - + f.write("Key Insights:\n") f.write("-------------\n") - + # Top selling day of week top_day = day_of_week_sales.loc[day_of_week_sales['EXTENSION_AMOUNT'].idxmax()] f.write(f"1. Highest sales occur on {top_day['DAY_OF_WEEK']}s\n") - + # Top selling hour top_hour = hourly_sales.loc[hourly_sales['EXTENSION_AMOUNT'].idxmax()] f.write(f"2. Peak sales hour is {int(top_hour['HOUR'])}:00h (24-hour format)\n") - + # Top product top_product = top_products_revenue.iloc[0] f.write(f"3. Best-selling product by revenue: {top_product['ITEM_DESC']} (${top_product['EXTENSION_AMOUNT']:,.2f})\n") - + # Top store top_store = store_sales.iloc[0] f.write(f"4. Top performing store: {top_store['SITE_NAME']} (${top_store['EXTENSION_AMOUNT']:,.2f})\n") - + # Price status insight regular_sales = price_status_sales[price_status_sales['PRICE_STATUS'] == 'R']['EXTENSION_AMOUNT'].iloc[0] promo_sales = price_status_sales[price_status_sales['PRICE_STATUS'] == 'P']['EXTENSION_AMOUNT'].iloc[0] if 'P' in price_status_sales['PRICE_STATUS'].values else 0 markdown_sales = price_status_sales[price_status_sales['PRICE_STATUS'] == 'M']['EXTENSION_AMOUNT'].iloc[0] if 'M' in price_status_sales['PRICE_STATUS'].values else 0 - + f.write(f"5. Regular-priced items account for ${regular_sales:,.2f} in sales ({regular_sales/total_sales*100:.1f}% of total)\n") if promo_sales > 0: f.write(f"6. Promotional items account for ${promo_sales:,.2f} in sales ({promo_sales/total_sales*100:.1f}% of total)\n") if markdown_sales > 0: f.write(f"7. Markdown items account for ${markdown_sales:,.2f} in sales ({markdown_sales/total_sales*100:.1f}% of total)\n") - + # Transaction insights f.write(f"8. Average transaction value: ${transaction_value_stats['mean']:,.2f}\n") f.write(f"9. Average items per transaction: {transaction_size_stats['mean']:.2f}\n") - + # Return insights f.write(f"10. Product with highest return rate: {return_by_product.iloc[0]['ITEM_DESC']} ({return_by_product.iloc[0]['RETURN_RATE']:.2f}%)\n") diff --git a/src/excel_to_csv.py b/src/excel_to_csv.py index ac3746d..1893168 100644 --- a/src/excel_to_csv.py +++ b/src/excel_to_csv.py @@ -6,18 +6,17 @@ def to_filename(string): def process_survey_responses(df): # Question categories - RATING_QUESTIONS = ['Satisfaction', 'Cleanliness', 'Service', 'Price', 'Checkout', 'Store Atmosphere'] - BINARY_QUESTIONS = ['Contact?', 'Purchase All'] + RATING_QUESTIONS = ['Satisfaction', 'Cleanliness', 'Service', 'Price', 'Checkout', 'Store Atmosphere', 'Merchandise'] + BINARY_QUESTIONS = ['Contact?', 'Purchase All'] MULTI_SELECT = ['MCX_Products Purchased', 'MCX_Program Awareness'] DEMOGRAPHIC = ['Military Affiliation', 'Demos: Branch of Service', 'Demos: Gender'] # Process answer columns based on question type for idx, row in df.iterrows(): - question_label = row['questionLabel'] - + + # Handle Rating Questions (1-5 scale) if any(qt in question_label for qt in RATING_QUESTIONS): - # Convert rating scales for col in ['answerLabels', 'answerDisplayLabels', 'answerTexts']: value = str(row[col]) if '1=' in value or 'Poor' in value or 'Very unlikely' in value or 'Falls short' in value: @@ -26,115 +25,85 @@ def process_survey_responses(df): df.loc[idx, col] = 5 elif value.isdigit(): df.loc[idx, col] = int(value) - + + # Handle Binary Questions elif any(qt in question_label for qt in BINARY_QUESTIONS): - # Convert Yes/No to 1/0 for col in ['answerLabels', 'answerDisplayLabels', 'answerTexts']: df.loc[idx, col] = 1 if row[col] == 'Yes' else 0 - + + # Handle Multi-select Questions + elif any(qt in question_label for qt in MULTI_SELECT): + # Keep original text values + continue + + # Handle Demographics + elif any(qt in question_label for qt in DEMOGRAPHIC): + # Keep original text values + continue + + # Convert responseTime to datetime + df['responseTime'] = pd.to_datetime(df['responseTime']) + return df -excel_file = '../data/rawdata/CustomerSurveyResponses.xlsx' -xls = pd.ExcelFile(excel_file) - -for sheet_name in xls.sheet_names: - print(sheet_name) - df = pd.read_excel(xls, sheet_name=sheet_name, skiprows=0) - df = df.iloc[:, :-1] # Remove last column - - # Process survey responses - df = process_survey_responses(df) - - # Process other columns import pandas as pd - import os - - def to_filename(string): - return "_".join(string.split()) - - def process_survey_responses(df): - # Question categories - RATING_QUESTIONS = ['Satisfaction', 'Cleanliness', 'Service', 'Price', 'Checkout', 'Store Atmosphere', 'Merchandise'] - BINARY_QUESTIONS = ['Contact?', 'Purchase All'] - MULTI_SELECT = ['MCX_Products Purchased', 'MCX_Program Awareness'] - DEMOGRAPHIC = ['Military Affiliation', 'Demos: Branch of Service', 'Demos: Gender'] - - # Process answer columns based on question type - for idx, row in df.iterrows(): - question_label = row['questionLabel'] - - # Handle Rating Questions (1-5 scale) - if any(qt in question_label for qt in RATING_QUESTIONS): - for col in ['answerLabels', 'answerDisplayLabels', 'answerTexts']: - value = str(row[col]) - if '1=' in value or 'Poor' in value or 'Very unlikely' in value or 'Falls short' in value: - df.loc[idx, col] = 1 - elif '5=' in value or 'Excellent' in value or 'Very Likely' in value or 'Strongly Agree' in value: - df.loc[idx, col] = 5 - elif value.isdigit(): - df.loc[idx, col] = int(value) - - # Handle Binary Questions - elif any(qt in question_label for qt in BINARY_QUESTIONS): - for col in ['answerLabels', 'answerDisplayLabels', 'answerTexts']: - df.loc[idx, col] = 1 if row[col] == 'Yes' else 0 - - # Handle Multi-select Questions - elif any(qt in question_label for qt in MULTI_SELECT): - # Keep original text values - continue - - # Handle Demographics - elif any(qt in question_label for qt in DEMOGRAPHIC): - # Keep original text values - continue - - # Convert responseTime to datetime - df['responseTime'] = pd.to_datetime(df['responseTime']) - - return df - - def main(): - excel_file = 'CustomerSurveyResponses.xlsx' - xls = pd.ExcelFile(excel_file) - - for sheet_name in xls.sheet_names: - print(f"Processing sheet: {sheet_name}") - - # Read sheet - df = pd.read_excel(xls, sheet_name=sheet_name) - - # Remove last column - df = df.iloc[:, :-1] - - # Process survey responses - df = process_survey_responses(df) - - # Export to CSV - output_filename = f"../data/convertedcsv/{excel_file}-{to_filename(sheet_name)}.csv" - df.to_csv(output_filename, index=False) - print(f"Exported to: {output_filename}") - - if __name__ == "__main__": - main() - for col in df.columns: - if col.endswith("Rate") or col.endswith("Diff") or col.startswith("%") or col.endswith("%"): - df[col] = df[col].astype(str) - elif col in ["responseTime"]: - df[col] = pd.to_datetime(df[col]).dt.strftime('%Y-%m-%d') - elif col in ["Email Domain", "Audience Name", "Audience Type", "Message Name", - "Campaign", "Social Network", "Outbound Post", "Media Type", - "Email Content Name", "Day Of Week", "Time Of Day", "Email Subject", - "respondentId", "questionId", "questionName", "questionPhrase", - "questionType", "questionLabel"]: - df[col] = df[col].astype(str) - else: - try: - df[col] = df[col].astype(int) - except: +def main(): + # Use relative paths that work from any directory + script_dir = os.path.dirname(os.path.abspath(__file__)) + project_root = os.path.dirname(script_dir) # Go up 1 level from src/ to project root + + # Input file path + excel_file_path = os.path.join(project_root, 'data', 'rawdata', 'CustomerSurveyResponses.xlsx') + + # Output directory + output_dir = os.path.join(project_root, 'data', 'convertedcsv') + os.makedirs(output_dir, exist_ok=True) + + # Check if input file exists + if not os.path.exists(excel_file_path): + print(f"Error: Excel file not found at: {excel_file_path}") + return + + print(f"Processing Excel file: {excel_file_path}") + xls = pd.ExcelFile(excel_file_path) + + for sheet_name in xls.sheet_names: + print(f"Processing sheet: {sheet_name}") + + # Read sheet + df = pd.read_excel(xls, sheet_name=sheet_name) + + # Remove last column + df = df.iloc[:, :-1] + + # Process survey responses + df = process_survey_responses(df) + + # Process data types + for col in df.columns: + if col.endswith("Rate") or col.endswith("Diff") or col.startswith("%") or col.endswith("%"): + df[col] = df[col].astype(str) + elif col in ["responseTime"]: + df[col] = pd.to_datetime(df[col]).dt.strftime('%Y-%m-%d') + elif col in ["Email Domain", "Audience Name", "Audience Type", "Message Name", + "Campaign", "Social Network", "Outbound Post", "Media Type", + "Email Content Name", "Day Of Week", "Time Of Day", "Email Subject", + "respondentId", "questionId", "questionName", "questionPhrase", + "questionType", "questionLabel"]: df[col] = df[col].astype(str) + else: + try: + df[col] = df[col].astype(int) + except: + df[col] = df[col].astype(str) + + # Create output filename + excel_basename = os.path.splitext(os.path.basename(excel_file_path))[0] + sheet_filename = to_filename(sheet_name) + output_filename = os.path.join(output_dir, f"{excel_basename}-{sheet_filename}.csv") + + # Export to CSV + df.to_csv(output_filename, index=False) + print(f"Exported {sheet_name} to {output_filename}") - sheet_name = to_filename(sheet_name) - filename = '-'.join([excel_file, sheet_name]) - output_file = f"{filename}.csv" - df.to_csv(output_file, index=False) - print(f"Exported {sheet_name} to {output_file}") +if __name__ == "__main__": + main() diff --git a/warehouse/README.md b/warehouse/README.md new file mode 100644 index 0000000..7df3d84 --- /dev/null +++ b/warehouse/README.md @@ -0,0 +1,77 @@ +# Database/Warehouse Module + +This module provides a hybrid ORM-based data warehouse supporting SQLite and Snowflake backends. + +## Quick Start for New Developers + +### Basic Testing (Works Out-of-Box): +```bash +cd /your/project/root +pip install -r new_requirements.txt +PYTHONPATH=. python warehouse/test_warehouse.py --complete +``` + +This will: +- Create a SQLite database (yearlong_warehouse.db) +- Load sample retail data +- Run query tests +- Demonstrate warehouse functionality + +### Data Ingestion: +```bash +# Load your own data (requires data files) +PYTHONPATH=. python -m warehouse.data_ingestion + +# Or specify custom paths: +manager = DataIngestionManager() +manager.load_retail_data("/path/to/your/data.parquet") +``` + +### Unit Tests: +```bash +PYTHONPATH=. python -m pytest tests/unit/ -v +``` + +## Database Backend Support + +- SQLite (Default): Local development/testing +- Snowflake: Production data warehouse + +## Import Notes + +Modules use relative imports. Run with PYTHONPATH=. when calling modules directly: + +```bash +# Works +PYTHONPATH=. python warehouse/db_setup.py + +# Fails with import errors +python warehouse/db_setup.py +``` + +## Project Structure +``` +warehouse/ +├── __init__.py # Package exports +├── db_setup.py # Database connection & hybrid logic +├── models.py # SQLAlchemy ORM models (facts/dimensions) +├── query_wrapper.py # Query API wrapper +├── data_ingestion.py # ETL pipeline +├── test_warehouse.py # Integration tests +└── README.md # This file +``` + +## Troubleshooting + +- Import errors: Use PYTHONPATH=. +- Missing data: Code auto-falls back to retail_data_sample.parquet +- Large data files: Excluded from repo (.gitignore) - provide your own +- Database backends: Default is SQLite; set SNOWFLAKE_* env vars for Snowflake + +## Data Schema + +The warehouse uses a classical star schema: + +- Metrics: Sales, Quantity, Returns, etc. +- Dimensions: Sites, Commands, Products (hierarchical attributes) +- Facts: Actual measurements with foreign key relationships diff --git a/warehouse/__init__.py b/warehouse/__init__.py new file mode 100644 index 0000000..00cbaf1 --- /dev/null +++ b/warehouse/__init__.py @@ -0,0 +1,24 @@ +""" +Snowflake-Compatible Data Warehouse Package +Provides ORM-based access to live Snowflake data for fast querying and LLM integration. +""" + +from .db_setup import HybridDataWarehouse, YearlongDataWarehouse, create_session +from .models import Base, Metric, Dimension, Fact, FactDimension +# Import query functions for easy access +from .query_wrapper import create_session as query_create_session, query_facts, convert_jargons + +__version__ = "2.1.0" # Updated with proper star schema design +__all__ = [ + 'HybridDataWarehouse', + 'YearlongDataWarehouse', # Backward compatibility alias + 'create_session', + 'query_create_session', + 'query_facts', + 'convert_jargons', + 'Base', + 'Metric', + 'Dimension', + 'Fact', + 'FactDimension' +] diff --git a/warehouse/data_ingestion.py b/warehouse/data_ingestion.py new file mode 100644 index 0000000..b2a6645 --- /dev/null +++ b/warehouse/data_ingestion.py @@ -0,0 +1,295 @@ +""" +Data Ingestion Module for Year-long Data Warehouse +Processes and loads data from various sources into the SQLite warehouse. +""" + +import dask.dataframe as dd +import os +import sys +from datetime import datetime +from typing import Optional +from sqlalchemy import text +try: + from .db_setup import YearlongDataWarehouse +except ImportError: + # Allow running as standalone script + from warehouse.db_setup import YearlongDataWarehouse +import logging + +# Add parent directory to path for module imports +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +class DataIngestionManager: + """ + Handles data ingestion from various sources into the data warehouse. + + Supports loading retail sales data and creating the necessary dimension + and fact records for analytics and querying. + """ + + def __init__(self, warehouse: Optional[YearlongDataWarehouse] = None): + """ + Initialize the data ingestion manager. + + Args: + warehouse: Pre-existing warehouse connection, or None to auto-create + """ + self.warehouse = warehouse or YearlongDataWarehouse() + logger.info("DataIngestionManager initialized") + + def load_retail_data(self, data_path: Optional[str] = None) -> bool: + """ + Load retail sales data from parquet/csv files into the warehouse. + + Args: + data_path: Path to data file, or None to auto-detect + + Returns: + True if successful, False otherwise + """ + try: + if data_path is None: + # Auto-detect data file + potential_paths = [ + "data/rawdata/MCCS_RetailData.parquet", + "data/convertedcsv/MCCS_RetailData.csv", + "retail_data_sample.parquet" + ] + + for path in potential_paths: + if os.path.exists(path): + data_path = path + break + else: + logger.error("No retail data file found") + return False + + logger.info(f"Loading data from: {data_path}") + + # Load data using Dask for memory efficiency + if data_path.endswith('.parquet'): + df = dd.read_parquet(data_path) + elif data_path.endswith('.csv'): + df = dd.read_csv(data_path) + else: + logger.error(f"Unsupported file format: {data_path}") + return False + + # Process data + success = self._process_retail_data(df) + if success: + logger.info("Retail data loading completed successfully") + return success + + except Exception as e: + logger.error(f"Failed to load retail data: {e}") + return False + + def _process_retail_data(self, df: dd.DataFrame) -> bool: + """ + Process and load retail data into warehouse tables. + + Args: + df: Dask dataframe with retail data + + Returns: + True if successful + """ + try: + # Extract unique site information for dimensions + site_info = df[['SITE_ID', 'SITE_NAME', 'COMMAND_NAME']].drop_duplicates() + site_data = site_info.compute() # Convert to pandas for processing + + # Create site dimensions + dimension_ids = {} + for _, row in site_data.iterrows(): + site_id = str(row['SITE_ID']) + dimension_id = self.warehouse.get_dimension_id( + dimension_type='site', + dimension_code=site_id, + dimension_name=row['SITE_NAME'], + metadata={ + 'command_name': row['COMMAND_NAME'], + 'store_format': 'MAIN STORE' if 'MAIN' in row['SITE_NAME'] else 'MARINE MART' + } + ) + + if dimension_id: + dimension_ids[site_id] = dimension_id + logger.info(f"Created dimension for site {site_id}") + else: + logger.error(f"Failed to create dimension for site {site_id}") + return False + + # Process fact data by date periods + processed_count = 0 + total_count = 0 + + # Get unique dates for processing + dates = df['SALE_DATE'].drop_duplicates().compute() + + for sale_date in dates: + try: + # Filter data for this date + daily_data = df[df['SALE_DATE'] == sale_date].compute() + + # Parse the date from string format (MM/DD/YY) + if isinstance(sale_date, str): + try: + parsed_date = datetime.strptime(sale_date, '%m/%d/%y') + except ValueError: + # Try YY/MM/DD format + try: + parsed_date = datetime.strptime(sale_date, '%y/%m/%d') + except ValueError: + logger.error(f"Unable to parse date format: {sale_date}") + continue + else: + parsed_date = sale_date + + # Aggregate by site for the day + for site_id, site_data_group in daily_data.groupby('SITE_ID'): + total_sales = site_data_group['EXTENSION_AMOUNT'].sum() + total_quantity = site_data_group['QTY'].sum() + total_returns = site_data_group['RETURN_IND'].eq('Y').sum() + + dim_ids = [dimension_ids[str(site_id)]] + + # Insert total sales fact for this day + self.warehouse.insert_fact( + metric_id=1, # Total Sales + dimension_ids=dim_ids, + period_type=1, # Daily + period_start=parsed_date.strftime('%Y%m%d'), + period_end=parsed_date.strftime('%Y%m%d'), + period_key=parsed_date.strftime('%Y-%m-%d'), + value=float(total_sales), + count=int(total_quantity) + ) + + # Insert returns data + if total_returns > 0: + self.warehouse.insert_fact( + metric_id=3, # Total Returns + dimension_ids=dim_ids, + period_type=1, # Daily + period_start=parsed_date.strftime('%Y%m%d'), + period_end=parsed_date.strftime('%Y%m%d'), + period_key=parsed_date.strftime('%Y-%m-%d'), + value=float(total_returns), + count=int(total_returns) + ) + + processed_count += 1 + total_count += len(daily_data) + + if processed_count % 10 == 0: + logger.info(f"Processed {processed_count} dates, {total_count} transactions") + + except Exception as e: + logger.error(f"Failed to process date {sale_date}: {e}") + continue + + logger.info(f"Successfully processed {processed_count} dates with {total_count} transactions") + return True + + except Exception as e: + logger.error(f"Failed to process retail data: {e}") + return False + + def create_date_dimensions(self) -> bool: + """ + Create dimension records for date hierarchies. + + Returns: + True if successful + """ + try: + # This could be expanded to create date dimension tables + # For now, we rely on period-based facts + logger.info("Date dimensions handled through period-based facts") + return True + except Exception as e: + logger.error(f"Failed to create date dimensions: {e}") + return False + + def validate_data_integrity(self) -> bool: + """ + Perform basic data integrity checks on loaded data. + + Returns: + True if data appears consistent + """ + try: + with self.warehouse.get_session() as session: + # Check that we have some basic metrics and dimensions + metric_count = session.execute(text("SELECT COUNT(*) FROM metrics")).scalar() + dimension_count = session.execute(text("SELECT COUNT(*) FROM dimensions")).scalar() + fact_count = session.execute(text("SELECT COUNT(*) FROM facts")).scalar() + + logger.info(f"Data integrity check: {metric_count} metrics, {dimension_count} dimensions, {fact_count} facts") + + # Basic validation + if metric_count == 0: + logger.warning("No metrics found - data may not be properly loaded") + return False + + if dimension_count == 0: + logger.warning("No dimensions found - data may not be properly loaded") + return False + + if fact_count == 0: + logger.warning("No facts found - data may not be properly loaded") + return False + + return True + + except Exception as e: + logger.error(f"Data integrity check failed: {e}") + return False + + def close(self): + """Close warehouse connections.""" + if self.warehouse: + self.warehouse.close() + + +def main(): + """Main function to run data ingestion.""" + logger.info("Starting data ingestion process...") + + try: + manager = DataIngestionManager() + + # Load retail data + if manager.load_retail_data(): + logger.info("Data loading successful") + + # Validate integrity + if manager.validate_data_integrity(): + logger.info("Data integrity validation passed") + logger.info("Data ingestion completed successfully!") + return True + else: + logger.error("Data integrity validation failed") + return False + else: + logger.error("Data loading failed") + return False + + except Exception as e: + logger.error(f"Data ingestion failed with error: {e}") + return False + finally: + try: + manager.close() + except: + pass + + +if __name__ == '__main__': + success = main() + sys.exit(0 if success else 1) diff --git a/warehouse/db_setup.py b/warehouse/db_setup.py new file mode 100644 index 0000000..1fa9e3e --- /dev/null +++ b/warehouse/db_setup.py @@ -0,0 +1,362 @@ +""" +Hybrid ORM Data Warehouse +Supports both SQLite (development) and Snowflake (production) using SQLAlchemy ORM. +""" + +import os +import json +import logging +from typing import Dict, Any, Optional, List +from sqlalchemy import create_engine, text +from sqlalchemy.orm import sessionmaker, Session + +from .models import Base, Metric, Dimension, Fact, FactDimension + +logger = logging.getLogger(__name__) + +class HybridDataWarehouse: + """ + ORM-based data warehouse supporting both SQLite and Snowflake. + + Automatically detects database type based on connection string or environment variables: + - SQLite: For development/testing (uses local .db files) + - Snowflake: For production (queries live enterprise data) + """ + + def __init__(self, connection_string: Optional[str] = None): + """ + Initialize warehouse connection for SQLite or Snowflake. + + Args: + connection_string: Database connection string. If None, auto-detects based on environment. + SQLite: Path ending in '.db' or 'sqlite:///' + Snowflake: 'snowflake://' URL or uses SNOWFLAKE_* env vars + """ + self.db_type = "sqlite" # Default to SQLite for development + + if connection_string is None: + # Auto-detect based on environment variables + snowflake_account = os.getenv('SNOWFLAKE_ACCOUNT') + snowflake_user = os.getenv('SNOWFLAKE_USER') + snowflake_password = os.getenv('SNOWFLAKE_PASSWORD') + + if all([snowflake_account, snowflake_user, snowflake_password]): + # Snowflake credentials found - use Snowflake + self.db_type = "snowflake" + warehouse_env = os.getenv('SNOWFLAKE_WAREHOUSE', 'default_warehouse') + database = os.getenv('SNOWFLAKE_DATABASE', 'default_database') + schema = os.getenv('SNOWFLAKE_SCHEMA', 'default_schema') + connection_string = f"snowflake://{snowflake_user}:{snowflake_password}@{snowflake_account}/{database}/{schema}?warehouse={warehouse_env}" + else: + # No Snowflake credentials - use SQLite + self.db_type = "sqlite" + connection_string = "sqlite:///yearlong_warehouse.db" + + elif connection_string.startswith('snowflake://'): + self.db_type = "snowflake" + elif connection_string.endswith('.db'): + # Convert bare .db path to proper SQLAlchemy URL with absolute path + self.db_type = "sqlite" + abs_path = os.path.abspath(connection_string) + connection_string = f"sqlite:///{abs_path}" + elif 'sqlite://' in connection_string: + self.db_type = "sqlite" + + try: + logger.info(f"Initializing {self.db_type.upper()} data warehouse...") + self.engine = create_engine(connection_string, echo=False) + self.SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=self.engine) + + # Test connection + if self.db_type == "snowflake": + with self.engine.connect() as conn: + conn.execute(text("SELECT 1")) + # SQLite connection is automatically tested by engine creation + + # Create tables if they don't exist + if not self._tables_exist(): + logger.info(f"Creating tables in {self.db_type.upper()}...") + Base.metadata.create_all(bind=self.engine) + + # Initialize default metrics if database is empty + self._initialize_default_metrics() + + logger.info(f"Connected to {self.db_type.upper()} data warehouse") + + except Exception as e: + # Clean up any partially initialized state + self.engine = None + self.SessionLocal = None + raise ValueError(f"Failed to connect to {self.db_type.upper()}: {e}") + + def _tables_exist(self) -> bool: + """Check if required tables exist in the database.""" + if self.engine is None: + return False + try: + # Get table names using database-specific methods + if self.db_type == "snowflake": + with self.engine.connect() as conn: + result = conn.execute(text("SHOW TABLES")) + existing_tables = {row[1] for row in result} # table names are in second column + else: # SQLite + with self.engine.connect() as conn: + result = conn.execute(text("SELECT name FROM sqlite_master WHERE type='table'")) + existing_tables = {row[0] for row in result} # table names are in first column + + required_tables = {'metrics', 'dimensions', 'facts', 'fact_dimensions'} + return required_tables.issubset(existing_tables) + except Exception: + return False + + def _initialize_default_metrics(self): + """Initialize default metrics in the database during setup.""" + with self.get_session() as session: + try: + # Check if metrics already exist + existing_count = session.query(Metric).count() + if existing_count > 0: + return + + # Initialize default metrics + default_metrics = [ + (1, "Total Sales", "Total sales revenue in dollars", "numeric"), + (2, "Total Quantity", "Total quantity sold", "numeric"), + (3, "Total Returns", "Total return transactions", "numeric"), + (4, "Return Rate", "Percentage of returns", "percentage"), + (5, "Net Sales", "Sales after returns", "numeric"), + (6, "Email Opens", "Number of email opens", "numeric"), + (7, "Email Clicks", "Number of email clicks", "numeric"), + (8, "Social Media Engagement", "Social media engagement metrics", "numeric"), + (9, "Social Media Followers", "Total social media followers", "numeric") + ] + + for metric_id, name, desc, data_type in default_metrics: + metric = Metric( + metric_id=metric_id, + metric_name=name, + metric_desc=desc, + data_type=data_type + ) + session.add(metric) + + session.commit() + logger.info("Initialized default metrics in database") + + except Exception as e: + session.rollback() + logger.error(f"Failed to initialize default metrics: {e}") + + def get_session(self) -> Session: + """Get a new database session.""" + if self.SessionLocal is None: + raise RuntimeError("Database connection not established. Unable to create session.") + return self.SessionLocal() + + def get_dimension_id(self, dimension_type: str, dimension_code: str, + dimension_name: str, metadata: Optional[Dict[str, Any]] = None) -> Optional[int]: + """ + Get dimension ID by type and code. Creates dimension if it doesn't exist. + + Note: In production with live Snowflake data, dimensions should already exist. + This method is primarily for development/testing. + """ + with self.get_session() as session: + # Try to find existing dimension + dimension = session.query(Dimension).filter( + Dimension.dimension_type == dimension_type, + Dimension.dimension_code == dimension_code + ).first() + + if dimension: + return dimension.id # type: ignore[attr-defined] + + # Create new dimension (development/testing only) + metadata_str = json.dumps(metadata) if metadata else None + new_dimension = Dimension( + dimension_type=dimension_type, + dimension_code=dimension_code, + dimension_name=dimension_name, + dimension_metadata=metadata_str + ) + + try: + session.add(new_dimension) + session.commit() + session.refresh(new_dimension) + logger.info(f"Created new dimension: {dimension_code}") + return new_dimension.id # type: ignore[attr-defined] + except Exception as e: + session.rollback() + logger.error(f"Failed to create dimension: {e}") + return None + + def insert_fact(self, metric_id: int, dimension_ids: List[int], period_type: int, + period_start: str, period_end: str, period_key: str, value: float, count: int) -> bool: + """ + Insert a fact record with proper many-to-many relationships. + + Note: In production, facts are typically loaded via ETL pipelines. + This method is primarily for development/testing. + """ + with self.get_session() as session: + try: + # Create the fact record + fact = Fact( + metric_id=metric_id, + period_type=period_type, + period_start=period_start, + period_end=period_end, + period_key=period_key, + value=value, + count=count + ) + + session.add(fact) + session.flush() # Get the fact ID without committing + + # Create many-to-many relationships + for dimension_id in dimension_ids: + fact_dimension = FactDimension( + fact_id=fact.id, + dimension_id=dimension_id + ) + session.add(fact_dimension) + + session.commit() + logger.debug(f"Inserted fact: metric_id={metric_id}, value={value}, dimensions={dimension_ids}") + return True + + except Exception as e: + session.rollback() + logger.error(f"Failed to insert fact: {e}") + return False + + def query_facts(self, session: Session, metric_ids: List[int], group_names: List[str], + period_levels: List[int], exact_date: str) -> Dict[str, Any]: + """ + Query facts table for aggregated data using proper star schema relationships. + + This method queries live data using SQLAlchemy ORM relationships. + """ + try: + # Build query using proper ORM relationships + query = session.query( + Fact, + Metric, + Dimension + ).join(Metric, Fact.metric_id == Metric.metric_id)\ + .join(FactDimension, Fact.id == FactDimension.fact_id)\ + .join(Dimension, FactDimension.dimension_id == Dimension.id) + + # Apply filters + query = query.filter( + Fact.metric_id.in_(metric_ids), + Fact.period_type.in_(period_levels), + Fact.period_start <= exact_date, + Fact.period_end >= exact_date + ) + + # Filter by dimension codes if specified + if group_names: + query = query.filter( + Dimension.dimension_code.in_(group_names), + Dimension.dimension_type == 'site' + ) + + results = query.all() + + # Format results to match existing API structure + formatted_results = {} + for fact, metric, dimension in results: + # Parse dimension metadata + dimension_details = { + 'site_id': dimension.dimension_code, + 'site_name': dimension.dimension_name, + } + if dimension.dimension_metadata: + try: + dimension_details.update(json.loads(dimension.dimension_metadata)) + except: + pass + + # Group by metric for the expected format + metric_key = str(fact.metric_id) + if metric_key not in formatted_results: + formatted_results[metric_key] = [] + + # Check if we already have this fact to avoid duplicates + existing_fact = next( + (f for f in formatted_results[metric_key] + if f['period_key'] == fact.period_key and f['site_id'] == dimension_details['site_id']), + None + ) + + if existing_fact: + # Update existing fact with additional dimension info if needed + continue + else: + # Add new formatted result + formatted_results[metric_key].append({ + 'metric_id': fact.metric_id, + 'metric_name': metric.metric_name, + 'metric_description': metric.metric_desc or '', + 'site_id': dimension_details.get('site_id', ''), + 'site_name': dimension_details.get('site_name', ''), + 'command_name': dimension_details.get('command_name', ''), + 'store_format': dimension_details.get('store_format', ''), + 'period_key': fact.period_key, + 'value': fact.value, + 'count': fact.count + }) + + return {'result': formatted_results} + + except Exception as e: + logger.error(f"Query failed: {e}") + import traceback + logger.error(f"Traceback: {traceback.format_exc()}") + return {'result': {}} + + def close(self): + """Close database connections.""" + if self.engine: + self.engine.dispose() + logger.info("Snowflake connections closed") + + def __del__(self): + """Ensure connections are closed.""" + self.close() + + +# Backward compatibility: aliases for existing code +SnowflakeDataWarehouse = HybridDataWarehouse # For Snowflake-specific usage +YearlongDataWarehouse = HybridDataWarehouse # For existing code + + +def create_session(connection_string: Optional[str] = None) -> Session: + """ + Create a database session for the warehouse. + + Args: + connection_string: Optional connection string. If None, auto-detects database type. + + Returns: + SQLAlchemy session for database operations + """ + warehouse = HybridDataWarehouse(connection_string) + return warehouse.get_session() + + +if __name__ == '__main__': + # Test the connection (auto-detects database type) + try: + warehouse = HybridDataWarehouse() + print(f"✅ {warehouse.db_type.upper()} data warehouse connection successful!") + warehouse.close() + except Exception as e: + print(f"❌ Failed to connect to data warehouse: {e}") + if "SNOWFLAKE" in str(e): + print("For Snowflake access, set SNOWFLAKE_* environment variables") + else: + print("For SQLite access, ensure the database file is accessible") diff --git a/warehouse/models.py b/warehouse/models.py new file mode 100644 index 0000000..258e2f1 --- /dev/null +++ b/warehouse/models.py @@ -0,0 +1,92 @@ +""" +SQLAlchemy ORM Models for Snowflake Data Warehouse +Defines the metrics, dimensions, and facts tables for live data querying. +""" + +from sqlalchemy import Column, Integer, String, Float, DateTime, Text, Index, UniqueConstraint, ForeignKey +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import relationship +from sqlalchemy.sql import func + +Base = declarative_base() + +class Metric(Base): + """Metrics table: Defines available metrics (sales, engagement, etc.)""" + __tablename__ = 'metrics' + + id = Column(Integer, primary_key=True, autoincrement=True) + metric_id = Column(Integer, nullable=False, unique=True, index=True) + metric_name = Column(String(255), nullable=False) + metric_desc = Column(Text) + data_type = Column(String(50), nullable=False) # 'numeric', 'percentage', 'count' + created_at = Column(DateTime(timezone=True), server_default=func.now()) + + # Relationship to facts + facts = relationship("Fact", backref="metric") + + def __repr__(self): + return f"" + +class Dimension(Base): + """Dimensions table: Stores dimensional data (sites, commands, etc.)""" + __tablename__ = 'dimensions' + + id = Column(Integer, primary_key=True, autoincrement=True) + dimension_type = Column(String(100), nullable=False, index=True) # 'site', 'command', etc. + dimension_code = Column(String(100), nullable=False, index=True) + dimension_name = Column(String(255), nullable=False) + dimension_metadata = Column(Text) # JSON string for additional attributes + created_at = Column(DateTime(timezone=True), server_default=func.now()) + + __table_args__ = ( + UniqueConstraint('dimension_type', 'dimension_code', name='unique_dimension'), + Index('idx_dimension_type_code', 'dimension_type', 'dimension_code'), + ) + + # Relationship to fact-dimensions (through fact_dimensions table) + fact_dimensions = relationship("FactDimension", backref="dimension") + + def __repr__(self): + return f"" + +class Fact(Base): + """Facts table: Stores actual measurements""" + __tablename__ = 'facts' + + id = Column(Integer, primary_key=True, autoincrement=True) + metric_id = Column(Integer, ForeignKey('metrics.metric_id'), nullable=False, index=True) + period_type = Column(Integer, nullable=False, index=True) # 1=daily, 2=monthly, 3=quarterly, 4=yearly + period_start = Column(String(8), nullable=False, index=True) # YYYYMMDD format + period_end = Column(String(8), nullable=False, index=True) # YYYYMMDD format + period_key = Column(String(255), nullable=False) + value = Column(Float, nullable=False) + count = Column(Integer, nullable=False) + created_at = Column(DateTime(timezone=True), server_default=func.now()) + + __table_args__ = ( + Index('idx_fact_lookup', 'metric_id', 'period_type', 'period_start', 'period_end'), + Index('idx_fact_metric_period', 'metric_id', 'period_type'), + ) + + # Relationships + fact_dimensions = relationship("FactDimension", backref="fact") + + def __repr__(self): + return f"" + +class FactDimension(Base): + """Junction table for many-to-many relationship between facts and dimensions""" + __tablename__ = 'fact_dimensions' + + id = Column(Integer, primary_key=True, autoincrement=True) + fact_id = Column(Integer, ForeignKey('facts.id'), nullable=False, index=True) + dimension_id = Column(Integer, ForeignKey('dimensions.id'), nullable=False, index=True) + created_at = Column(DateTime(timezone=True), server_default=func.now()) + + __table_args__ = ( + UniqueConstraint('fact_id', 'dimension_id', name='unique_fact_dimension'), + Index('idx_fact_dimension_lookup', 'fact_id', 'dimension_id'), + ) + + def __repr__(self): + return f"" diff --git a/warehouse/query_wrapper.py b/warehouse/query_wrapper.py new file mode 100644 index 0000000..38f842e --- /dev/null +++ b/warehouse/query_wrapper.py @@ -0,0 +1,228 @@ +""" +Query Wrapper for Year-long Data Warehouse +Provides the same API interface as the user's existing warehouse system. +""" + +from typing import List, Dict, Any, cast +from datetime import date +from .db_setup import YearlongDataWarehouse +import logging +import pandas as pd + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +class YearlongWarehouseSession: + """Session wrapper to match existing query interface.""" + + def __init__(self, db_path = None): + if hasattr(db_path, 'get_session'): # If it's a warehouse object + self.warehouse: YearlongDataWarehouse = cast(YearlongDataWarehouse, db_path) + elif db_path is None: + # Use auto-detection + self.warehouse = YearlongDataWarehouse() + elif hasattr(db_path, 'startswith') and db_path.startswith('sqlite://'): + self.warehouse = YearlongDataWarehouse(db_path) + else: + # For direct file paths, convert to proper SQLite URL + import os + db_path = os.path.abspath(db_path) # Get absolute path + self.warehouse = YearlongDataWarehouse(f"sqlite:///{db_path}") + logger.info("Year-long warehouse session initialized") + + def query_facts(self, metric_ids: List[int], group_names: List[str], + period_levels: List[int], exact_date: date) -> Dict[str, Any]: + """Query interface matching the user's existing system.""" + + exact_date_str = exact_date.strftime('%Y%m%d') + + # Query the warehouse using session + session = self.warehouse.get_session() + df = self.warehouse.query_facts( + session=session, + metric_ids=metric_ids, + group_names=group_names, + period_levels=period_levels, + exact_date=exact_date_str + ) + + # Format results to match existing structure + result = {} + + if not df['result']: + logger.warning("No data found for the given query parameters") + return {'result': {}} + + # Process the results dictionary + for metric_id_str, facts in df['result'].items(): + metric_id = int(metric_id_str) + result[metric_id] = { + 'metadata': { + 'metric_name': 'Total Sales', + 'metric_desc': 'Total sales revenue' + } + } + + for fact in facts: + site_id = fact.get('site_id', '') + site_name = fact.get('site_name', '') + command_name = fact.get('command_name', '') + store_format = fact.get('store_format', '') + period_key = fact.get('period_key', '') + value = fact.get('value', 0) + + site_metadata = {} + if command_name: + site_metadata['command_name'] = command_name + if store_format: + site_metadata['store_format'] = store_format + + if site_id and site_id not in result[metric_id]: + result[metric_id][site_id] = { + 'metadata': site_metadata, + 'site_name': site_name, + period_key: value + } + elif site_id: + result[metric_id][site_id][period_key] = value + + # Return in the expected format + return {'result': result} + + def to_dataframe(self, query_result: Dict[str, Any]) -> pd.DataFrame: + """ + Convert query result dictionary to pandas DataFrame. + + Args: + query_result: Result from query_facts method + + Returns: + pandas DataFrame with flattened data + """ + if not query_result.get('result'): + logger.warning("No data in query result to convert to DataFrame") + return pd.DataFrame() + + rows = [] + + # Flatten the nested dictionary structure + for metric_id, metric_data in query_result['result'].items(): + metric_metadata = metric_data.get('metadata', {}) + metric_name = metric_metadata.get('metric_name', f'Metric {metric_id}') + metric_desc = metric_metadata.get('metric_desc', '') + + # Process each site within this metric + for site_key, site_data in metric_data.items(): + if site_key == 'metadata': + continue # Skip metric-level metadata + + site_id = site_key + site_name = site_data.get('site_name', '') + site_metadata = site_data.get('metadata', {}) + command_name = site_metadata.get('command_name', '') + store_format = site_metadata.get('store_format', '') + + # Process each period within this site + for period_key, value in site_data.items(): + if period_key in ['site_name', 'metadata']: + continue # Skip non-period data + + # Create a row for this data point + row = { + 'metric_id': int(metric_id), + 'metric_name': metric_name, + 'metric_description': metric_desc, + 'site_id': site_id, + 'site_name': site_name, + 'command_name': command_name, + 'store_format': store_format, + 'period_key': period_key, + 'value': float(value) if value is not None else 0.0, + 'count': 1 # Default count, can be enhanced later + } + rows.append(row) + + # Create DataFrame from rows + df = pd.DataFrame(rows) + + # Ensure proper data types + df['metric_id'] = df['metric_id'].astype(int) + df['value'] = df['value'].astype(float) + + logger.info(f"Converted query result to DataFrame with {len(df)} rows and {len(df.columns)} columns") + return df + + def query_facts_as_dataframe(self, metric_ids: List[int], group_names: List[str], + period_levels: List[int], exact_date: date) -> pd.DataFrame: + """ + Query facts and return results directly as pandas DataFrame. + + Args: + metric_ids: List of metric IDs to query + group_names: List of site IDs to filter by + period_levels: List of period levels (1=daily, 2=monthly, 4=yearly) + exact_date: Date to query for + + Returns: + pandas DataFrame with query results + """ + query_result = self.query_facts(metric_ids, group_names, period_levels, exact_date) + return self.to_dataframe(query_result) + +# Wrapper functions to match the user's existing interface +def convert_jargons(session: YearlongWarehouseSession, df: Dict[str, Any]) -> Dict[str, Any]: + """Convert and format results (wrapper for compatibility).""" + # This is a pass-through for now, but you can add formatting logic here + return df + +def query_facts(session: YearlongWarehouseSession, metric_ids: List[int], + group_names: List[str], period_levels: List[int], + exact_date: date) -> Dict[str, Any]: + """Main query function to match existing interface.""" + return session.query_facts( + metric_ids=metric_ids, + group_names=group_names, + period_levels=period_levels, + exact_date=exact_date + ) + +# Factory function for creating sessions +def create_session(db_path: str = 'yearlong_warehouse.db') -> YearlongWarehouseSession: + """Create a new warehouse session.""" + return YearlongWarehouseSession(db_path) + +# Sample usage examples +def demo_yearlong_warehouse(): + """Demo function showing how to use the yearlong warehouse.""" + + # Create session + session = create_session('yearlong_warehouse.db') + + # Example 1: What is the total sales of three locations for 2025 + print("=" * 60) + print("Query: What is the total sales of three locations for 2025") + result = query_facts( + session=session, + metric_ids=[1], # Total Sales + group_names=["1100", "5206", "2301"], # Site IDs + period_levels=[4], # Yearly + exact_date=date(2025, 1, 1) + ) + + logger.info(convert_jargons(session=session, df=result)) + + # Example 2: What is the total sales of MCCS for Oct 2024 + print("=" * 60) + print("Query: What is the total sales of MCCS for Oct 2024") + result = query_facts( + session=session, + metric_ids=[1], + group_names=["1100"], + period_levels=[2], # Monthly + exact_date=date(2024, 10, 1) + ) + + logger.info(convert_jargons(session=session, df=result)) + +if __name__ == '__main__': + demo_yearlong_warehouse() diff --git a/warehouse/test_warehouse.py b/warehouse/test_warehouse.py new file mode 100644 index 0000000..83fb24c --- /dev/null +++ b/warehouse/test_warehouse.py @@ -0,0 +1,229 @@ +""" +Test and Demo Script for Year-long Data Warehouse +Demonstrates the warehouse functionality and evaluates the approach. +""" + +import logging +import os +from datetime import date +from warehouse.db_setup import YearlongDataWarehouse +from warehouse.query_wrapper import query_facts, convert_jargons, YearlongWarehouseSession +from typing import NamedTuple + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +class Sites(NamedTuple): + """Site information structure to match user's existing code.""" + site_id: str + site_name: str + command_name: str + store_format: str + +def test_warehouse_creation(): + """Test warehouse database creation.""" + logger.info("Testing warehouse creation...") + try: + # Auto-detect will use SQLite by default since no Snowflake env vars are set + warehouse = YearlongDataWarehouse() + logger.info("✓ Warehouse created successfully") + return warehouse + except Exception as e: + logger.error(f"✗ Warehouse creation failed: {e}") + return None + +def insert_sample_data(warehouse: YearlongDataWarehouse): + """Insert sample data that matches user's example structure.""" + logger.info("Inserting sample data...") + + try: + # Insert sample site dimensions + sites_data = [ + ("1100", "HHM MCX MAIN STORE", "HENDERSON HALL", "MAIN STORE"), + ("5206", "CLM MCX CAMP JOHNSON MARINE MART", "CAMP LEJEUNE", "MARINE MART"), + ("2301", "QUM MCX MARINE MART", "QUANTICO", "MARINE MART") + ] + + for site_id, site_name, command_name, store_format in sites_data: + dimension_id = warehouse.get_dimension_id( + dimension_type='site', + dimension_code=site_id, + dimension_name=site_name, + metadata={ + 'command_name': command_name, + 'store_format': store_format + } + ) + logger.info(f"✓ Created dimension for site {site_id} (ID: {dimension_id})") + + # Insert sample facts (matching user's examples) + sample_facts = [ + # 2025 total sales + (1, [1], 4, "20250101", "20251231", "20250101 to 20251231", 773686.11, 1000), # Site 1100 dimension ID + (1, [2], 4, "20250101", "20251231", "20250101 to 20251231", 616489.05, 850), # Site 5206 dimension ID + (1, [3], 4, "20250101", "20251231", "20250101 to 20251231", 204485.38, 600), # Site 2301 dimension ID + + # Oct 2024 sales + (1, [1], 2, "20241001", "20241031", "20241001 to 20241031", 1138733.48, 500), # Site 1100 dimension ID + ] + + for fact in sample_facts: + warehouse.insert_fact(*fact) + logger.info(f"✓ Inserted fact: {fact}") + + logger.info("Sample data insertion completed successfully") + return True + + except Exception as e: + logger.error(f"✗ Sample data insertion failed: {e}") + return False + +def test_queries(warehouse: YearlongDataWarehouse): + """Test the query functionality with sample data.""" + logger.info("Testing query functionality...") + + # Use the same warehouse for querying + session = YearlongWarehouseSession(warehouse) + + try: + # Test Query 1: Total sales of three locations for 2025 + logger.info("=" * 60) + logger.info("Query 1: What is the total sales of three locations for 2025?") + result1 = query_facts( + session=session, + metric_ids=[1], # Total Sales + group_names=["1100", "5206", "2301"], # Site IDs + period_levels=[4], # Yearly + exact_date=date(2025, 1, 1) + ) + + logger.info(f"Result 1: {convert_jargons(session=session, df=result1)}") + logger.info("✓ Query 1 executed successfully") + + # Test Query 2: Total sales for Oct 2024 + logger.info("=" * 60) + logger.info("Query 2: What is the total sales of MCCS for Oct 2024?") + result2 = query_facts( + session=session, + metric_ids=[1], + group_names=["1100"], + period_levels=[2], # Monthly + exact_date=date(2024, 10, 1) + ) + + logger.info(f"Result 2: {convert_jargons(session=session, df=result2)}") + logger.info("✓ Query 2 executed successfully") + + # Test DataFrame conversion + logger.info("=" * 60) + logger.info("Testing DataFrame conversion functionality...") + + # Convert query result to DataFrame + df_result = session.to_dataframe(result1) + logger.info(f"✓ DataFrame conversion successful: {len(df_result)} rows, {len(df_result.columns)} columns") + logger.info(f"DataFrame columns: {list(df_result.columns)}") + logger.info(f"DataFrame sample:\n{df_result.head()}") + + # Test combined query_as_dataframe method + logger.info("=" * 60) + logger.info("Testing combined query_as_dataframe method...") + df_combined = session.query_facts_as_dataframe([1], ['1100'], [2], date(2024, 10, 1)) + logger.info(f"✓ Combined method successful: {len(df_combined)} rows, {len(df_combined.columns)} columns") + logger.info(f"Combined DataFrame:\n{df_combined}") + + except Exception as e: + logger.error(f"✗ Query testing failed: {e}") + return False + + return True + +# def evaluate_approach(): +# """Evaluate the proposed warehouse approach.""" +# logger.info("=" * 80) +# logger.info("WAREHOUSE APPROACH EVALUATION") +# logger.info("=" * 80) + +# evaluation_points = [ +# ("✅ SQLite Database", "Simple, portable, easy deployment"), +# ("✅ Unified Interface", "Same API as existing system"), +# ("✅ Fast Query Performance", "Indexed for quick aggregations"), +# ("✅ Year-long Data Support", "Handles multiple periods efficiently"), +# ("✅ Multi-data Type Support", "Designed for sales, email, social media"), +# ("✅ LLM Integration Ready", "Structured data perfect for AI analysis"), +# ("✅ Portable Deployment", "Single SQLite file, ready to ship"), +# ("✅ Metadata Preservation", "Site, command, format info maintained"), +# ("✅ Time-based Aggregation", "Supports daily/monthly/quarterly/yearly"), +# ("✅ Extensible Design", "Easy to add new metrics/data types") +# ] + + +def run_complete_test(): + """Run the complete warehouse test suite.""" + logger.info("Starting Year-long Data Warehouse Test Suite...") + + # Clean up previous test database + if os.path.exists('yearlong_warehouse.db'): + os.remove('yearlong_warehouse.db') + logger.info("Removed existing test database") + + # Test 1: Warehouse creation + warehouse = test_warehouse_creation() + if not warehouse: + logger.error("Warehouse creation failed - aborting tests") + return False + + # Test 2: Sample data insertion + if not insert_sample_data(warehouse): + logger.error("Sample data insertion failed - aborting tests") + warehouse.close() + return False + + # Test 3: Query testing (using the same warehouse) + if not test_queries(warehouse): + logger.error("Query testing failed") + warehouse.close() + return False + + warehouse.close() + + logger.info("✓ Year-long Data Warehouse Test Suite completed successfully!") + return True + +def run_sprint1_test(): + """Run Sprint 1 warehouse tests: creation and data insertion.""" + logger.info("Starting Sprint 1: Foundation & Database Core Test Suite...") + + # Test 1: Warehouse creation + warehouse = test_warehouse_creation() + if not warehouse: + logger.error("Sprint 1 failed at warehouse creation") + return False + + # Test 2: Sample data insertion + if not insert_sample_data(warehouse): + logger.error("Sprint 1 failed at sample data insertion") + return False + + logger.info("✓ Sprint 2: Foundation & Database Core completed successfully!") + return True + +if __name__ == '__main__': + import sys + + if len(sys.argv) > 1 and sys.argv[1] == '--complete': + success = run_complete_test() + if success: + print("\n🎉 SUCCESS: Complete Year-long Data Warehouse Test Suite passed!") + + else: + print("\n❌ FAILURE: Complete test suite failed.") + print("Check the logs above for specific error details.") + else: + success = run_sprint1_test() + if success: + print("\n🎉 SUCCESS: Sprint 2 (Foundation & Database Core) implementation is working perfectly!") + + else: + print("\n❌ FAILURE: Sprint 2 test suite failed.") + print("Check the logs above for specific error details.") diff --git a/yearlong_warehouse.db b/yearlong_warehouse.db new file mode 100644 index 0000000..6aadc46 Binary files /dev/null and b/yearlong_warehouse.db differ