Files
reddit-bot/backend/services/post_service.py
dat972 bc2203524f Add Reddit monitoring bot — backend, frontend, and Docker config
Python/FastAPI backend with PostgreSQL for collecting Reddit data via
public .json endpoints. React/Vite dashboard for analytics. Docker Compose
setup with API and worker services connecting to shared PostgreSQL.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 19:29:58 -05:00

103 lines
2.9 KiB
Python

from datetime import datetime
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import joinedload
from backend.models.post import Post
from backend.models.subreddit import MonitoredSubreddit
from backend.models.author import Author
from backend.models.comment import Comment
async def list_posts(
db: AsyncSession,
subreddit_id: int | None = None,
author: str | None = None,
flair: str | None = None,
sort_by: str = "created_utc",
sort_order: str = "desc",
since: datetime | None = None,
until: datetime | None = None,
page: int = 1,
per_page: int = 25,
) -> tuple[list[dict], int]:
base = select(Post, MonitoredSubreddit.name, Author.username).join(
MonitoredSubreddit
).outerjoin(Author)
filters = []
if subreddit_id:
filters.append(Post.subreddit_id == subreddit_id)
if flair:
filters.append(Post.flair == flair)
if since:
filters.append(Post.created_utc >= since)
if until:
filters.append(Post.created_utc <= until)
if author:
filters.append(Author.username == author)
if filters:
base = base.where(*filters)
# Count
count_stmt = select(func.count()).select_from(base.subquery())
total = (await db.execute(count_stmt)).scalar() or 0
# Sort
sort_col = getattr(Post, sort_by, Post.created_utc)
if sort_order == "asc":
base = base.order_by(sort_col.asc())
else:
base = base.order_by(sort_col.desc())
# Paginate
base = base.offset((page - 1) * per_page).limit(per_page)
result = await db.execute(base)
rows = result.all()
posts = []
for post, sub_name, author_name in rows:
data = {c.name: getattr(post, c.name) for c in post.__table__.columns}
data["subreddit_name"] = sub_name
data["author_name"] = author_name
posts.append(data)
return posts, total
async def get_post(db: AsyncSession, post_id: int) -> dict | None:
stmt = (
select(Post, MonitoredSubreddit.name, Author.username)
.join(MonitoredSubreddit)
.outerjoin(Author)
.where(Post.id == post_id)
)
result = await db.execute(stmt)
row = result.first()
if not row:
return None
post, sub_name, author_name = row
data = {c.name: getattr(post, c.name) for c in post.__table__.columns}
data["subreddit_name"] = sub_name
data["author_name"] = author_name
# Get comments
comment_stmt = (
select(Comment, Author.username)
.outerjoin(Author)
.where(Comment.post_id == post_id)
.order_by(Comment.created_utc.asc())
)
comment_result = await db.execute(comment_stmt)
comments = []
for comment, c_author in comment_result.all():
c_data = {c.name: getattr(comment, c.name) for c in comment.__table__.columns}
c_data["author_name"] = c_author
comments.append(c_data)
data["comments"] = comments
return data