225 lines
8.8 KiB
Python
225 lines
8.8 KiB
Python
|
|
# -*- coding: utf-8 -*-
|
|||
|
|
"""
|
|||
|
|
数据质量检查模块 - 自动评估数据质量并提供改进建议
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import pandas as pd
|
|||
|
|
import numpy as np
|
|||
|
|
from typing import Dict, List, Tuple, Any
|
|||
|
|
from dataclasses import dataclass
|
|||
|
|
|
|||
|
|
|
|||
|
|
@dataclass
|
|||
|
|
class QualityIssue:
|
|||
|
|
"""数据质量问题"""
|
|||
|
|
column: str
|
|||
|
|
issue_type: str # missing, duplicate, outlier, type_mismatch等
|
|||
|
|
severity: str # high, medium, low
|
|||
|
|
description: str
|
|||
|
|
suggestion: str
|
|||
|
|
|
|||
|
|
|
|||
|
|
class DataQualityChecker:
|
|||
|
|
"""数据质量检查器"""
|
|||
|
|
|
|||
|
|
def __init__(self, df: pd.DataFrame):
|
|||
|
|
self.df = df
|
|||
|
|
self.issues: List[QualityIssue] = []
|
|||
|
|
self.quality_score: float = 100.0
|
|||
|
|
|
|||
|
|
def check_all(self) -> Dict[str, Any]:
|
|||
|
|
"""执行所有质量检查"""
|
|||
|
|
self.check_missing_values()
|
|||
|
|
self.check_duplicates()
|
|||
|
|
self.check_data_types()
|
|||
|
|
self.check_outliers()
|
|||
|
|
self.check_consistency()
|
|||
|
|
|
|||
|
|
return self.generate_report()
|
|||
|
|
|
|||
|
|
def check_missing_values(self) -> None:
|
|||
|
|
"""检查缺失值"""
|
|||
|
|
for col in self.df.columns:
|
|||
|
|
missing_count = self.df[col].isnull().sum()
|
|||
|
|
missing_ratio = (missing_count / len(self.df)) * 100
|
|||
|
|
|
|||
|
|
if missing_ratio > 50:
|
|||
|
|
severity = "high"
|
|||
|
|
self.quality_score -= 10
|
|||
|
|
elif missing_ratio > 20:
|
|||
|
|
severity = "medium"
|
|||
|
|
self.quality_score -= 5
|
|||
|
|
elif missing_ratio > 0:
|
|||
|
|
severity = "low"
|
|||
|
|
self.quality_score -= 2
|
|||
|
|
else:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
issue = QualityIssue(
|
|||
|
|
column=col,
|
|||
|
|
issue_type="missing",
|
|||
|
|
severity=severity,
|
|||
|
|
description=f"列 '{col}' 存在 {missing_count} 个缺失值 ({missing_ratio:.1f}%)",
|
|||
|
|
suggestion=self._suggest_missing_handling(col, missing_ratio)
|
|||
|
|
)
|
|||
|
|
self.issues.append(issue)
|
|||
|
|
|
|||
|
|
def check_duplicates(self) -> None:
|
|||
|
|
"""检查重复数据"""
|
|||
|
|
duplicate_count = self.df.duplicated().sum()
|
|||
|
|
if duplicate_count > 0:
|
|||
|
|
duplicate_ratio = (duplicate_count / len(self.df)) * 100
|
|||
|
|
|
|||
|
|
severity = "high" if duplicate_ratio > 10 else "medium"
|
|||
|
|
self.quality_score -= 5 if severity == "high" else 3
|
|||
|
|
|
|||
|
|
issue = QualityIssue(
|
|||
|
|
column="全表",
|
|||
|
|
issue_type="duplicate",
|
|||
|
|
severity=severity,
|
|||
|
|
description=f"发现 {duplicate_count} 行重复数据 ({duplicate_ratio:.1f}%)",
|
|||
|
|
suggestion="建议使用 df.drop_duplicates() 删除重复行,或检查是否为合理的重复记录"
|
|||
|
|
)
|
|||
|
|
self.issues.append(issue)
|
|||
|
|
|
|||
|
|
def check_data_types(self) -> None:
|
|||
|
|
"""检查数据类型一致性"""
|
|||
|
|
for col in self.df.columns:
|
|||
|
|
# 检查是否有数值列被识别为object
|
|||
|
|
if self.df[col].dtype == 'object':
|
|||
|
|
try:
|
|||
|
|
# 尝试转换为数值
|
|||
|
|
pd.to_numeric(self.df[col].dropna(), errors='raise')
|
|||
|
|
|
|||
|
|
issue = QualityIssue(
|
|||
|
|
column=col,
|
|||
|
|
issue_type="type_mismatch",
|
|||
|
|
severity="medium",
|
|||
|
|
description=f"列 '{col}' 当前为文本类型,但可以转换为数值类型",
|
|||
|
|
suggestion=f"建议使用 df['{col}'] = pd.to_numeric(df['{col}']) 转换类型"
|
|||
|
|
)
|
|||
|
|
self.issues.append(issue)
|
|||
|
|
self.quality_score -= 3
|
|||
|
|
except:
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
def check_outliers(self) -> None:
|
|||
|
|
"""检查数值列的异常值"""
|
|||
|
|
numeric_cols = self.df.select_dtypes(include=[np.number]).columns
|
|||
|
|
|
|||
|
|
for col in numeric_cols:
|
|||
|
|
q1 = self.df[col].quantile(0.25)
|
|||
|
|
q3 = self.df[col].quantile(0.75)
|
|||
|
|
iqr = q3 - q1
|
|||
|
|
|
|||
|
|
lower_bound = q1 - 3 * iqr
|
|||
|
|
upper_bound = q3 + 3 * iqr
|
|||
|
|
|
|||
|
|
outliers = self.df[(self.df[col] < lower_bound) | (self.df[col] > upper_bound)]
|
|||
|
|
outlier_count = len(outliers)
|
|||
|
|
|
|||
|
|
if outlier_count > 0:
|
|||
|
|
outlier_ratio = (outlier_count / len(self.df)) * 100
|
|||
|
|
|
|||
|
|
if outlier_ratio > 5:
|
|||
|
|
severity = "medium"
|
|||
|
|
self.quality_score -= 3
|
|||
|
|
else:
|
|||
|
|
severity = "low"
|
|||
|
|
self.quality_score -= 1
|
|||
|
|
|
|||
|
|
issue = QualityIssue(
|
|||
|
|
column=col,
|
|||
|
|
issue_type="outlier",
|
|||
|
|
severity=severity,
|
|||
|
|
description=f"列 '{col}' 存在 {outlier_count} 个异常值 ({outlier_ratio:.1f}%)",
|
|||
|
|
suggestion=f"建议检查 {lower_bound:.2f} 以下和 {upper_bound:.2f} 以上的值是否合理"
|
|||
|
|
)
|
|||
|
|
self.issues.append(issue)
|
|||
|
|
|
|||
|
|
def check_consistency(self) -> None:
|
|||
|
|
"""检查数据一致性"""
|
|||
|
|
# 检查时间列的时序性
|
|||
|
|
datetime_cols = self.df.select_dtypes(include=['datetime64']).columns
|
|||
|
|
|
|||
|
|
for col in datetime_cols:
|
|||
|
|
if not self.df[col].is_monotonic_increasing:
|
|||
|
|
issue = QualityIssue(
|
|||
|
|
column=col,
|
|||
|
|
issue_type="consistency",
|
|||
|
|
severity="medium",
|
|||
|
|
description=f"时间列 '{col}' 不是单调递增的,可能存在乱序",
|
|||
|
|
suggestion=f"建议使用 df.sort_values('{col}') 进行排序"
|
|||
|
|
)
|
|||
|
|
self.issues.append(issue)
|
|||
|
|
self.quality_score -= 3
|
|||
|
|
|
|||
|
|
def _suggest_missing_handling(self, col: str, missing_ratio: float) -> str:
|
|||
|
|
"""建议缺失值处理方法"""
|
|||
|
|
if missing_ratio > 70:
|
|||
|
|
return f"缺失比例过高,建议删除列 '{col}'"
|
|||
|
|
elif missing_ratio > 30:
|
|||
|
|
return f"建议填充或删除缺失值:使用中位数/众数填充或删除含缺失值的行"
|
|||
|
|
else:
|
|||
|
|
if pd.api.types.is_numeric_dtype(self.df[col]):
|
|||
|
|
return f"建议使用均值/中位数填充:df['{col}'].fillna(df['{col}'].median())"
|
|||
|
|
else:
|
|||
|
|
return f"建议使用众数填充:df['{col}'].fillna(df['{col}'].mode()[0])"
|
|||
|
|
|
|||
|
|
def generate_report(self) -> Dict[str, Any]:
|
|||
|
|
"""生成质量报告"""
|
|||
|
|
# 确保质量分数在0-100之间
|
|||
|
|
self.quality_score = max(0, min(100, self.quality_score))
|
|||
|
|
|
|||
|
|
# 按严重程度分类
|
|||
|
|
high_issues = [i for i in self.issues if i.severity == "high"]
|
|||
|
|
medium_issues = [i for i in self.issues if i.severity == "medium"]
|
|||
|
|
low_issues = [i for i in self.issues if i.severity == "low"]
|
|||
|
|
|
|||
|
|
return {
|
|||
|
|
"quality_score": round(self.quality_score, 2),
|
|||
|
|
"total_issues": len(self.issues),
|
|||
|
|
"high_severity": len(high_issues),
|
|||
|
|
"medium_severity": len(medium_issues),
|
|||
|
|
"low_severity": len(low_issues),
|
|||
|
|
"issues": self.issues,
|
|||
|
|
"summary": self._generate_summary()
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
def _generate_summary(self) -> str:
|
|||
|
|
"""生成可读的摘要"""
|
|||
|
|
summary = f"## 数据质量报告\n\n"
|
|||
|
|
summary += f"**质量评分**: {self.quality_score:.1f}/100\n\n"
|
|||
|
|
|
|||
|
|
if self.quality_score >= 90:
|
|||
|
|
summary += "✅ **评级**: 优秀 - 数据质量很好\n\n"
|
|||
|
|
elif self.quality_score >= 75:
|
|||
|
|
summary += "⚠️ **评级**: 良好 - 存在一些小问题\n\n"
|
|||
|
|
elif self.quality_score >= 60:
|
|||
|
|
summary += "⚠️ **评级**: 一般 - 需要处理多个问题\n\n"
|
|||
|
|
else:
|
|||
|
|
summary += "❌ **评级**: 差 - 数据质量问题严重\n\n"
|
|||
|
|
|
|||
|
|
summary += f"**问题统计**: 共 {len(self.issues)} 个质量问题\n"
|
|||
|
|
summary += f"- 🔴 高严重性: {len([i for i in self.issues if i.severity == 'high'])} 个\n"
|
|||
|
|
summary += f"- 🟡 中严重性: {len([i for i in self.issues if i.severity == 'medium'])} 个\n"
|
|||
|
|
summary += f"- 🟢 低严重性: {len([i for i in self.issues if i.severity == 'low'])} 个\n\n"
|
|||
|
|
|
|||
|
|
if self.issues:
|
|||
|
|
summary += "### 主要问题:\n\n"
|
|||
|
|
# 只显示高和中严重性的问题
|
|||
|
|
for issue in self.issues:
|
|||
|
|
if issue.severity in ["high", "medium"]:
|
|||
|
|
emoji = "🔴" if issue.severity == "high" else "🟡"
|
|||
|
|
summary += f"{emoji} **{issue.column}** - {issue.description}\n"
|
|||
|
|
summary += f" 💡 {issue.suggestion}\n\n"
|
|||
|
|
|
|||
|
|
return summary
|
|||
|
|
|
|||
|
|
|
|||
|
|
def quick_quality_check(df: pd.DataFrame) -> str:
|
|||
|
|
"""快速数据质量检查"""
|
|||
|
|
checker = DataQualityChecker(df)
|
|||
|
|
report = checker.check_all()
|
|||
|
|
return report['summary']
|