liujintao пре 4 часа
комит
2b258506bc

+ 14 - 0
.env

@@ -0,0 +1,14 @@
+# Zendesk Help Center 凭据
+ZENDESK_SUBDOMAIN=zositechhelp
+ZENDESK_EMAIL=zhzxb027@vip.163.com
+ZENDESK_API_TOKEN=CpmJL8D4aP20BPn9vi8t9ww5VUOv0bhhu69PFkyc
+
+# 默认语言
+DEFAULT_LOCALE=en-us
+
+# FAQ 缓存刷新时间(24 小时制,本地时区)
+CACHE_REFRESH_HOUR=23
+CACHE_REFRESH_MINUTE=59
+
+# HTTP 请求超时(秒)
+HTTP_TIMEOUT=30

+ 14 - 0
.env.example

@@ -0,0 +1,14 @@
+# Zendesk Help Center 凭据
+ZENDESK_SUBDOMAIN=zositechhelp
+ZENDESK_EMAIL=your_email@example.com
+ZENDESK_API_TOKEN=your_api_token_here
+
+# 默认语言
+DEFAULT_LOCALE=en-us
+
+# FAQ 缓存刷新时间(24 小时制,本地时区)
+CACHE_REFRESH_HOUR=0
+CACHE_REFRESH_MINUTE=0
+
+# HTTP 请求超时(秒)
+HTTP_TIMEOUT=30

+ 17 - 0
.gitignore

@@ -0,0 +1,17 @@
+.idea/
+.vscode
+.git/
+.DS_Store
+npm-debug.log*
+yarn-debug.log*
+yarn-error.log*
+pnpm-debug.log*
+node_modules
+dist
+.next
+.out
+.serverless
+.webpack
+.webpack-cache
+.serverless_plugins
+.venv

+ 114 - 0
README.md

@@ -0,0 +1,114 @@
+# Zendesk FAQ Keyword Search
+
+把 `test1.py` (search_articles) 与 `test2.py` (list_all_sections_full) 重写为 FastAPI 服务:
+
+- 启动时加载所有 **FAQ Section IDs**(名字含 `faq` 且文章数 > 0)保存到内存
+- 每天 **0:00** 自动刷新一次(可在 `.env` 改时间)
+- 暴露 `GET/POST /search` 接口,请求字段为 `query`,返回 Zendesk 接口的搜索结果
+
+## 目录结构
+
+```
+app/
+├── main.py               # FastAPI 入口、lifespan、健康检查、手动刷新
+├── config.py             # 从 .env 读取 Zendesk 凭据
+├── schemas.py            # 请求/响应 Pydantic 模型
+├── services/
+│   ├── zendesk_client.py # 异步 list_all_faq_sections / search_articles
+│   └── cache.py          # 内存缓存 + 每日 0 点定时刷新
+└── routers/
+    └── search.py         # /search 接口
+```
+
+## 安装
+
+```bash
+pip install -r requirements.txt
+cp .env.example .env
+# 编辑 .env 填入 ZENDESK_SUBDOMAIN / ZENDESK_EMAIL / ZENDESK_API_TOKEN
+```
+
+## 运行
+
+```bash
+uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload
+```
+
+启动日志中会看到:
+
+```
+启动:首次加载 FAQ 缓存…
+✅ FAQ Section id=... count=... name=... cat=...
+FAQ 缓存刷新完成:sec_ids=N 条,下一次刷新 2026-06-16T00:00:00
+启动:定时刷新任务已运行
+```
+
+## 接口
+
+### 1. 搜索(GET)
+
+```bash
+curl "http://localhost:8000/search?query=Can%20I%20Add%20a%20PoE%20Switch"
+```
+
+### 2. 搜索(POST)
+
+```bash
+curl -X POST http://localhost:8000/search \
+  -H "Content-Type: application/json" \
+  -d '{"query": "Can I Add a PoE Switch", "page": 1, "per_page": 25}'
+```
+
+响应:
+
+```json
+{
+  "success": true,
+  "query": "...",
+  "count": 3,
+  "page": 1,
+  "per_page": 25,
+  "next_page": null,
+  "sec_ids_used": [48252627506585, 47684283137817, "..."],
+  "results": [{"title": "...", "html_url": "...", "snippet": "..."}]
+}
+```
+
+### 3. 缓存状态
+
+```bash
+curl http://localhost:8000/health/cache
+# {"sec_ids_count": 30, "last_updated_at": "2026-06-15T09:12:33", "next_refresh_at": "2026-06-16T00:00:00"}
+```
+
+### 4. 手动刷新缓存
+
+```bash
+curl -X POST http://localhost:8000/admin/cache/refresh
+```
+
+### 5. Swagger UI
+
+打开 <http://localhost:8000/docs>。
+
+## 配置项(.env)
+
+| 变量 | 默认 | 说明 |
+| --- | --- | --- |
+| `ZENDESK_SUBDOMAIN` | — | Zendesk 子域名,例:`zositechhelp` |
+| `ZENDESK_EMAIL` | — | Zendesk 账号邮箱 |
+| `ZENDESK_API_TOKEN` | — | API Token |
+| `DEFAULT_LOCALE` | `en-us` | 默认语言 |
+| `CACHE_REFRESH_HOUR` | `0` | 每日刷新小时(本地时区) |
+| `CACHE_REFRESH_MINUTE` | `0` | 每日刷新分钟 |
+| `HTTP_TIMEOUT` | `30` | httpx 超时秒数 |
+
+## 与原脚本对照
+
+| 原脚本 | 新位置 |
+| --- | --- |
+| `test2.list_all_sections_full` | `app/services/zendesk_client.list_all_faq_sections`(异步) |
+| `test2.get_article_count` | `app/services/zendesk_client._get_article_count`(私有) |
+| `test1.search_articles` | `app/services/zendesk_client.search_articles`(异步) |
+| `test1.faq_section_ids` 硬编码 | 内存缓存 `app/services/cache.faq_cache.sec_ids`(启动加载 + 每日 0 点刷新) |
+| `test1.QUERY` 硬编码 | HTTP 请求字段 `query` |

+ 0 - 0
app/__init__.py


BIN
app/__pycache__/__init__.cpython-312.pyc


BIN
app/__pycache__/config.cpython-312.pyc


BIN
app/__pycache__/main.cpython-312.pyc


BIN
app/__pycache__/schemas.cpython-312.pyc


+ 40 - 0
app/config.py

@@ -0,0 +1,40 @@
+"""应用配置:从 .env / 环境变量加载。"""
+from pydantic_settings import BaseSettings, SettingsConfigDict
+
+
+class Settings(BaseSettings):
+    """所有可配置项;不允许在源码中硬编码凭据。"""
+
+    model_config = SettingsConfigDict(
+        env_file=".env",
+        env_file_encoding="utf-8",
+        case_sensitive=False,
+        extra="ignore",
+    )
+
+    # Zendesk 凭据
+    zendesk_subdomain: str
+    zendesk_email: str
+    zendesk_api_token: str
+
+    # 业务默认值
+    default_locale: str = "en-us"
+
+    # 缓存刷新时间(本地时区,24 小时制)
+    cache_refresh_hour: int = 0
+    cache_refresh_minute: int = 0
+
+    # HTTP
+    http_timeout: int = 30
+
+    @property
+    def zendesk_base_url(self) -> str:
+        return f"https://{self.zendesk_subdomain}.zendesk.com/api/v2/help_center"
+
+    @property
+    def zendesk_auth(self) -> tuple[str, str]:
+        # Zendesk API Token 鉴权格式:("{email}/token", api_token)
+        return (f"{self.zendesk_email}/token", self.zendesk_api_token)
+
+
+settings = Settings()

+ 77 - 0
app/main.py

@@ -0,0 +1,77 @@
+"""FastAPI 应用入口。"""
+from __future__ import annotations
+
+import asyncio
+import logging
+from contextlib import asynccontextmanager
+from typing import AsyncIterator
+
+from fastapi import FastAPI
+
+from app.routers import search
+from app.schemas import CacheStatus
+from app.services.cache import faq_cache, scheduler_loop
+
+logging.basicConfig(
+    level=logging.INFO,
+    format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
+)
+logger = logging.getLogger(__name__)
+
+
+@asynccontextmanager
+async def lifespan(app: FastAPI) -> AsyncIterator[None]:
+    """启动时初始化缓存 + 启动定时器;关闭时优雅取消。"""
+    logger.info("启动:首次加载 FAQ 缓存…")
+    await faq_cache.refresh()
+
+    task = asyncio.create_task(scheduler_loop(), name="faq-scheduler")
+    logger.info("启动:定时刷新任务已运行")
+
+    try:
+        yield
+    finally:
+        logger.info("关闭:取消定时任务…")
+        task.cancel()
+        try:
+            await task
+        except asyncio.CancelledError:
+            pass
+
+
+app = FastAPI(
+    title="Zendesk FAQ Keyword Search",
+    description="封装 Zendesk Help Center 搜索;FAQ sec_ids 每日 0 点刷新缓存。",
+    version="1.0.0",
+    lifespan=lifespan,
+)
+
+# 路由
+app.include_router(search.router)
+
+
+@app.get("/health", tags=["meta"])
+async def health() -> dict[str, str]:
+    return {"status": "ok"}
+
+
+@app.get("/health/cache", response_model=CacheStatus, tags=["meta"])
+async def cache_status() -> CacheStatus:
+    snap = faq_cache.snapshot()
+    return CacheStatus(
+        sec_ids_count=len(snap["sec_ids"]),
+        last_updated_at=snap["last_updated_at"],
+        next_refresh_at=snap["next_refresh_at"],
+    )
+
+
+@app.post("/admin/cache/refresh", tags=["meta"])
+async def refresh_cache_now() -> dict[str, object]:
+    """手动触发刷新(运维 / 测试用途)。"""
+    await faq_cache.refresh()
+    snap = faq_cache.snapshot()
+    return {
+        "success": True,
+        "sec_ids_count": len(snap["sec_ids"]),
+        "last_updated_at": snap["last_updated_at"],
+    }

+ 0 - 0
app/routers/__init__.py


BIN
app/routers/__pycache__/__init__.cpython-312.pyc


BIN
app/routers/__pycache__/search.cpython-312.pyc


+ 66 - 0
app/routers/search.py

@@ -0,0 +1,66 @@
+"""搜索接口路由。"""
+from __future__ import annotations
+
+import logging
+
+from fastapi import APIRouter, HTTPException, Query
+
+from app.schemas import SearchRequest, SearchResponse
+from app.services.cache import faq_cache
+from app.services.zendesk_client import ZendeskError, search_articles
+
+logger = logging.getLogger(__name__)
+
+router = APIRouter(tags=["search"])
+
+
+async def _do_search(req: SearchRequest) -> SearchResponse:
+    """共享的搜索逻辑:使用内存中的 FAQ sec_ids 调用 Zendesk。"""
+    snapshot = faq_cache.snapshot()
+    sec_ids: list[int] = snapshot["sec_ids"]
+
+    if not sec_ids:
+        logger.warning("FAQ sec_ids 缓存为空,本次搜索不带 section 过滤")
+
+    try:
+        data = await search_articles(
+            query=req.query,
+            section_ids=sec_ids,
+            locale=req.locale,
+            page=req.page,
+            per_page=req.per_page,
+        )
+    except ZendeskError as exc:
+        raise HTTPException(status_code=502, detail=str(exc)) from exc
+
+    return SearchResponse(
+        success=True,
+        query=req.query,
+        count=int(data.get("count", 0)),
+        page=req.page,
+        per_page=req.per_page,
+        next_page=data.get("next_page"),
+        sec_ids_used=sec_ids,
+        results=data.get("results", []),
+    )
+
+
+@router.get("/search", response_model=SearchResponse, summary="按 QUERY 搜索 FAQ 文章")
+async def search_get(
+    query: str = Query(..., min_length=1, description="搜索关键词"),
+    locale: str | None = Query(default=None),
+    page: int = Query(default=1, ge=1),
+    per_page: int = Query(default=25, ge=1, le=100),
+) -> SearchResponse:
+    """GET 版本,便于浏览器直接测试。"""
+    return await _do_search(
+        SearchRequest(
+            query=query, locale=locale, page=page, per_page=per_page
+        )
+    )
+
+
+@router.post("/search", response_model=SearchResponse, summary="按 QUERY 搜索 FAQ 文章 (POST)")
+async def search_post(req: SearchRequest) -> SearchResponse:
+    """POST 版本,请求体:{"query": "...", "locale": "...", ...}。"""
+    return await _do_search(req)

+ 35 - 0
app/schemas.py

@@ -0,0 +1,35 @@
+"""请求 / 响应 Pydantic 模型。"""
+from typing import Any
+
+from pydantic import BaseModel, Field
+
+
+class SearchRequest(BaseModel):
+    """搜索请求体(POST 用)。"""
+
+    query: str = Field(..., min_length=1, description="搜索关键词")
+    locale: str | None = Field(default=None, description="语言代码,留空使用默认")
+    page: int = Field(default=1, ge=1)
+    per_page: int = Field(default=25, ge=1, le=100)
+
+
+class SearchResponse(BaseModel):
+    """统一响应信封。"""
+
+    success: bool
+    query: str
+    count: int
+    page: int
+    per_page: int
+    next_page: str | None = None
+    sec_ids_used: list[int]
+    results: list[dict[str, Any]]
+    error: str | None = None
+
+
+class CacheStatus(BaseModel):
+    """缓存状态。"""
+
+    sec_ids_count: int
+    last_updated_at: str | None
+    next_refresh_at: str | None

+ 0 - 0
app/services/__init__.py


BIN
app/services/__pycache__/__init__.cpython-312.pyc


BIN
app/services/__pycache__/cache.cpython-312.pyc


BIN
app/services/__pycache__/zendesk_client.cpython-312.pyc


+ 106 - 0
app/services/cache.py

@@ -0,0 +1,106 @@
+"""FAQ sec_ids 内存缓存 + 每日 0 点定时刷新。"""
+from __future__ import annotations
+
+import asyncio
+import logging
+from dataclasses import dataclass, field
+from datetime import datetime, time, timedelta
+from typing import Any
+
+from app.config import settings
+from app.services.zendesk_client import list_all_faq_sections
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class FaqCache:
+    """内存中的 FAQ section 缓存。
+
+    使用不可变赋值(每次刷新整体替换 sec_ids 引用),并发读取安全。
+    写入前用 lock 保护避免并发刷新。
+    """
+
+    sec_ids: list[int] = field(default_factory=list)
+    faq_sections: list[dict[str, Any]] = field(default_factory=list)
+    last_updated_at: datetime | None = None
+    next_refresh_at: datetime | None = None
+    _lock: asyncio.Lock = field(default_factory=asyncio.Lock)
+
+    async def refresh(self) -> None:
+        """从 Zendesk 拉取最新 FAQ sec_ids 替换缓存。"""
+        async with self._lock:
+            logger.info("开始刷新 FAQ 缓存…")
+            try:
+                sec_ids, faq_sections = await list_all_faq_sections()
+            except Exception as exc:
+                # 缓存刷新失败不应让定时任务终止,记录并保留旧值
+                logger.exception("FAQ 缓存刷新失败:%s", exc)
+                return
+
+            # 不可变替换:直接覆盖整体引用,避免读半更新
+            self.sec_ids = list(sec_ids)
+            self.faq_sections = list(faq_sections)
+            self.last_updated_at = datetime.now()
+            self.next_refresh_at = _next_refresh_time(self.last_updated_at)
+            logger.info(
+                "FAQ 缓存刷新完成:sec_ids=%s 条,下一次刷新 %s",
+                len(self.sec_ids),
+                self.next_refresh_at,
+            )
+
+    def snapshot(self) -> dict[str, Any]:
+        """读快照(不持锁)。"""
+        return {
+            "sec_ids": self.sec_ids,
+            "faq_sections": self.faq_sections,
+            "last_updated_at": (
+                self.last_updated_at.isoformat()
+                if self.last_updated_at
+                else None
+            ),
+            "next_refresh_at": (
+                self.next_refresh_at.isoformat()
+                if self.next_refresh_at
+                else None
+            ),
+        }
+
+
+# 全局单例(FastAPI 进程级)
+faq_cache = FaqCache()
+
+
+def _next_refresh_time(now: datetime) -> datetime:
+    """计算下一次定时刷新时间(每天 hh:mm,本地时区)。"""
+    target_time = time(
+        hour=settings.cache_refresh_hour,
+        minute=settings.cache_refresh_minute,
+    )
+    today_target = datetime.combine(now.date(), target_time)
+    if now < today_target:
+        return today_target
+    return today_target + timedelta(days=1)
+
+
+async def scheduler_loop() -> None:
+    """每天 0 点(可配置)刷新一次 FAQ 缓存。
+
+    采用 sleep(到下一次目标时间) 的简单循环;
+    被 cancel 时优雅退出,无需第三方调度库。
+    """
+    while True:
+        now = datetime.now()
+        next_run = _next_refresh_time(now)
+        wait_seconds = (next_run - now).total_seconds()
+        logger.info(
+            "下一次 FAQ 缓存刷新 %s(%.0f 秒后)",
+            next_run.isoformat(),
+            wait_seconds,
+        )
+        try:
+            await asyncio.sleep(wait_seconds)
+        except asyncio.CancelledError:
+            logger.info("scheduler_loop 收到取消信号,退出")
+            raise
+        await faq_cache.refresh()

+ 166 - 0
app/services/zendesk_client.py

@@ -0,0 +1,166 @@
+"""Zendesk Help Center 异步客户端。
+
+封装两类调用:
+- list_all_faq_sections:扫描 sections 找出名字含 "faq" 且文章数 > 0 的 sec_id
+- search_articles:按 section 列表搜索文章
+"""
+from __future__ import annotations
+
+import logging
+from typing import Any
+
+import httpx
+
+from app.config import settings
+
+logger = logging.getLogger(__name__)
+
+
+class ZendeskError(Exception):
+    """Zendesk API 调用失败统一异常。"""
+
+
+async def _get_article_count(
+    client: httpx.AsyncClient, section_id: int, locale: str
+) -> int:
+    """获取某 section 的文章数(与 test2.py 中 get_article_count 等价)。"""
+    url = (
+        f"{settings.zendesk_base_url}/{locale}/sections/"
+        f"{section_id}/articles.json"
+    )
+    try:
+        resp = await client.get(url, params={"per_page": 1})
+        if resp.status_code == 200:
+            return int(resp.json().get("count", 0))
+    except httpx.HTTPError as exc:
+        logger.warning("get_article_count 失败 sec_id=%s: %s", section_id, exc)
+    return 0
+
+
+async def list_all_faq_sections(
+    locales: list[str] | None = None,
+) -> tuple[list[int], list[dict[str, Any]]]:
+    """完整列出所有 FAQ 相关 Section(异步版本,等价于 test2.list_all_sections_full)。
+
+    返回:
+        (sec_ids, faq_sections):
+        sec_ids        — 含有文章 (count>0) 的 FAQ Section ID 列表
+        faq_sections   — 详细元数据列表
+    """
+    locales = locales or [settings.default_locale]
+    faq_sections: list[dict[str, Any]] = []
+    sec_ids: list[int] = []
+
+    timeout = httpx.Timeout(settings.http_timeout)
+    async with httpx.AsyncClient(
+        auth=settings.zendesk_auth, timeout=timeout
+    ) as client:
+        for locale in locales:
+            url: str | None = (
+                f"{settings.zendesk_base_url}/{locale}/sections.json"
+            )
+            page = 1
+            logger.info("扫描 Locale: %s", locale)
+
+            while url:
+                params: dict[str, Any] = {"per_page": 100}
+                # next_page 已包含 page 参数;首次请求才需要显式传
+                if "page=" not in url:
+                    params["page"] = page
+
+                try:
+                    resp = await client.get(url, params=params)
+                except httpx.HTTPError as exc:
+                    logger.error("sections 请求异常 locale=%s: %s", locale, exc)
+                    break
+
+                if resp.status_code != 200:
+                    logger.error(
+                        "sections 请求失败 locale=%s status=%s",
+                        locale,
+                        resp.status_code,
+                    )
+                    break
+
+                data = resp.json()
+                sections = data.get("sections", [])
+                # logger.info("第 %s 页,共 %s 个 Section", page, len(sections))
+
+                for sec in sections:
+                    name = (sec.get("name") or "").strip()
+                    sec_id = sec.get("id")
+                    cat_id = sec.get("category_id")
+
+                    if "faq" in name.lower():
+                        count = await _get_article_count(client, sec_id, locale)
+                        logger.info(
+                            "✅ FAQ Section id=%s count=%s name=%s cat=%s",
+                            sec_id,
+                            count,
+                            name,
+                            cat_id,
+                        )
+                        if count > 0:
+                            sec_ids.append(sec_id)
+                            faq_sections.append(
+                                {
+                                    "id": sec_id,
+                                    "name": name,
+                                    "category_id": cat_id,
+                                    "locale": locale,
+                                    "article_count": count,
+                                }
+                            )
+
+                url = data.get("next_page")
+                page += 1
+
+    logger.info("总共发现 %s 个 FAQ 相关 Section", len(faq_sections))
+    return sec_ids, faq_sections
+
+
+async def search_articles(
+    query: str,
+    section_ids: list[int],
+    locale: str | None = None,
+    page: int = 1,
+    per_page: int = 25,
+    **extra: Any,
+) -> dict[str, Any]:
+    """搜索 Help Center 文章(异步版本,等价于 test1.search_articles)。
+
+    section_ids 为空列表时,不带 section 过滤直接搜索全部。
+    """
+    locale = locale or settings.default_locale
+    url = f"{settings.zendesk_base_url}/articles/search"
+
+    params: dict[str, Any] = {
+        "query": query,
+        "locale": locale,
+        "page": page,
+        "per_page": per_page,
+    }
+    if section_ids:
+        params["section"] = ",".join(map(str, section_ids))
+    params.update({k: v for k, v in extra.items() if v is not None})
+
+    timeout = httpx.Timeout(settings.http_timeout)
+    async with httpx.AsyncClient(
+        auth=settings.zendesk_auth, timeout=timeout
+    ) as client:
+        try:
+            resp = await client.get(url, params=params)
+            resp.raise_for_status()
+            return resp.json()
+        except httpx.HTTPStatusError as exc:
+            logger.error(
+                "search_articles HTTP %s: %s",
+                exc.response.status_code,
+                exc.response.text,
+            )
+            raise ZendeskError(
+                f"Zendesk 返回 {exc.response.status_code}"
+            ) from exc
+        except httpx.HTTPError as exc:
+            logger.error("search_articles 网络错误: %s", exc)
+            raise ZendeskError(str(exc)) from exc

+ 6 - 0
pyproject.toml

@@ -0,0 +1,6 @@
+[project]
+name = "keywordsearch"
+version = "0.1.0"
+description = "Add your description here"
+requires-python = ">=3.12"
+dependencies = []

+ 6 - 0
requirements.txt

@@ -0,0 +1,6 @@
+fastapi>=0.110.0
+uvicorn[standard]>=0.27.0
+httpx>=0.27.0
+pydantic>=2.6.0
+pydantic-settings>=2.2.0
+python-dotenv>=1.0.0

+ 8 - 0
uv.lock

@@ -0,0 +1,8 @@
+version = 1
+revision = 2
+requires-python = ">=3.12"
+
+[[package]]
+name = "keywordsearch"
+version = "0.1.0"
+source = { virtual = "." }