"""FAQ sec_ids 内存缓存 + 每日 0 点定时刷新。""" from __future__ import annotations import asyncio import logging from dataclasses import dataclass, field from datetime import datetime, time, timedelta from typing import Any from app.config import settings from app.services.zendesk_client import list_all_faq_sections logger = logging.getLogger(__name__) @dataclass class FaqCache: """内存中的 FAQ section 缓存。 使用不可变赋值(每次刷新整体替换 sec_ids 引用),并发读取安全。 写入前用 lock 保护避免并发刷新。 """ sec_ids: list[int] = field(default_factory=list) faq_sections: list[dict[str, Any]] = field(default_factory=list) last_updated_at: datetime | None = None next_refresh_at: datetime | None = None _lock: asyncio.Lock = field(default_factory=asyncio.Lock) async def refresh(self) -> None: """从 Zendesk 拉取最新 FAQ sec_ids 替换缓存。""" async with self._lock: logger.info("开始刷新 FAQ 缓存…") try: sec_ids, faq_sections = await list_all_faq_sections() except Exception as exc: # 缓存刷新失败不应让定时任务终止,记录并保留旧值 logger.exception("FAQ 缓存刷新失败:%s", exc) return # 不可变替换:直接覆盖整体引用,避免读半更新 self.sec_ids = list(sec_ids) self.faq_sections = list(faq_sections) self.last_updated_at = datetime.now() self.next_refresh_at = _next_refresh_time(self.last_updated_at) logger.info( "FAQ 缓存刷新完成:sec_ids=%s 条,下一次刷新 %s", len(self.sec_ids), self.next_refresh_at, ) def snapshot(self) -> dict[str, Any]: """读快照(不持锁)。""" return { "sec_ids": self.sec_ids, "faq_sections": self.faq_sections, "last_updated_at": ( self.last_updated_at.isoformat() if self.last_updated_at else None ), "next_refresh_at": ( self.next_refresh_at.isoformat() if self.next_refresh_at else None ), } # 全局单例(FastAPI 进程级) faq_cache = FaqCache() def _next_refresh_time(now: datetime) -> datetime: """计算下一次定时刷新时间(每天 hh:mm,本地时区)。""" target_time = time( hour=settings.cache_refresh_hour, minute=settings.cache_refresh_minute, ) today_target = datetime.combine(now.date(), target_time) if now < today_target: return today_target return today_target + timedelta(days=1) async def scheduler_loop() -> None: """每天 0 点(可配置)刷新一次 FAQ 缓存。 采用 sleep(到下一次目标时间) 的简单循环; 被 cancel 时优雅退出,无需第三方调度库。 """ while True: now = datetime.now() next_run = _next_refresh_time(now) wait_seconds = (next_run - now).total_seconds() logger.info( "下一次 FAQ 缓存刷新 %s(%.0f 秒后)", next_run.isoformat(), wait_seconds, ) try: await asyncio.sleep(wait_seconds) except asyncio.CancelledError: logger.info("scheduler_loop 收到取消信号,退出") raise await faq_cache.refresh()