Files
damai/scripts/scheduler.py

134 lines
3.8 KiB
Python
Raw Permalink Normal View History

# coding: utf-8
"""
定时调度模块
支持定时开抢NTP 时间校准
"""
import logging
import time
from datetime import datetime, timezone, timedelta
from typing import Optional
logger = logging.getLogger("damai.scheduler")
CST = timezone(timedelta(hours=8))
def parse_strike_time(time_str: str) -> datetime:
"""
解析开抢时间字符串
支持格式2026-01-25T12:00:00 2026-01-25 12:00:00
"""
for fmt in ("%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S"):
try:
dt = datetime.strptime(time_str, fmt)
return dt.replace(tzinfo=CST)
except ValueError:
continue
raise ValueError(f"无法解析时间: {time_str}")
def get_ntp_time() -> Optional[datetime]:
"""从 NTP 服务器获取精确时间"""
try:
import ntplib
client = ntplib.NTPClient()
servers = ["time.google.com", "ntp.aliyun.com", "pool.ntp.org"]
for server in servers:
try:
response = client.request(server, timeout=3)
ntp_time = datetime.fromtimestamp(response.tx_time, tz=CST)
return ntp_time
except Exception:
continue
except ImportError:
logger.debug("ntplib 未安装,使用本地时间")
except Exception:
pass
return None
def wait_until_strike(strike_time_str: str, preheat_stages: list = None) -> dict:
"""
等待到开抢时间
preheat_stages: 预热阶段列表 [5.0, 2.0, 0.5] 表示提前 5s/2s/0.5s 触发预热回调
返回: {"strike_time": datetime, "ntp_offset": float}
"""
strike_time = parse_strike_time(strike_time_str)
if preheat_stages is None:
preheat_stages = [5.0, 2.0, 0.5]
# 尝试 NTP 校时
ntp_time = get_ntp_time()
if ntp_time:
local_time = datetime.now(CST)
offset = (ntp_time - local_time).total_seconds()
logger.info(f"NTP 校时成功 | 偏差: {offset:+.3f}s")
else:
offset = 0.0
logger.warning("NTP 校时失败,使用本地时间")
now = datetime.now(CST)
remaining = (strike_time - now).total_seconds()
if remaining < 0:
logger.warning(f"开抢时间已过 {-remaining:.1f}s立即执行")
return {"strike_time": strike_time, "ntp_offset": offset}
logger.info(
f"距离开抢: {remaining:.1f}s | "
f"开抢时间: {strike_time.strftime('%H:%M:%S')} | "
f"预热阶段: {preheat_stages}"
)
# 排序预热阶段(降序)
stages = sorted(preheat_stages, reverse=True)
for stage_sec in stages:
sleep_until = remaining - stage_sec
if sleep_until <= 0:
continue
logger.info(f"等待 {sleep_until:.1f}s 到预热阶段 (提前 {stage_sec}s)...")
time.sleep(sleep_until)
remaining = (strike_time - datetime.now(CST)).total_seconds()
# 最后精确等待
while True:
now = datetime.now(CST)
remaining = (strike_time - now).total_seconds()
if remaining <= 0.005: # 5ms 精度
break
if remaining > 0.1:
time.sleep(remaining * 0.5)
else:
# 忙等待最后阶段
pass
logger.info("⏰ 到达开抢时间!")
return {"strike_time": strike_time, "ntp_offset": offset}
def format_countdown(strike_time_str: str) -> str:
"""格式化倒计时显示"""
strike_time = parse_strike_time(strike_time_str)
now = datetime.now(CST)
remaining = (strike_time - now).total_seconds()
if remaining < 0:
return f"已开抢 {-remaining:.0f}s"
hours = int(remaining // 3600)
minutes = int((remaining % 3600) // 60)
seconds = int(remaining % 60)
if hours > 0:
return f"{hours}h {minutes}m {seconds}s"
elif minutes > 0:
return f"{minutes}m {seconds}s"
else:
return f"{seconds}s"