import logging import time import httpx import config logger = logging.getLogger(__name__) _BASE_URL = "https://api.weixin.qq.com/cgi-bin" class WeChatError(Exception): pass class WeChatClient: def __init__(self): self._http = httpx.Client(timeout=httpx.Timeout(connect=10.0, read=30.0, write=10.0, pool=10.0)) self._token = None self._token_expires_at = 0 def _get_access_token(self, force_refresh: bool = False): if not force_refresh and self._token and time.time() < self._token_expires_at - 300: return self._token resp = self._http.post(f"{_BASE_URL}/stable_token", json={ "grant_type": "client_credential", "appid": config.APP_ID, "secret": config.APP_SECRET, "force_refresh": force_refresh, }) data = resp.json() if "access_token" not in data: raise WeChatError(f"Failed to get token: {data}") self._token = data["access_token"] self._token_expires_at = time.time() + data.get("expires_in", 7200) logger.info("Access token refreshed") return self._token def _api_post(self, path: str, body: dict = None) -> dict: token = self._get_access_token() url = f"{_BASE_URL}/{path}?access_token={token}" resp = self._http.post(url, json=body or {}) data = resp.json() if data.get("errcode") == 40001: logger.info("Token expired, force refreshing") token = self._get_access_token(force_refresh=True) url = f"{_BASE_URL}/{path}?access_token={token}" resp = self._http.post(url, json=body or {}) data = resp.json() if data.get("errcode", 0) != 0: raise WeChatError(f"API error {data.get('errcode')}: {data.get('errmsg', '')}") return data def _api_get(self, path: str) -> dict: token = self._get_access_token() url = f"{_BASE_URL}/{path}?access_token={token}" resp = self._http.get(url) data = resp.json() if data.get("errcode") == 40001: logger.info("Token expired, force refreshing") token = self._get_access_token(force_refresh=True) url = f"{_BASE_URL}/{path}?access_token={token}" resp = self._http.get(url) data = resp.json() if data.get("errcode", 0) != 0: raise WeChatError(f"API error {data.get('errcode')}: {data.get('errmsg', '')}") return data def get_material_count(self) -> dict: """Get total counts for each material type.""" return self._api_get("material/get_materialcount") def batch_get_materials(self, material_type: str, offset: int = 0, count: int = 20) -> dict: """Get a page of materials by type.""" return self._api_post("material/batchget_material", { "type": material_type, "offset": offset, "count": count, }) def get_material(self, media_id: str) -> dict: """Get detail of a single material by media_id.""" return self._api_post("material/get_material", {"media_id": media_id}) def batch_get_published(self, offset: int = 0, count: int = 20, no_content: int = 0) -> dict: """Get a page of published articles (freepublish, requires verified account).""" return self._api_post("freepublish/batchget", { "offset": offset, "count": count, "no_content": no_content, }) def get_published_article(self, article_id: str) -> dict: """Get detail of a single published article by article_id.""" return self._api_post("freepublish/getarticle", {"article_id": article_id}) def batch_get_drafts(self, offset: int = 0, count: int = 20, no_content: int = 0) -> dict: """Get a page of draft articles.""" return self._api_post("draft/batchget", { "offset": offset, "count": count, "no_content": no_content, }) def close(self): self._http.close()