import requests import logging import time from datetime import datetime, timedelta, date # ====================== 配置区域 ====================== # 飞书应用凭证 APP_ID = "cli_a8b50ec0eed1500d" APP_SECRET = "ccxkE7ZCFQZcwkkM1rLy0ccZRXYsT2xK" # 日统计表格配置 DAILY_SPREADSHEET_ID = "MIqssvuB3h3XmgtkI26cBX7Angc" # 日统计电子表格ID DAILY_SHEET_ID = "46ea20" # 日统计工作表ID DAILY_URL = "https://my-ichery.feishu.cn/sheets/MIqssvuB3h3XmgtkI26cBX7Angc?sheet=46ea20" # 日统计表格链接 # 周统计表格配置 WEEKLY_BASE_TOKEN = "T3QLbmpqma014ussjy9cgitqnSh" # 周统计多维表格ID WEEKLY_TABLE_ID = "tbloB11YuVOSdi9e" # 周统计表ID WEEKLY_URL = "https://my-ichery.feishu.cn/base/T3QLbmpqma014ussjy9cgitqnSh?table=tbloB11YuVOSdi9e" # 周统计表格链接 # 飞书机器人配置 WEBHOOK_URL = "https://open.feishu.cn/open-apis/bot/v2/hook/3bc45247-c469-47a0-bfe9-c62e46c5aca0" # ====================== 日志配置 ====================== logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) def get_access_token(): """获取飞书访问令牌[1,3](@ref)""" api_token_url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal/" api_post_data = {"app_id": APP_ID, "app_secret": APP_SECRET} try: response = requests.post(api_token_url, json=api_post_data, timeout=10) response.raise_for_status() resp_data = response.json() if resp_data.get("code") == 0: return resp_data["tenant_access_token"] else: raise Exception(f"获取Token失败: {resp_data.get('msg')}") except Exception as e: logging.error(f"获取飞书Token异常: {str(e)}") raise def read_daily_data(retry=3): """从飞书电子表格读取日统计数据[1,3](@ref)""" access_token = get_access_token() url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{DAILY_SPREADSHEET_ID}/values/{DAILY_SHEET_ID}" headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json; charset=utf-8", } for attempt in range(retry): try: response = requests.get(url, headers=headers, timeout=30) response.raise_for_status() resp_data = response.json() if resp_data.get("code") != 0: raise Exception(f"API错误: {resp_data.get('msg')}") # 解析数据:跳过标题行,只取日期和激活数量 values = resp_data['data']['valueRange']['values'][1:] daily_data = [] for row in values: try: date_str = row[0] count = int(row[1]) date_obj = datetime.strptime(date_str, "%Y-%m-%d").date() daily_data.append({"date": date_obj, "count": count}) except (IndexError, ValueError) as e: logging.warning(f"数据格式错误: {row} - {str(e)}") return daily_data except Exception as e: logging.warning(f"读取飞书表格失败(尝试 {attempt + 1}/{retry}): {str(e)}") if attempt < retry - 1: time.sleep(2 ** attempt) # 指数退避重试 raise Exception(f"读取飞书表格失败,重试{retry}次后仍不成功") def clear_weekly_table(retry=3): """清空周统计表(覆盖写入前的准备)[3,5](@ref)""" access_token = get_access_token() url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{WEEKLY_BASE_TOKEN}/tables/{WEEKLY_TABLE_ID}/records" headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json; charset=utf-8", } # 获取所有记录ID record_ids = [] page_token = "" while True: params = {"page_size": 100} if page_token: params["page_token"] = page_token response = requests.get(url, headers=headers, params=params, timeout=30) resp_data = response.json() if resp_data.get("code") != 0: raise Exception(f"获取记录失败: {resp_data.get('msg')}") # 收集所有记录的ID for record in resp_data.get("data", {}).get("items", []): record_ids.append(record["record_id"]) # 检查是否有更多数据 if resp_data["data"].get("has_more"): page_token = resp_data["data"].get("page_token", "") else: break if not record_ids: logging.info("周统计表为空,无需清除") return True logging.info(f"获取到{len(record_ids)}条记录待删除") # 批量删除所有记录 delete_url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{WEEKLY_BASE_TOKEN}/tables/{WEEKLY_TABLE_ID}/records/batch_delete" # 分批删除(每次最多100条)[3](@ref) batch_size = 100 for i in range(0, len(record_ids), batch_size): batch_ids = record_ids[i:i + batch_size] payload = {"records": batch_ids} for attempt in range(retry): try: response = requests.post(delete_url, json=payload, headers=headers, timeout=30) resp_data = response.json() if response.status_code == 200 and resp_data.get("code") == 0: logging.info(f"成功删除批次 {i // batch_size + 1}: {len(batch_ids)}条记录") break else: error_msg = resp_data.get('msg', '未知错误') logging.warning(f"删除记录失败(尝试 {attempt + 1}/{retry}): {error_msg}") time.sleep(2 ** attempt) except Exception as e: logging.warning(f"删除记录网络错误(尝试 {attempt + 1}/{retry}): {str(e)}") time.sleep(2 ** attempt) return True def write_weekly_data(records, retry=3): """覆盖写入数据到飞书多维表格(周统计)[3,5](@ref)""" access_token = get_access_token() url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{WEEKLY_BASE_TOKEN}/tables/{WEEKLY_TABLE_ID}/records/batch_create" headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json; charset=utf-8", } # 构建API请求体 payload = {"records": []} for row in records: try: # 使用毫秒时间戳格式(飞书官方要求)[3](@ref) start_dt = datetime.combine(row['week_start'], datetime.min.time()) end_dt = datetime.combine(row['week_end'], datetime.min.time()) start_timestamp = int(start_dt.timestamp() * 1000) end_timestamp = int(end_dt.timestamp() * 1000) payload["records"].append({ "fields": { "周数": str(row['week_number']), "周开始日期": start_timestamp, "周结束日期": end_timestamp, "激活数量": int(row['active_count']) } }) except Exception as e: logging.error(f"记录格式错误: {row} - {str(e)}") # 处理空记录情况 if not payload["records"]: logging.warning("没有有效记录可写入多维表格") return 0 # 分批写入(每次最多100条)[3](@ref) batch_size = 100 success_count = 0 for i in range(0, len(payload["records"]), batch_size): batch = payload["records"][i:i + batch_size] batch_payload = {"records": batch} batch_success = False for attempt in range(retry): try: response = requests.post(url, json=batch_payload, headers=headers, timeout=30) resp_data = response.json() if response.status_code == 200 and resp_data.get("code") == 0: if not batch_success: success_count += len(batch) batch_success = True logging.info(f"成功写入批次 {i // batch_size + 1}: {len(batch)}条记录") break else: error_msg = resp_data.get('msg', '未知错误') error_code = resp_data.get("code", "") # 错误诊断[3](@ref) if error_code == "DatetimeFieldConvFail": logging.error("日期字段转换失败: 请检查多维表格中日期字段类型配置") elif error_code == "TextFieldConvFail": logging.error("文本字段转换失败: 请检查数字字段是否包含非数字字符") logging.warning(f"飞书API错误(尝试 {attempt + 1}/{retry}): {error_msg}") time.sleep(2 ** attempt) except Exception as e: logging.warning(f"网络错误(尝试 {attempt + 1}/{retry}): {str(e)}") time.sleep(2 ** attempt) if not batch_success: logging.error(f"批次 {i // batch_size + 1} 写入失败") return success_count def calculate_iso_week(date): """计算ISO周数(处理跨年)""" iso_year, iso_week, iso_day = date.isocalendar() return f"{iso_year}-W{iso_week:02d}" def calculate_week_range(date): """计算所在周的周一和周日日期(ISO标准)""" start_date = date - timedelta(days=date.weekday()) end_date = start_date + timedelta(days=6) return start_date, end_date def send_feishu_message(message, retry=2): """发送飞书通知[6,7,8](@ref)""" headers = {'Content-Type': 'application/json'} # 构建交互式卡片 payload = { "msg_type": "interactive", "card": { "config": {"wide_screen_mode": True}, "elements": [ { "tag": "div", "text": {"content": message, "tag": "lark_md"} }, { "tag": "action", "actions": [ { "tag": "button", "text": {"content": "查看日统计", "tag": "plain_text"}, "type": "default", "url": DAILY_URL }, { "tag": "button", "text": {"content": "查看周统计", "tag": "plain_text"}, "type": "primary", "url": WEEKLY_URL } ] } ], "header": { "title": {"content": "数据同步完成通知", "tag": "plain_text"}, "template": "green" } } } for attempt in range(retry): try: response = requests.post(WEBHOOK_URL, headers=headers, json=payload, timeout=10) response.raise_for_status() return response.json() except Exception as e: logging.warning(f"飞书消息发送失败(尝试 {attempt + 1}/{retry}): {str(e)}") if attempt < retry - 1: time.sleep(2) # 等待后重试 raise Exception(f"飞书消息发送失败,重试{retry}次后仍不成功") if __name__ == '__main__': try: logging.info("开始周统计计算流程...") current_time = datetime.now().strftime("%Y-%m-%d %H:%M") # 1. 从飞书日统计表读取数据 daily_data = read_daily_data() logging.info(f"成功读取{len(daily_data)}条日统计记录") # 2. 计算周统计数据(处理跨年) weekly_stats = {} for entry in daily_data: date = entry['date'] count = entry['count'] week_number = calculate_iso_week(date) week_start, week_end = calculate_week_range(date) if week_number not in weekly_stats: weekly_stats[week_number] = { 'week_number': week_number, 'week_start': week_start, 'week_end': week_end, 'active_count': 0 } weekly_stats[week_number]['active_count'] += count # 转换为列表并排序 weekly_list = sorted(weekly_stats.values(), key=lambda x: x['week_start']) logging.info(f"生成{len(weekly_list)}条周统计记录") # 3. 覆盖写入周统计表 # 先清空表 if not clear_weekly_table(): raise Exception("清空周统计表失败") logging.info("成功清空周统计表") # 再写入新数据 success_count = write_weekly_data(weekly_list) if success_count != len(weekly_list): raise Exception(f"周统计写入失败: 成功写入{success_count}条,总记录{len(weekly_list)}条") logging.info(f"成功覆盖写入{success_count}条周统计记录") # 4. 构建通知消息 today = datetime.now().date() current_week_number = calculate_iso_week(today) stats_summary = f"• 本周({current_week_number})激活车辆: {weekly_stats.get(current_week_number, {}).get('active_count', 0)}\n" stats_summary += f"• 累计激活车辆: {sum(week['active_count'] for week in weekly_list)}\n" if weekly_list: latest_week = weekly_list[-1] stats_summary += f"• 最新一周({latest_week['week_number']}): {latest_week['week_start'].strftime('%Y-%m-%d')}至{latest_week['week_end'].strftime('%Y-%m-%d')}, 激活{latest_week['active_count']}辆" message = f"📅 周统计计算完成 ✅\n" message += f"⏰ 时间: {current_time}\n" message += f"📊 统计摘要:\n{stats_summary}\n" message += f"📝 处理结果: 成功处理{len(daily_data)}条日记录 → 生成{len(weekly_list)}条周记录" # 5. 构建通知消息 current_time = datetime.now().strftime("%Y-%m-%d %H:%M") # 新增统计维度计算 today = datetime.now().date() yesterday = today - timedelta(days=1) current_week_start = today - timedelta(days=today.weekday()) current_week_end = current_week_start + timedelta(days=6) last_week_start = current_week_start - timedelta(days=7) last_week_end = last_week_start + timedelta(days=6) three_months_ago = today - timedelta(days=90) # 计算昨日数据 yesterday_count = next((d['count'] for d in daily_data if d['date'] == yesterday), 0) # 计算今日数据 today_count = next((d['count'] for d in daily_data if d['date'] == today), 0) # 计算本周数据(当前周) current_week_number = calculate_iso_week(today) current_week_total = weekly_stats.get(current_week_number, {}).get('active_count', 0) # 计算上周数据 last_week_number = calculate_iso_week(last_week_start) last_week_total = weekly_stats.get(last_week_number, {}).get('active_count', 0) # 计算最近三个月数据 recent_three_months = sum( d['count'] for d in daily_data if three_months_ago <= d['date'] <= today ) # 计算累计数据 total_count = sum(d['count'] for d in daily_data) # 构建统计摘要 stats_summary = f"• 今日激活: {today_count}\n" stats_summary += f"• 昨日激活: {yesterday_count}\n" stats_summary += f"• 本周激活: {current_week_total}\n" stats_summary += f"• 上周激活: {last_week_total}\n" stats_summary += f"• 近三月激活: {recent_three_months}\n" stats_summary += f"• 累计激活: {total_count}\n" # # 添加最新周统计摘要 # if weekly_list: # latest_week = weekly_list[-1] # stats_summary += f"• 最新一周({latest_week['week_number']}): {latest_week['active_count']}辆" message = f"📅欧盟O&J统计计算完成 ✅\n" message += f"⏰ 时间: {current_time}\n" message += f"📊 统计摘要:\n{stats_summary}\n" message += f"📝 处理结果: 成功处理{len(daily_data)}条日记录 → 生成{len(weekly_list)}条周记录" # 5. 发送通知 result = send_feishu_message(message) logging.info(f"飞书消息发送成功: {result}") except Exception as e: error_message = f"❌ 周统计计算异常: {str(e)}" logging.exception("程序执行异常") send_feishu_message(error_message)