Files

225 lines
8.8 KiB
Python
Raw Permalink Normal View History

# -*- coding: utf-8 -*-
"""
数据质量检查模块 - 自动评估数据质量并提供改进建议
"""
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple, Any
from dataclasses import dataclass
@dataclass
class QualityIssue:
"""数据质量问题"""
column: str
issue_type: str # missing, duplicate, outlier, type_mismatch等
severity: str # high, medium, low
description: str
suggestion: str
class DataQualityChecker:
"""数据质量检查器"""
def __init__(self, df: pd.DataFrame):
self.df = df
self.issues: List[QualityIssue] = []
self.quality_score: float = 100.0
def check_all(self) -> Dict[str, Any]:
"""执行所有质量检查"""
self.check_missing_values()
self.check_duplicates()
self.check_data_types()
self.check_outliers()
self.check_consistency()
return self.generate_report()
def check_missing_values(self) -> None:
"""检查缺失值"""
for col in self.df.columns:
missing_count = self.df[col].isnull().sum()
missing_ratio = (missing_count / len(self.df)) * 100
if missing_ratio > 50:
severity = "high"
self.quality_score -= 10
elif missing_ratio > 20:
severity = "medium"
self.quality_score -= 5
elif missing_ratio > 0:
severity = "low"
self.quality_score -= 2
else:
continue
issue = QualityIssue(
column=col,
issue_type="missing",
severity=severity,
description=f"'{col}' 存在 {missing_count} 个缺失值 ({missing_ratio:.1f}%)",
suggestion=self._suggest_missing_handling(col, missing_ratio)
)
self.issues.append(issue)
def check_duplicates(self) -> None:
"""检查重复数据"""
duplicate_count = self.df.duplicated().sum()
if duplicate_count > 0:
duplicate_ratio = (duplicate_count / len(self.df)) * 100
severity = "high" if duplicate_ratio > 10 else "medium"
self.quality_score -= 5 if severity == "high" else 3
issue = QualityIssue(
column="全表",
issue_type="duplicate",
severity=severity,
description=f"发现 {duplicate_count} 行重复数据 ({duplicate_ratio:.1f}%)",
suggestion="建议使用 df.drop_duplicates() 删除重复行,或检查是否为合理的重复记录"
)
self.issues.append(issue)
def check_data_types(self) -> None:
"""检查数据类型一致性"""
for col in self.df.columns:
# 检查是否有数值列被识别为object
if self.df[col].dtype == 'object':
try:
# 尝试转换为数值
pd.to_numeric(self.df[col].dropna(), errors='raise')
issue = QualityIssue(
column=col,
issue_type="type_mismatch",
severity="medium",
description=f"'{col}' 当前为文本类型,但可以转换为数值类型",
suggestion=f"建议使用 df['{col}'] = pd.to_numeric(df['{col}']) 转换类型"
)
self.issues.append(issue)
self.quality_score -= 3
except:
pass
def check_outliers(self) -> None:
"""检查数值列的异常值"""
numeric_cols = self.df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
q1 = self.df[col].quantile(0.25)
q3 = self.df[col].quantile(0.75)
iqr = q3 - q1
lower_bound = q1 - 3 * iqr
upper_bound = q3 + 3 * iqr
outliers = self.df[(self.df[col] < lower_bound) | (self.df[col] > upper_bound)]
outlier_count = len(outliers)
if outlier_count > 0:
outlier_ratio = (outlier_count / len(self.df)) * 100
if outlier_ratio > 5:
severity = "medium"
self.quality_score -= 3
else:
severity = "low"
self.quality_score -= 1
issue = QualityIssue(
column=col,
issue_type="outlier",
severity=severity,
description=f"'{col}' 存在 {outlier_count} 个异常值 ({outlier_ratio:.1f}%)",
suggestion=f"建议检查 {lower_bound:.2f} 以下和 {upper_bound:.2f} 以上的值是否合理"
)
self.issues.append(issue)
def check_consistency(self) -> None:
"""检查数据一致性"""
# 检查时间列的时序性
datetime_cols = self.df.select_dtypes(include=['datetime64']).columns
for col in datetime_cols:
if not self.df[col].is_monotonic_increasing:
issue = QualityIssue(
column=col,
issue_type="consistency",
severity="medium",
description=f"时间列 '{col}' 不是单调递增的,可能存在乱序",
suggestion=f"建议使用 df.sort_values('{col}') 进行排序"
)
self.issues.append(issue)
self.quality_score -= 3
def _suggest_missing_handling(self, col: str, missing_ratio: float) -> str:
"""建议缺失值处理方法"""
if missing_ratio > 70:
return f"缺失比例过高,建议删除列 '{col}'"
elif missing_ratio > 30:
return f"建议填充或删除缺失值:使用中位数/众数填充或删除含缺失值的行"
else:
if pd.api.types.is_numeric_dtype(self.df[col]):
return f"建议使用均值/中位数填充df['{col}'].fillna(df['{col}'].median())"
else:
return f"建议使用众数填充df['{col}'].fillna(df['{col}'].mode()[0])"
def generate_report(self) -> Dict[str, Any]:
"""生成质量报告"""
# 确保质量分数在0-100之间
self.quality_score = max(0, min(100, self.quality_score))
# 按严重程度分类
high_issues = [i for i in self.issues if i.severity == "high"]
medium_issues = [i for i in self.issues if i.severity == "medium"]
low_issues = [i for i in self.issues if i.severity == "low"]
return {
"quality_score": round(self.quality_score, 2),
"total_issues": len(self.issues),
"high_severity": len(high_issues),
"medium_severity": len(medium_issues),
"low_severity": len(low_issues),
"issues": self.issues,
"summary": self._generate_summary()
}
def _generate_summary(self) -> str:
"""生成可读的摘要"""
summary = f"## 数据质量报告\n\n"
summary += f"**质量评分**: {self.quality_score:.1f}/100\n\n"
if self.quality_score >= 90:
2026-01-31 18:00:05 +08:00
summary += "[OK] **评级**: 优秀 - 数据质量很好\n\n"
elif self.quality_score >= 75:
2026-01-31 18:00:05 +08:00
summary += "[WARN] **评级**: 良好 - 存在一些小问题\n\n"
elif self.quality_score >= 60:
2026-01-31 18:00:05 +08:00
summary += "[WARN] **评级**: 一般 - 需要处理多个问题\n\n"
else:
2026-01-31 18:00:05 +08:00
summary += "[ERROR] **评级**: 差 - 数据质量问题严重\n\n"
summary += f"**问题统计**: 共 {len(self.issues)} 个质量问题\n"
2026-01-31 18:00:05 +08:00
summary += f"- [RED] 高严重性: {len([i for i in self.issues if i.severity == 'high'])}\n"
summary += f"- [YELLOW] 中严重性: {len([i for i in self.issues if i.severity == 'medium'])}\n"
summary += f"- [GREEN] 低严重性: {len([i for i in self.issues if i.severity == 'low'])}\n\n"
if self.issues:
summary += "### 主要问题:\n\n"
# 只显示高和中严重性的问题
for issue in self.issues:
if issue.severity in ["high", "medium"]:
2026-01-31 18:00:05 +08:00
emoji = "[RED]" if issue.severity == "high" else "[YELLOW]"
summary += f"{emoji} **{issue.column}** - {issue.description}\n"
2026-01-31 18:00:05 +08:00
summary += f" [TIP] {issue.suggestion}\n\n"
return summary
def quick_quality_check(df: pd.DataFrame) -> str:
"""快速数据质量检查"""
checker = DataQualityChecker(df)
report = checker.check_all()
return report['summary']