Files

70 lines
2.4 KiB
Python
Raw Permalink Normal View History

2026-02-02 09:27:49 +08:00
import time
import asyncio
import ntplib
from datetime import datetime
2026-02-02 09:27:49 +08:00
class PrecisionTimer:
def __init__(self):
self.offset = 0 # 服务器时间 - 本地时间
2026-02-02 09:27:49 +08:00
def sync_time(self):
"""多次 NTP 同步取中位数,提高精度"""
offsets = []
servers = ['ntp.aliyun.com', 'ntp.tencent.com', 'pool.ntp.org']
for server in servers:
try:
client = ntplib.NTPClient()
resp = client.request(server, version=3)
offsets.append(resp.tx_time - time.time())
except Exception:
continue
if offsets:
offsets.sort()
self.offset = offsets[len(offsets) // 2]
print(f"时间同步完成,偏移量: {self.offset:.3f}s (采样{len(offsets)}个)")
else:
print("NTP同步失败将使用系统时间")
2026-02-02 09:27:49 +08:00
def get_server_time(self):
return time.time() + self.offset
async def wait_until(self, target_time_str):
"""等待直到目标时间,最后阶段忙等保证精度"""
target_dt = datetime.strptime(target_time_str, "%Y-%m-%d %H:%M:%S")
target_ts = target_dt.timestamp()
print(f"等待目标时间: {target_time_str}")
while True:
now = self.get_server_time()
remaining = target_ts - now
if remaining <= 0:
print("目标时间已到!")
break
elif remaining > 10:
await asyncio.sleep(remaining - 10)
elif remaining > 2:
await asyncio.sleep(0.5)
elif remaining > 0.1:
await asyncio.sleep(0.01)
# remaining <= 0.1: 忙等,不 sleep
async def wait_until_early(self, target_time_str, early_ms=500):
"""提前 early_ms 毫秒触发,用于需要预操作的场景"""
2026-02-02 09:27:49 +08:00
target_dt = datetime.strptime(target_time_str, "%Y-%m-%d %H:%M:%S")
target_ts = target_dt.timestamp() - (early_ms / 1000.0)
2026-02-02 09:27:49 +08:00
while True:
now = self.get_server_time()
remaining = target_ts - now
2026-02-02 09:27:49 +08:00
if remaining <= 0:
break
elif remaining > 10:
await asyncio.sleep(remaining - 10)
elif remaining > 2:
await asyncio.sleep(0.5)
2026-02-02 09:27:49 +08:00
elif remaining > 0.1:
await asyncio.sleep(0.01)