91 lines
2.9 KiB
Python
91 lines
2.9 KiB
Python
import logging
|
|
import signal
|
|
import sys
|
|
|
|
from apscheduler.schedulers.blocking import BlockingScheduler
|
|
from apscheduler.triggers.interval import IntervalTrigger
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
|
|
from backend.config import settings
|
|
from backend.worker.monitor import poll_new_posts, poll_hot_posts, collect_comments, update_scores
|
|
from backend.worker.snapshot import take_metric_snapshots
|
|
from backend.worker.digest_job import generate_daily_digests
|
|
from backend.worker.summary_job import generate_summaries
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def seed_subreddits():
|
|
"""Add seed subreddits on first startup if configured."""
|
|
if not settings.seed_subreddits:
|
|
return
|
|
from sqlalchemy import select, create_engine
|
|
from sqlalchemy.orm import sessionmaker
|
|
from backend.models.subreddit import MonitoredSubreddit
|
|
|
|
engine = create_engine(settings.database_url_sync)
|
|
Session = sessionmaker(engine)
|
|
|
|
names = [s.strip().lower() for s in settings.seed_subreddits.split(",") if s.strip()]
|
|
with Session() as db:
|
|
for name in names:
|
|
existing = db.execute(
|
|
select(MonitoredSubreddit).where(MonitoredSubreddit.name == name)
|
|
).scalar_one_or_none()
|
|
if not existing:
|
|
db.add(MonitoredSubreddit(name=name))
|
|
logger.info(f"Seeded subreddit: r/{name}")
|
|
db.commit()
|
|
engine.dispose()
|
|
|
|
|
|
def main():
|
|
logger.info("Starting Reddit monitor worker")
|
|
seed_subreddits()
|
|
|
|
scheduler = BlockingScheduler()
|
|
|
|
# Reddit polling jobs
|
|
scheduler.add_job(poll_new_posts, IntervalTrigger(minutes=10), id="poll_new", max_instances=1)
|
|
scheduler.add_job(poll_hot_posts, IntervalTrigger(minutes=30), id="poll_hot", max_instances=1)
|
|
scheduler.add_job(collect_comments, IntervalTrigger(minutes=15), id="comments", max_instances=1)
|
|
scheduler.add_job(update_scores, IntervalTrigger(minutes=60), id="scores", max_instances=1)
|
|
|
|
# Metric snapshots
|
|
scheduler.add_job(take_metric_snapshots, IntervalTrigger(minutes=30), id="snapshots", max_instances=1)
|
|
|
|
# Daily digest
|
|
scheduler.add_job(
|
|
generate_daily_digests,
|
|
CronTrigger(hour=settings.digest_hour_utc, minute=0),
|
|
id="digest",
|
|
max_instances=1,
|
|
)
|
|
|
|
# AI summary stub
|
|
scheduler.add_job(
|
|
generate_summaries,
|
|
CronTrigger(hour=settings.digest_hour_utc, minute=30),
|
|
id="summary",
|
|
max_instances=1,
|
|
)
|
|
|
|
def shutdown(signum, frame):
|
|
logger.info("Shutting down worker...")
|
|
scheduler.shutdown(wait=False)
|
|
sys.exit(0)
|
|
|
|
signal.signal(signal.SIGTERM, shutdown)
|
|
signal.signal(signal.SIGINT, shutdown)
|
|
|
|
logger.info("Worker started. Scheduled jobs are running.")
|
|
scheduler.start()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|