387 lines
14 KiB
Python
387 lines
14 KiB
Python
import pandas as pd
|
||
import requests
|
||
import logging
|
||
import time
|
||
import json
|
||
from sqlalchemy import create_engine
|
||
from urllib.parse import quote_plus
|
||
from datetime import datetime
|
||
|
||
# 配置日志
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s %(levelname)s %(name)s %(message)s'
|
||
)
|
||
|
||
# ====================== 全局配置 ======================
|
||
# 数据库连接配置
|
||
DB_CONFIG = {
|
||
'host': 'eutsp-prod.mysql.germany.rds.aliyuncs.com',
|
||
'port': 3306,
|
||
'user': 'international_tsp_eu_r',
|
||
'password': 'ZXhBgo1TB2XbF3kP',
|
||
'database': 'chery_international_tsp_eu'
|
||
}
|
||
|
||
# 飞书多维表格配置 (多市场表格配置)
|
||
FEISHU_APP_TOKEN = "T3QLbmpqma014ussjy9cgitqnSh" # 应用Token (恢复此关键变量)
|
||
MARKET_TABLES = [
|
||
{"name": "欧洲所有", "table_id": "tbloPJHLc05MHIAY", "view_id": "vewBvcYDAa"},
|
||
{"name": "意大利", "table_id": "tbl4ZXTTgcPvdZUD", "view_id": "vewBvcYDAa"},
|
||
{"name": "西班牙", "table_id": "tbl0KBWQT1ZqiT2g", "view_id": "vewBvcYDAa"},
|
||
{"name": "英国", "table_id": "tblbfEHrsojQ4D5w", "view_id": "vewBvcYDAa"},
|
||
{"name": "比利时", "table_id": "tblYN3eEEumBMlgB", "view_id": "vewBvcYDAa"}
|
||
]
|
||
|
||
# 通用字段映射
|
||
VIN_FIELD_NAME = "车架号"
|
||
ACTIVE_FIELD_NAME = "激活状态"
|
||
VEHICLE_NAME_FIELD = "车型"
|
||
|
||
# 飞书应用凭证
|
||
APP_ID = "cli_a8b50ec0eed1500d"
|
||
APP_SECRET = "ccxkE7ZCFQZcwkkM1rLy0ccZRXYsT2xK"
|
||
|
||
# 飞书机器人配置
|
||
WEBHOOK_URL = "https://open.feishu.cn/open-apis/bot/v2/hook/3bc45247-c469-47a0-bfe9-c62e46c5aca0"
|
||
MULTI_TABLE_URL = "https://my-ichery.feishu.cn/base/T3QLbmpqma014ussjy9cgitqnSh?from=from_copylink"
|
||
|
||
|
||
# ====================== 通用函数 ======================
|
||
def get_access_token():
|
||
"""获取飞书访问令牌"""
|
||
api_token_url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal/"
|
||
api_post_data = {"app_id": APP_ID, "app_secret": APP_SECRET}
|
||
try:
|
||
response = requests.post(api_token_url, json=api_post_data, timeout=10)
|
||
response.raise_for_status()
|
||
resp_data = response.json()
|
||
if resp_data.get("code") == 0:
|
||
return resp_data["tenant_access_token"]
|
||
else:
|
||
raise Exception(f"获取Token失败: {resp_data.get('msg')}")
|
||
except Exception as e:
|
||
logging.error(f"获取飞书Token异常: {e}")
|
||
raise
|
||
|
||
|
||
def send_feishu_message(message):
|
||
"""发送飞书通知"""
|
||
headers = {'Content-Type': 'application/json'}
|
||
payload = {
|
||
"msg_type": "interactive",
|
||
"card": {
|
||
"config": {"wide_screen_mode": True},
|
||
"elements": [
|
||
{
|
||
"tag": "div",
|
||
"text": {"content": message, "tag": "lark_md"}
|
||
},
|
||
{
|
||
"tag": "action",
|
||
"actions": [
|
||
{
|
||
"tag": "button",
|
||
"text": {"content": "查看多维表格", "tag": "plain_text"},
|
||
"type": "primary",
|
||
"url": MULTI_TABLE_URL
|
||
}
|
||
]
|
||
}
|
||
],
|
||
"header": {
|
||
"title": {"content": "车辆状态同步报告", "tag": "plain_text"},
|
||
"template": "blue"
|
||
}
|
||
}
|
||
}
|
||
|
||
try:
|
||
response = requests.post(WEBHOOK_URL, headers=headers, json=payload, timeout=10)
|
||
response.raise_for_status()
|
||
return response.json()
|
||
except Exception as e:
|
||
logging.error(f"飞书消息发送失败: {str(e)}")
|
||
return None
|
||
|
||
|
||
def read_feishu_records(app_token, table_id, view_id, retry=3):
|
||
"""从飞书多维表格读取车辆记录"""
|
||
access_token = get_access_token()
|
||
headers = {
|
||
"Authorization": f"Bearer {access_token}",
|
||
"Content-Type": "application/json; charset=utf-8",
|
||
}
|
||
|
||
url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{app_token}/tables/{table_id}/records"
|
||
params = {
|
||
"page_size": 500,
|
||
"view_id": view_id # 添加视图ID参数
|
||
}
|
||
|
||
all_records = []
|
||
page_token = None
|
||
|
||
while True:
|
||
if page_token:
|
||
params["page_token"] = page_token
|
||
|
||
for attempt in range(retry):
|
||
try:
|
||
response = requests.get(url, headers=headers, params=params, timeout=30)
|
||
response.raise_for_status()
|
||
resp_data = response.json()
|
||
if resp_data.get("code") != 0:
|
||
raise Exception(f"API错误: {resp_data.get('msg')}")
|
||
records = resp_data["data"]["items"]
|
||
all_records.extend(records)
|
||
has_more = resp_data["data"].get("has_more")
|
||
page_token = resp_data["data"].get("page_token") if has_more else None
|
||
break
|
||
except Exception as e:
|
||
logging.warning(f"读取多维表格失败(尝试 {attempt + 1}/{retry}): {e}")
|
||
time.sleep(2 ** attempt)
|
||
else:
|
||
raise Exception("无法读取多维表格数据")
|
||
|
||
if not page_token:
|
||
break
|
||
|
||
return all_records
|
||
|
||
|
||
def fetch_activation_data():
|
||
"""从数据库查询车辆激活状态和车型"""
|
||
try:
|
||
safe_password = quote_plus(DB_CONFIG['password'])
|
||
engine = create_engine(
|
||
f"mysql+pymysql://{DB_CONFIG['user']}:{safe_password}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}",
|
||
pool_recycle=3600
|
||
)
|
||
|
||
sql_query = """
|
||
SELECT
|
||
v.VIN AS car_vins,
|
||
CASE s.STATE
|
||
WHEN 2 THEN 'active'
|
||
ELSE 'no_active'
|
||
END AS active_status,
|
||
iv.`NAME` AS `车型`
|
||
FROM v_vehicle_info v
|
||
JOIN v_tbox_info t USING (VIN)
|
||
JOIN v_sim_info s USING (SN)
|
||
LEFT JOIN v_material_info m ON m.MATERIAL_ID = v.MATERIAL_ID
|
||
LEFT JOIN v_outside_vehicle_type ov ON ov.TYPE_CODE = m.OUTSIDE_TYPE_CODE
|
||
LEFT JOIN v_inside_vehicle_type iv ON iv.TYPE_CODE = ov.INSIDE_TYPE_CODE
|
||
"""
|
||
|
||
df = pd.read_sql(sql_query, engine)
|
||
df = df.drop_duplicates(subset=['car_vins'], keep='first')
|
||
|
||
expected = ['car_vins', 'active_status', VEHICLE_NAME_FIELD]
|
||
missing = [c for c in expected if c not in df.columns]
|
||
if missing:
|
||
logging.error(f"查询结果缺少列: {missing}")
|
||
raise KeyError(f"Missing columns: {missing}")
|
||
|
||
df2 = df.set_index('car_vins')[['active_status', VEHICLE_NAME_FIELD]]
|
||
return df2.to_dict(orient='index')
|
||
|
||
except Exception as e:
|
||
logging.error(f"数据库查询失败: {e}")
|
||
raise
|
||
|
||
|
||
def compare_and_update_records(app_token, table_id, view_id, activation_data):
|
||
"""比较并更新飞书多维表格记录"""
|
||
access_token = get_access_token()
|
||
headers = {
|
||
"Authorization": f"Bearer {access_token}",
|
||
"Content-Type": "application/json; charset=utf-8",
|
||
}
|
||
|
||
feishu_records = read_feishu_records(app_token, table_id, view_id)
|
||
logging.info(f"成功读取{len(feishu_records)}条多维表格记录")
|
||
|
||
records_to_update = []
|
||
activated_count = not_activated_count = 0
|
||
|
||
for record in feishu_records:
|
||
record_id = record.get("record_id")
|
||
fields = record.get("fields", {})
|
||
vin = fields.get(VIN_FIELD_NAME)
|
||
if not vin:
|
||
logging.warning(f"记录 {record_id} 缺少VIN字段")
|
||
continue
|
||
|
||
db = activation_data.get(vin)
|
||
if not db:
|
||
logging.warning(f"VIN {vin} 在数据库中未找到")
|
||
continue
|
||
|
||
db_status = db['active_status']
|
||
db_name = db[VEHICLE_NAME_FIELD]
|
||
|
||
current_status = fields.get(ACTIVE_FIELD_NAME, "").lower()
|
||
current_name = fields.get(VEHICLE_NAME_FIELD, "")
|
||
|
||
if db_status != current_status or db_name != current_name:
|
||
update_fields = {
|
||
ACTIVE_FIELD_NAME: db_status,
|
||
VEHICLE_NAME_FIELD: db_name
|
||
}
|
||
records_to_update.append({
|
||
"record_id": record_id,
|
||
"fields": update_fields
|
||
})
|
||
logging.debug(f"VIN {vin} 需要更新: 状态 {current_status}→{db_status}, 车型 {current_name}→{db_name}")
|
||
|
||
if db_status == "active":
|
||
activated_count += 1
|
||
else:
|
||
not_activated_count += 1
|
||
|
||
total_updated = len(records_to_update)
|
||
logging.info(f"需要更新 {total_updated} 条记录(激活:{activated_count},未激活:{not_activated_count})")
|
||
|
||
# 批量更新
|
||
batch_size = 50
|
||
for i in range(0, total_updated, batch_size):
|
||
batch = records_to_update[i:i + batch_size]
|
||
payload = {"records": batch}
|
||
url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{app_token}/tables/{table_id}/records/batch_update"
|
||
for attempt in range(3):
|
||
try:
|
||
r = requests.post(url, headers=headers, json=payload, timeout=30)
|
||
data = r.json()
|
||
if r.status_code == 200 and data.get("code") == 0:
|
||
logging.info(f"第{i // batch_size + 1}批更新成功,{len(batch)}条")
|
||
break
|
||
else:
|
||
logging.warning(f"第{i // batch_size + 1}批更新失败(尝试{attempt + 1}/3):{data.get('msg')}")
|
||
except Exception as e:
|
||
logging.warning(f"网络异常(尝试{attempt + 1}/3):{e}")
|
||
time.sleep(2 ** attempt)
|
||
|
||
return total_updated, activated_count, not_activated_count
|
||
|
||
|
||
# ====================== 多表格处理流程 ======================
|
||
def sync_all_markets():
|
||
"""执行所有市场的车辆激活状态同步"""
|
||
start_time = time.time()
|
||
all_results = []
|
||
|
||
try:
|
||
# 一次性获取所有激活数据(避免多次查询数据库)
|
||
logging.info("开始获取数据库激活数据...")
|
||
activation_data = fetch_activation_data()
|
||
logging.info(f"获取到 {len(activation_data)} 条车辆激活数据")
|
||
|
||
# 按市场依次处理
|
||
for market in MARKET_TABLES:
|
||
market_name = market["name"]
|
||
table_id = market["table_id"]
|
||
view_id = market["view_id"]
|
||
|
||
logging.info(f"开始同步市场: {market_name} (表格ID: {table_id})")
|
||
|
||
try:
|
||
# 使用FEISHU_APP_TOKEN修复NameError问题 ⭐⭐关键修复
|
||
updated, activated, not_activated = compare_and_update_records(
|
||
FEISHU_APP_TOKEN,
|
||
table_id,
|
||
view_id,
|
||
activation_data
|
||
)
|
||
|
||
result = {
|
||
"market": market_name,
|
||
"table_id": table_id,
|
||
"total_updated": updated,
|
||
"activated_count": activated,
|
||
"not_activated_count": not_activated,
|
||
"success": True
|
||
}
|
||
logging.info(f"{market_name}同步完成: 更新{updated}台 (✅激活{activated}台,❌未激活{not_activated}台)")
|
||
|
||
except Exception as e:
|
||
result = {
|
||
"market": market_name,
|
||
"table_id": table_id,
|
||
"error": str(e),
|
||
"success": False
|
||
}
|
||
logging.error(f"{market_name}同步失败: {str(e)}", exc_info=True)
|
||
|
||
all_results.append(result)
|
||
time.sleep(1) # 每个市场处理间隔
|
||
|
||
# 统计总体数据
|
||
total_elapsed = time.time() - start_time
|
||
return {
|
||
"success": True,
|
||
"results": all_results,
|
||
"elapsed_time": total_elapsed,
|
||
"start_time": start_time
|
||
}
|
||
|
||
except Exception as e:
|
||
logging.error(f"整体同步流程失败: {str(e)}", exc_info=True)
|
||
return {
|
||
"success": False,
|
||
"error": str(e),
|
||
"elapsed_time": time.time() - start_time
|
||
}
|
||
|
||
|
||
if __name__ == '__main__':
|
||
# 执行多市场同步
|
||
sync_result = sync_all_markets()
|
||
current_time = datetime.now().strftime("%Y-%m-%d %H:%M")
|
||
|
||
if sync_result.get("success"):
|
||
# 构建详细的飞书消息
|
||
message_header = f"🚗 **车辆激活状态同步完成** ✅\n" \
|
||
f"⏰ 同步时间: {current_time}\n" \
|
||
f"⏱️ 总耗时: {sync_result['elapsed_time']:.2f}秒\n\n" \
|
||
f"📊 **各市场同步详情:**\n"
|
||
|
||
message_details = []
|
||
success_count = 0
|
||
for result in sync_result["results"]:
|
||
if result["success"]:
|
||
success_count += 1
|
||
detail = (
|
||
f"• **{result['market']}**: "
|
||
f"更新 {result['total_updated']}台 | "
|
||
f"✅激活 {result['activated_count']}台 | "
|
||
f"❌未激活 {result['not_activated_count']}台"
|
||
)
|
||
else:
|
||
detail = (
|
||
f"• **{result['market']}**: ❌同步失败!错误信息: `{result['error']}`"
|
||
)
|
||
message_details.append(detail)
|
||
|
||
message_summary = f"\n✅ 成功处理 {success_count}/{len(MARKET_TABLES)} 个市场\n" \
|
||
f"🔗 查看多维表格: [点击访问]({MULTI_TABLE_URL})"
|
||
|
||
full_message = message_header + "\n".join(message_details) + message_summary
|
||
|
||
# 发送飞书通知
|
||
send_result = send_feishu_message(full_message)
|
||
if send_result:
|
||
logging.info("飞书消息发送成功")
|
||
else:
|
||
logging.error("飞书消息发送失败")
|
||
|
||
else:
|
||
error_message = f"❌ **车辆激活状态同步失败**\n" \
|
||
f"⏰ 时间: {current_time}\n" \
|
||
f"⏱️ 耗时: {sync_result['elapsed_time']:.2f}秒\n" \
|
||
f"错误信息: `{sync_result['error']}`\n\n" \
|
||
f"请检查系统日志获取详细信息"
|
||
|
||
send_feishu_message(error_message)
|
||
logging.error(f"整体同步失败: {sync_result['error']}") |