Files
life/backend/app/routers/finance.py
2026-03-09 16:05:38 -05:00

264 lines
7.2 KiB
Python

from datetime import date
from fastapi import APIRouter, Depends, HTTPException, Query, UploadFile
from pydantic import BaseModel
from sqlalchemy import func, select
from sqlalchemy.orm import Session
from app.auth import get_current_user
from app.database import get_db
from app.models.finance import FinanceAccount, FinanceSnapshot, FinanceTransaction
from app.services.chase_csv import parse_chase_csv
router = APIRouter(
prefix="/api/finances", tags=["finances"], dependencies=[Depends(get_current_user)]
)
# --- Schemas ---
class AccountCreate(BaseModel):
name: str
account_type: str
class AccountRead(AccountCreate):
id: int
model_config = {"from_attributes": True}
class TransactionCreate(BaseModel):
date: date
amount: float
category: str
description: str | None = None
account_id: int | None = None
class TransactionRead(TransactionCreate):
id: int
model_config = {"from_attributes": True}
class SnapshotCreate(BaseModel):
date: date
net_worth: float
notes: str | None = None
class SnapshotRead(SnapshotCreate):
id: int
model_config = {"from_attributes": True}
class CategorySummary(BaseModel):
category: str
total: float
class AccountSpendingSummary(BaseModel):
account_id: int | None
account_name: str | None
total: float
class CSVUploadResult(BaseModel):
imported: int
skipped: int
errors: list[str]
# --- Account endpoints ---
@router.get("/accounts", response_model=list[AccountRead])
def list_accounts(db: Session = Depends(get_db)):
return db.scalars(select(FinanceAccount).order_by(FinanceAccount.name)).all()
@router.post("/accounts", response_model=AccountRead, status_code=201)
def create_account(body: AccountCreate, db: Session = Depends(get_db)):
acct = FinanceAccount(**body.model_dump())
db.add(acct)
db.commit()
db.refresh(acct)
return acct
@router.delete("/accounts/{account_id}", status_code=204)
def delete_account(account_id: int, db: Session = Depends(get_db)):
acct = db.get(FinanceAccount, account_id)
if not acct:
raise HTTPException(status_code=404, detail="Account not found")
db.delete(acct)
db.commit()
@router.post("/accounts/{account_id}/upload-csv", response_model=CSVUploadResult)
async def upload_csv(
account_id: int,
file: UploadFile,
db: Session = Depends(get_db),
):
acct = db.get(FinanceAccount, account_id)
if not acct:
raise HTTPException(status_code=404, detail="Account not found")
content = (await file.read()).decode("utf-8")
rows, errors = parse_chase_csv(content, acct.account_type)
transactions = [
FinanceTransaction(
date=r["date"],
amount=r["amount"],
category=r["category"],
description=r["description"],
account_id=account_id,
)
for r in rows
]
db.add_all(transactions)
db.commit()
return CSVUploadResult(imported=len(transactions), skipped=0, errors=errors)
# --- Transaction endpoints ---
@router.get("/transactions", response_model=list[TransactionRead])
def list_transactions(
from_date: date | None = Query(None, alias="from"),
to_date: date | None = Query(None, alias="to"),
category: str | None = None,
account_id: int | None = Query(None),
db: Session = Depends(get_db),
):
q = select(FinanceTransaction).order_by(FinanceTransaction.date.desc())
if from_date:
q = q.where(FinanceTransaction.date >= from_date)
if to_date:
q = q.where(FinanceTransaction.date <= to_date)
if category:
q = q.where(FinanceTransaction.category == category)
if account_id is not None:
q = q.where(FinanceTransaction.account_id == account_id)
return db.scalars(q).all()
@router.post("/transactions", response_model=TransactionRead, status_code=201)
def create_transaction(body: TransactionCreate, db: Session = Depends(get_db)):
txn = FinanceTransaction(**body.model_dump())
db.add(txn)
db.commit()
db.refresh(txn)
return txn
@router.put("/transactions/{txn_id}", response_model=TransactionRead)
def update_transaction(
txn_id: int, body: TransactionCreate, db: Session = Depends(get_db)
):
txn = db.get(FinanceTransaction, txn_id)
if not txn:
raise HTTPException(status_code=404, detail="Transaction not found")
for key, val in body.model_dump().items():
setattr(txn, key, val)
db.commit()
db.refresh(txn)
return txn
@router.delete("/transactions/{txn_id}", status_code=204)
def delete_transaction(txn_id: int, db: Session = Depends(get_db)):
txn = db.get(FinanceTransaction, txn_id)
if not txn:
raise HTTPException(status_code=404, detail="Transaction not found")
db.delete(txn)
db.commit()
# --- Snapshot endpoints ---
@router.get("/snapshots", response_model=list[SnapshotRead])
def list_snapshots(db: Session = Depends(get_db)):
return db.scalars(
select(FinanceSnapshot).order_by(FinanceSnapshot.date.desc())
).all()
@router.post("/snapshots", response_model=SnapshotRead, status_code=201)
def create_snapshot(body: SnapshotCreate, db: Session = Depends(get_db)):
snap = FinanceSnapshot(**body.model_dump())
db.add(snap)
db.commit()
db.refresh(snap)
return snap
# --- Summary endpoints ---
@router.get("/summary", response_model=list[CategorySummary])
def spending_summary(
period: str = Query("month", pattern="^(month|year)$"),
account_id: int | None = Query(None),
db: Session = Depends(get_db),
):
today = date.today()
if period == "month":
start = today.replace(day=1)
else:
start = today.replace(month=1, day=1)
q = (
select(
FinanceTransaction.category,
func.sum(FinanceTransaction.amount).label("total"),
)
.where(FinanceTransaction.date >= start)
.where(FinanceTransaction.amount < 0)
.group_by(FinanceTransaction.category)
.order_by(func.sum(FinanceTransaction.amount))
)
if account_id is not None:
q = q.where(FinanceTransaction.account_id == account_id)
rows = db.execute(q).all()
return [CategorySummary(category=r.category, total=float(r.total)) for r in rows]
@router.get("/summary/by-account", response_model=list[AccountSpendingSummary])
def spending_by_account(
period: str = Query("month", pattern="^(month|year)$"),
db: Session = Depends(get_db),
):
today = date.today()
if period == "month":
start = today.replace(day=1)
else:
start = today.replace(month=1, day=1)
rows = db.execute(
select(
FinanceTransaction.account_id,
FinanceAccount.name.label("account_name"),
func.sum(FinanceTransaction.amount).label("total"),
)
.outerjoin(FinanceAccount, FinanceTransaction.account_id == FinanceAccount.id)
.where(FinanceTransaction.date >= start)
.where(FinanceTransaction.amount < 0)
.group_by(FinanceTransaction.account_id, FinanceAccount.name)
.order_by(func.sum(FinanceTransaction.amount))
).all()
return [
AccountSpendingSummary(
account_id=r.account_id,
account_name=r.account_name or "Untagged",
total=float(r.total),
)
for r in rows
]