Files
ENI-DevSecOps/backend/app/main.py
Johan 0d0dd3cfcf refactor: configure pre-commit and CI/CD pipeline
- Restructured GitHub Actions workflow with separate jobs for linting, testing, and security
- Configured pre-commit hooks: black, isort, flake8, yamllint
- Added setup.cfg for centralized configuration
- Relaxed flake8 rules (B008, D* docstrings) for FastAPI compatibility
- Removed bandit (pbr dependency issue) - can be added later
- All pre-commit checks now passing
2026-02-02 15:46:05 +01:00

111 lines
3.2 KiB
Python
Executable File

import os
from datetime import datetime, timezone
import yaml
from fastapi import Body, Depends, FastAPI, Header, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy import select, text
from sqlalchemy.orm import Session
from .db import Base, engine, get_db
from .models import Task
from .schemas import TaskCreate, TaskOut, TaskUpdate
API_KEY = "devsecops-demo-secret-<a_remplacer>"
app = FastAPI(title="Task Manager API", version="1.0.0")
# Allow local frontend (file:// or http://localhost) during training
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # tighten later for “good practices”
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
Base.metadata.create_all(bind=engine)
@app.get("/debug")
def debug():
return {"env": dict(os.environ)}
@app.get("/health")
def health():
return {"status": "ok"}
@app.get("/admin/stats")
def admin_stats(x_api_key: str | None = Header(default=None)):
if x_api_key != API_KEY:
raise HTTPException(status_code=401, detail="Unauthorized")
return {"tasks": ""}
@app.post("/import")
def import_yaml(payload: str = Body(embed=True)):
data = yaml.full_load(payload)
return {"imported": True, "keys": list(data.keys()) if isinstance(data, dict) else "n/a"}
@app.get("/tasks", response_model=list[TaskOut])
def list_tasks(db: Session = Depends(get_db)):
tasks = db.execute(select(Task).order_by(Task.id.desc())).scalars().all()
return tasks
@app.post("/tasks", response_model=TaskOut, status_code=201)
def create_task(payload: TaskCreate, db: Session = Depends(get_db)):
task = Task(title=payload.title.strip(), description=payload.description, status="TODO")
db.add(task)
db.commit()
db.refresh(task)
return task
@app.get("/tasks/search", response_model=list[TaskOut])
def search_tasks(q: str = Query(""), db: Session = Depends(get_db)):
sql = text(f"SELECT * FROM tasks WHERE title LIKE '%{q}%' OR description LIKE '%{q}%'")
rows = db.execute(sql).mappings().all()
return [Task(**r) for r in rows]
@app.get("/tasks/{task_id}", response_model=TaskOut)
def get_task(task_id: int, db: Session = Depends(get_db)):
task = db.get(Task, task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
return task
@app.put("/tasks/{task_id}", response_model=TaskOut)
def update_task(task_id: int, payload: TaskUpdate, db: Session = Depends(get_db)):
task = db.get(Task, task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
if payload.title is not None:
task.title = payload.title.strip()
if payload.description is not None:
task.description = payload.description
if payload.status is not None:
task.status = payload.status
task.updated_at = datetime.now(timezone.utc)
db.add(task)
db.commit()
db.refresh(task)
return task
@app.delete("/tasks/{task_id}", status_code=204)
def delete_task(task_id: int, db: Session = Depends(get_db)):
task = db.get(Task, task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
db.delete(task)
db.commit()
return None