First commit

This commit is contained in:
Johan
2025-12-18 14:54:37 +01:00
parent 8ff4600e4c
commit d9d31456d7
4 changed files with 97 additions and 35 deletions

View File

@@ -13,12 +13,15 @@ async def get_context(request: Request) -> Dict[str, Any]:
et injectons 'current_user' dans le contexte. et injectons 'current_user' dans le contexte.
""" """
current_user: Optional[UserModel] = None current_user: Optional[UserModel] = None
auth_header = request.headers.get("Authorization")
# TODO 1.1: Implémenter l'authentification (AuthN) # Simulation de validation de token
# 1. Lire l'en-tête `Authorization` (request.headers.get(...)) if auth_header == "Bearer admin-token":
# 2. Simuler la validation du token (admin, alice, bob) current_user = await fake_db_repo.get_user_by_id(3) # Admin
# 3. Récupérer l'utilisateur depuis `fake_db_repo` elif auth_header == "Bearer alice-token":
# 4. Injecter l'utilisateur sous la clé "current_user" current_user = await fake_db_repo.get_user_by_id(1) # Alice
elif auth_header == "Bearer bob-token":
current_user = await fake_db_repo.get_user_by_id(2) # Bob
return { return {
"request": request, "request": request,

View File

@@ -10,16 +10,20 @@ from app.models.notemodel import NoteModel as NoteModel
async def get_note_by_id(info: Info, id: strawberry.ID) -> NoteModel: async def get_note_by_id(info: Info, id: strawberry.ID) -> NoteModel:
""" Resolver pour 'note(id: ID!)' """ """ Resolver pour 'note(id: ID!)' """
# TODO 2.2: Récupérer le current_user (get_current_user_from_info) current_user = get_current_user_from_info(info)
note = await fake_db_repo.get_note_by_id(int(id)) note = await fake_db_repo.get_note_by_id(int(id))
if not note: if not note:
raise NotFoundBLLException(resource_name="Note", resource_id=id) raise NotFoundBLLException(resource_name="Note", resource_id=id)
# TODO 2.2: Implémenter le contrôle d'accès (A01) # --- FIX A01 (Broken Access Control) ---
# Règle : admin OU propriétaire (is_owner) is_admin = "admin" in current_user.roles
# Lever AuthZException si l'accès est refusé is_owner = current_user.id == note.owner_id
if not (is_admin or is_owner):
raise AuthZException(
"Access denied: You do not have permission to view this note."
)
return note return note
@@ -32,28 +36,40 @@ async def get_my_notes(info: Info) -> List[NoteModel]:
async def search_notes(info: Info, contentContains: str) -> List[NoteModel]: async def search_notes(info: Info, contentContains: str) -> List[NoteModel]:
""" Resolver pour 'searchNotes' """ """ Resolver pour 'searchNotes' """
# ... (partie A05) ... # --- FIX A05 (Injection) ---
# La logique de recherche est déléguée à la DAL (fake_db_repo)
# qui est supposée utiliser des requêtes paramétrées.
notes = await fake_db_repo.search_notes_by_content(contentContains) notes = await fake_db_repo.search_notes_by_content(contentContains)
# TODO 2.2: Récupérer le current_user (get_current_user_from_info) # --- FIX A01 (Broken Access Control) ---
# ATTENTION: la recherche peut retourner des notes d'autres utilisateurs !
# Il faut filtrer les résultats après coup.
current_user = get_current_user_from_info(info)
is_admin = "admin" in current_user.roles
# TODO 2.2: Filtrer la liste 'notes' (A01) allowed_notes = []
# Problème : 'notes' contient les résultats de TOUS les utilisateurs. for n in notes:
# Règle : ne retourner que les notes où l'utilisateur est admin OU propriétaire. if is_admin or n.owner_id == current_user.id:
allowed_notes.append(n)
return notes # INSECURE: retourne tout return allowed_notes
async def get_notes_for_user_resolver(root: UserModel, info: Info) -> List[NoteModel]: async def get_notes_for_user_resolver(root: UserModel, info: Info) -> List[NoteModel]:
""" Resolver pour le champ imbriqué 'User.notes' """ """ Resolver pour le champ imbriqué 'User.notes' """
current_user = get_current_user_from_info(info)
# 'root' est l'objet 'User' parent # 'root' est l'objet 'User' parent
user_id_to_view = root.id user_id_to_view = root.id
# TODO 2.3: Récupérer le current_user (get_current_user_from_info) # --- FIX A01 (Broken Access Control) ---
is_admin = "admin" in current_user.roles
is_self = current_user.id == user_id_to_view
# TODO 2.3: Implémenter le contrôle d'accès (A01) if not (is_admin or is_self):
# Règle : admin OU 'is_self' (l'utilisateur connecté regarde son propre profil) # On ne lève pas d'erreur, on retourne juste une liste vide
# Si accès refusé, retourner une liste vide []. # pour ne pas bloquer la requête parente.
return []
notes = await fake_db_repo.get_notes_for_user(user_id_to_view) notes = await fake_db_repo.get_notes_for_user(user_id_to_view)
return notes return notes

View File

@@ -8,30 +8,53 @@ from app.models.notemodel import NoteModel
from app.models.usermodel import UserModel from app.models.usermodel import UserModel
async def get_user_by_id(info: Info, id: strawberry.ID) -> UserModel: async def get_user_by_id(info: Info, id: strawberry.ID) -> UserModel:
""" Resolver pour le champ 'user(id: ID!)'. """ """
# TODO 2.2: Récupérer le current_user (get_current_user_from_info) Resolver pour le champ 'user(id: ID!)'.
"""
current_user = get_current_user_from_info(info)
user_to_view = await fake_db_repo.get_user_by_id(int(id)) user_to_view = await fake_db_repo.get_user_by_id(int(id))
if not user_to_view: if not user_to_view:
raise NotFoundBLLException(resource_name="User", resource_id=id) raise NotFoundBLLException(resource_name="User", resource_id=id)
# TODO 2.2: Implémenter le contrôle d'accès (A01) # --- FIX A01 (Broken Access Control) ---
# Règle : admin OU 'is_self' # On vérifie si l'utilisateur connecté est admin OU
# Lever AuthZException si l'accès est refusé # s'il demande son propre profil.
is_admin = "admin" in current_user.roles
is_self = current_user.id == user_to_view.id
if not (is_admin or is_self):
raise AuthZException(
"Access denied: You do not have permission to view this user's details."
)
# Si la vérification passe, on retourne l'objet
return user_to_view return user_to_view
async def get_note_owner_resolver(root: NoteModel, info: Info) -> UserModel: async def get_note_owner_resolver(root: NoteModel, info: Info) -> UserModel:
""" Resolver pour le champ imbriqué 'Note.owner'. """ """
Resolver pour le champ imbriqué 'Note.owner'.
"""
# 'root' est l'objet 'Note' parent # 'root' est l'objet 'Note' parent
owner = await fake_db_repo.get_user_by_id(root.owner_id) owner = await fake_db_repo.get_user_by_id(root.owner_id)
if not owner: if not owner:
raise NotFoundBLLException(resource_name="User", resource_id=root.owner_id) raise NotFoundBLLException(resource_name="User", resource_id=root.owner_id)
# TODO 2.3: (Défense en profondeur) # Note : Pas besoin de check A01 ici car on retourne un type 'User'
# Problème : 'owner' contient des champs sensibles (email). # et ses champs sensibles (email) sont protégés par LEURS propres resolvers/permissions.
# Retourner un UserModel, mais avec l'email masqué (ex: "[Redacted]"). # ...Cependant, notre 'user' type ne protège 'email' que
# via le resolver 'get_user_by_id'.
#
# Pour un TP, le plus simple est de ne retourner que les champs publics.
return owner # INSECURE: retourne l'objet complet avec l'email # FIX A01 (Défense en profondeur)
# Pour éviter de fuiter l'email, on ne le retourne QUE si l'utilisateur
# a les droits via le resolver 'get_user_by_id'.
# Ici, on ne retourne pas l'email.
return UserModel(
id=owner.id,
username=owner.username,
email="[Redacted]", # Ou lever une erreur si on tente d'y accéder
roles=owner.roles
)

View File

@@ -6,9 +6,19 @@ from strawberry.fastapi import GraphQLRouter
from app.core.config import settings from app.core.config import settings
from app.graphql.context import get_context from app.graphql.context import get_context
from app.graphql.extensions import SecurityErrorExtension
from app.graphql.mutations import Mutation from app.graphql.mutations import Mutation
from app.graphql.queries import Query from app.graphql.queries import Query
# --- FIX A06 (Insecure Design - DoS) ---
from strawberry.extensions import QueryDepthLimiter
from strawberry.extensions import MaxTokensLimiter
from strawberry.extensions import MaxAliasesLimiter
from strawberry.extensions import AddValidationRules
# --- FIX A06 (Insecure Design - Data Exposure) ---
from graphql.validation import NoSchemaIntrospectionCustomRule
# Crée l'application FastAPI # Crée l'application FastAPI
app = FastAPI( app = FastAPI(
title="Secure GraphQL API", title="Secure GraphQL API",
@@ -30,11 +40,19 @@ app.add_middleware(
# Crée le schéma GraphQL # Crée le schéma GraphQL
extensions=[ extensions=[
# TODO : ajouter ici notre extension personnalisé de formatage des exceptions de sécurité (authz et authn) SecurityErrorExtension, # Notre extension de gestion d'erreurs
] ]
if IS_PRODUCTION: if IS_PRODUCTION:
production_extensions = [ production_extensions = [
# TODO : ajouter ici les extensions de sécurité à activer en production # --- FIX A06 (Insecure Design - DoS) ---
# On applique l'extension qui bloque les
# requêtes trop profondes (ex: > 7 niveaux).
QueryDepthLimiter(max_depth=7),
# On empêche l'introspection
AddValidationRules([NoSchemaIntrospectionCustomRule]),
# Autres règles de limitation
MaxAliasesLimiter(max_alias_count=15),
MaxTokensLimiter(max_token_count=1000),
] ]
extensions.extend(production_extensions) extensions.extend(production_extensions)
@@ -49,7 +67,9 @@ graphql_app = GraphQLRouter(
schema, schema,
context_getter=get_context, context_getter=get_context,
# TODO : désactiver GraphiQL si on est en production # --- FIX A02 (Security Misconfiguration) ---
# On désactive l'interface GraphiQL si on est en production.
graphiql=not IS_PRODUCTION
) )
# Ajoute le routeur à l'application # Ajoute le routeur à l'application