| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677 |
- """FastAPI 应用入口。"""
- from __future__ import annotations
- import asyncio
- import logging
- from contextlib import asynccontextmanager
- from typing import AsyncIterator
- from fastapi import FastAPI
- from app.routers import search
- from app.schemas import CacheStatus
- from app.services.cache import faq_cache, scheduler_loop
- logging.basicConfig(
- level=logging.INFO,
- format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
- )
- logger = logging.getLogger(__name__)
- @asynccontextmanager
- async def lifespan(app: FastAPI) -> AsyncIterator[None]:
- """启动时初始化缓存 + 启动定时器;关闭时优雅取消。"""
- logger.info("启动:首次加载 FAQ 缓存…")
- await faq_cache.refresh()
- task = asyncio.create_task(scheduler_loop(), name="faq-scheduler")
- logger.info("启动:定时刷新任务已运行")
- try:
- yield
- finally:
- logger.info("关闭:取消定时任务…")
- task.cancel()
- try:
- await task
- except asyncio.CancelledError:
- pass
- app = FastAPI(
- title="Zendesk FAQ Keyword Search",
- description="封装 Zendesk Help Center 搜索;FAQ sec_ids 每日 0 点刷新缓存。",
- version="1.0.0",
- lifespan=lifespan,
- )
- # 路由
- app.include_router(search.router)
- @app.get("/health", tags=["meta"])
- async def health() -> dict[str, str]:
- return {"status": "ok"}
- @app.get("/health/cache", response_model=CacheStatus, tags=["meta"])
- async def cache_status() -> CacheStatus:
- snap = faq_cache.snapshot()
- return CacheStatus(
- sec_ids_count=len(snap["sec_ids"]),
- last_updated_at=snap["last_updated_at"],
- next_refresh_at=snap["next_refresh_at"],
- )
- @app.post("/admin/cache/refresh", tags=["meta"])
- async def refresh_cache_now() -> dict[str, object]:
- """手动触发刷新(运维 / 测试用途)。"""
- await faq_cache.refresh()
- snap = faq_cache.snapshot()
- return {
- "success": True,
- "sec_ids_count": len(snap["sec_ids"]),
- "last_updated_at": snap["last_updated_at"],
- }
|