"""Zendesk Help Center 异步客户端。 封装两类调用: - list_all_faq_sections:扫描 sections 找出名字含 "faq" 且文章数 > 0 的 sec_id - search_articles:按 section 列表搜索文章 """ from __future__ import annotations import logging from typing import Any import httpx from app.config import settings logger = logging.getLogger(__name__) class ZendeskError(Exception): """Zendesk API 调用失败统一异常。""" async def _get_article_count( client: httpx.AsyncClient, section_id: int, locale: str ) -> int: """获取某 section 的文章数(与 test2.py 中 get_article_count 等价)。""" url = ( f"{settings.zendesk_base_url}/{locale}/sections/" f"{section_id}/articles.json" ) try: resp = await client.get(url, params={"per_page": 1}) if resp.status_code == 200: return int(resp.json().get("count", 0)) except httpx.HTTPError as exc: logger.warning("get_article_count 失败 sec_id=%s: %s", section_id, exc) return 0 async def list_all_faq_sections( locales: list[str] | None = None, ) -> tuple[list[int], list[dict[str, Any]]]: """完整列出所有 FAQ 相关 Section(异步版本,等价于 test2.list_all_sections_full)。 返回: (sec_ids, faq_sections): sec_ids — 含有文章 (count>0) 的 FAQ Section ID 列表 faq_sections — 详细元数据列表 """ locales = locales or [settings.default_locale] faq_sections: list[dict[str, Any]] = [] sec_ids: list[int] = [] timeout = httpx.Timeout(settings.http_timeout) async with httpx.AsyncClient( auth=settings.zendesk_auth, timeout=timeout ) as client: for locale in locales: url: str | None = ( f"{settings.zendesk_base_url}/{locale}/sections.json" ) page = 1 logger.info("扫描 Locale: %s", locale) while url: params: dict[str, Any] = {"per_page": 100} # next_page 已包含 page 参数;首次请求才需要显式传 if "page=" not in url: params["page"] = page try: resp = await client.get(url, params=params) except httpx.HTTPError as exc: logger.error("sections 请求异常 locale=%s: %s", locale, exc) break if resp.status_code != 200: logger.error( "sections 请求失败 locale=%s status=%s", locale, resp.status_code, ) break data = resp.json() sections = data.get("sections", []) # logger.info("第 %s 页,共 %s 个 Section", page, len(sections)) for sec in sections: name = (sec.get("name") or "").strip() sec_id = sec.get("id") cat_id = sec.get("category_id") if "faq" in name.lower(): count = await _get_article_count(client, sec_id, locale) logger.info( "✅ FAQ Section id=%s count=%s name=%s cat=%s", sec_id, count, name, cat_id, ) if count > 0: sec_ids.append(sec_id) faq_sections.append( { "id": sec_id, "name": name, "category_id": cat_id, "locale": locale, "article_count": count, } ) url = data.get("next_page") page += 1 logger.info("总共发现 %s 个 FAQ 相关 Section", len(faq_sections)) return sec_ids, faq_sections async def search_articles( query: str, section_ids: list[int], locale: str | None = None, page: int = 1, per_page: int = 25, **extra: Any, ) -> dict[str, Any]: """搜索 Help Center 文章(异步版本,等价于 test1.search_articles)。 section_ids 为空列表时,不带 section 过滤直接搜索全部。 """ locale = locale or settings.default_locale url = f"{settings.zendesk_base_url}/articles/search" params: dict[str, Any] = { "query": query, "locale": locale, "page": page, "per_page": per_page, } if section_ids: params["section"] = ",".join(map(str, section_ids)) params.update({k: v for k, v in extra.items() if v is not None}) timeout = httpx.Timeout(settings.http_timeout) async with httpx.AsyncClient( auth=settings.zendesk_auth, timeout=timeout ) as client: try: resp = await client.get(url, params=params) resp.raise_for_status() return resp.json() except httpx.HTTPStatusError as exc: logger.error( "search_articles HTTP %s: %s", exc.response.status_code, exc.response.text, ) raise ZendeskError( f"Zendesk 返回 {exc.response.status_code}" ) from exc except httpx.HTTPError as exc: logger.error("search_articles 网络错误: %s", exc) raise ZendeskError(str(exc)) from exc