import json import logging from datetime import datetime, timezone import psycopg2 from psycopg2.pool import SimpleConnectionPool import config logger = logging.getLogger(__name__) _CREATE_VIDEOS = """ CREATE TABLE IF NOT EXISTS videos ( id SERIAL PRIMARY KEY, media_id VARCHAR(128) UNIQUE NOT NULL, name TEXT, url TEXT, title TEXT, description TEXT, down_url TEXT, wechat_update_time TIMESTAMP, created_at TIMESTAMP DEFAULT NOW(), updated_at TIMESTAMP DEFAULT NOW() ); """ _CHECK_ARTICLE_EXISTS = """ SELECT id FROM articles WHERE wechat_article_id = %s; """ _INSERT_ARTICLE = """ INSERT INTO articles (title, content, cover_url, author, publish_date, created_at, source_url, wechat_article_id) VALUES (%(title)s, %(content)s, %(cover_url)s, %(author)s, %(publish_date)s, NOW(), %(source_url)s, %(wechat_article_id)s) """ _UPDATE_ARTICLE = """ UPDATE articles SET title = %(title)s, content = %(content)s, cover_url = %(cover_url)s, author = %(author)s, publish_date = %(publish_date)s, source_url = %(source_url)s WHERE id = %(id)s """ _UPSERT_VIDEO = """ INSERT INTO videos (media_id, name, url, title, description, down_url, wechat_update_time, created_at) VALUES (%(media_id)s, %(name)s, %(url)s, %(title)s, %(description)s, %(down_url)s, %(wechat_update_time)s, NOW()) ON CONFLICT (media_id) DO UPDATE SET name = COALESCE(EXCLUDED.name, videos.name), url = COALESCE(EXCLUDED.url, videos.url), title = COALESCE(EXCLUDED.title, videos.title), description = COALESCE(EXCLUDED.description, videos.description), down_url = COALESCE(EXCLUDED.down_url, videos.down_url), wechat_update_time = EXCLUDED.wechat_update_time, updated_at = NOW() WHERE videos.wechat_update_time < EXCLUDED.wechat_update_time OR videos.wechat_update_time IS NULL; """ _UPSERT_SYNC_STATE = """ INSERT INTO sync_states (key, value, updated_at) VALUES (%(key)s, %(value)s, NOW()) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, updated_at = NOW(); """ _GET_SYNC_STATE = """ SELECT value FROM sync_states WHERE key = %s; """ _GET_VIDEO_UPDATE_TIME = """ SELECT wechat_update_time FROM videos WHERE media_id = %s; """ class Database: def __init__(self): self._pool = SimpleConnectionPool(1, 3, config.DATABASE_URL) def _get_conn(self): conn = self._pool.getconn() try: with conn.cursor() as cur: cur.execute("SELECT 1") except psycopg2.OperationalError: logger.warning("DB connection stale, reconnecting") self._pool.putconn(conn, close=True) self._pool = SimpleConnectionPool(1, 3, config.DATABASE_URL) conn = self._pool.getconn() return conn def _put_conn(self, conn): self._pool.putconn(conn) def ensure_tables(self): conn = self._get_conn() try: with conn.cursor() as cur: # articles and sync_states already exist; only create videos cur.execute(_CREATE_VIDEOS) conn.commit() logger.info("Database tables verified") finally: self._put_conn(conn) def upsert_article(self, article: dict): """Insert or update an article using wechat_article_id as the dedup key.""" conn = self._get_conn() try: with conn.cursor() as cur: cur.execute(_CHECK_ARTICLE_EXISTS, (article["wechat_article_id"],)) row = cur.fetchone() if row: article["id"] = row[0] cur.execute(_UPDATE_ARTICLE, article) else: cur.execute(_INSERT_ARTICLE, article) conn.commit() except Exception: conn.rollback() raise finally: self._put_conn(conn) def upsert_video(self, video: dict): conn = self._get_conn() try: with conn.cursor() as cur: cur.execute(_UPSERT_VIDEO, video) conn.commit() except Exception: conn.rollback() raise finally: self._put_conn(conn) def should_fetch_video_detail(self, media_id: str, update_time: int) -> bool: conn = self._get_conn() try: with conn.cursor() as cur: cur.execute(_GET_VIDEO_UPDATE_TIME, (media_id,)) row = cur.fetchone() if row is None or row[0] is None: return True new_time = datetime.fromtimestamp(update_time, tz=timezone.utc) # Compare as naive UTC (DB stores timestamp without tz) stored_naive = row[0].replace(tzinfo=None) if row[0].tzinfo else row[0] new_naive = new_time.replace(tzinfo=None) return new_naive > stored_naive finally: self._put_conn(conn) def update_sync_state(self, key: str, data: dict): conn = self._get_conn() try: with conn.cursor() as cur: cur.execute(_UPSERT_SYNC_STATE, { "key": key, "value": json.dumps(data, default=str), }) conn.commit() except Exception: conn.rollback() raise finally: self._put_conn(conn) def get_sync_state(self, key: str) -> dict | None: conn = self._get_conn() try: with conn.cursor() as cur: cur.execute(_GET_SYNC_STATE, (key,)) row = cur.fetchone() if row: return json.loads(row[0]) return None finally: self._put_conn(conn) def delete_missing_videos(self, wechat_media_ids: set) -> int: """Delete videos whose media_id is not in the WeChat response.""" conn = self._get_conn() try: with conn.cursor() as cur: cur.execute("SELECT media_id FROM videos") db_ids = {row[0] for row in cur.fetchall()} to_delete = db_ids - wechat_media_ids if to_delete: cur.execute("DELETE FROM videos WHERE media_id = ANY(%s)", (list(to_delete),)) deleted = cur.rowcount conn.commit() logger.info("Deleted %d videos no longer on WeChat", deleted) return deleted return 0 except Exception: conn.rollback() raise finally: self._put_conn(conn) def delete_missing_articles(self, wechat_media_ids: set) -> int: """Delete articles whose wechat_article_id's media_id is not in the WeChat response.""" conn = self._get_conn() try: with conn.cursor() as cur: cur.execute("SELECT id, wechat_article_id FROM articles WHERE wechat_article_id IS NOT NULL") rows = cur.fetchall() to_delete = [] for article_id, waid in rows: # wechat_article_id format: {media_id}_{idx} parts = waid.rsplit("_", 1) if len(parts) == 2 and parts[0] not in wechat_media_ids: to_delete.append(article_id) if to_delete: cur.execute("DELETE FROM articles WHERE id = ANY(%s)", (to_delete,)) deleted = cur.rowcount conn.commit() logger.info("Deleted %d articles no longer on WeChat", deleted) return deleted return 0 except Exception: conn.rollback() raise finally: self._put_conn(conn) def delete_missing_articles_by_prefix(self, wechat_article_ids: set) -> int: """Delete articles whose article_id (prefix of wechat_article_id) is not in the published set.""" conn = self._get_conn() try: with conn.cursor() as cur: cur.execute("SELECT id, wechat_article_id FROM articles WHERE wechat_article_id IS NOT NULL") rows = cur.fetchall() to_delete = [] for article_id, waid in rows: # wechat_article_id format: {article_id}_{idx} parts = waid.rsplit("_", 1) if len(parts) == 2 and parts[0] not in wechat_article_ids: to_delete.append(article_id) if to_delete: cur.execute("DELETE FROM articles WHERE id = ANY(%s)", (to_delete,)) deleted = cur.rowcount conn.commit() logger.info("Deleted %d articles no longer published", deleted) return deleted return 0 except Exception: conn.rollback() raise finally: self._put_conn(conn) def close(self): self._pool.closeall()