Files
iov_data_analysis_agent/utils/data_quality.py
2026-01-31 18:00:05 +08:00

225 lines
8.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
数据质量检查模块 - 自动评估数据质量并提供改进建议
"""
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple, Any
from dataclasses import dataclass
@dataclass
class QualityIssue:
"""数据质量问题"""
column: str
issue_type: str # missing, duplicate, outlier, type_mismatch等
severity: str # high, medium, low
description: str
suggestion: str
class DataQualityChecker:
"""数据质量检查器"""
def __init__(self, df: pd.DataFrame):
self.df = df
self.issues: List[QualityIssue] = []
self.quality_score: float = 100.0
def check_all(self) -> Dict[str, Any]:
"""执行所有质量检查"""
self.check_missing_values()
self.check_duplicates()
self.check_data_types()
self.check_outliers()
self.check_consistency()
return self.generate_report()
def check_missing_values(self) -> None:
"""检查缺失值"""
for col in self.df.columns:
missing_count = self.df[col].isnull().sum()
missing_ratio = (missing_count / len(self.df)) * 100
if missing_ratio > 50:
severity = "high"
self.quality_score -= 10
elif missing_ratio > 20:
severity = "medium"
self.quality_score -= 5
elif missing_ratio > 0:
severity = "low"
self.quality_score -= 2
else:
continue
issue = QualityIssue(
column=col,
issue_type="missing",
severity=severity,
description=f"'{col}' 存在 {missing_count} 个缺失值 ({missing_ratio:.1f}%)",
suggestion=self._suggest_missing_handling(col, missing_ratio)
)
self.issues.append(issue)
def check_duplicates(self) -> None:
"""检查重复数据"""
duplicate_count = self.df.duplicated().sum()
if duplicate_count > 0:
duplicate_ratio = (duplicate_count / len(self.df)) * 100
severity = "high" if duplicate_ratio > 10 else "medium"
self.quality_score -= 5 if severity == "high" else 3
issue = QualityIssue(
column="全表",
issue_type="duplicate",
severity=severity,
description=f"发现 {duplicate_count} 行重复数据 ({duplicate_ratio:.1f}%)",
suggestion="建议使用 df.drop_duplicates() 删除重复行,或检查是否为合理的重复记录"
)
self.issues.append(issue)
def check_data_types(self) -> None:
"""检查数据类型一致性"""
for col in self.df.columns:
# 检查是否有数值列被识别为object
if self.df[col].dtype == 'object':
try:
# 尝试转换为数值
pd.to_numeric(self.df[col].dropna(), errors='raise')
issue = QualityIssue(
column=col,
issue_type="type_mismatch",
severity="medium",
description=f"'{col}' 当前为文本类型,但可以转换为数值类型",
suggestion=f"建议使用 df['{col}'] = pd.to_numeric(df['{col}']) 转换类型"
)
self.issues.append(issue)
self.quality_score -= 3
except:
pass
def check_outliers(self) -> None:
"""检查数值列的异常值"""
numeric_cols = self.df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
q1 = self.df[col].quantile(0.25)
q3 = self.df[col].quantile(0.75)
iqr = q3 - q1
lower_bound = q1 - 3 * iqr
upper_bound = q3 + 3 * iqr
outliers = self.df[(self.df[col] < lower_bound) | (self.df[col] > upper_bound)]
outlier_count = len(outliers)
if outlier_count > 0:
outlier_ratio = (outlier_count / len(self.df)) * 100
if outlier_ratio > 5:
severity = "medium"
self.quality_score -= 3
else:
severity = "low"
self.quality_score -= 1
issue = QualityIssue(
column=col,
issue_type="outlier",
severity=severity,
description=f"'{col}' 存在 {outlier_count} 个异常值 ({outlier_ratio:.1f}%)",
suggestion=f"建议检查 {lower_bound:.2f} 以下和 {upper_bound:.2f} 以上的值是否合理"
)
self.issues.append(issue)
def check_consistency(self) -> None:
"""检查数据一致性"""
# 检查时间列的时序性
datetime_cols = self.df.select_dtypes(include=['datetime64']).columns
for col in datetime_cols:
if not self.df[col].is_monotonic_increasing:
issue = QualityIssue(
column=col,
issue_type="consistency",
severity="medium",
description=f"时间列 '{col}' 不是单调递增的,可能存在乱序",
suggestion=f"建议使用 df.sort_values('{col}') 进行排序"
)
self.issues.append(issue)
self.quality_score -= 3
def _suggest_missing_handling(self, col: str, missing_ratio: float) -> str:
"""建议缺失值处理方法"""
if missing_ratio > 70:
return f"缺失比例过高,建议删除列 '{col}'"
elif missing_ratio > 30:
return f"建议填充或删除缺失值:使用中位数/众数填充或删除含缺失值的行"
else:
if pd.api.types.is_numeric_dtype(self.df[col]):
return f"建议使用均值/中位数填充df['{col}'].fillna(df['{col}'].median())"
else:
return f"建议使用众数填充df['{col}'].fillna(df['{col}'].mode()[0])"
def generate_report(self) -> Dict[str, Any]:
"""生成质量报告"""
# 确保质量分数在0-100之间
self.quality_score = max(0, min(100, self.quality_score))
# 按严重程度分类
high_issues = [i for i in self.issues if i.severity == "high"]
medium_issues = [i for i in self.issues if i.severity == "medium"]
low_issues = [i for i in self.issues if i.severity == "low"]
return {
"quality_score": round(self.quality_score, 2),
"total_issues": len(self.issues),
"high_severity": len(high_issues),
"medium_severity": len(medium_issues),
"low_severity": len(low_issues),
"issues": self.issues,
"summary": self._generate_summary()
}
def _generate_summary(self) -> str:
"""生成可读的摘要"""
summary = f"## 数据质量报告\n\n"
summary += f"**质量评分**: {self.quality_score:.1f}/100\n\n"
if self.quality_score >= 90:
summary += "[OK] **评级**: 优秀 - 数据质量很好\n\n"
elif self.quality_score >= 75:
summary += "[WARN] **评级**: 良好 - 存在一些小问题\n\n"
elif self.quality_score >= 60:
summary += "[WARN] **评级**: 一般 - 需要处理多个问题\n\n"
else:
summary += "[ERROR] **评级**: 差 - 数据质量问题严重\n\n"
summary += f"**问题统计**: 共 {len(self.issues)} 个质量问题\n"
summary += f"- [RED] 高严重性: {len([i for i in self.issues if i.severity == 'high'])}\n"
summary += f"- [YELLOW] 中严重性: {len([i for i in self.issues if i.severity == 'medium'])}\n"
summary += f"- [GREEN] 低严重性: {len([i for i in self.issues if i.severity == 'low'])}\n\n"
if self.issues:
summary += "### 主要问题:\n\n"
# 只显示高和中严重性的问题
for issue in self.issues:
if issue.severity in ["high", "medium"]:
emoji = "[RED]" if issue.severity == "high" else "[YELLOW]"
summary += f"{emoji} **{issue.column}** - {issue.description}\n"
summary += f" [TIP] {issue.suggestion}\n\n"
return summary
def quick_quality_check(df: pd.DataFrame) -> str:
"""快速数据质量检查"""
checker = DataQualityChecker(df)
report = checker.check_all()
return report['summary']