Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@
import streamlit as st
import pandas as pd
import matplotlib.pyplot as plt
import os

# Load the data
data = pd.read_csv('/Users/huawei/Spaces/pbl/data-analysis/data/convertedcsv/Advertising_Email_Engagement/Advertising_Email_Engagement.xlsx-Email_Engagement_Engagement_Tim.csv', header=0) # Adjust header if necessary
# Load the data using relative path
script_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(os.path.dirname(script_dir)) # Go up 2 levels to project root
data_path = os.path.join(project_root, 'data', 'convertedcsv', 'Advertising_Email_Engagement', 'Advertising_Email_Engagement.xlsx-Email_Engagement_Engagement_Tim.csv')
data = pd.read_csv(data_path, header=0) # Adjust header if necessary
data.columns = data.columns.str.strip()
print(data.columns)

Expand Down Expand Up @@ -64,4 +68,4 @@

# Conclusion
st.subheader('Conclusion')
st.write("This dashboard provides insights into email marketing engagement metrics, including trends in deliveries, unique opens, open rates, click rates, and unsubscribe rates.")
st.write("This dashboard provides insights into email marketing engagement metrics, including trends in deliveries, unique opens, open rates, click rates, and unsubscribe rates.")
178 changes: 125 additions & 53 deletions retail_trends_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from datetime import datetime
import os
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
from dask.diagnostics.progress import ProgressBar
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
Expand All @@ -24,55 +24,127 @@
print("Starting Retail Sales Data Analysis...")
print("Loading data (this may take a few minutes due to the large file size)...")

# Load the data using Dask for better memory management with large files
# Try to read from CSV first, if not available, use the Parquet file
# Try to load data from warehouse first, fallback to direct file loading
import os
try:
print("Attempting to load data from warehouse...")
from warehouse.query_wrapper import create_session
from warehouse.data_ingestion import DataIngestionManager
from datetime import date

# Create warehouse session
session = create_session('yearlong_warehouse.db')

# Check if warehouse has data, if not, try to load from files
try:
# Test query to see if warehouse has data
test_result = session.query_facts([1], ['1100'], [4], date(2025, 1, 1))
if test_result['result']:
print("✅ Warehouse data found, using warehouse for analysis")

# Get all available sites and periods for comprehensive analysis
# For now, we'll query specific periods and combine them
df_2024 = session.query_facts_as_dataframe([1], ['1100', '5206', '2301'], [2], date(2024, 12, 1))
df_2025 = session.query_facts_as_dataframe([1], ['1100', '5206', '2301'], [4], date(2025, 1, 1))

# Combine dataframes
df = pd.concat([df_2024, df_2025], ignore_index=True)

# Convert period_key to datetime for analysis
df['SALE_DATE'] = pd.to_datetime(df['period_key'].str.split(' to ').str[0], format='%Y%m%d')
df['MONTH'] = df['SALE_DATE'].dt.month
df['MONTH_NAME'] = df['SALE_DATE'].dt.month_name()
df['DAY_OF_WEEK'] = df['SALE_DATE'].dt.day_name()
df['HOUR'] = 12 # Default hour since warehouse doesn't store hourly data
df['IS_RETURN'] = False # Default, warehouse has separate return metrics
df['EXTENSION_AMOUNT'] = df['value'] # Map value to extension_amount for compatibility
df['SITE_ID'] = df['site_id']
df['SITE_NAME'] = df['site_name']
df['COMMAND_NAME'] = df['command_name']
df['STORE_FORMAT'] = df['store_format']

warehouse_loaded = True
print(f"Loaded {len(df)} records from warehouse")

else:
print("⚠️ Warehouse found but no data, falling back to file loading...")
raise Exception("No warehouse data")

except Exception as e:
print(f"❌ Warehouse loading failed: {e}")
print("Falling back to direct file loading...")
raise Exception("Warehouse unavailable")

except Exception as warehouse_error:
print(f"❌ Warehouse not available: {warehouse_error}")
print("Loading data directly from files...")

# Fallback to original file loading logic
csv_path = 'data/convertedcsv/MCCS_RetailData.csv'
parquet_path = 'data/rawdata/MCCS_RetailData.parquet'
sample_parquet_path = 'retail_data_sample.parquet'

if os.path.exists(csv_path):
print(f"Reading from CSV file: {csv_path}")
df = dd.read_csv(csv_path,
assume_missing=True,
blocksize="64MB") # Adjust blocksize as needed
elif os.path.exists(parquet_path):
print(f"CSV file not found. Reading from Parquet file: {parquet_path}")
df = dd.read_parquet(parquet_path)
elif os.path.exists(sample_parquet_path):
print(f"Using sample Parquet file: {sample_parquet_path}")
df = dd.read_parquet(sample_parquet_path)
else:
raise FileNotFoundError("Could not find retail data in CSV or Parquet format. Please ensure either file exists.")

# Convert date columns to datetime - using flexible format detection
df['SALE_DATE'] = dd.to_datetime(df['SALE_DATE'], errors='coerce')
df['SALE_DATE_TIME'] = dd.to_datetime(df['SALE_DATE_TIME'], errors='coerce')

# Create a month column for monthly analysis
df['MONTH'] = df['SALE_DATE'].dt.month
df['MONTH_NAME'] = df['SALE_DATE'].dt.month_name()
df['DAY_OF_WEEK'] = df['SALE_DATE'].dt.day_name()
df['HOUR'] = df['SALE_DATE_TIME'].dt.hour

# Create a flag for returns
df['IS_RETURN'] = df['RETURN_IND'] == 'Y'

csv_path = 'data/convertedcsv/MCCS_RetailData.csv'
parquet_path = 'data/rawdata/MCCS_RetailData.parquet'
sample_parquet_path = 'retail_data_sample.parquet'

if os.path.exists(csv_path):
print(f"Reading from CSV file: {csv_path}")
df = dd.read_csv(csv_path,
assume_missing=True,
blocksize="64MB") # Adjust blocksize as needed
elif os.path.exists(parquet_path):
print(f"CSV file not found. Reading from Parquet file: {parquet_path}")
df = dd.read_parquet(parquet_path)
elif os.path.exists(sample_parquet_path):
print(f"Using sample Parquet file: {sample_parquet_path}")
df = dd.read_parquet(sample_parquet_path)
else:
raise FileNotFoundError("Could not find retail data in CSV or Parquet format. Please ensure either file exists.")
# Compute basic statistics
print("Computing basic statistics...")

# Convert date columns to datetime - using flexible format detection
df['SALE_DATE'] = dd.to_datetime(df['SALE_DATE'], errors='coerce')
df['SALE_DATE_TIME'] = dd.to_datetime(df['SALE_DATE_TIME'], errors='coerce')
# Handle warehouse vs file data differently
if 'warehouse_loaded' in locals() and warehouse_loaded:
# Data is already pandas from warehouse
total_sales = df['EXTENSION_AMOUNT'].sum()
total_transactions = 0 # Not available in warehouse summary data
total_items = 0 # Not available in warehouse summary data
total_quantity = df['count'].sum()
return_rate = 0.0 # Not available in current warehouse data

# Create a month column for monthly analysis
df['MONTH'] = df['SALE_DATE'].dt.month
df['MONTH_NAME'] = df['SALE_DATE'].dt.month_name()
df['DAY_OF_WEEK'] = df['SALE_DATE'].dt.day_name()
df['HOUR'] = df['SALE_DATE_TIME'].dt.hour
# Get unique stores and commands
stores = df['SITE_NAME'].unique()
commands = df['COMMAND_NAME'].unique()
store_formats = df['STORE_FORMAT'].unique()

# Create a flag for returns
df['IS_RETURN'] = df['RETURN_IND'] == 'Y'
# For warehouse data, we need to adapt the analysis since we don't have detailed transaction data
print("Note: Using warehouse summary data - some detailed metrics may not be available")

# Compute basic statistics
with ProgressBar():
print("Computing basic statistics...")
# Convert to pandas for easier analysis after aggregation
total_sales = df['EXTENSION_AMOUNT'].sum().compute()
total_transactions = df['SLIP_NO'].nunique().compute()
total_items = df['ITEM_ID'].nunique().compute()
total_quantity = df['QTY'].sum().compute()
return_rate = df['IS_RETURN'].mean().compute() * 100
# Get unique stores and commands
stores = df['SITE_NAME'].unique().compute()
commands = df['COMMAND_NAME'].unique().compute()
store_formats = df['STORE_FORMAT'].unique().compute()
else:
# Data is from files (dask)
with ProgressBar():
# Convert to pandas for easier analysis after aggregation
total_sales = df['EXTENSION_AMOUNT'].sum().compute()
total_transactions = df['SLIP_NO'].nunique().compute()
total_items = df['ITEM_ID'].nunique().compute()
total_quantity = df['QTY'].sum().compute()
return_rate = df['IS_RETURN'].mean().compute() * 100

# Get unique stores and commands
stores = df['SITE_NAME'].unique().compute()
commands = df['COMMAND_NAME'].unique().compute()
store_formats = df['STORE_FORMAT'].unique().compute()

print(f"\nAnalysis Period: Dec 2024 - Jan 2025")
print(f"Total Sales: ${total_sales:,.2f}")
Expand Down Expand Up @@ -319,41 +391,41 @@
f.write(f"Number of Stores: {len(stores)}\n")
f.write(f"Number of Commands: {len(commands)}\n")
f.write(f"Store Formats: {', '.join(store_formats)}\n\n")

f.write("Key Insights:\n")
f.write("-------------\n")

# Top selling day of week
top_day = day_of_week_sales.loc[day_of_week_sales['EXTENSION_AMOUNT'].idxmax()]
f.write(f"1. Highest sales occur on {top_day['DAY_OF_WEEK']}s\n")

# Top selling hour
top_hour = hourly_sales.loc[hourly_sales['EXTENSION_AMOUNT'].idxmax()]
f.write(f"2. Peak sales hour is {int(top_hour['HOUR'])}:00h (24-hour format)\n")

# Top product
top_product = top_products_revenue.iloc[0]
f.write(f"3. Best-selling product by revenue: {top_product['ITEM_DESC']} (${top_product['EXTENSION_AMOUNT']:,.2f})\n")

# Top store
top_store = store_sales.iloc[0]
f.write(f"4. Top performing store: {top_store['SITE_NAME']} (${top_store['EXTENSION_AMOUNT']:,.2f})\n")

# Price status insight
regular_sales = price_status_sales[price_status_sales['PRICE_STATUS'] == 'R']['EXTENSION_AMOUNT'].iloc[0]
promo_sales = price_status_sales[price_status_sales['PRICE_STATUS'] == 'P']['EXTENSION_AMOUNT'].iloc[0] if 'P' in price_status_sales['PRICE_STATUS'].values else 0
markdown_sales = price_status_sales[price_status_sales['PRICE_STATUS'] == 'M']['EXTENSION_AMOUNT'].iloc[0] if 'M' in price_status_sales['PRICE_STATUS'].values else 0

f.write(f"5. Regular-priced items account for ${regular_sales:,.2f} in sales ({regular_sales/total_sales*100:.1f}% of total)\n")
if promo_sales > 0:
f.write(f"6. Promotional items account for ${promo_sales:,.2f} in sales ({promo_sales/total_sales*100:.1f}% of total)\n")
if markdown_sales > 0:
f.write(f"7. Markdown items account for ${markdown_sales:,.2f} in sales ({markdown_sales/total_sales*100:.1f}% of total)\n")

# Transaction insights
f.write(f"8. Average transaction value: ${transaction_value_stats['mean']:,.2f}\n")
f.write(f"9. Average items per transaction: {transaction_size_stats['mean']:.2f}\n")

# Return insights
f.write(f"10. Product with highest return rate: {return_by_product.iloc[0]['ITEM_DESC']} ({return_by_product.iloc[0]['RETURN_RATE']:.2f}%)\n")

Expand Down
Loading