| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 |
- """FAQ sec_ids 内存缓存 + 每日 0 点定时刷新。"""
- from __future__ import annotations
- import asyncio
- import logging
- from dataclasses import dataclass, field
- from datetime import datetime, time, timedelta
- from typing import Any
- from app.config import settings
- from app.services.zendesk_client import list_all_faq_sections
- logger = logging.getLogger(__name__)
- @dataclass
- class FaqCache:
- """内存中的 FAQ section 缓存。
- 使用不可变赋值(每次刷新整体替换 sec_ids 引用),并发读取安全。
- 写入前用 lock 保护避免并发刷新。
- """
- sec_ids: list[int] = field(default_factory=list)
- faq_sections: list[dict[str, Any]] = field(default_factory=list)
- last_updated_at: datetime | None = None
- next_refresh_at: datetime | None = None
- _lock: asyncio.Lock = field(default_factory=asyncio.Lock)
- async def refresh(self) -> None:
- """从 Zendesk 拉取最新 FAQ sec_ids 替换缓存。"""
- async with self._lock:
- logger.info("开始刷新 FAQ 缓存…")
- try:
- sec_ids, faq_sections = await list_all_faq_sections()
- except Exception as exc:
- # 缓存刷新失败不应让定时任务终止,记录并保留旧值
- logger.exception("FAQ 缓存刷新失败:%s", exc)
- return
- # 不可变替换:直接覆盖整体引用,避免读半更新
- self.sec_ids = list(sec_ids)
- self.faq_sections = list(faq_sections)
- self.last_updated_at = datetime.now()
- self.next_refresh_at = _next_refresh_time(self.last_updated_at)
- logger.info(
- "FAQ 缓存刷新完成:sec_ids=%s 条,下一次刷新 %s",
- len(self.sec_ids),
- self.next_refresh_at,
- )
- def snapshot(self) -> dict[str, Any]:
- """读快照(不持锁)。"""
- return {
- "sec_ids": self.sec_ids,
- "faq_sections": self.faq_sections,
- "last_updated_at": (
- self.last_updated_at.isoformat()
- if self.last_updated_at
- else None
- ),
- "next_refresh_at": (
- self.next_refresh_at.isoformat()
- if self.next_refresh_at
- else None
- ),
- }
- # 全局单例(FastAPI 进程级)
- faq_cache = FaqCache()
- def _next_refresh_time(now: datetime) -> datetime:
- """计算下一次定时刷新时间(每天 hh:mm,本地时区)。"""
- target_time = time(
- hour=settings.cache_refresh_hour,
- minute=settings.cache_refresh_minute,
- )
- today_target = datetime.combine(now.date(), target_time)
- if now < today_target:
- return today_target
- return today_target + timedelta(days=1)
- async def scheduler_loop() -> None:
- """每天 0 点(可配置)刷新一次 FAQ 缓存。
- 采用 sleep(到下一次目标时间) 的简单循环;
- 被 cancel 时优雅退出,无需第三方调度库。
- """
- while True:
- now = datetime.now()
- next_run = _next_refresh_time(now)
- wait_seconds = (next_run - now).total_seconds()
- logger.info(
- "下一次 FAQ 缓存刷新 %s(%.0f 秒后)",
- next_run.isoformat(),
- wait_seconds,
- )
- try:
- await asyncio.sleep(wait_seconds)
- except asyncio.CancelledError:
- logger.info("scheduler_loop 收到取消信号,退出")
- raise
- await faq_cache.refresh()
|