diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a8f2fe0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +__pycache__/ +*.pyc +*.egg-info/ +.env +.env_example +api_docs/ diff --git a/.python-version b/.python-version new file mode 100644 index 0000000..2c07333 --- /dev/null +++ b/.python-version @@ -0,0 +1 @@ +3.11 diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..11651a6 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,10 @@ +FROM python:3.11-slim + +WORKDIR /app + +COPY pyproject.toml . +COPY main.py config.py wechat.py db.py sync.py ./ + +RUN pip install --no-cache-dir . + +CMD ["python", "main.py"] diff --git a/README.md b/README.md index 789bb64..092a727 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,176 @@ # UpdateDB -定时爬取公众号的视频与文章数据,更新到数据库 \ No newline at end of file +定时从微信公众号平台获取视频与文章数据,同步到远端 PostgreSQL 数据库。 + +## 技术栈 + +- Python 3.11 +- httpx (HTTP 客户端) +- psycopg2-binary (PostgreSQL 驱动) +- APScheduler (定时调度) +- Docker 部署 + +## 项目结构 + +``` +UpdateDB/ + main.py # 入口:启动调度器,支持 --once 手动触发 + config.py # 加载 .env 配置 + wechat.py # 微信 API 客户端(token 管理、素材获取) + db.py # 数据库连接、建表、UPSERT + sync.py # 同步编排逻辑 + Dockerfile + docker-compose.yml + .env_example # 环境变量模板 + api_docs/ # 微信 API 文档 +``` + +## 数据库变更总结 + +### 新建表:`videos` + +程序启动时自动创建(`CREATE TABLE IF NOT EXISTS`),**不会影响已有表**。 + +```sql +CREATE TABLE IF NOT EXISTS videos ( + id SERIAL PRIMARY KEY, -- 自增主键 + media_id VARCHAR(128) UNIQUE, -- 微信素材 ID(唯一约束) + name TEXT, -- 视频文件名(来自列表接口) + url TEXT, -- 视频播放 URL(来自列表接口) + title TEXT, -- 视频标题(来自详情接口) + description TEXT, -- 视频描述(来自详情接口) + down_url TEXT, -- 视频下载地址(来自详情接口,可能过期) + wechat_update_time TIMESTAMP, -- 微信端更新时间 + created_at TIMESTAMP DEFAULT NOW(), + updated_at TIMESTAMP DEFAULT NOW() +); +``` + +**后端读取时注意**: +- `url` 和 `down_url` 都可能过期失效,建议每次使用前检查可用性,或定期触发同步刷新 +- `title` 和 `description` 可能为 NULL(首次同步时详情接口调用失败) +- `url` 是播放地址,`down_url` 是下载地址,两者不同 + +### 未修改的表 + +#### `articles`(原有表,未改结构) + +程序往这些**已有字段**写入数据,未新增/修改任何列: + +| 数据库字段 | 写入内容 | 微信 API 来源 | 注意事项 | +|---|---|---|---| +| `title` | 文章标题 | `news_item.title` | 截断到 200 字符 | +| `content` | 文章 HTML 内容 | `news_item.content` | 原始 HTML,微信已去除 JS | +| `cover_url` | 封面图 URL | `news_item.thumb_url` | 截断到 500 字符,可能为 NULL | +| `author` | 作者 | `news_item.author` | 截断到 100 字符,可能为 NULL | +| `publish_date` | 发布日期 | `item.update_time` 转 date | 仅日期,不含时分秒 | +| `source_url` | 文章原文链接 | `news_item.url` | 截断到 1000 字符,可能为 NULL | +| `wechat_article_id` | 去重标识 | `{media_id}_{index}` | 格式见下方说明 | + +**未写入的字段**(保持原有值/NULL):`category`、`tags`(微信 API 无此数据) + +#### `sync_states`(原有表,未改结构) + +使用已有的 key-value 结构记录同步状态: + +| key | value 示例 | +|---|---| +| `wechat_news_sync` | `{"status":"idle","count":42,"last_sync":"2026-04-09T..."}` | +| `wechat_video_sync` | `{"status":"idle","count":5,"last_sync":"2026-04-09T..."}` | + +value 是 JSON 字符串,后端可用 `json.loads()` 解析。 + +### 同步删除机制 + +每次同步完成后,程序会对比微信端和数据库的 `media_id`: +- 数据库中有、微信端没有的记录 → **自动删除** +- 涉及 `articles` 和 `videos` 两张表 +- articles 的删除依据:`wechat_article_id` 中提取的 `media_id`(按最后一个 `_` 分割) + +### `wechat_article_id` 格式说明 + +``` +格式:{media_id}_{article_index} +示例:abc123def456_0 (单图文) + abc123def456_1 (多图文中的第 2 篇) +``` + +- `media_id`:微信素材 ID,一个 media_id 对应一条图文素材 +- `article_index`:图文内的文章序号(从 0 开始),多图文时会有 0、1、2... +- 后端查询某篇具体文章:`WHERE wechat_article_id = '{media_id}_{idx}'` +- 后端查询某条图文素材的所有文章:`WHERE wechat_article_id LIKE '{media_id}_%'` + +### 数据写入策略 + +- **articles**:`SELECT` 检查 `wechat_article_id` 是否存在 → 存在则 `UPDATE`,不存在则 `INSERT` +- **videos**:`INSERT ... ON CONFLICT (media_id) DO UPDATE`,仅在 `wechat_update_time` 更新时覆盖 +- **sync_states**:`INSERT ... ON CONFLICT (key) DO UPDATE`,每次同步更新状态 + +## 快速开始 + +### 本地开发 + +1. 安装依赖: + +```bash +pip install -e . +``` + +2. 复制配置文件并填写: + +```bash +cp .env_example .env +# 编辑 .env,填入 app_secret 和数据库信息 +``` + +3. 运行: + +```bash +# 启动定时同步(默认每 48 小时) +python main.py + +# 只运行一次 +python main.py --once +``` + +### Docker 部署 + +1. 在 VPS 上克隆仓库,创建 `.env` 文件: + +```bash +cp .env_example .env +# 编辑 .env +``` + +2. 构建并启动: + +```bash +docker compose up -d --build +``` + +3. 查看日志: + +```bash +docker compose logs -f +``` + +## 环境变量 + +| 变量 | 必填 | 说明 | +|------|------|------| +| `app_id` | 是 | 微信公众号 AppID | +| `app_secret` | 是 | 微信公众号 AppSecret | +| `db_host` | 是 | PostgreSQL 主机 | +| `db_port` | 是 | PostgreSQL 端口 | +| `db_name` | 是 | 数据库名 | +| `db_user` | 是 | 数据库用户 | +| `db_pw` | 是 | 数据库密码 | +| `sync_interval_hours` | 否 | 同步间隔(小时),默认 48 | + +## 同步逻辑 + +1. 调用微信稳定版 Token 接口获取 access_token(自动缓存和刷新) +2. 分页获取所有 news(图文)和 video(视频)素材 +3. 对视频素材,额外调用详情接口获取下载地址 +4. 使用 UPSERT 写入数据库(INSERT ON CONFLICT DO UPDATE) +5. news 和 video 独立同步,一个失败不影响另一个 diff --git a/config.py b/config.py new file mode 100644 index 0000000..2948233 --- /dev/null +++ b/config.py @@ -0,0 +1,24 @@ +import os +from dotenv import load_dotenv + +load_dotenv() + + +def _require(key: str) -> str: + val = os.environ.get(key) + if not val: + raise SystemExit(f"Missing required env var: {key}") + return val + + +APP_ID = _require("app_id") +APP_SECRET = _require("app_secret") +DB_HOST = _require("db_host") +DB_PORT = _require("db_port") +DB_NAME = _require("db_name") +DB_USER = _require("db_user") +DB_PW = _require("db_pw") + +DATABASE_URL = f"postgresql://{DB_USER}:{DB_PW}@{DB_HOST}:{DB_PORT}/{DB_NAME}" + +SYNC_INTERVAL_HOURS = int(os.environ.get("sync_interval_hours", "48")) diff --git a/db.py b/db.py new file mode 100644 index 0000000..5b33568 --- /dev/null +++ b/db.py @@ -0,0 +1,256 @@ +import json +import logging +from datetime import datetime, timezone + +import psycopg2 +from psycopg2.pool import SimpleConnectionPool + +import config + +logger = logging.getLogger(__name__) + +_CREATE_VIDEOS = """ +CREATE TABLE IF NOT EXISTS videos ( + id SERIAL PRIMARY KEY, + media_id VARCHAR(128) UNIQUE NOT NULL, + name TEXT, + url TEXT, + title TEXT, + description TEXT, + down_url TEXT, + wechat_update_time TIMESTAMP, + created_at TIMESTAMP DEFAULT NOW(), + updated_at TIMESTAMP DEFAULT NOW() +); +""" + +_CHECK_ARTICLE_EXISTS = """ +SELECT id FROM articles WHERE wechat_article_id = %s; +""" + +_INSERT_ARTICLE = """ +INSERT INTO articles (title, content, cover_url, author, publish_date, created_at, source_url, wechat_article_id) +VALUES (%(title)s, %(content)s, %(cover_url)s, %(author)s, %(publish_date)s, NOW(), %(source_url)s, %(wechat_article_id)s) +""" + +_UPDATE_ARTICLE = """ +UPDATE articles SET + title = %(title)s, + content = %(content)s, + cover_url = %(cover_url)s, + author = %(author)s, + publish_date = %(publish_date)s, + source_url = %(source_url)s +WHERE id = %(id)s +""" + +_UPSERT_VIDEO = """ +INSERT INTO videos (media_id, name, url, title, description, down_url, wechat_update_time, created_at) +VALUES (%(media_id)s, %(name)s, %(url)s, %(title)s, %(description)s, %(down_url)s, %(wechat_update_time)s, NOW()) +ON CONFLICT (media_id) +DO UPDATE SET + name = COALESCE(EXCLUDED.name, videos.name), + url = COALESCE(EXCLUDED.url, videos.url), + title = COALESCE(EXCLUDED.title, videos.title), + description = COALESCE(EXCLUDED.description, videos.description), + down_url = COALESCE(EXCLUDED.down_url, videos.down_url), + wechat_update_time = EXCLUDED.wechat_update_time, + updated_at = NOW() +WHERE videos.wechat_update_time < EXCLUDED.wechat_update_time + OR videos.wechat_update_time IS NULL; +""" + +_UPSERT_SYNC_STATE = """ +INSERT INTO sync_states (key, value, updated_at) +VALUES (%(key)s, %(value)s, NOW()) +ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, updated_at = NOW(); +""" + +_GET_SYNC_STATE = """ +SELECT value FROM sync_states WHERE key = %s; +""" + +_GET_VIDEO_UPDATE_TIME = """ +SELECT wechat_update_time FROM videos WHERE media_id = %s; +""" + + +class Database: + def __init__(self): + self._pool = SimpleConnectionPool(1, 3, config.DATABASE_URL) + + def _get_conn(self): + conn = self._pool.getconn() + try: + with conn.cursor() as cur: + cur.execute("SELECT 1") + except psycopg2.OperationalError: + logger.warning("DB connection stale, reconnecting") + self._pool.putconn(conn, close=True) + self._pool = SimpleConnectionPool(1, 3, config.DATABASE_URL) + conn = self._pool.getconn() + return conn + + def _put_conn(self, conn): + self._pool.putconn(conn) + + def ensure_tables(self): + conn = self._get_conn() + try: + with conn.cursor() as cur: + # articles and sync_states already exist; only create videos + cur.execute(_CREATE_VIDEOS) + conn.commit() + logger.info("Database tables verified") + finally: + self._put_conn(conn) + + def upsert_article(self, article: dict): + """Insert or update an article using wechat_article_id as the dedup key.""" + conn = self._get_conn() + try: + with conn.cursor() as cur: + cur.execute(_CHECK_ARTICLE_EXISTS, (article["wechat_article_id"],)) + row = cur.fetchone() + if row: + article["id"] = row[0] + cur.execute(_UPDATE_ARTICLE, article) + else: + cur.execute(_INSERT_ARTICLE, article) + conn.commit() + except Exception: + conn.rollback() + raise + finally: + self._put_conn(conn) + + def upsert_video(self, video: dict): + conn = self._get_conn() + try: + with conn.cursor() as cur: + cur.execute(_UPSERT_VIDEO, video) + conn.commit() + except Exception: + conn.rollback() + raise + finally: + self._put_conn(conn) + + def should_fetch_video_detail(self, media_id: str, update_time: int) -> bool: + conn = self._get_conn() + try: + with conn.cursor() as cur: + cur.execute(_GET_VIDEO_UPDATE_TIME, (media_id,)) + row = cur.fetchone() + if row is None or row[0] is None: + return True + new_time = datetime.fromtimestamp(update_time, tz=timezone.utc) + # Compare as naive UTC (DB stores timestamp without tz) + stored_naive = row[0].replace(tzinfo=None) if row[0].tzinfo else row[0] + new_naive = new_time.replace(tzinfo=None) + return new_naive > stored_naive + finally: + self._put_conn(conn) + + def update_sync_state(self, key: str, data: dict): + conn = self._get_conn() + try: + with conn.cursor() as cur: + cur.execute(_UPSERT_SYNC_STATE, { + "key": key, + "value": json.dumps(data, default=str), + }) + conn.commit() + except Exception: + conn.rollback() + raise + finally: + self._put_conn(conn) + + def get_sync_state(self, key: str) -> dict | None: + conn = self._get_conn() + try: + with conn.cursor() as cur: + cur.execute(_GET_SYNC_STATE, (key,)) + row = cur.fetchone() + if row: + return json.loads(row[0]) + return None + finally: + self._put_conn(conn) + + def delete_missing_videos(self, wechat_media_ids: set) -> int: + """Delete videos whose media_id is not in the WeChat response.""" + conn = self._get_conn() + try: + with conn.cursor() as cur: + cur.execute("SELECT media_id FROM videos") + db_ids = {row[0] for row in cur.fetchall()} + to_delete = db_ids - wechat_media_ids + if to_delete: + cur.execute("DELETE FROM videos WHERE media_id = ANY(%s)", (list(to_delete),)) + deleted = cur.rowcount + conn.commit() + logger.info("Deleted %d videos no longer on WeChat", deleted) + return deleted + return 0 + except Exception: + conn.rollback() + raise + finally: + self._put_conn(conn) + + def delete_missing_articles(self, wechat_media_ids: set) -> int: + """Delete articles whose wechat_article_id's media_id is not in the WeChat response.""" + conn = self._get_conn() + try: + with conn.cursor() as cur: + cur.execute("SELECT id, wechat_article_id FROM articles WHERE wechat_article_id IS NOT NULL") + rows = cur.fetchall() + to_delete = [] + for article_id, waid in rows: + # wechat_article_id format: {media_id}_{idx} + parts = waid.rsplit("_", 1) + if len(parts) == 2 and parts[0] not in wechat_media_ids: + to_delete.append(article_id) + if to_delete: + cur.execute("DELETE FROM articles WHERE id = ANY(%s)", (to_delete,)) + deleted = cur.rowcount + conn.commit() + logger.info("Deleted %d articles no longer on WeChat", deleted) + return deleted + return 0 + except Exception: + conn.rollback() + raise + finally: + self._put_conn(conn) + + def delete_missing_articles_by_prefix(self, wechat_article_ids: set) -> int: + """Delete articles whose article_id (prefix of wechat_article_id) is not in the published set.""" + conn = self._get_conn() + try: + with conn.cursor() as cur: + cur.execute("SELECT id, wechat_article_id FROM articles WHERE wechat_article_id IS NOT NULL") + rows = cur.fetchall() + to_delete = [] + for article_id, waid in rows: + # wechat_article_id format: {article_id}_{idx} + parts = waid.rsplit("_", 1) + if len(parts) == 2 and parts[0] not in wechat_article_ids: + to_delete.append(article_id) + if to_delete: + cur.execute("DELETE FROM articles WHERE id = ANY(%s)", (to_delete,)) + deleted = cur.rowcount + conn.commit() + logger.info("Deleted %d articles no longer published", deleted) + return deleted + return 0 + except Exception: + conn.rollback() + raise + finally: + self._put_conn(conn) + + def close(self): + self._pool.closeall() diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..ed4c26b --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,14 @@ +services: + updatedb: + build: . + container_name: updatedb + restart: unless-stopped + env_file: + - .env + environment: + - TZ=Asia/Shanghai + logging: + driver: json-file + options: + max-size: "10m" + max-file: "3" diff --git a/main.py b/main.py new file mode 100644 index 0000000..bbaa92e --- /dev/null +++ b/main.py @@ -0,0 +1,62 @@ +import argparse +import logging +import sys + +from apscheduler.schedulers.blocking import BlockingScheduler + +import config +from db import Database +from sync import SyncService +from wechat import WeChatClient + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +logger = logging.getLogger(__name__) + + +def main(): + parser = argparse.ArgumentParser(description="UpdateDB - sync WeChat materials to database") + parser.add_argument("--once", action="store_true", help="Run sync once and exit") + args = parser.parse_args() + + logger.info("UpdateDB starting (sync interval: %dh)", config.SYNC_INTERVAL_HOURS) + + db = Database() + wechat = WeChatClient() + sync = SyncService(wechat, db) + + try: + db.ensure_tables() + + # Run once at startup + sync.run_sync() + + if args.once: + logger.info("--once flag set, exiting after first sync") + return + + scheduler = BlockingScheduler() + scheduler.add_job( + sync.run_sync, + "interval", + hours=config.SYNC_INTERVAL_HOURS, + id="sync_job", + max_instances=1, + ) + logger.info("Scheduler started, next sync in %d hours", config.SYNC_INTERVAL_HOURS) + scheduler.start() + except KeyboardInterrupt: + logger.info("Shutting down") + except Exception as e: + logger.error("Fatal error: %s", e, exc_info=True) + sys.exit(1) + finally: + wechat.close() + db.close() + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..757c0fe --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,12 @@ +[project] +name = "updatedb" +version = "0.1.0" +description = "定时从微信公众号平台获取视频与文章数据,同步到远端数据库" +readme = "README.md" +requires-python = ">=3.11" +dependencies = [ + "httpx>=0.27", + "psycopg2-binary>=2.9", + "apscheduler>=3.10", + "python-dotenv>=1.0", +] diff --git a/sync.py b/sync.py new file mode 100644 index 0000000..ec5b758 --- /dev/null +++ b/sync.py @@ -0,0 +1,159 @@ +import logging +import time +from datetime import datetime, timezone + +from db import Database +from wechat import WeChatClient, WeChatError + +logger = logging.getLogger(__name__) + + +class SyncService: + def __init__(self, wechat: WeChatClient, db: Database): + self.wechat = wechat + self.db = db + + def run_sync(self): + logger.info("=== Sync started ===") + for material_type in ["news", "video"]: + sync_key = f"wechat_{material_type}_sync" + try: + self.db.update_sync_state(sync_key, {"status": "syncing", "count": 0}) + if material_type == "news": + count = self._sync_published_articles() + else: + count = self._sync_materials(material_type) + self.db.update_sync_state(sync_key, { + "status": "idle", + "count": count, + "last_sync": datetime.now(tz=timezone.utc).isoformat(), + }) + logger.info("Sync %s completed, %d items processed", material_type, count) + except Exception as e: + logger.error("Sync %s failed: %s", material_type, e, exc_info=True) + self.db.update_sync_state(sync_key, { + "status": "error", + "error": str(e), + "last_sync": datetime.now(tz=timezone.utc).isoformat(), + }) + logger.info("=== Sync finished ===") + + # --- Published articles (freepublish API) --- + + def _sync_published_articles(self) -> int: + """Sync articles using draft API (personal subscription account).""" + offset = 0 + processed = 0 + all_media_ids = set() + + while True: + batch = self.wechat.batch_get_drafts(offset=offset, count=20, no_content=0) + items = batch.get("item", []) + total = batch.get("total_count", 0) + item_count = batch.get("item_count", 0) + + if not items: + break + + for item in items: + try: + media_id = item.get("media_id", "") + all_media_ids.add(media_id) + self._sync_draft_item(item) + processed += 1 + except Exception as e: + logger.error("Error processing draft %s: %s", + item.get("media_id", "?"), e) + + offset += item_count + if offset >= total: + break + time.sleep(0.5) + + # Delete articles no longer in drafts + self.db.delete_missing_articles(all_media_ids) + return processed + + def _sync_draft_item(self, item: dict): + media_id = item["media_id"] + update_time = item.get("update_time", 0) + news_items = item.get("content", {}).get("news_item", []) + + for idx, news in enumerate(news_items): + wechat_article_id = f"{media_id}_{idx}" + + article = { + "wechat_article_id": wechat_article_id, + "title": news.get("title", "")[:200], + "content": news.get("content", ""), + "cover_url": news.get("thumb_url", "")[:500] if news.get("thumb_url") else None, + "author": news.get("author", "")[:100] if news.get("author") else None, + "publish_date": datetime.fromtimestamp(update_time, tz=timezone.utc).date() if update_time else None, + "source_url": news.get("url", "")[:1000] if news.get("url") else None, + } + self.db.upsert_article(article) + + # --- Materials (video) --- + + def _sync_materials(self, material_type: str) -> int: + counts = self.wechat.get_material_count() + total_key = f"{material_type}_count" + total = counts.get(total_key, 0) + logger.info("Total %s materials: %d", material_type, total) + + if total == 0: + return 0 + + offset = 0 + processed = 0 + all_media_ids = set() + while offset < total: + batch = self.wechat.batch_get_materials(material_type, offset, count=20) + items = batch.get("item", []) + item_count = batch.get("item_count", 0) + + for item in items: + try: + all_media_ids.add(item["media_id"]) + if material_type == "video": + self._sync_video_item(item) + processed += 1 + except Exception as e: + logger.error("Error processing %s item %s: %s", + material_type, item.get("media_id", "?"), e) + + offset += item_count + if offset < total: + time.sleep(0.5) + + # Delete items that exist in DB but no longer on WeChat + if material_type == "video": + self.db.delete_missing_videos(all_media_ids) + + return processed + + def _sync_video_item(self, item: dict): + media_id = item["media_id"] + update_time = item.get("update_time", 0) + wechat_time = datetime.fromtimestamp(update_time, tz=timezone.utc) if update_time else None + + video = { + "media_id": media_id, + "name": item.get("name", ""), + "url": item.get("url", ""), + "title": None, + "description": None, + "down_url": None, + "wechat_update_time": wechat_time, + } + + if self.db.should_fetch_video_detail(media_id, update_time): + try: + detail = self.wechat.get_material(media_id) + video["title"] = detail.get("title") + video["description"] = detail.get("description") + video["down_url"] = detail.get("down_url") + except WeChatError as e: + logger.warning("Failed to fetch video detail %s: %s", media_id, e) + + self.db.upsert_video(video) diff --git a/wechat.py b/wechat.py new file mode 100644 index 0000000..7fce080 --- /dev/null +++ b/wechat.py @@ -0,0 +1,115 @@ +import logging +import time + +import httpx + +import config + +logger = logging.getLogger(__name__) + +_BASE_URL = "https://api.weixin.qq.com/cgi-bin" + + +class WeChatError(Exception): + pass + + +class WeChatClient: + def __init__(self): + self._http = httpx.Client(timeout=httpx.Timeout(connect=10.0, read=30.0, write=10.0, pool=10.0)) + self._token = None + self._token_expires_at = 0 + + def _get_access_token(self, force_refresh: bool = False): + if not force_refresh and self._token and time.time() < self._token_expires_at - 300: + return self._token + + resp = self._http.post(f"{_BASE_URL}/stable_token", json={ + "grant_type": "client_credential", + "appid": config.APP_ID, + "secret": config.APP_SECRET, + "force_refresh": force_refresh, + }) + data = resp.json() + if "access_token" not in data: + raise WeChatError(f"Failed to get token: {data}") + + self._token = data["access_token"] + self._token_expires_at = time.time() + data.get("expires_in", 7200) + logger.info("Access token refreshed") + return self._token + + def _api_post(self, path: str, body: dict = None) -> dict: + token = self._get_access_token() + url = f"{_BASE_URL}/{path}?access_token={token}" + resp = self._http.post(url, json=body or {}) + data = resp.json() + + if data.get("errcode") == 40001: + logger.info("Token expired, force refreshing") + token = self._get_access_token(force_refresh=True) + url = f"{_BASE_URL}/{path}?access_token={token}" + resp = self._http.post(url, json=body or {}) + data = resp.json() + + if data.get("errcode", 0) != 0: + raise WeChatError(f"API error {data.get('errcode')}: {data.get('errmsg', '')}") + + return data + + def _api_get(self, path: str) -> dict: + token = self._get_access_token() + url = f"{_BASE_URL}/{path}?access_token={token}" + resp = self._http.get(url) + data = resp.json() + + if data.get("errcode") == 40001: + logger.info("Token expired, force refreshing") + token = self._get_access_token(force_refresh=True) + url = f"{_BASE_URL}/{path}?access_token={token}" + resp = self._http.get(url) + data = resp.json() + + if data.get("errcode", 0) != 0: + raise WeChatError(f"API error {data.get('errcode')}: {data.get('errmsg', '')}") + + return data + + def get_material_count(self) -> dict: + """Get total counts for each material type.""" + return self._api_get("material/get_materialcount") + + def batch_get_materials(self, material_type: str, offset: int = 0, count: int = 20) -> dict: + """Get a page of materials by type.""" + return self._api_post("material/batchget_material", { + "type": material_type, + "offset": offset, + "count": count, + }) + + def get_material(self, media_id: str) -> dict: + """Get detail of a single material by media_id.""" + return self._api_post("material/get_material", {"media_id": media_id}) + + def batch_get_published(self, offset: int = 0, count: int = 20, no_content: int = 0) -> dict: + """Get a page of published articles (freepublish, requires verified account).""" + return self._api_post("freepublish/batchget", { + "offset": offset, + "count": count, + "no_content": no_content, + }) + + def get_published_article(self, article_id: str) -> dict: + """Get detail of a single published article by article_id.""" + return self._api_post("freepublish/getarticle", {"article_id": article_id}) + + def batch_get_drafts(self, offset: int = 0, count: int = 20, no_content: int = 0) -> dict: + """Get a page of draft articles.""" + return self._api_post("draft/batchget", { + "offset": offset, + "count": count, + "no_content": no_content, + }) + + def close(self): + self._http.close()