# -*- coding: utf-8 -*- """ 数据质量检查模块 - 自动评估数据质量并提供改进建议 """ import pandas as pd import numpy as np from typing import Dict, List, Tuple, Any from dataclasses import dataclass @dataclass class QualityIssue: """数据质量问题""" column: str issue_type: str # missing, duplicate, outlier, type_mismatch等 severity: str # high, medium, low description: str suggestion: str class DataQualityChecker: """数据质量检查器""" def __init__(self, df: pd.DataFrame): self.df = df self.issues: List[QualityIssue] = [] self.quality_score: float = 100.0 def check_all(self) -> Dict[str, Any]: """执行所有质量检查""" self.check_missing_values() self.check_duplicates() self.check_data_types() self.check_outliers() self.check_consistency() return self.generate_report() def check_missing_values(self) -> None: """检查缺失值""" for col in self.df.columns: missing_count = self.df[col].isnull().sum() missing_ratio = (missing_count / len(self.df)) * 100 if missing_ratio > 50: severity = "high" self.quality_score -= 10 elif missing_ratio > 20: severity = "medium" self.quality_score -= 5 elif missing_ratio > 0: severity = "low" self.quality_score -= 2 else: continue issue = QualityIssue( column=col, issue_type="missing", severity=severity, description=f"列 '{col}' 存在 {missing_count} 个缺失值 ({missing_ratio:.1f}%)", suggestion=self._suggest_missing_handling(col, missing_ratio) ) self.issues.append(issue) def check_duplicates(self) -> None: """检查重复数据""" duplicate_count = self.df.duplicated().sum() if duplicate_count > 0: duplicate_ratio = (duplicate_count / len(self.df)) * 100 severity = "high" if duplicate_ratio > 10 else "medium" self.quality_score -= 5 if severity == "high" else 3 issue = QualityIssue( column="全表", issue_type="duplicate", severity=severity, description=f"发现 {duplicate_count} 行重复数据 ({duplicate_ratio:.1f}%)", suggestion="建议使用 df.drop_duplicates() 删除重复行,或检查是否为合理的重复记录" ) self.issues.append(issue) def check_data_types(self) -> None: """检查数据类型一致性""" for col in self.df.columns: # 检查是否有数值列被识别为object if self.df[col].dtype == 'object': try: # 尝试转换为数值 pd.to_numeric(self.df[col].dropna(), errors='raise') issue = QualityIssue( column=col, issue_type="type_mismatch", severity="medium", description=f"列 '{col}' 当前为文本类型,但可以转换为数值类型", suggestion=f"建议使用 df['{col}'] = pd.to_numeric(df['{col}']) 转换类型" ) self.issues.append(issue) self.quality_score -= 3 except: pass def check_outliers(self) -> None: """检查数值列的异常值""" numeric_cols = self.df.select_dtypes(include=[np.number]).columns for col in numeric_cols: q1 = self.df[col].quantile(0.25) q3 = self.df[col].quantile(0.75) iqr = q3 - q1 lower_bound = q1 - 3 * iqr upper_bound = q3 + 3 * iqr outliers = self.df[(self.df[col] < lower_bound) | (self.df[col] > upper_bound)] outlier_count = len(outliers) if outlier_count > 0: outlier_ratio = (outlier_count / len(self.df)) * 100 if outlier_ratio > 5: severity = "medium" self.quality_score -= 3 else: severity = "low" self.quality_score -= 1 issue = QualityIssue( column=col, issue_type="outlier", severity=severity, description=f"列 '{col}' 存在 {outlier_count} 个异常值 ({outlier_ratio:.1f}%)", suggestion=f"建议检查 {lower_bound:.2f} 以下和 {upper_bound:.2f} 以上的值是否合理" ) self.issues.append(issue) def check_consistency(self) -> None: """检查数据一致性""" # 检查时间列的时序性 datetime_cols = self.df.select_dtypes(include=['datetime64']).columns for col in datetime_cols: if not self.df[col].is_monotonic_increasing: issue = QualityIssue( column=col, issue_type="consistency", severity="medium", description=f"时间列 '{col}' 不是单调递增的,可能存在乱序", suggestion=f"建议使用 df.sort_values('{col}') 进行排序" ) self.issues.append(issue) self.quality_score -= 3 def _suggest_missing_handling(self, col: str, missing_ratio: float) -> str: """建议缺失值处理方法""" if missing_ratio > 70: return f"缺失比例过高,建议删除列 '{col}'" elif missing_ratio > 30: return f"建议填充或删除缺失值:使用中位数/众数填充或删除含缺失值的行" else: if pd.api.types.is_numeric_dtype(self.df[col]): return f"建议使用均值/中位数填充:df['{col}'].fillna(df['{col}'].median())" else: return f"建议使用众数填充:df['{col}'].fillna(df['{col}'].mode()[0])" def generate_report(self) -> Dict[str, Any]: """生成质量报告""" # 确保质量分数在0-100之间 self.quality_score = max(0, min(100, self.quality_score)) # 按严重程度分类 high_issues = [i for i in self.issues if i.severity == "high"] medium_issues = [i for i in self.issues if i.severity == "medium"] low_issues = [i for i in self.issues if i.severity == "low"] return { "quality_score": round(self.quality_score, 2), "total_issues": len(self.issues), "high_severity": len(high_issues), "medium_severity": len(medium_issues), "low_severity": len(low_issues), "issues": self.issues, "summary": self._generate_summary() } def _generate_summary(self) -> str: """生成可读的摘要""" summary = f"## 数据质量报告\n\n" summary += f"**质量评分**: {self.quality_score:.1f}/100\n\n" if self.quality_score >= 90: summary += "[OK] **评级**: 优秀 - 数据质量很好\n\n" elif self.quality_score >= 75: summary += "[WARN] **评级**: 良好 - 存在一些小问题\n\n" elif self.quality_score >= 60: summary += "[WARN] **评级**: 一般 - 需要处理多个问题\n\n" else: summary += "[ERROR] **评级**: 差 - 数据质量问题严重\n\n" summary += f"**问题统计**: 共 {len(self.issues)} 个质量问题\n" summary += f"- [RED] 高严重性: {len([i for i in self.issues if i.severity == 'high'])} 个\n" summary += f"- [YELLOW] 中严重性: {len([i for i in self.issues if i.severity == 'medium'])} 个\n" summary += f"- [GREEN] 低严重性: {len([i for i in self.issues if i.severity == 'low'])} 个\n\n" if self.issues: summary += "### 主要问题:\n\n" # 只显示高和中严重性的问题 for issue in self.issues: if issue.severity in ["high", "medium"]: emoji = "[RED]" if issue.severity == "high" else "[YELLOW]" summary += f"{emoji} **{issue.column}** - {issue.description}\n" summary += f" [TIP] {issue.suggestion}\n\n" return summary def quick_quality_check(df: pd.DataFrame) -> str: """快速数据质量检查""" checker = DataQualityChecker(df) report = checker.check_all() return report['summary']