diff --git a/backend/worker/main.py b/backend/worker/main.py index 5ff4245..a0de865 100644 --- a/backend/worker/main.py +++ b/backend/worker/main.py @@ -50,10 +50,10 @@ def main(): scheduler = BlockingScheduler() # Reddit polling jobs - scheduler.add_job(poll_new_posts, IntervalTrigger(minutes=2), id="poll_new", max_instances=1) - scheduler.add_job(poll_hot_posts, IntervalTrigger(minutes=2), id="poll_hot", max_instances=1) - scheduler.add_job(collect_comments, IntervalTrigger(minutes=5), id="comments", max_instances=1) - scheduler.add_job(update_scores, IntervalTrigger(minutes=15), id="scores", max_instances=1) + scheduler.add_job(poll_new_posts, IntervalTrigger(minutes=10), id="poll_new", max_instances=1) + scheduler.add_job(poll_hot_posts, IntervalTrigger(minutes=30), id="poll_hot", max_instances=1) + scheduler.add_job(collect_comments, IntervalTrigger(minutes=15), id="comments", max_instances=1) + scheduler.add_job(update_scores, IntervalTrigger(minutes=60), id="scores", max_instances=1) # Metric snapshots scheduler.add_job(take_metric_snapshots, IntervalTrigger(minutes=30), id="snapshots", max_instances=1) diff --git a/backend/worker/monitor.py b/backend/worker/monitor.py index d60c880..0892595 100644 --- a/backend/worker/monitor.py +++ b/backend/worker/monitor.py @@ -1,3 +1,4 @@ +import asyncio import logging from datetime import datetime, timezone, timedelta @@ -14,7 +15,6 @@ from backend.worker.reddit_client import create_client, fetch_json logger = logging.getLogger(__name__) -# Sync engine for worker (PRAW-replacement uses async httpx, but DB writes are sync for simplicity with APScheduler) _engine = create_engine(settings.database_url_sync, pool_size=3, max_overflow=5, pool_recycle=3600) SyncSession = sessionmaker(_engine) @@ -42,23 +42,24 @@ def _upsert_author(db: Session, username: str) -> int | None: def _parse_post(post_data: dict, subreddit_id: int, db: Session, hot_rank: int | None = None) -> dict: - data = post_data.get("data", post_data) - author_id = _upsert_author(db, data.get("author")) - created = datetime.fromtimestamp(data.get("created_utc", 0), tz=timezone.utc) + """Parse a Pullpush submission object into a dict for DB upsert.""" + author_id = _upsert_author(db, post_data.get("author")) + created = datetime.fromtimestamp(post_data.get("created_utc", 0), tz=timezone.utc) + reddit_id = post_data.get("name", f"t3_{post_data.get('id', '')}") return { - "reddit_id": data.get("name", f"t3_{data.get('id', '')}"), + "reddit_id": reddit_id, "subreddit_id": subreddit_id, "author_id": author_id, - "title": data.get("title", ""), - "selftext": data.get("selftext"), - "url": data.get("url"), - "permalink": data.get("permalink"), - "flair": data.get("link_flair_text"), - "score": data.get("score", 0), - "upvote_ratio": data.get("upvote_ratio"), - "num_comments": data.get("num_comments", 0), - "is_self": data.get("is_self"), - "over_18": data.get("over_18", False), + "title": post_data.get("title", ""), + "selftext": post_data.get("selftext"), + "url": post_data.get("url"), + "permalink": post_data.get("permalink"), + "flair": post_data.get("link_flair_text"), + "score": post_data.get("score", 0), + "upvote_ratio": post_data.get("upvote_ratio"), + "num_comments": post_data.get("num_comments", 0), + "is_self": post_data.get("is_self"), + "over_18": post_data.get("over_18", False), "hot_rank": hot_rank, "created_utc": created, "collected_at": datetime.now(timezone.utc), @@ -87,14 +88,14 @@ def _upsert_posts(db: Session, posts: list[dict], update_hot_rank: bool = False) def _parse_comment(comment_data: dict, post_id: int, db: Session, parent_map: dict) -> dict | None: - data = comment_data.get("data", comment_data) - if data.get("kind") == "more" or not data.get("body"): + """Parse a Pullpush comment object into a dict for DB upsert.""" + if not comment_data.get("body"): return None - reddit_id = data.get("name", f"t1_{data.get('id', '')}") - author_id = _upsert_author(db, data.get("author")) - created = datetime.fromtimestamp(data.get("created_utc", 0), tz=timezone.utc) + reddit_id = comment_data.get("name", f"t1_{comment_data.get('id', '')}") + author_id = _upsert_author(db, comment_data.get("author")) + created = datetime.fromtimestamp(comment_data.get("created_utc", 0), tz=timezone.utc) - parent_reddit_id = data.get("parent_id", "") + parent_reddit_id = comment_data.get("parent_id", "") parent_comment_id = parent_map.get(parent_reddit_id) return { @@ -102,19 +103,16 @@ def _parse_comment(comment_data: dict, post_id: int, db: Session, parent_map: di "post_id": post_id, "parent_comment_id": parent_comment_id, "author_id": author_id, - "body": data.get("body", ""), - "score": data.get("score", 0), + "body": comment_data.get("body", ""), + "score": comment_data.get("score", 0), "created_utc": created, "collected_at": datetime.now(timezone.utc), "updated_at": datetime.now(timezone.utc), } -import asyncio - - def poll_new_posts(): - """Fetch /new for each active subreddit and upsert posts.""" + """Fetch recent submissions from Pullpush for each active subreddit.""" asyncio.run(_poll_new_posts_async()) @@ -126,22 +124,27 @@ async def _poll_new_posts_async(): client = create_client() async with client: for sub in subreddits: - data = await fetch_json(client, f"/r/{sub['name']}/new", {"limit": "100"}) + data = await fetch_json(client, "/reddit/search/submission/", { + "subreddit": sub["name"], + "sort": "created_utc", + "sort_type": "desc", + "size": 100, + }) if not data: continue - children = data.get("data", {}).get("children", []) - if not children: + posts_data = data.get("data", []) + if not posts_data: continue with SyncSession() as db: - posts = [_parse_post(child, sub["id"], db) for child in children] + posts = [_parse_post(p, sub["id"], db) for p in posts_data] _upsert_posts(db, posts) db.commit() - logger.info(f"r/{sub['name']}: upserted {len(children)} new posts") + logger.info(f"r/{sub['name']}: upserted {len(posts_data)} new posts") def poll_hot_posts(): - """Fetch /hot for each active subreddit and update hot_rank.""" + """Approximate hot posts by fetching recent high-scoring submissions.""" asyncio.run(_poll_hot_posts_async()) @@ -150,88 +153,91 @@ async def _poll_hot_posts_async(): if not subreddits: return + after_epoch = int((datetime.now(timezone.utc) - timedelta(hours=24)).timestamp()) + client = create_client() async with client: for sub in subreddits: - data = await fetch_json(client, f"/r/{sub['name']}/hot", {"limit": "100"}) + data = await fetch_json(client, "/reddit/search/submission/", { + "subreddit": sub["name"], + "sort": "score", + "sort_type": "desc", + "size": 100, + "after": after_epoch, + }) if not data: continue - children = data.get("data", {}).get("children", []) - if not children: + posts_data = data.get("data", []) + if not posts_data: continue with SyncSession() as db: posts = [ - _parse_post(child, sub["id"], db, hot_rank=i + 1) - for i, child in enumerate(children) + _parse_post(p, sub["id"], db, hot_rank=i + 1) + for i, p in enumerate(posts_data) ] _upsert_posts(db, posts, update_hot_rank=True) db.commit() - logger.info(f"r/{sub['name']}: updated hot ranks for {len(children)} posts") + logger.info(f"r/{sub['name']}: updated hot ranks for {len(posts_data)} posts") def collect_comments(): - """Fetch comments for recent posts.""" + """Fetch recent comments from Pullpush for each active subreddit.""" asyncio.run(_collect_comments_async()) async def _collect_comments_async(): - cutoff = datetime.now(timezone.utc) - timedelta(hours=48) - - with SyncSession() as db: - stmt = ( - select(Post.id, Post.reddit_id, Post.subreddit_id) - .join(MonitoredSubreddit) - .where( - MonitoredSubreddit.is_active == True, # noqa: E712 - Post.created_utc >= cutoff, - ) - .order_by(Post.created_utc.desc()) - .limit(50) - ) - result = db.execute(stmt) - recent_posts = [{"id": r[0], "reddit_id": r[1], "subreddit_id": r[2]} for r in result] - - if not recent_posts: + subreddits = _get_active_subreddits() + if not subreddits: return + cutoff_epoch = int((datetime.now(timezone.utc) - timedelta(hours=48)).timestamp()) + client = create_client() async with client: - for post in recent_posts: - short_id = post["reddit_id"].replace("t3_", "") - data = await fetch_json(client, f"/comments/{short_id}", {"limit": "500", "sort": "new"}) - if not data or len(data) < 2: + for sub in subreddits: + data = await fetch_json(client, "/reddit/search/comment/", { + "subreddit": sub["name"], + "sort": "created_utc", + "sort_type": "desc", + "size": 100, + "after": cutoff_epoch, + }) + if not data: + continue + comments_data = data.get("data", []) + if not comments_data: continue - comment_listing = data[1].get("data", {}).get("children", []) - with SyncSession() as db: - # Build parent_map from existing comments - existing = db.execute( - select(Comment.id, Comment.reddit_id).where(Comment.post_id == post["id"]) + # Build lookup: reddit post fullname -> our DB post ID + link_ids = {c.get("link_id") for c in comments_data if c.get("link_id")} + if not link_ids: + continue + + result = db.execute( + select(Post.id, Post.reddit_id).where(Post.reddit_id.in_(link_ids)) ) - parent_map = {r[1]: r[0] for r in existing} + post_lookup = {reddit_id: post_id for post_id, reddit_id in result} + + # Build parent_map: comment reddit_id -> our DB comment ID + existing = db.execute( + select(Comment.id, Comment.reddit_id) + .join(Post) + .where(Post.subreddit_id == sub["id"]) + ) + parent_map = {reddit_id: cid for cid, reddit_id in existing} comments_to_upsert = [] - - def process_comments(children): - for child in children: - if child.get("kind") == "more": - continue - c_data = child.get("data", {}) - parsed = _parse_comment(c_data, post["id"], db, parent_map) - if parsed: - comments_to_upsert.append(parsed) - # Process replies recursively - replies = c_data.get("replies") - if isinstance(replies, dict): - reply_children = replies.get("data", {}).get("children", []) - process_comments(reply_children) - - process_comments(comment_listing) + for c in comments_data: + post_id = post_lookup.get(c.get("link_id")) + if not post_id: + continue # Post not in our DB yet + parsed = _parse_comment(c, post_id, db, parent_map) + if parsed: + comments_to_upsert.append(parsed) if comments_to_upsert: - # Upsert comments one at a time to handle parent references for comment in comments_to_upsert: stmt = insert(Comment).values(comment) stmt = stmt.on_conflict_do_update( @@ -244,52 +250,40 @@ async def _collect_comments_async(): ) db.execute(stmt) db.commit() - logger.info(f"Post {short_id}: upserted {len(comments_to_upsert)} comments") + logger.info(f"r/{sub['name']}: upserted {len(comments_to_upsert)} comments") def update_scores(): - """Re-fetch recent posts to update scores and comment counts.""" + """Re-fetch recent posts to capture any score updates in Pullpush.""" asyncio.run(_update_scores_async()) async def _update_scores_async(): - cutoff = datetime.now(timezone.utc) - timedelta(days=7) - - with SyncSession() as db: - stmt = ( - select(Post.reddit_id, Post.subreddit_id, MonitoredSubreddit.name) - .join(MonitoredSubreddit) - .where( - MonitoredSubreddit.is_active == True, # noqa: E712 - Post.created_utc >= cutoff, - ) - ) - result = db.execute(stmt) - posts_by_sub: dict[str, list[str]] = {} - for reddit_id, _, sub_name in result: - posts_by_sub.setdefault(sub_name, []).append(reddit_id) - - if not posts_by_sub: + subreddits = _get_active_subreddits() + if not subreddits: return - # Score updates piggyback on the new/hot polls — the upsert already updates scores. - # This job explicitly re-fetches to catch score changes on older posts. + after_epoch = int((datetime.now(timezone.utc) - timedelta(days=7)).timestamp()) + client = create_client() async with client: - for sub_name, reddit_ids in posts_by_sub.items(): - data = await fetch_json(client, f"/r/{sub_name}/new", {"limit": "100"}) + for sub in subreddits: + data = await fetch_json(client, "/reddit/search/submission/", { + "subreddit": sub["name"], + "sort": "created_utc", + "sort_type": "desc", + "size": 100, + "after": after_epoch, + }) if not data: continue - children = data.get("data", {}).get("children", []) + posts_data = data.get("data", []) + if not posts_data: + continue with SyncSession() as db: - sub = db.execute( - select(MonitoredSubreddit).where(MonitoredSubreddit.name == sub_name) - ).scalar_one_or_none() - if not sub: - continue - posts = [_parse_post(child, sub.id, db) for child in children] + posts = [_parse_post(p, sub["id"], db) for p in posts_data] _upsert_posts(db, posts) db.commit() - logger.info(f"Score update complete for {len(posts_by_sub)} subreddits") + logger.info(f"Score update complete for {len(subreddits)} subreddits") diff --git a/backend/worker/reddit_client.py b/backend/worker/reddit_client.py index d4018ba..955d166 100644 --- a/backend/worker/reddit_client.py +++ b/backend/worker/reddit_client.py @@ -8,11 +8,11 @@ from backend.config import settings logger = logging.getLogger(__name__) -BASE_URL = "https://old.reddit.com" +BASE_URL = "https://api.pullpush.io" # Simple in-process rate limiter: track request timestamps _request_times: list[float] = [] -MAX_REQUESTS_PER_MINUTE = 9 # Stay under Reddit's ~10/min limit +MAX_REQUESTS_PER_MINUTE = 9 async def _wait_for_rate_limit(): @@ -29,9 +29,9 @@ async def _wait_for_rate_limit(): async def fetch_json(client: httpx.AsyncClient, path: str, params: dict | None = None) -> dict | None: - """Fetch a Reddit .json endpoint with rate limiting and error handling.""" + """Fetch a Pullpush API endpoint with rate limiting and error handling.""" await _wait_for_rate_limit() - url = f"{BASE_URL}{path}.json" + url = f"{BASE_URL}{path}" try: response = await client.get(url, params=params) if response.status_code == 429: @@ -40,7 +40,7 @@ async def fetch_json(client: httpx.AsyncClient, path: str, params: dict | None = await asyncio.sleep(retry_after) return await fetch_json(client, path, params) if response.status_code >= 500: - logger.warning(f"Reddit returned {response.status_code} for {path}") + logger.warning(f"Pullpush returned {response.status_code} for {path}") return None response.raise_for_status() return response.json() @@ -50,13 +50,9 @@ async def fetch_json(client: httpx.AsyncClient, path: str, params: dict | None = def create_client() -> httpx.AsyncClient: - """Create an httpx client configured for Reddit.""" + """Create an httpx client configured for Pullpush API.""" return httpx.AsyncClient( - headers={ - "User-Agent": settings.reddit_user_agent, - "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", - "Accept-Language": "en-US,en;q=0.5", - }, + headers={"User-Agent": settings.reddit_user_agent}, timeout=30.0, follow_redirects=True, )