Files

241 lines
8.1 KiB
Python
Raw Permalink Normal View History

import pandas as pd
import requests
import json
import logging
import time
from datetime import datetime
from sqlalchemy import create_engine
from urllib.parse import quote_plus
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# 数据库连接配置
DB_CONFIG = {
'host': 'eutsp-prod.mysql.germany.rds.aliyuncs.com',
'port': 3306,
'user': 'international_tsp_eu_r',
'password': 'ZXhBgo1TB2XbF3kP',
'database': 'chery_international_tsp_eu'
}
# 飞书多维表格配置
FEISHU_APP_TOKEN = "T3QLbmpqma014ussjy9cgitqnSh" # 应用Token
FEISHU_TABLE_ID = "tbloPJHLc05MHIAY" # 表格ID
VIN_FIELD_NAME = "车架号" # VIN字段名称
ACTIVE_FIELD_NAME = "激活状态" # 激活状态字段名称
def get_access_token():
"""获取飞书访问令牌"""
api_token_url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal/"
api_post_data = {"app_id": "cli_a8b50ec0eed1500d", "app_secret": "ccxkE7ZCFQZcwkkM1rLy0ccZRXYsT2xK"}
try:
response = requests.post(api_token_url, json=api_post_data, timeout=10)
response.raise_for_status()
resp_data = response.json()
if resp_data.get("code") == 0:
return resp_data["tenant_access_token"]
else:
raise Exception(f"获取Token失败: {resp_data.get('msg')}")
except Exception as e:
logging.error(f"获取飞书Token异常: {str(e)}")
raise
def read_feishu_records(app_token, table_id, retry=3):
"""从飞书多维表格读取车辆记录"""
access_token = get_access_token()
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json; charset=utf-8",
}
url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{app_token}/tables/{table_id}/records"
params = {"page_size": 500} # 单次请求最大记录数
all_records = []
page_token = None
while True:
if page_token:
params["page_token"] = page_token
records_fetched = False
for attempt in range(retry):
try:
response = requests.get(url, headers=headers, params=params, timeout=30)
response.raise_for_status()
resp_data = response.json()
if resp_data.get("code") != 0:
raise Exception(f"API错误: {resp_data.get('msg')}")
records = resp_data["data"]["items"]
all_records.extend(records)
# 检查是否有下一页
if "page_token" in resp_data["data"] and resp_data["data"]["has_more"]:
page_token = resp_data["data"]["page_token"]
else:
page_token = None
records_fetched = True
break
except Exception as e:
logging.warning(f"读取多维表格失败(尝试 {attempt + 1}/{retry}): {str(e)}")
if attempt < retry - 1:
time.sleep(2 ** attempt)
if not records_fetched:
raise Exception("无法读取多维表格数据")
if not page_token:
break
return all_records
def fetch_activation_data():
"""从数据库查询车辆激活状态"""
try:
# 创建SQLAlchemy引擎
safe_password = quote_plus(DB_CONFIG['password'])
engine = create_engine(
f"mysql+pymysql://{DB_CONFIG['user']}:{safe_password}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}",
pool_recycle=3600
)
sql_query = """
SELECT
v_vehicle_info.VIN AS car_vins,
CASE v_sim_info.STATE
WHEN 2 THEN 'active'
ELSE 'no_active'
END AS active_status
FROM v_vehicle_info
JOIN v_tbox_info USING (VIN)
JOIN v_sim_info USING (SN);
"""
# 执行查询并转换为DataFrame
df = pd.read_sql(sql_query, engine)
return df.set_index('car_vins')['active_status'].to_dict()
except Exception as e:
logging.error(f"数据库查询失败: {str(e)}")
raise
def compare_and_update_records(app_token, table_id, activation_data):
"""比较并更新飞书多维表格记录"""
access_token = get_access_token()
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json; charset=utf-8",
}
# 1. 读取多维表格所有记录
feishu_records = read_feishu_records(app_token, table_id)
logging.info(f"成功读取{len(feishu_records)}条多维表格记录")
# 2. 准备需要更新的记录
records_to_update = []
for record in feishu_records:
try:
record_id = record["record_id"]
fields = record.get("fields", {})
# 获取车架号(VIN)
vin = fields.get(VIN_FIELD_NAME)
if not vin:
logging.warning(f"记录 {record_id} 缺少VIN字段")
continue
# 获取多维表格中的激活状态
current_status = fields.get(ACTIVE_FIELD_NAME, "").lower()
# 获取数据库中的激活状态
db_status = activation_data.get(vin)
if not db_status:
logging.warning(f"VIN {vin} 在数据库中未找到")
continue
# 比较状态是否需要更新
if db_status != current_status:
records_to_update.append({
"record_id": record_id,
"fields": {
ACTIVE_FIELD_NAME: db_status
}
})
logging.info(f"VIN {vin} 状态需更新: {current_status}{db_status}")
except Exception as e:
logging.error(f"处理记录异常: {record} - {str(e)}")
logging.info(f"需要更新{len(records_to_update)}条记录")
# 3. 批量更新记录 (每批次50条)
if not records_to_update:
logging.info("没有需要更新的记录")
return
batch_size = 50
success_count = 0
for i in range(0, len(records_to_update), batch_size):
batch = records_to_update[i:i + batch_size]
batch_payload = {"records": batch}
update_url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{app_token}/tables/{table_id}/records/batch_update"
for attempt in range(3): # 最多重试3次
try:
response = requests.post(update_url, headers=headers, json=batch_payload, timeout=30)
resp_data = response.json()
if response.status_code == 200 and resp_data.get("code") == 0:
success_count += len(batch)
logging.info(f"成功更新批次 {i // batch_size + 1}: {len(batch)}条记录")
break
else:
error_msg = resp_data.get('msg', '未知错误')
logging.warning(f"更新失败(尝试 {attempt + 1}/3): {error_msg}")
time.sleep(2 ** attempt)
except Exception as e:
logging.warning(f"网络错误(尝试 {attempt + 1}/3): {str(e)}")
time.sleep(2 ** attempt)
logging.info(f"总计更新成功{success_count}条记录")
return success_count
def main():
try:
start_time = time.time()
logging.info("开始车辆激活状态同步流程...")
# 1. 从数据库获取激活状态数据
activation_data = fetch_activation_data()
logging.info(f"从数据库获取{len(activation_data)}条激活状态记录")
# 2. 与飞书多维表格数据比对并更新
update_count = compare_and_update_records(FEISHU_APP_TOKEN, FEISHU_TABLE_ID, activation_data)
# 3. 完成日志
elapsed_time = time.time() - start_time
logging.info(f"流程完成! 耗时: {elapsed_time:.2f}秒, 更新记录数: {update_count}")
except Exception as e:
logging.error(f"流程执行异常: {str(e)}")
logging.exception("详细错误信息")
if __name__ == '__main__':
main()