cache.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. """FAQ sec_ids 内存缓存 + 每日 0 点定时刷新。"""
  2. from __future__ import annotations
  3. import asyncio
  4. import logging
  5. from dataclasses import dataclass, field
  6. from datetime import datetime, time, timedelta
  7. from typing import Any
  8. from app.config import settings
  9. from app.services.zendesk_client import list_all_faq_sections
  10. logger = logging.getLogger(__name__)
  11. @dataclass
  12. class FaqCache:
  13. """内存中的 FAQ section 缓存。
  14. 使用不可变赋值(每次刷新整体替换 sec_ids 引用),并发读取安全。
  15. 写入前用 lock 保护避免并发刷新。
  16. """
  17. sec_ids: list[int] = field(default_factory=list)
  18. faq_sections: list[dict[str, Any]] = field(default_factory=list)
  19. last_updated_at: datetime | None = None
  20. next_refresh_at: datetime | None = None
  21. _lock: asyncio.Lock = field(default_factory=asyncio.Lock)
  22. async def refresh(self) -> None:
  23. """从 Zendesk 拉取最新 FAQ sec_ids 替换缓存。"""
  24. async with self._lock:
  25. logger.info("开始刷新 FAQ 缓存…")
  26. try:
  27. sec_ids, faq_sections = await list_all_faq_sections()
  28. except Exception as exc:
  29. # 缓存刷新失败不应让定时任务终止,记录并保留旧值
  30. logger.exception("FAQ 缓存刷新失败:%s", exc)
  31. return
  32. # 不可变替换:直接覆盖整体引用,避免读半更新
  33. self.sec_ids = list(sec_ids)
  34. self.faq_sections = list(faq_sections)
  35. self.last_updated_at = datetime.now()
  36. self.next_refresh_at = _next_refresh_time(self.last_updated_at)
  37. logger.info(
  38. "FAQ 缓存刷新完成:sec_ids=%s 条,下一次刷新 %s",
  39. len(self.sec_ids),
  40. self.next_refresh_at,
  41. )
  42. def snapshot(self) -> dict[str, Any]:
  43. """读快照(不持锁)。"""
  44. return {
  45. "sec_ids": self.sec_ids,
  46. "faq_sections": self.faq_sections,
  47. "last_updated_at": (
  48. self.last_updated_at.isoformat()
  49. if self.last_updated_at
  50. else None
  51. ),
  52. "next_refresh_at": (
  53. self.next_refresh_at.isoformat()
  54. if self.next_refresh_at
  55. else None
  56. ),
  57. }
  58. # 全局单例(FastAPI 进程级)
  59. faq_cache = FaqCache()
  60. def _next_refresh_time(now: datetime) -> datetime:
  61. """计算下一次定时刷新时间(每天 hh:mm,本地时区)。"""
  62. target_time = time(
  63. hour=settings.cache_refresh_hour,
  64. minute=settings.cache_refresh_minute,
  65. )
  66. today_target = datetime.combine(now.date(), target_time)
  67. if now < today_target:
  68. return today_target
  69. return today_target + timedelta(days=1)
  70. async def scheduler_loop() -> None:
  71. """每天 0 点(可配置)刷新一次 FAQ 缓存。
  72. 采用 sleep(到下一次目标时间) 的简单循环;
  73. 被 cancel 时优雅退出,无需第三方调度库。
  74. """
  75. while True:
  76. now = datetime.now()
  77. next_run = _next_refresh_time(now)
  78. wait_seconds = (next_run - now).total_seconds()
  79. logger.info(
  80. "下一次 FAQ 缓存刷新 %s(%.0f 秒后)",
  81. next_run.isoformat(),
  82. wait_seconds,
  83. )
  84. try:
  85. await asyncio.sleep(wait_seconds)
  86. except asyncio.CancelledError:
  87. logger.info("scheduler_loop 收到取消信号,退出")
  88. raise
  89. await faq_cache.refresh()