Files
reddit-bot/backend/services/analytics_service.py
dat972 bc2203524f Add Reddit monitoring bot — backend, frontend, and Docker config
Python/FastAPI backend with PostgreSQL for collecting Reddit data via
public .json endpoints. React/Vite dashboard for analytics. Docker Compose
setup with API and worker services connecting to shared PostgreSQL.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 19:29:58 -05:00

232 lines
7.1 KiB
Python

from datetime import datetime, timedelta, timezone
from sqlalchemy import select, func, case, text
from sqlalchemy.ext.asyncio import AsyncSession
from backend.models.post import Post
from backend.models.comment import Comment
from backend.models.author import Author
from backend.models.subreddit import MonitoredSubreddit
async def get_engagement(
db: AsyncSession,
subreddit_id: int | None = None,
granularity: str = "day",
since: datetime | None = None,
until: datetime | None = None,
) -> list[dict]:
if not since:
since = datetime.now(timezone.utc) - timedelta(days=30)
if not until:
until = datetime.now(timezone.utc)
trunc = func.date_trunc(granularity, Post.created_utc)
stmt = select(
trunc.label("period"),
func.count(Post.id).label("posts"),
func.coalesce(func.avg(Post.score), 0).label("avg_score"),
).where(Post.created_utc >= since, Post.created_utc <= until)
if subreddit_id:
stmt = stmt.where(Post.subreddit_id == subreddit_id)
stmt = stmt.group_by("period").order_by("period")
result = await db.execute(stmt)
# Get comment counts per period
comment_trunc = func.date_trunc(granularity, Comment.created_utc)
comment_stmt = (
select(
comment_trunc.label("period"),
func.count(Comment.id).label("comments"),
)
.join(Post)
.where(Comment.created_utc >= since, Comment.created_utc <= until)
)
if subreddit_id:
comment_stmt = comment_stmt.where(Post.subreddit_id == subreddit_id)
comment_stmt = comment_stmt.group_by("period")
comment_result = await db.execute(comment_stmt)
comment_map = {str(r.period): r.comments for r in comment_result}
return [
{
"period": str(r.period),
"posts": r.posts,
"comments": comment_map.get(str(r.period), 0),
"avg_score": round(float(r.avg_score), 1),
}
for r in result
]
async def get_top_posts(
db: AsyncSession,
subreddit_id: int | None = None,
metric: str = "score",
since: datetime | None = None,
until: datetime | None = None,
limit: int = 10,
) -> list[dict]:
if not since:
since = datetime.now(timezone.utc) - timedelta(days=7)
stmt = (
select(Post, MonitoredSubreddit.name, Author.username)
.join(MonitoredSubreddit)
.outerjoin(Author)
.where(Post.created_utc >= since)
)
if until:
stmt = stmt.where(Post.created_utc <= until)
if subreddit_id:
stmt = stmt.where(Post.subreddit_id == subreddit_id)
sort_col = Post.score if metric == "score" else Post.num_comments
stmt = stmt.order_by(sort_col.desc()).limit(limit)
result = await db.execute(stmt)
return [
{
"id": post.id,
"title": post.title,
"score": post.score,
"num_comments": post.num_comments,
"author_name": author_name,
"subreddit_name": sub_name,
"created_utc": post.created_utc,
"permalink": post.permalink,
}
for post, sub_name, author_name in result.all()
]
async def get_top_authors(
db: AsyncSession,
subreddit_id: int | None = None,
since: datetime | None = None,
until: datetime | None = None,
limit: int = 10,
) -> list[dict]:
if not since:
since = datetime.now(timezone.utc) - timedelta(days=7)
post_count = (
select(func.count(Post.id))
.where(Post.author_id == Author.id, Post.created_utc >= since)
)
comment_count = (
select(func.count(Comment.id))
.where(Comment.author_id == Author.id, Comment.created_utc >= since)
)
if until:
post_count = post_count.where(Post.created_utc <= until)
comment_count = comment_count.where(Comment.created_utc <= until)
if subreddit_id:
post_count = post_count.where(Post.subreddit_id == subreddit_id)
comment_count = comment_count.join(Post).where(Post.subreddit_id == subreddit_id)
pc = post_count.correlate(Author).scalar_subquery().label("post_count")
cc = comment_count.correlate(Author).scalar_subquery().label("comment_count")
stmt = (
select(Author, pc, cc)
.order_by((pc + cc).desc())
.limit(limit)
)
result = await db.execute(stmt)
return [
{
"id": author.id,
"username": author.username,
"post_count": pc or 0,
"comment_count": cc or 0,
"total_activity": (pc or 0) + (cc or 0),
}
for author, pc, cc in result.all()
]
async def get_subreddit_summary(
db: AsyncSession,
since: datetime | None = None,
until: datetime | None = None,
) -> list[dict]:
if not since:
since = datetime.now(timezone.utc) - timedelta(days=7)
stmt = (
select(
MonitoredSubreddit.id,
MonitoredSubreddit.name,
func.count(Post.id).label("total_posts"),
func.coalesce(func.avg(Post.score), 0).label("avg_score"),
)
.outerjoin(Post, (Post.subreddit_id == MonitoredSubreddit.id) & (Post.created_utc >= since))
.where(MonitoredSubreddit.is_active == True) # noqa: E712
.group_by(MonitoredSubreddit.id)
.order_by(MonitoredSubreddit.name)
)
if until:
stmt = stmt.where(Post.created_utc <= until)
result = await db.execute(stmt)
summaries = []
for sub_id, sub_name, total_posts, avg_score in result.all():
# Get comment count
cc = await db.execute(
select(func.count(Comment.id))
.join(Post)
.where(Post.subreddit_id == sub_id, Comment.created_utc >= since)
)
comment_count = cc.scalar() or 0
# Top flair
flair_stmt = (
select(Post.flair, func.count(Post.id).label("cnt"))
.where(Post.subreddit_id == sub_id, Post.created_utc >= since, Post.flair.isnot(None))
.group_by(Post.flair)
.order_by(func.count(Post.id).desc())
.limit(1)
)
flair_result = await db.execute(flair_stmt)
top_flair_row = flair_result.first()
summaries.append({
"subreddit_id": sub_id,
"subreddit_name": sub_name,
"total_posts": total_posts,
"total_comments": comment_count,
"avg_score": round(float(avg_score), 1),
"top_flair": top_flair_row[0] if top_flair_row else None,
})
return summaries
async def get_flair_distribution(
db: AsyncSession,
subreddit_id: int,
since: datetime | None = None,
until: datetime | None = None,
) -> list[dict]:
if not since:
since = datetime.now(timezone.utc) - timedelta(days=30)
stmt = (
select(Post.flair, func.count(Post.id).label("count"))
.where(Post.subreddit_id == subreddit_id, Post.created_utc >= since)
.group_by(Post.flair)
.order_by(func.count(Post.id).desc())
)
if until:
stmt = stmt.where(Post.created_utc <= until)
result = await db.execute(stmt)
return [{"flair": flair, "count": count} for flair, count in result.all()]