241 lines
8.1 KiB
Python
241 lines
8.1 KiB
Python
import pandas as pd
|
|
import requests
|
|
import json
|
|
import logging
|
|
import time
|
|
from datetime import datetime
|
|
from sqlalchemy import create_engine
|
|
from urllib.parse import quote_plus
|
|
|
|
# 配置日志
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
datefmt='%Y-%m-%d %H:%M:%S'
|
|
)
|
|
|
|
# 数据库连接配置
|
|
DB_CONFIG = {
|
|
'host': 'eutsp-prod.mysql.germany.rds.aliyuncs.com',
|
|
'port': 3306,
|
|
'user': 'international_tsp_eu_r',
|
|
'password': 'ZXhBgo1TB2XbF3kP',
|
|
'database': 'chery_international_tsp_eu'
|
|
}
|
|
|
|
# 飞书多维表格配置
|
|
FEISHU_APP_TOKEN = "T3QLbmpqma014ussjy9cgitqnSh" # 应用Token
|
|
FEISHU_TABLE_ID = "tbloPJHLc05MHIAY" # 表格ID
|
|
VIN_FIELD_NAME = "车架号" # VIN字段名称
|
|
ACTIVE_FIELD_NAME = "激活状态" # 激活状态字段名称
|
|
|
|
|
|
def get_access_token():
|
|
"""获取飞书访问令牌"""
|
|
api_token_url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal/"
|
|
api_post_data = {"app_id": "cli_a8b50ec0eed1500d", "app_secret": "ccxkE7ZCFQZcwkkM1rLy0ccZRXYsT2xK"}
|
|
try:
|
|
response = requests.post(api_token_url, json=api_post_data, timeout=10)
|
|
response.raise_for_status()
|
|
resp_data = response.json()
|
|
if resp_data.get("code") == 0:
|
|
return resp_data["tenant_access_token"]
|
|
else:
|
|
raise Exception(f"获取Token失败: {resp_data.get('msg')}")
|
|
except Exception as e:
|
|
logging.error(f"获取飞书Token异常: {str(e)}")
|
|
raise
|
|
|
|
|
|
def read_feishu_records(app_token, table_id, retry=3):
|
|
"""从飞书多维表格读取车辆记录"""
|
|
access_token = get_access_token()
|
|
headers = {
|
|
"Authorization": f"Bearer {access_token}",
|
|
"Content-Type": "application/json; charset=utf-8",
|
|
}
|
|
|
|
url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{app_token}/tables/{table_id}/records"
|
|
params = {"page_size": 500} # 单次请求最大记录数
|
|
|
|
all_records = []
|
|
page_token = None
|
|
|
|
while True:
|
|
if page_token:
|
|
params["page_token"] = page_token
|
|
|
|
records_fetched = False
|
|
for attempt in range(retry):
|
|
try:
|
|
response = requests.get(url, headers=headers, params=params, timeout=30)
|
|
response.raise_for_status()
|
|
resp_data = response.json()
|
|
|
|
if resp_data.get("code") != 0:
|
|
raise Exception(f"API错误: {resp_data.get('msg')}")
|
|
|
|
records = resp_data["data"]["items"]
|
|
all_records.extend(records)
|
|
|
|
# 检查是否有下一页
|
|
if "page_token" in resp_data["data"] and resp_data["data"]["has_more"]:
|
|
page_token = resp_data["data"]["page_token"]
|
|
else:
|
|
page_token = None
|
|
|
|
records_fetched = True
|
|
break
|
|
except Exception as e:
|
|
logging.warning(f"读取多维表格失败(尝试 {attempt + 1}/{retry}): {str(e)}")
|
|
if attempt < retry - 1:
|
|
time.sleep(2 ** attempt)
|
|
|
|
if not records_fetched:
|
|
raise Exception("无法读取多维表格数据")
|
|
|
|
if not page_token:
|
|
break
|
|
|
|
return all_records
|
|
|
|
|
|
def fetch_activation_data():
|
|
"""从数据库查询车辆激活状态"""
|
|
try:
|
|
# 创建SQLAlchemy引擎
|
|
safe_password = quote_plus(DB_CONFIG['password'])
|
|
engine = create_engine(
|
|
f"mysql+pymysql://{DB_CONFIG['user']}:{safe_password}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}",
|
|
pool_recycle=3600
|
|
)
|
|
|
|
sql_query = """
|
|
SELECT
|
|
v_vehicle_info.VIN AS car_vins,
|
|
CASE v_sim_info.STATE
|
|
WHEN 2 THEN 'active'
|
|
ELSE 'no_active'
|
|
END AS active_status
|
|
FROM v_vehicle_info
|
|
JOIN v_tbox_info USING (VIN)
|
|
JOIN v_sim_info USING (SN);
|
|
"""
|
|
|
|
# 执行查询并转换为DataFrame
|
|
df = pd.read_sql(sql_query, engine)
|
|
return df.set_index('car_vins')['active_status'].to_dict()
|
|
|
|
except Exception as e:
|
|
logging.error(f"数据库查询失败: {str(e)}")
|
|
raise
|
|
|
|
|
|
def compare_and_update_records(app_token, table_id, activation_data):
|
|
"""比较并更新飞书多维表格记录"""
|
|
access_token = get_access_token()
|
|
headers = {
|
|
"Authorization": f"Bearer {access_token}",
|
|
"Content-Type": "application/json; charset=utf-8",
|
|
}
|
|
|
|
# 1. 读取多维表格所有记录
|
|
feishu_records = read_feishu_records(app_token, table_id)
|
|
logging.info(f"成功读取{len(feishu_records)}条多维表格记录")
|
|
|
|
# 2. 准备需要更新的记录
|
|
records_to_update = []
|
|
|
|
for record in feishu_records:
|
|
try:
|
|
record_id = record["record_id"]
|
|
fields = record.get("fields", {})
|
|
|
|
# 获取车架号(VIN)
|
|
vin = fields.get(VIN_FIELD_NAME)
|
|
if not vin:
|
|
logging.warning(f"记录 {record_id} 缺少VIN字段")
|
|
continue
|
|
|
|
# 获取多维表格中的激活状态
|
|
current_status = fields.get(ACTIVE_FIELD_NAME, "").lower()
|
|
|
|
# 获取数据库中的激活状态
|
|
db_status = activation_data.get(vin)
|
|
if not db_status:
|
|
logging.warning(f"VIN {vin} 在数据库中未找到")
|
|
continue
|
|
|
|
# 比较状态是否需要更新
|
|
if db_status != current_status:
|
|
records_to_update.append({
|
|
"record_id": record_id,
|
|
"fields": {
|
|
ACTIVE_FIELD_NAME: db_status
|
|
}
|
|
})
|
|
logging.info(f"VIN {vin} 状态需更新: {current_status} → {db_status}")
|
|
|
|
except Exception as e:
|
|
logging.error(f"处理记录异常: {record} - {str(e)}")
|
|
|
|
logging.info(f"需要更新{len(records_to_update)}条记录")
|
|
|
|
# 3. 批量更新记录 (每批次50条)
|
|
if not records_to_update:
|
|
logging.info("没有需要更新的记录")
|
|
return
|
|
|
|
batch_size = 50
|
|
success_count = 0
|
|
|
|
for i in range(0, len(records_to_update), batch_size):
|
|
batch = records_to_update[i:i + batch_size]
|
|
batch_payload = {"records": batch}
|
|
|
|
update_url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{app_token}/tables/{table_id}/records/batch_update"
|
|
|
|
for attempt in range(3): # 最多重试3次
|
|
try:
|
|
response = requests.post(update_url, headers=headers, json=batch_payload, timeout=30)
|
|
resp_data = response.json()
|
|
|
|
if response.status_code == 200 and resp_data.get("code") == 0:
|
|
success_count += len(batch)
|
|
logging.info(f"成功更新批次 {i // batch_size + 1}: {len(batch)}条记录")
|
|
break
|
|
else:
|
|
error_msg = resp_data.get('msg', '未知错误')
|
|
logging.warning(f"更新失败(尝试 {attempt + 1}/3): {error_msg}")
|
|
time.sleep(2 ** attempt)
|
|
except Exception as e:
|
|
logging.warning(f"网络错误(尝试 {attempt + 1}/3): {str(e)}")
|
|
time.sleep(2 ** attempt)
|
|
|
|
logging.info(f"总计更新成功{success_count}条记录")
|
|
return success_count
|
|
|
|
|
|
def main():
|
|
try:
|
|
start_time = time.time()
|
|
logging.info("开始车辆激活状态同步流程...")
|
|
|
|
# 1. 从数据库获取激活状态数据
|
|
activation_data = fetch_activation_data()
|
|
logging.info(f"从数据库获取{len(activation_data)}条激活状态记录")
|
|
|
|
# 2. 与飞书多维表格数据比对并更新
|
|
update_count = compare_and_update_records(FEISHU_APP_TOKEN, FEISHU_TABLE_ID, activation_data)
|
|
|
|
# 3. 完成日志
|
|
elapsed_time = time.time() - start_time
|
|
logging.info(f"流程完成! 耗时: {elapsed_time:.2f}秒, 更新记录数: {update_count}")
|
|
|
|
except Exception as e:
|
|
logging.error(f"流程执行异常: {str(e)}")
|
|
logging.exception("详细错误信息")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main() |