- 使用 draft API 同步文章(适配个人订阅号) - 使用 material API 同步视频(含详情获取) - 自动建表(videos)、UPSERT 已有 articles 表 - 同步删除:微信端删除的素材自动从数据库移除 - APScheduler 定时调度,支持 --once 手动触发 - Docker + docker-compose 部署配置 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
257 lines
8.9 KiB
Python
257 lines
8.9 KiB
Python
import json
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
|
|
import psycopg2
|
|
from psycopg2.pool import SimpleConnectionPool
|
|
|
|
import config
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_CREATE_VIDEOS = """
|
|
CREATE TABLE IF NOT EXISTS videos (
|
|
id SERIAL PRIMARY KEY,
|
|
media_id VARCHAR(128) UNIQUE NOT NULL,
|
|
name TEXT,
|
|
url TEXT,
|
|
title TEXT,
|
|
description TEXT,
|
|
down_url TEXT,
|
|
wechat_update_time TIMESTAMP,
|
|
created_at TIMESTAMP DEFAULT NOW(),
|
|
updated_at TIMESTAMP DEFAULT NOW()
|
|
);
|
|
"""
|
|
|
|
_CHECK_ARTICLE_EXISTS = """
|
|
SELECT id FROM articles WHERE wechat_article_id = %s;
|
|
"""
|
|
|
|
_INSERT_ARTICLE = """
|
|
INSERT INTO articles (title, content, cover_url, author, publish_date, created_at, source_url, wechat_article_id)
|
|
VALUES (%(title)s, %(content)s, %(cover_url)s, %(author)s, %(publish_date)s, NOW(), %(source_url)s, %(wechat_article_id)s)
|
|
"""
|
|
|
|
_UPDATE_ARTICLE = """
|
|
UPDATE articles SET
|
|
title = %(title)s,
|
|
content = %(content)s,
|
|
cover_url = %(cover_url)s,
|
|
author = %(author)s,
|
|
publish_date = %(publish_date)s,
|
|
source_url = %(source_url)s
|
|
WHERE id = %(id)s
|
|
"""
|
|
|
|
_UPSERT_VIDEO = """
|
|
INSERT INTO videos (media_id, name, url, title, description, down_url, wechat_update_time, created_at)
|
|
VALUES (%(media_id)s, %(name)s, %(url)s, %(title)s, %(description)s, %(down_url)s, %(wechat_update_time)s, NOW())
|
|
ON CONFLICT (media_id)
|
|
DO UPDATE SET
|
|
name = COALESCE(EXCLUDED.name, videos.name),
|
|
url = COALESCE(EXCLUDED.url, videos.url),
|
|
title = COALESCE(EXCLUDED.title, videos.title),
|
|
description = COALESCE(EXCLUDED.description, videos.description),
|
|
down_url = COALESCE(EXCLUDED.down_url, videos.down_url),
|
|
wechat_update_time = EXCLUDED.wechat_update_time,
|
|
updated_at = NOW()
|
|
WHERE videos.wechat_update_time < EXCLUDED.wechat_update_time
|
|
OR videos.wechat_update_time IS NULL;
|
|
"""
|
|
|
|
_UPSERT_SYNC_STATE = """
|
|
INSERT INTO sync_states (key, value, updated_at)
|
|
VALUES (%(key)s, %(value)s, NOW())
|
|
ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, updated_at = NOW();
|
|
"""
|
|
|
|
_GET_SYNC_STATE = """
|
|
SELECT value FROM sync_states WHERE key = %s;
|
|
"""
|
|
|
|
_GET_VIDEO_UPDATE_TIME = """
|
|
SELECT wechat_update_time FROM videos WHERE media_id = %s;
|
|
"""
|
|
|
|
|
|
class Database:
|
|
def __init__(self):
|
|
self._pool = SimpleConnectionPool(1, 3, config.DATABASE_URL)
|
|
|
|
def _get_conn(self):
|
|
conn = self._pool.getconn()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute("SELECT 1")
|
|
except psycopg2.OperationalError:
|
|
logger.warning("DB connection stale, reconnecting")
|
|
self._pool.putconn(conn, close=True)
|
|
self._pool = SimpleConnectionPool(1, 3, config.DATABASE_URL)
|
|
conn = self._pool.getconn()
|
|
return conn
|
|
|
|
def _put_conn(self, conn):
|
|
self._pool.putconn(conn)
|
|
|
|
def ensure_tables(self):
|
|
conn = self._get_conn()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
# articles and sync_states already exist; only create videos
|
|
cur.execute(_CREATE_VIDEOS)
|
|
conn.commit()
|
|
logger.info("Database tables verified")
|
|
finally:
|
|
self._put_conn(conn)
|
|
|
|
def upsert_article(self, article: dict):
|
|
"""Insert or update an article using wechat_article_id as the dedup key."""
|
|
conn = self._get_conn()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(_CHECK_ARTICLE_EXISTS, (article["wechat_article_id"],))
|
|
row = cur.fetchone()
|
|
if row:
|
|
article["id"] = row[0]
|
|
cur.execute(_UPDATE_ARTICLE, article)
|
|
else:
|
|
cur.execute(_INSERT_ARTICLE, article)
|
|
conn.commit()
|
|
except Exception:
|
|
conn.rollback()
|
|
raise
|
|
finally:
|
|
self._put_conn(conn)
|
|
|
|
def upsert_video(self, video: dict):
|
|
conn = self._get_conn()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(_UPSERT_VIDEO, video)
|
|
conn.commit()
|
|
except Exception:
|
|
conn.rollback()
|
|
raise
|
|
finally:
|
|
self._put_conn(conn)
|
|
|
|
def should_fetch_video_detail(self, media_id: str, update_time: int) -> bool:
|
|
conn = self._get_conn()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(_GET_VIDEO_UPDATE_TIME, (media_id,))
|
|
row = cur.fetchone()
|
|
if row is None or row[0] is None:
|
|
return True
|
|
new_time = datetime.fromtimestamp(update_time, tz=timezone.utc)
|
|
# Compare as naive UTC (DB stores timestamp without tz)
|
|
stored_naive = row[0].replace(tzinfo=None) if row[0].tzinfo else row[0]
|
|
new_naive = new_time.replace(tzinfo=None)
|
|
return new_naive > stored_naive
|
|
finally:
|
|
self._put_conn(conn)
|
|
|
|
def update_sync_state(self, key: str, data: dict):
|
|
conn = self._get_conn()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(_UPSERT_SYNC_STATE, {
|
|
"key": key,
|
|
"value": json.dumps(data, default=str),
|
|
})
|
|
conn.commit()
|
|
except Exception:
|
|
conn.rollback()
|
|
raise
|
|
finally:
|
|
self._put_conn(conn)
|
|
|
|
def get_sync_state(self, key: str) -> dict | None:
|
|
conn = self._get_conn()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(_GET_SYNC_STATE, (key,))
|
|
row = cur.fetchone()
|
|
if row:
|
|
return json.loads(row[0])
|
|
return None
|
|
finally:
|
|
self._put_conn(conn)
|
|
|
|
def delete_missing_videos(self, wechat_media_ids: set) -> int:
|
|
"""Delete videos whose media_id is not in the WeChat response."""
|
|
conn = self._get_conn()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute("SELECT media_id FROM videos")
|
|
db_ids = {row[0] for row in cur.fetchall()}
|
|
to_delete = db_ids - wechat_media_ids
|
|
if to_delete:
|
|
cur.execute("DELETE FROM videos WHERE media_id = ANY(%s)", (list(to_delete),))
|
|
deleted = cur.rowcount
|
|
conn.commit()
|
|
logger.info("Deleted %d videos no longer on WeChat", deleted)
|
|
return deleted
|
|
return 0
|
|
except Exception:
|
|
conn.rollback()
|
|
raise
|
|
finally:
|
|
self._put_conn(conn)
|
|
|
|
def delete_missing_articles(self, wechat_media_ids: set) -> int:
|
|
"""Delete articles whose wechat_article_id's media_id is not in the WeChat response."""
|
|
conn = self._get_conn()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute("SELECT id, wechat_article_id FROM articles WHERE wechat_article_id IS NOT NULL")
|
|
rows = cur.fetchall()
|
|
to_delete = []
|
|
for article_id, waid in rows:
|
|
# wechat_article_id format: {media_id}_{idx}
|
|
parts = waid.rsplit("_", 1)
|
|
if len(parts) == 2 and parts[0] not in wechat_media_ids:
|
|
to_delete.append(article_id)
|
|
if to_delete:
|
|
cur.execute("DELETE FROM articles WHERE id = ANY(%s)", (to_delete,))
|
|
deleted = cur.rowcount
|
|
conn.commit()
|
|
logger.info("Deleted %d articles no longer on WeChat", deleted)
|
|
return deleted
|
|
return 0
|
|
except Exception:
|
|
conn.rollback()
|
|
raise
|
|
finally:
|
|
self._put_conn(conn)
|
|
|
|
def delete_missing_articles_by_prefix(self, wechat_article_ids: set) -> int:
|
|
"""Delete articles whose article_id (prefix of wechat_article_id) is not in the published set."""
|
|
conn = self._get_conn()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute("SELECT id, wechat_article_id FROM articles WHERE wechat_article_id IS NOT NULL")
|
|
rows = cur.fetchall()
|
|
to_delete = []
|
|
for article_id, waid in rows:
|
|
# wechat_article_id format: {article_id}_{idx}
|
|
parts = waid.rsplit("_", 1)
|
|
if len(parts) == 2 and parts[0] not in wechat_article_ids:
|
|
to_delete.append(article_id)
|
|
if to_delete:
|
|
cur.execute("DELETE FROM articles WHERE id = ANY(%s)", (to_delete,))
|
|
deleted = cur.rowcount
|
|
conn.commit()
|
|
logger.info("Deleted %d articles no longer published", deleted)
|
|
return deleted
|
|
return 0
|
|
except Exception:
|
|
conn.rollback()
|
|
raise
|
|
finally:
|
|
self._put_conn(conn)
|
|
|
|
def close(self):
|
|
self._pool.closeall()
|