import pandas as pd import requests import logging import time import json from sqlalchemy import create_engine from urllib.parse import quote_plus from datetime import datetime # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s %(levelname)s %(name)s %(message)s' ) # ====================== 全局配置 ====================== # 数据库连接配置 DB_CONFIG = { 'host': 'eutsp-prod.mysql.germany.rds.aliyuncs.com', 'port': 3306, 'user': 'international_tsp_eu_r', 'password': 'ZXhBgo1TB2XbF3kP', 'database': 'chery_international_tsp_eu' } # 飞书多维表格配置 (多市场表格配置) FEISHU_APP_TOKEN = "T3QLbmpqma014ussjy9cgitqnSh" # 应用Token (恢复此关键变量) MARKET_TABLES = [ {"name": "欧洲所有", "table_id": "tbloPJHLc05MHIAY", "view_id": "vewBvcYDAa"}, {"name": "意大利", "table_id": "tbl4ZXTTgcPvdZUD", "view_id": "vewBvcYDAa"}, {"name": "西班牙", "table_id": "tbl0KBWQT1ZqiT2g", "view_id": "vewBvcYDAa"}, {"name": "英国", "table_id": "tblbfEHrsojQ4D5w", "view_id": "vewBvcYDAa"}, {"name": "比利时", "table_id": "tblYN3eEEumBMlgB", "view_id": "vewBvcYDAa"} ] # 通用字段映射 VIN_FIELD_NAME = "车架号" ACTIVE_FIELD_NAME = "激活状态" VEHICLE_NAME_FIELD = "车型" # 飞书应用凭证 APP_ID = "cli_a8b50ec0eed1500d" APP_SECRET = "ccxkE7ZCFQZcwkkM1rLy0ccZRXYsT2xK" # 飞书机器人配置 WEBHOOK_URL = "https://open.feishu.cn/open-apis/bot/v2/hook/3bc45247-c469-47a0-bfe9-c62e46c5aca0" MULTI_TABLE_URL = "https://my-ichery.feishu.cn/base/T3QLbmpqma014ussjy9cgitqnSh?from=from_copylink" # ====================== 通用函数 ====================== def get_access_token(): """获取飞书访问令牌""" api_token_url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal/" api_post_data = {"app_id": APP_ID, "app_secret": APP_SECRET} try: response = requests.post(api_token_url, json=api_post_data, timeout=10) response.raise_for_status() resp_data = response.json() if resp_data.get("code") == 0: return resp_data["tenant_access_token"] else: raise Exception(f"获取Token失败: {resp_data.get('msg')}") except Exception as e: logging.error(f"获取飞书Token异常: {e}") raise def send_feishu_message(message): """发送飞书通知""" headers = {'Content-Type': 'application/json'} payload = { "msg_type": "interactive", "card": { "config": {"wide_screen_mode": True}, "elements": [ { "tag": "div", "text": {"content": message, "tag": "lark_md"} }, { "tag": "action", "actions": [ { "tag": "button", "text": {"content": "查看多维表格", "tag": "plain_text"}, "type": "primary", "url": MULTI_TABLE_URL } ] } ], "header": { "title": {"content": "车辆状态同步报告", "tag": "plain_text"}, "template": "blue" } } } try: response = requests.post(WEBHOOK_URL, headers=headers, json=payload, timeout=10) response.raise_for_status() return response.json() except Exception as e: logging.error(f"飞书消息发送失败: {str(e)}") return None def read_feishu_records(app_token, table_id, view_id, retry=3): """从飞书多维表格读取车辆记录""" access_token = get_access_token() headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json; charset=utf-8", } url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{app_token}/tables/{table_id}/records" params = { "page_size": 500, "view_id": view_id # 添加视图ID参数 } all_records = [] page_token = None while True: if page_token: params["page_token"] = page_token for attempt in range(retry): try: response = requests.get(url, headers=headers, params=params, timeout=30) response.raise_for_status() resp_data = response.json() if resp_data.get("code") != 0: raise Exception(f"API错误: {resp_data.get('msg')}") records = resp_data["data"]["items"] all_records.extend(records) has_more = resp_data["data"].get("has_more") page_token = resp_data["data"].get("page_token") if has_more else None break except Exception as e: logging.warning(f"读取多维表格失败(尝试 {attempt + 1}/{retry}): {e}") time.sleep(2 ** attempt) else: raise Exception("无法读取多维表格数据") if not page_token: break return all_records def fetch_activation_data(): """从数据库查询车辆激活状态和车型""" try: safe_password = quote_plus(DB_CONFIG['password']) engine = create_engine( f"mysql+pymysql://{DB_CONFIG['user']}:{safe_password}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}", pool_recycle=3600 ) sql_query = """ SELECT v.VIN AS car_vins, CASE s.STATE WHEN 2 THEN 'active' ELSE 'no_active' END AS active_status, iv.`NAME` AS `车型` FROM v_vehicle_info v JOIN v_tbox_info t USING (VIN) JOIN v_sim_info s USING (SN) LEFT JOIN v_material_info m ON m.MATERIAL_ID = v.MATERIAL_ID LEFT JOIN v_outside_vehicle_type ov ON ov.TYPE_CODE = m.OUTSIDE_TYPE_CODE LEFT JOIN v_inside_vehicle_type iv ON iv.TYPE_CODE = ov.INSIDE_TYPE_CODE """ df = pd.read_sql(sql_query, engine) df = df.drop_duplicates(subset=['car_vins'], keep='first') expected = ['car_vins', 'active_status', VEHICLE_NAME_FIELD] missing = [c for c in expected if c not in df.columns] if missing: logging.error(f"查询结果缺少列: {missing}") raise KeyError(f"Missing columns: {missing}") df2 = df.set_index('car_vins')[['active_status', VEHICLE_NAME_FIELD]] return df2.to_dict(orient='index') except Exception as e: logging.error(f"数据库查询失败: {e}") raise def compare_and_update_records(app_token, table_id, view_id, activation_data): """比较并更新飞书多维表格记录""" access_token = get_access_token() headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json; charset=utf-8", } feishu_records = read_feishu_records(app_token, table_id, view_id) logging.info(f"成功读取{len(feishu_records)}条多维表格记录") records_to_update = [] activated_count = not_activated_count = 0 for record in feishu_records: record_id = record.get("record_id") fields = record.get("fields", {}) vin = fields.get(VIN_FIELD_NAME) if not vin: logging.warning(f"记录 {record_id} 缺少VIN字段") continue db = activation_data.get(vin) if not db: logging.warning(f"VIN {vin} 在数据库中未找到") continue db_status = db['active_status'] db_name = db[VEHICLE_NAME_FIELD] current_status = fields.get(ACTIVE_FIELD_NAME, "").lower() current_name = fields.get(VEHICLE_NAME_FIELD, "") if db_status != current_status or db_name != current_name: update_fields = { ACTIVE_FIELD_NAME: db_status, VEHICLE_NAME_FIELD: db_name } records_to_update.append({ "record_id": record_id, "fields": update_fields }) logging.debug(f"VIN {vin} 需要更新: 状态 {current_status}→{db_status}, 车型 {current_name}→{db_name}") if db_status == "active": activated_count += 1 else: not_activated_count += 1 total_updated = len(records_to_update) logging.info(f"需要更新 {total_updated} 条记录(激活:{activated_count},未激活:{not_activated_count})") # 批量更新 batch_size = 50 for i in range(0, total_updated, batch_size): batch = records_to_update[i:i + batch_size] payload = {"records": batch} url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{app_token}/tables/{table_id}/records/batch_update" for attempt in range(3): try: r = requests.post(url, headers=headers, json=payload, timeout=30) data = r.json() if r.status_code == 200 and data.get("code") == 0: logging.info(f"第{i // batch_size + 1}批更新成功,{len(batch)}条") break else: logging.warning(f"第{i // batch_size + 1}批更新失败(尝试{attempt + 1}/3):{data.get('msg')}") except Exception as e: logging.warning(f"网络异常(尝试{attempt + 1}/3):{e}") time.sleep(2 ** attempt) return total_updated, activated_count, not_activated_count # ====================== 多表格处理流程 ====================== def sync_all_markets(): """执行所有市场的车辆激活状态同步""" start_time = time.time() all_results = [] try: # 一次性获取所有激活数据(避免多次查询数据库) logging.info("开始获取数据库激活数据...") activation_data = fetch_activation_data() logging.info(f"获取到 {len(activation_data)} 条车辆激活数据") # 按市场依次处理 for market in MARKET_TABLES: market_name = market["name"] table_id = market["table_id"] view_id = market["view_id"] logging.info(f"开始同步市场: {market_name} (表格ID: {table_id})") try: # 使用FEISHU_APP_TOKEN修复NameError问题 ⭐⭐关键修复 updated, activated, not_activated = compare_and_update_records( FEISHU_APP_TOKEN, table_id, view_id, activation_data ) result = { "market": market_name, "table_id": table_id, "total_updated": updated, "activated_count": activated, "not_activated_count": not_activated, "success": True } logging.info(f"{market_name}同步完成: 更新{updated}台 (✅激活{activated}台,❌未激活{not_activated}台)") except Exception as e: result = { "market": market_name, "table_id": table_id, "error": str(e), "success": False } logging.error(f"{market_name}同步失败: {str(e)}", exc_info=True) all_results.append(result) time.sleep(1) # 每个市场处理间隔 # 统计总体数据 total_elapsed = time.time() - start_time return { "success": True, "results": all_results, "elapsed_time": total_elapsed, "start_time": start_time } except Exception as e: logging.error(f"整体同步流程失败: {str(e)}", exc_info=True) return { "success": False, "error": str(e), "elapsed_time": time.time() - start_time } if __name__ == '__main__': # 执行多市场同步 sync_result = sync_all_markets() current_time = datetime.now().strftime("%Y-%m-%d %H:%M") if sync_result.get("success"): # 构建详细的飞书消息 message_header = f"🚗 **车辆激活状态同步完成** ✅\n" \ f"⏰ 同步时间: {current_time}\n" \ f"⏱️ 总耗时: {sync_result['elapsed_time']:.2f}秒\n\n" \ f"📊 **各市场同步详情:**\n" message_details = [] success_count = 0 for result in sync_result["results"]: if result["success"]: success_count += 1 detail = ( f"• **{result['market']}**: " f"更新 {result['total_updated']}台 | " f"✅激活 {result['activated_count']}台 | " f"❌未激活 {result['not_activated_count']}台" ) else: detail = ( f"• **{result['market']}**: ❌同步失败!错误信息: `{result['error']}`" ) message_details.append(detail) message_summary = f"\n✅ 成功处理 {success_count}/{len(MARKET_TABLES)} 个市场\n" \ f"🔗 查看多维表格: [点击访问]({MULTI_TABLE_URL})" full_message = message_header + "\n".join(message_details) + message_summary # 发送飞书通知 send_result = send_feishu_message(full_message) if send_result: logging.info("飞书消息发送成功") else: logging.error("飞书消息发送失败") else: error_message = f"❌ **车辆激活状态同步失败**\n" \ f"⏰ 时间: {current_time}\n" \ f"⏱️ 耗时: {sync_result['elapsed_time']:.2f}秒\n" \ f"错误信息: `{sync_result['error']}`\n\n" \ f"请检查系统日志获取详细信息" send_feishu_message(error_message) logging.error(f"整体同步失败: {sync_result['error']}")