Add Reddit monitoring bot — backend, frontend, and Docker config

Python/FastAPI backend with PostgreSQL for collecting Reddit data via
public .json endpoints. React/Vite dashboard for analytics. Docker Compose
setup with API and worker services connecting to shared PostgreSQL.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-09 19:29:58 -05:00
parent aaa240dbf0
commit bc2203524f
76 changed files with 7570 additions and 0 deletions

View File

View File

@@ -0,0 +1,140 @@
import logging
from datetime import datetime, timezone, timedelta, date
from sqlalchemy import select, func, create_engine
from sqlalchemy.orm import sessionmaker
from backend.config import settings
from backend.models.subreddit import MonitoredSubreddit
from backend.models.post import Post
from backend.models.comment import Comment
from backend.models.author import Author
from backend.models.daily_digest import DailyDigest
logger = logging.getLogger(__name__)
_engine = create_engine(settings.database_url_sync, pool_size=2, pool_recycle=3600)
SyncSession = sessionmaker(_engine)
def generate_daily_digests():
"""Generate daily digest for each active subreddit."""
yesterday = date.today() - timedelta(days=1)
day_start = datetime(yesterday.year, yesterday.month, yesterday.day, tzinfo=timezone.utc)
day_end = day_start + timedelta(days=1)
with SyncSession() as db:
subs = db.execute(
select(MonitoredSubreddit).where(MonitoredSubreddit.is_active == True) # noqa: E712
).scalars().all()
for sub in subs:
# Check if digest already exists
existing = db.execute(
select(DailyDigest).where(
DailyDigest.subreddit_id == sub.id,
DailyDigest.digest_date == yesterday,
)
).scalar_one_or_none()
if existing:
continue
# Gather stats
post_count = db.execute(
select(func.count(Post.id)).where(
Post.subreddit_id == sub.id,
Post.created_utc >= day_start,
Post.created_utc < day_end,
)
).scalar() or 0
comment_count = db.execute(
select(func.count(Comment.id))
.join(Post)
.where(
Post.subreddit_id == sub.id,
Comment.created_utc >= day_start,
Comment.created_utc < day_end,
)
).scalar() or 0
# Top posts by score
top_posts = db.execute(
select(Post.title, Post.score, Post.num_comments, Post.permalink)
.where(
Post.subreddit_id == sub.id,
Post.created_utc >= day_start,
Post.created_utc < day_end,
)
.order_by(Post.score.desc())
.limit(5)
).all()
# Top authors
top_authors = db.execute(
select(Author.username, func.count(Comment.id).label("cnt"))
.join(Comment, Comment.author_id == Author.id)
.join(Post, Comment.post_id == Post.id)
.where(
Post.subreddit_id == sub.id,
Comment.created_utc >= day_start,
Comment.created_utc < day_end,
)
.group_by(Author.username)
.order_by(func.count(Comment.id).desc())
.limit(5)
).all()
avg_score = db.execute(
select(func.avg(Post.score)).where(
Post.subreddit_id == sub.id,
Post.created_utc >= day_start,
Post.created_utc < day_end,
)
).scalar()
# Build markdown digest
lines = [
f"# r/{sub.name} — Daily Digest for {yesterday}",
"",
f"**Posts:** {post_count} | **Comments:** {comment_count} | **Avg Score:** {avg_score:.1f}" if avg_score else f"**Posts:** {post_count} | **Comments:** {comment_count}",
"",
]
if top_posts:
lines.append("## Top Posts")
for i, (title, score, num_comments, permalink) in enumerate(top_posts, 1):
lines.append(f"{i}. **{title}** — {score} pts, {num_comments} comments")
lines.append("")
if top_authors:
lines.append("## Most Active Users")
for username, cnt in top_authors:
lines.append(f"- u/{username}: {cnt} comments")
lines.append("")
content = "\n".join(lines)
metadata = {
"post_count": post_count,
"comment_count": comment_count,
"avg_score": float(avg_score) if avg_score else 0,
"top_posts": [
{"title": t, "score": s, "num_comments": n}
for t, s, n, _ in top_posts
],
"top_authors": [
{"username": u, "comment_count": c}
for u, c in top_authors
],
}
digest = DailyDigest(
subreddit_id=sub.id,
digest_date=yesterday,
content=content,
metadata_=metadata,
)
db.add(digest)
db.commit()
logger.info(f"Generated daily digest for r/{sub.name} on {yesterday}")

90
backend/worker/main.py Normal file
View File

@@ -0,0 +1,90 @@
import logging
import signal
import sys
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.cron import CronTrigger
from backend.config import settings
from backend.worker.monitor import poll_new_posts, poll_hot_posts, collect_comments, update_scores
from backend.worker.snapshot import take_metric_snapshots
from backend.worker.digest_job import generate_daily_digests
from backend.worker.summary_job import generate_summaries
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
)
logger = logging.getLogger(__name__)
def seed_subreddits():
"""Add seed subreddits on first startup if configured."""
if not settings.seed_subreddits:
return
from sqlalchemy import select, create_engine
from sqlalchemy.orm import sessionmaker
from backend.models.subreddit import MonitoredSubreddit
engine = create_engine(settings.database_url_sync)
Session = sessionmaker(engine)
names = [s.strip().lower() for s in settings.seed_subreddits.split(",") if s.strip()]
with Session() as db:
for name in names:
existing = db.execute(
select(MonitoredSubreddit).where(MonitoredSubreddit.name == name)
).scalar_one_or_none()
if not existing:
db.add(MonitoredSubreddit(name=name))
logger.info(f"Seeded subreddit: r/{name}")
db.commit()
engine.dispose()
def main():
logger.info("Starting Reddit monitor worker")
seed_subreddits()
scheduler = BlockingScheduler()
# Reddit polling jobs
scheduler.add_job(poll_new_posts, IntervalTrigger(minutes=2), id="poll_new", max_instances=1)
scheduler.add_job(poll_hot_posts, IntervalTrigger(minutes=2), id="poll_hot", max_instances=1)
scheduler.add_job(collect_comments, IntervalTrigger(minutes=5), id="comments", max_instances=1)
scheduler.add_job(update_scores, IntervalTrigger(minutes=15), id="scores", max_instances=1)
# Metric snapshots
scheduler.add_job(take_metric_snapshots, IntervalTrigger(minutes=30), id="snapshots", max_instances=1)
# Daily digest
scheduler.add_job(
generate_daily_digests,
CronTrigger(hour=settings.digest_hour_utc, minute=0),
id="digest",
max_instances=1,
)
# AI summary stub
scheduler.add_job(
generate_summaries,
CronTrigger(hour=settings.digest_hour_utc, minute=30),
id="summary",
max_instances=1,
)
def shutdown(signum, frame):
logger.info("Shutting down worker...")
scheduler.shutdown(wait=False)
sys.exit(0)
signal.signal(signal.SIGTERM, shutdown)
signal.signal(signal.SIGINT, shutdown)
logger.info("Worker started. Scheduled jobs are running.")
scheduler.start()
if __name__ == "__main__":
main()

295
backend/worker/monitor.py Normal file
View File

@@ -0,0 +1,295 @@
import logging
from datetime import datetime, timezone, timedelta
from sqlalchemy import select, create_engine
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy.dialects.postgresql import insert
from backend.config import settings
from backend.models.subreddit import MonitoredSubreddit
from backend.models.author import Author
from backend.models.post import Post
from backend.models.comment import Comment
from backend.worker.reddit_client import create_client, fetch_json
logger = logging.getLogger(__name__)
# Sync engine for worker (PRAW-replacement uses async httpx, but DB writes are sync for simplicity with APScheduler)
_engine = create_engine(settings.database_url_sync, pool_size=3, max_overflow=5, pool_recycle=3600)
SyncSession = sessionmaker(_engine)
def _get_active_subreddits() -> list[dict]:
with SyncSession() as db:
stmt = select(MonitoredSubreddit).where(MonitoredSubreddit.is_active == True) # noqa: E712
result = db.execute(stmt)
return [{"id": s.id, "name": s.name} for s in result.scalars()]
def _upsert_author(db: Session, username: str) -> int | None:
if not username or username == "[deleted]":
return None
now = datetime.now(timezone.utc)
stmt = insert(Author).values(username=username, first_seen_at=now, last_seen_at=now)
stmt = stmt.on_conflict_do_update(
index_elements=[Author.username],
set_={"last_seen_at": now},
)
db.execute(stmt)
result = db.execute(select(Author.id).where(Author.username == username))
row = result.first()
return row[0] if row else None
def _parse_post(post_data: dict, subreddit_id: int, db: Session, hot_rank: int | None = None) -> dict:
data = post_data.get("data", post_data)
author_id = _upsert_author(db, data.get("author"))
created = datetime.fromtimestamp(data.get("created_utc", 0), tz=timezone.utc)
return {
"reddit_id": data.get("name", f"t3_{data.get('id', '')}"),
"subreddit_id": subreddit_id,
"author_id": author_id,
"title": data.get("title", ""),
"selftext": data.get("selftext"),
"url": data.get("url"),
"permalink": data.get("permalink"),
"flair": data.get("link_flair_text"),
"score": data.get("score", 0),
"upvote_ratio": data.get("upvote_ratio"),
"num_comments": data.get("num_comments", 0),
"is_self": data.get("is_self"),
"over_18": data.get("over_18", False),
"hot_rank": hot_rank,
"created_utc": created,
"collected_at": datetime.now(timezone.utc),
"updated_at": datetime.now(timezone.utc),
}
def _upsert_posts(db: Session, posts: list[dict], update_hot_rank: bool = False):
if not posts:
return
update_set = {
"score": insert(Post).excluded.score,
"upvote_ratio": insert(Post).excluded.upvote_ratio,
"num_comments": insert(Post).excluded.num_comments,
"updated_at": insert(Post).excluded.updated_at,
}
if update_hot_rank:
update_set["hot_rank"] = insert(Post).excluded.hot_rank
stmt = insert(Post).values(posts)
stmt = stmt.on_conflict_do_update(
index_elements=[Post.reddit_id],
set_=update_set,
)
db.execute(stmt)
def _parse_comment(comment_data: dict, post_id: int, db: Session, parent_map: dict) -> dict | None:
data = comment_data.get("data", comment_data)
if data.get("kind") == "more" or not data.get("body"):
return None
reddit_id = data.get("name", f"t1_{data.get('id', '')}")
author_id = _upsert_author(db, data.get("author"))
created = datetime.fromtimestamp(data.get("created_utc", 0), tz=timezone.utc)
parent_reddit_id = data.get("parent_id", "")
parent_comment_id = parent_map.get(parent_reddit_id)
return {
"reddit_id": reddit_id,
"post_id": post_id,
"parent_comment_id": parent_comment_id,
"author_id": author_id,
"body": data.get("body", ""),
"score": data.get("score", 0),
"created_utc": created,
"collected_at": datetime.now(timezone.utc),
"updated_at": datetime.now(timezone.utc),
}
import asyncio
def poll_new_posts():
"""Fetch /new for each active subreddit and upsert posts."""
asyncio.run(_poll_new_posts_async())
async def _poll_new_posts_async():
subreddits = _get_active_subreddits()
if not subreddits:
return
client = create_client()
async with client:
for sub in subreddits:
data = await fetch_json(client, f"/r/{sub['name']}/new", {"limit": "100"})
if not data:
continue
children = data.get("data", {}).get("children", [])
if not children:
continue
with SyncSession() as db:
posts = [_parse_post(child, sub["id"], db) for child in children]
_upsert_posts(db, posts)
db.commit()
logger.info(f"r/{sub['name']}: upserted {len(children)} new posts")
def poll_hot_posts():
"""Fetch /hot for each active subreddit and update hot_rank."""
asyncio.run(_poll_hot_posts_async())
async def _poll_hot_posts_async():
subreddits = _get_active_subreddits()
if not subreddits:
return
client = create_client()
async with client:
for sub in subreddits:
data = await fetch_json(client, f"/r/{sub['name']}/hot", {"limit": "100"})
if not data:
continue
children = data.get("data", {}).get("children", [])
if not children:
continue
with SyncSession() as db:
posts = [
_parse_post(child, sub["id"], db, hot_rank=i + 1)
for i, child in enumerate(children)
]
_upsert_posts(db, posts, update_hot_rank=True)
db.commit()
logger.info(f"r/{sub['name']}: updated hot ranks for {len(children)} posts")
def collect_comments():
"""Fetch comments for recent posts."""
asyncio.run(_collect_comments_async())
async def _collect_comments_async():
cutoff = datetime.now(timezone.utc) - timedelta(hours=48)
with SyncSession() as db:
stmt = (
select(Post.id, Post.reddit_id, Post.subreddit_id)
.join(MonitoredSubreddit)
.where(
MonitoredSubreddit.is_active == True, # noqa: E712
Post.created_utc >= cutoff,
)
.order_by(Post.created_utc.desc())
.limit(50)
)
result = db.execute(stmt)
recent_posts = [{"id": r[0], "reddit_id": r[1], "subreddit_id": r[2]} for r in result]
if not recent_posts:
return
client = create_client()
async with client:
for post in recent_posts:
short_id = post["reddit_id"].replace("t3_", "")
data = await fetch_json(client, f"/comments/{short_id}", {"limit": "500", "sort": "new"})
if not data or len(data) < 2:
continue
comment_listing = data[1].get("data", {}).get("children", [])
with SyncSession() as db:
# Build parent_map from existing comments
existing = db.execute(
select(Comment.id, Comment.reddit_id).where(Comment.post_id == post["id"])
)
parent_map = {r[1]: r[0] for r in existing}
comments_to_upsert = []
def process_comments(children):
for child in children:
if child.get("kind") == "more":
continue
c_data = child.get("data", {})
parsed = _parse_comment(c_data, post["id"], db, parent_map)
if parsed:
comments_to_upsert.append(parsed)
# Process replies recursively
replies = c_data.get("replies")
if isinstance(replies, dict):
reply_children = replies.get("data", {}).get("children", [])
process_comments(reply_children)
process_comments(comment_listing)
if comments_to_upsert:
# Upsert comments one at a time to handle parent references
for comment in comments_to_upsert:
stmt = insert(Comment).values(comment)
stmt = stmt.on_conflict_do_update(
index_elements=[Comment.reddit_id],
set_={
"score": stmt.excluded.score,
"body": stmt.excluded.body,
"updated_at": stmt.excluded.updated_at,
},
)
db.execute(stmt)
db.commit()
logger.info(f"Post {short_id}: upserted {len(comments_to_upsert)} comments")
def update_scores():
"""Re-fetch recent posts to update scores and comment counts."""
asyncio.run(_update_scores_async())
async def _update_scores_async():
cutoff = datetime.now(timezone.utc) - timedelta(days=7)
with SyncSession() as db:
stmt = (
select(Post.reddit_id, Post.subreddit_id, MonitoredSubreddit.name)
.join(MonitoredSubreddit)
.where(
MonitoredSubreddit.is_active == True, # noqa: E712
Post.created_utc >= cutoff,
)
)
result = db.execute(stmt)
posts_by_sub: dict[str, list[str]] = {}
for reddit_id, _, sub_name in result:
posts_by_sub.setdefault(sub_name, []).append(reddit_id)
if not posts_by_sub:
return
# Score updates piggyback on the new/hot polls — the upsert already updates scores.
# This job explicitly re-fetches to catch score changes on older posts.
client = create_client()
async with client:
for sub_name, reddit_ids in posts_by_sub.items():
data = await fetch_json(client, f"/r/{sub_name}/new", {"limit": "100"})
if not data:
continue
children = data.get("data", {}).get("children", [])
with SyncSession() as db:
sub = db.execute(
select(MonitoredSubreddit).where(MonitoredSubreddit.name == sub_name)
).scalar_one_or_none()
if not sub:
continue
posts = [_parse_post(child, sub.id, db) for child in children]
_upsert_posts(db, posts)
db.commit()
logger.info(f"Score update complete for {len(posts_by_sub)} subreddits")

View File

@@ -0,0 +1,58 @@
import asyncio
import logging
import time
import httpx
from backend.config import settings
logger = logging.getLogger(__name__)
BASE_URL = "https://www.reddit.com"
# Simple in-process rate limiter: track request timestamps
_request_times: list[float] = []
MAX_REQUESTS_PER_MINUTE = 9 # Stay under Reddit's ~10/min limit
async def _wait_for_rate_limit():
"""Block until we have budget for another request."""
now = time.monotonic()
# Remove timestamps older than 60 seconds
while _request_times and _request_times[0] < now - 60:
_request_times.pop(0)
if len(_request_times) >= MAX_REQUESTS_PER_MINUTE:
wait = 60 - (now - _request_times[0]) + 0.5
logger.info(f"Rate limit: waiting {wait:.1f}s")
await asyncio.sleep(wait)
_request_times.append(time.monotonic())
async def fetch_json(client: httpx.AsyncClient, path: str, params: dict | None = None) -> dict | None:
"""Fetch a Reddit .json endpoint with rate limiting and error handling."""
await _wait_for_rate_limit()
url = f"{BASE_URL}{path}.json"
try:
response = await client.get(url, params=params)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 60))
logger.warning(f"Rate limited, waiting {retry_after}s")
await asyncio.sleep(retry_after)
return await fetch_json(client, path, params)
if response.status_code >= 500:
logger.warning(f"Reddit returned {response.status_code} for {path}")
return None
response.raise_for_status()
return response.json()
except httpx.HTTPError as e:
logger.error(f"HTTP error fetching {path}: {e}")
return None
def create_client() -> httpx.AsyncClient:
"""Create an httpx client configured for Reddit."""
return httpx.AsyncClient(
headers={"User-Agent": settings.reddit_user_agent},
timeout=30.0,
follow_redirects=True,
)

View File

@@ -0,0 +1,47 @@
import logging
from datetime import datetime, timezone, timedelta
from sqlalchemy import select, create_engine
from sqlalchemy.orm import sessionmaker
from backend.config import settings
from backend.models.post import Post
from backend.models.metric_snapshot import MetricSnapshot
from backend.models.subreddit import MonitoredSubreddit
logger = logging.getLogger(__name__)
_engine = create_engine(settings.database_url_sync, pool_size=2, pool_recycle=3600)
SyncSession = sessionmaker(_engine)
def take_metric_snapshots():
"""Snapshot current metrics for recent posts."""
now = datetime.now(timezone.utc)
with SyncSession() as db:
# Posts < 48h old: snapshot every run (every 30 min)
cutoff_recent = now - timedelta(hours=48)
stmt = (
select(Post.id, Post.score, Post.num_comments, Post.upvote_ratio)
.join(MonitoredSubreddit)
.where(
MonitoredSubreddit.is_active == True, # noqa: E712
Post.created_utc >= cutoff_recent,
)
)
result = db.execute(stmt)
snapshots = []
for post_id, score, num_comments, upvote_ratio in result:
snapshots.append(MetricSnapshot(
post_id=post_id,
score=score,
num_comments=num_comments,
upvote_ratio=upvote_ratio,
snapshot_at=now,
))
if snapshots:
db.add_all(snapshots)
db.commit()
logger.info(f"Took {len(snapshots)} metric snapshots")

View File

@@ -0,0 +1,12 @@
import logging
from backend.config import settings
logger = logging.getLogger(__name__)
def generate_summaries():
"""Stub: AI summary generation. Enable when a provider is configured."""
if not settings.ai_summary_enabled:
return
logger.info("AI summary generation not yet configured")