-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdata_processor.py
More file actions
156 lines (124 loc) · 5.64 KB
/
data_processor.py
File metadata and controls
156 lines (124 loc) · 5.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import pandas as pd
import io
from typing import Union
import streamlit as st
class DataProcessor:
def __init__(self):
self.supported_formats = ['csv', 'xlsx', 'xls']
def load_file(self, uploaded_file) -> pd.DataFrame:
"""Load and process uploaded file into a pandas DataFrame"""
try:
# Get file extension
file_extension = uploaded_file.name.split('.')[-1].lower()
if file_extension not in self.supported_formats:
raise ValueError(f"Unsupported file format: {file_extension}")
# Read file based on extension
if file_extension == 'csv':
df = self._load_csv(uploaded_file)
elif file_extension in ['xlsx', 'xls']:
df = self._load_excel(uploaded_file)
else:
raise ValueError(f"Unknown file format: {file_extension}")
# Process and clean the data
df = self._clean_data(df)
return df
except Exception as e:
raise Exception(f"Error loading file: {str(e)}")
def _load_csv(self, uploaded_file) -> pd.DataFrame:
"""Load CSV file with error handling"""
try:
# Try different encodings
encodings = ['utf-8', 'latin-1', 'cp1252']
for encoding in encodings:
try:
uploaded_file.seek(0) # Reset file pointer
df = pd.read_csv(uploaded_file, encoding=encoding)
return df
except UnicodeDecodeError:
continue
# If all encodings fail, raise error
raise ValueError("Could not decode CSV file with any supported encoding")
except Exception as e:
raise Exception(f"Error reading CSV file: {str(e)}")
def _load_excel(self, uploaded_file) -> pd.DataFrame:
"""Load Excel file with error handling"""
try:
# Try to read Excel file
df = pd.read_excel(uploaded_file, engine='openpyxl')
return df
except Exception as e:
# Try with xlrd engine for older Excel files
try:
uploaded_file.seek(0)
df = pd.read_excel(uploaded_file, engine='xlrd')
return df
except:
raise Exception(f"Error reading Excel file: {str(e)}")
def _clean_data(self, df: pd.DataFrame) -> pd.DataFrame:
"""Clean and process the data"""
# Make a copy to avoid modifying original
df_clean = df.copy()
# Remove completely empty rows
df_clean = df_clean.dropna(how='all')
# Remove completely empty columns
df_clean = df_clean.dropna(axis=1, how='all')
# Strip whitespace from string columns
string_columns = df_clean.select_dtypes(include=['object']).columns
for col in string_columns:
df_clean[col] = df_clean[col].astype(str).str.strip()
# Replace 'nan' strings with actual NaN
df_clean[col] = df_clean[col].replace('nan', pd.NA)
# Clean column names
df_clean.columns = df_clean.columns.str.strip()
df_clean.columns = df_clean.columns.str.replace('\n', ' ')
df_clean.columns = df_clean.columns.str.replace('\r', ' ')
# Reset index
df_clean = df_clean.reset_index(drop=True)
return df_clean
def get_data_summary(self, df: pd.DataFrame) -> dict:
"""Get summary statistics for the data"""
summary = {
'total_rows': len(df),
'total_columns': len(df.columns),
'column_info': {},
'missing_data': {},
'data_types': {}
}
for col in df.columns:
# Column info
summary['column_info'][col] = {
'non_null_count': df[col].count(),
'null_count': df[col].isnull().sum(),
'unique_count': df[col].nunique(),
'data_type': str(df[col].dtype)
}
# Missing data percentage
missing_pct = (df[col].isnull().sum() / len(df)) * 100
summary['missing_data'][col] = round(missing_pct, 2)
# Data types
summary['data_types'][col] = str(df[col].dtype)
return summary
def validate_data(self, df: pd.DataFrame) -> list:
"""Validate data and return list of issues"""
issues = []
# Check if DataFrame is empty
if df.empty:
issues.append("Dataset is empty")
return issues
# Check for columns with all missing values
all_missing_cols = df.columns[df.isnull().all()].tolist()
if all_missing_cols:
issues.append(f"Columns with all missing values: {', '.join(all_missing_cols)}")
# Check for duplicate columns
duplicate_cols = df.columns[df.columns.duplicated()].tolist()
if duplicate_cols:
issues.append(f"Duplicate column names: {', '.join(duplicate_cols)}")
# Check for columns with very high missing data (>90%)
high_missing_cols = []
for col in df.columns:
missing_pct = (df[col].isnull().sum() / len(df)) * 100
if missing_pct > 90:
high_missing_cols.append(f"{col} ({missing_pct:.1f}%)")
if high_missing_cols:
issues.append(f"Columns with >90% missing data: {', '.join(high_missing_cols)}")
return issues