import pandas as pd import requests import json from datetime import datetime from sqlalchemy import create_engine from urllib.parse import quote_plus import logging import time # 新增用于消息重试延迟 # ================== 飞书配置 ================== # 飞书应用凭证 APP_ID = "cli_a8b50ec0eed1500d" APP_SECRET = "ccxkE7ZCFQZcwkkM1rLy0ccZRXYsT2xK" # 飞书多维表格配置 SPREADSHEET_ID = "MIqssvuB3h3XmgtkI26cBX7Angc" # 表格ID SHEET_NAME = "46ea20" # 工作表名称 # 飞书机器人Webhook WEBHOOK_URL = "https://open.feishu.cn/open-apis/bot/v2/hook/3bc45247-c469-47a0-bfe9-c62e46c5aca0" # ================== 数据库配置 ================== DB_CONFIG = { 'host': 'eutsp-prod.mysql.germany.rds.aliyuncs.com', 'port': 3306, 'user': 'international_tsp_eu_r', 'password': 'ZXhBgo1TB2XbF3kP', 'database': 'chery_international_tsp_eu' } # ================== 其他配置 ================== # 创建SQLAlchemy引擎(解决pandas警告问题) safe_password = quote_plus(DB_CONFIG['password']) engine = create_engine( f"mysql+pymysql://{DB_CONFIG['user']}:{safe_password}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}", pool_recycle=3600, pool_pre_ping=True, pool_size=5, max_overflow=10 ) # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # ================== 功能函数 ================== def get_access_token(): """获取飞书访问令牌""" api_token_url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal/" api_post_data = {"app_id": APP_ID, "app_secret": APP_SECRET} try: response = requests.post(api_token_url, json=api_post_data, timeout=10) response.raise_for_status() if response.json().get("code") == 0: return response.json()["tenant_access_token"] else: raise Exception(f"获取Token失败: {response.text}") except Exception as e: logging.error(f"获取飞书Token异常: {str(e)}") raise def overwrite_data(spreadsheet_token, values, access_token=None): """覆盖写入数据到飞书多维表格""" if not access_token: access_token = get_access_token() url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{spreadsheet_token}/values" headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json; charset=utf-8", } row_count = len(values) + 1 range_str = f"{SHEET_NAME}!A1:B{row_count}" # 使用配置的工作表名称 post_data = { "valueRange": { "range": range_str, "values": values } } try: response = requests.put(url, json=post_data, headers=headers, timeout=15) response.raise_for_status() return response except Exception as e: logging.error(f"写入飞书表格失败: {str(e)}") raise def fetch_data_from_db(): """从数据库执行SQL查询获取数据""" try: sql_query = """ SELECT v_vehicle_info.VIN AS car_vins, CASE v_sim_info.STATE WHEN 2 THEN 'active' ELSE 'no_active' END AS active_status, v_vehicle_info.MATERIAL_ID as MATERIAL, v_vehicle_info.SALES_TERRITORY as Area, v_sim_info.ACTIVATION_DATE as active_date FROM v_vehicle_info JOIN v_tbox_info USING (VIN) JOIN v_sim_info USING (SN); """ return pd.read_sql(sql_query, engine) except Exception as e: logging.error(f"数据库查询失败: {str(e)}") raise def send_feishu_message(webhook_url, message, spreadsheet_token=None, retry=2): """ 通过Webhook发送飞书消息,支持添加多维表格链接 :param webhook_url: 机器人Webhook地址 :param message: 要发送的文本内容 :param spreadsheet_token: 多维表格ID(可选) :param retry: 重试次数 """ headers = {'Content-Type': 'application/json'} # 如果有提供多维表格ID,添加表格链接按钮 if spreadsheet_token: spreadsheet_url = f"https://my-ichery.feishu.cn/sheets/{spreadsheet_token}" payload = { "msg_type": "interactive", "card": { "config": {"wide_screen_mode": True}, "elements": [ { "tag": "div", "text": {"content": message, "tag": "lark_md"} }, { "actions": [{ "tag": "button", "text": {"content": "查看多维表格", "tag": "plain_text"}, "type": "primary", "url": spreadsheet_url }], "tag": "action" } ], "header": { "title": {"content": "数据同步完成通知", "tag": "plain_text"}, "template": "green" } } } else: payload = {"msg_type": "text", "content": {"text": message}} for attempt in range(retry): try: response = requests.post(webhook_url, headers=headers, data=json.dumps(payload), timeout=10) response.raise_for_status() return response.json() except Exception as e: logging.warning(f"飞书消息发送失败(尝试 {attempt + 1}/{retry}): {str(e)}") if attempt < retry - 1: time.sleep(2) # 等待后重试 raise Exception(f"飞书消息发送失败,重试{retry}次后仍不成功") # ================== 主程序 ================== if __name__ == '__main__': try: logging.info("开始数据同步流程...") # 1. 从数据库获取数据 df = fetch_data_from_db() logging.info(f"成功获取{len(df)}条车辆数据") # 2. 处理数据 df['active_date'] = pd.to_datetime(df['active_date'], errors='coerce') active_df = df[df['active_status'] == 'active'] logging.info(f"有效激活数据: {len(active_df)}条") # 3. 按天统计激活数 daily_count = active_df.groupby(active_df['active_date'].dt.date).size().reset_index(name='active_count') values = [[str(d), str(c)] for d, c in daily_count.values.tolist()] # 确保值为字符串类型 logging.info(f"生成{len(values)}条日期统计记录") # 4. 覆盖写入多维表格 response = overwrite_data(SPREADSHEET_ID, values) if response.status_code == 200: logging.info("数据成功写入飞书多维表格") # 5. 构建通知消息 current_time = datetime.now().strftime("%Y-%m-%d %H:%M") today_count = daily_count[daily_count['active_date'] == datetime.now().date()] weekly_count = daily_count[daily_count['active_date'] >= (datetime.now() - pd.Timedelta(days=7)).date()] stats_summary = f"• 今日激活车辆: {today_count['active_count'].values[0] if not today_count.empty else 0}\n" stats_summary += f"• 本周激活车辆: {weekly_count['active_count'].sum()}\n" stats_summary += f"• 累计激活车辆: {len(active_df)}" message = f"🚗 车辆数据同步完成 ✅\n" message += f"⏰ 时间: {current_time}\n" message += f"📊 统计摘要:\n{stats_summary}\n" # 6. 发送包含多维表格链接的通知 result = send_feishu_message(WEBHOOK_URL, message, SPREADSHEET_ID) logging.info(f"飞书消息发送成功: {result}") else: error_msg = f"表格写入失败: {response.status_code} - {response.text}" logging.error(error_msg) error_message = f"❌ 数据同步失败\n{error_msg}" send_feishu_message(WEBHOOK_URL, error_message) except Exception as e: error_message = f"❌ 数据同步异常: {str(e)}" logging.exception("程序执行异常") send_feishu_message(WEBHOOK_URL, error_message)