First commit

This commit is contained in:
Johan
2025-12-18 14:52:33 +01:00
commit 8ff4600e4c
27 changed files with 3300 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
/.idea

495
README.md Normal file
View File

@@ -0,0 +1,495 @@
# TP : sécurisation d'une API GraphQL (OWASP)
Vous avez développé une API GraphQL pour une application de prise de notes en utilisant **FastAPI** et **Strawberry**.
L'application est fonctionnelle, mais n'a aucune notion de sécurité.
Actuellement, n'importe quel utilisateur (même non authentifié) peut lire, rechercher et voir les profils et notes privées de tous les autres utilisateurs.
L'API est également vulnérable à des attaques par Déni de Service (DoS) et expose sa configuration en production.
## Objectifs du TP
* Implémenter l'**Authentification (AuthN)** au niveau de la couche Contexte.
* Implémenter l'**Autorisation (AuthZ)** au niveau des Resolvers (OWASP A01).
* Configurer les défenses de production (OWASP A02 & A06).
* Utiliser les outils fournis (Exceptions, Helpers) pour sécuriser l'API.
## Votre Mission
**Ce qui est fourni et 100% fonctionnel (ne pas modifier) :**
* Tout le package `app/models/` (schémas Pydantic)
* Tout le package `app/repositories/` (simulation de la base de données)
* Tout le package `app/core/` (contient `config.py` et `exceptions.py` qui définit `AuthNException` et `AuthZException`)
* `app/graphql/resolvers/auth_resolver.py` (contient le helper `get_current_user_from_info`)
* `app/graphql/resolvers/noop.py`
* `app/graphql/types/` (les types `Note` et `User` sont définis)
* `app/graphql/queries.py` et `mutations.py` (les points d'entrée sont définis)
* `app/graphql/extensions.py` (contient la `SecurityErrorExtension` prête à l'emploi pour formater les erreurs)
**Ce sur quoi vous allez travailler (fichiers à modifier) :**
1. `app/graphql/context.py` : pour implémenter l'**Authentification**.
2. `app/graphql/resolvers/note_resolver.py` et `user_resolver.py` : pour implémenter l'**Autorisation (A01)**.
3. `app/main.py` : pour **activer les extensions** et la configuration de production (**A02, A06**).
-----
## Installation et Lancement
* Après avoir récupéré le projet, installez les dépendances via Poetry :
```bash
poetry install
```
* A ce stade, si vos imports ne sont pas reconnus dans l'IDE PyCharm à l'ouverture d'un fichier (par exemple `app/main.py`), marquer le répertoire `src` comme `Sources Root` (Clic droit sur répertoire `src`, puis `Mark Directory as ... > Sources Root`) ou redémarrer l'IDE (File \> Invalidate Caches... \> Just restart à gauche).
* Pour démarrer le serveur GraphQL, utilisez la commande suivante à la racine du projet :
```bash
# En étant à la racine du projet
cd src
uvicorn app.main:app --host 0.0.0.0 --port 8002
```
* Ouvrez un client GraphQL comme Apollo Studio Sandbox :
[https://studio.apollographql.com/sandbox/explorer/](https://studio.apollographql.com/sandbox/explorer/)
* Renseignez dans l'outil l'URL de votre serveur local : `http://127.0.0.1:8002/graphql`
À noter que vous pouvez aussi utiliser l'interface incluse GraphiQL en vous rendant à l'adresse : `http://127.0.0.1:8002/graphql`
-----
## Partie 1 : Authentification (AuthN)
L'API est actuellement totalement publique. Nous allons simuler une authentification par "Bearer Token".
### Tâche 1.1 : Injecter l'utilisateur dans le contexte
Dans `app/graphql/context.py`, modifiez la fonction `get_context` (actuellement basique) :
1. Lisez l'en-tête `Authorization` de la requête (`request.headers.get("Authorization")`).
2. Simulez une validation de token (utilisez `fake_db_repo` pour récupérer les utilisateurs) :
* Si le token est `Bearer admin-token`, récupérez l'utilisateur `admin` (ID 3).
* Si le token est `Bearer alice-token`, récupérez l'utilisateur `alice` (ID 1).
* Si le token est `Bearer bob-token`, récupérez l'utilisateur `bob` (ID 2).
* Sinon, l'utilisateur est `None`.
3. Injectez l'utilisateur trouvé dans le contexte sous la clé `current_user`.
Voici le flux logique que vous devez implémenter dans `get_context` :
```mermaid
graph TD
A[début get_context] --> B{lire header 'authorization'};
B --> C{header présent et correct?};
C -- non --> D[current_user = none];
C -- oui --> E{token?};
E -- "bearer admin-token" --> F["user = admin (id 3)"];
E -- "bearer alice-token" --> G["user = alice (id 1)"];
E -- "bearer bob-token" --> H["user = bob (id 2)"];
E -- autre --> D;
F --> I[current_user = user];
G --> I;
H --> I;
I --> J[retourner contexte];
D --> J;
```
-----
## Partie 2 : A01 - Broken Access Control (Autorisation)
Maintenant que nous savons *qui* est l'utilisateur (grâce au contexte), nous devons vérifier *ce qu'il a le droit de faire*.
### Tâche 2.1 : activer l'extension de gestion d'erreurs
Pour que les exceptions `AuthNException` et `AuthZException` que vous allez lever soient correctement formatées pour le client, vous devez activer l'extension fournie.
1. Dans `app/main.py`, importez `SecurityErrorExtension` depuis `app.graphql.extensions`.
2. Lors de la création de `strawberry.Schema`, passez-la dans la liste `extensions`.
<!-- end list -->
```python
# app/main.py
# ... imports
from app.graphql.extensions import SecurityErrorExtension
# ...
# Crée le schéma GraphQL
extensions=[
SecurityErrorExtension, # Notre extension de gestion d'erreurs
]
if IS_PRODUCTION:
# ... (sera fait en Partie 3)
pass
schema = strawberry.Schema(
query=Query,
mutation=Mutation,
extensions=extensions,
)
# ...
```
### Tâche 2.2 : sécuriser les Resolvers "Root"
Voici le flux de décision typique pour un resolver "root" sécurisé (comme `get_note_by_id`) :
```mermaid
graph TD
A["début resolver (ex: get_note_by_id)"] --> B["appel get_current_user_from_info(info)"];
B --> C{utilisateur authentifié?};
C -- non --> D[lever AuthNException];
C -- "oui (utilisateur = 'user')" --> E["récupérer la ressource (ex: note)"];
E --> F{"contrôle d'accès (user.is_admin ou user.is_owner)?"};
F -- non --> G[lever AuthZException];
F -- oui --> H["retourner la ressource (note)"];
D --> I["fin (erreur)"];
G --> I;
H --> J["fin (succès)"];
```
Modifiez les resolvers dans `app/graphql/resolvers/note_resolver.py` et `user_resolver.py`. Ils sont actuellement **totalement insécurisés**.
Pour chaque resolver sensible :
1. Récupérez l'utilisateur connecté en utilisant le helper `get_current_user_from_info(info)` (fourni dans `auth_resolver.py`). S'il n'est pas connecté, ce helper lèvera automatiquement une `AuthNException`.
2. Implémentez la logique de contrôle d'accès. Si l'accès est refusé, levez une `AuthZException` (fournie dans `core.exceptions`).
**À sécuriser :**
* **`get_user_by_id`** (`user_resolver.py`) :
* *Règle :* un utilisateur ne peut voir qu'un profil s'il est **Admin** OU s'il demande **son propre profil** (`is_self`).
* **`get_note_by_id`** (`note_resolver.py`) :
* *Règle :* un utilisateur ne peut voir une note que s'il est **Admin** OU s'il est le **propriétaire** de la note (`is_owner`).
* **`search_notes`** (`note_resolver.py`) :
* *Problème :* la recherche retourne toutes les notes (ex: `contentContains: "secret"`), y compris celles des autres.
* *Correctif :* après avoir récupéré la liste de *toutes* les notes correspondantes, **filtrez cette liste en Python** pour ne retourner que les notes que l'utilisateur actuel a le droit de voir (Admin ou propriétaire).
### Tâche 2.3 : sécuriser les resolvers imbriqués
Les failles se cachent souvent dans les champs imbriqués.
Voici le flux de décision pour un resolver imbriqué (comme `User.notes`) qui **ne doit pas lever d'erreur** pour ne pas casser la requête parente :
```mermaid
graph TD
A["resolver parent (ex: user(id: &quot;1&quot;)) retourne un 'user'"] --> B[graphql engine demande le champ 'notes'];
B --> C["appel resolver 'get_notes_for_user_resolver(root, info)'"];
C --> D["récupérer 'current_user' (de info.context)"];
D --> E["récupérer 'parent_user' (de root)"];
E --> F{"check droits (current_user.is_admin ou current_user == parent_user)?"};
F -- non --> G["retourner liste vide []"];
F -- oui --> H[récupérer et retourner les notes du parent_user];
G --> I[fin];
H --> I[fin];
```
* **`User.notes`** (Resolver `get_notes_for_user_resolver` dans `note_resolver.py`) :
* *Problème :* ce resolver est lié au champ `notes` du type `User`. Il doit être sécurisé.
* *Règle :* l'utilisateur *connecté* (`info.context`) ne peut voir les notes de l'utilisateur *parent* (`root`) que s'il est **Admin** ou **`is_self`**.
* *Important :* s'il n'a pas le droit, **retournez une liste vide `[]`** (ne levez pas d'exception pour ne pas faire échouer la requête parente).
* **`Note.owner`** (Resolver `get_note_owner_resolver` dans `user_resolver.py`) :
* *Problème :* ce resolver retourne un type `User`, qui contient un champ sensible (`email`).
* *Correctif (Défense en profondeur) :* modifiez ce resolver pour qu'il retourne un `UserModel`, mais **en rédigeant (masquant) les champs sensibles** que l'utilisateur n'est pas censé voir dans ce contexte (ex: `email="[Redacted]"`).
-----
## Partie 3 : A02 & A06 - Configuration et design
Protéger l'API contre les abus et la découverte en modifiant `app/main.py`.
Voici le flux logique de configuration que vous allez implémenter dans `main.py` :
```mermaid
graph TD
A[démarrage app/main.py] --> B["extensions_base = [securityerrorextension]"];
B --> C{is_production est true?};
C -- non --> D[graphiql = true];
D --> H["créer schema(extensions=extensions_base)"];
C -- oui --> E[graphiql = false];
E --> F["extensions_prod = [noschemainrospection, querydepthlimiter, ...]"];
F --> G[extensions_totales = extensions_base + extensions_prod];
G --> I["créer schema(extensions=extensions_totales)"];
```
### Tâche 3.1 : A02 - Désactiver GraphiQL en production
* *Problème :* l'interface interactive GraphiQL ne doit jamais être exposée en production.
* *Correctif :* dans `main.py`, utilisez la variable `settings.ENVIRONMENT`. Passez l'argument `graphiql=not IS_PRODUCTION` au `GraphQLRouter`.
### Tâche 3.2 : A06 - Bloquer l'introspection en production
* *Problème :* l'introspection permet à un attaquant de télécharger votre schéma complet.
* *Correctif :* dans `main.py` :
1. Importez `NoSchemaIntrospectionCustomRule` depuis `graphql.validation`.
2. Importez `AddValidationRules` de `strawberry.extensions`.
3. Dans le bloc `if IS_PRODUCTION:`, créez une liste `production_extensions` et ajoutez-y `AddValidationRules([NoSchemaIntrospectionCustomRule])`.
4. Ajoutez cette liste aux extensions principales (`extensions.extend(production_extensions)`).
### Tâche 3.3 : A06 - Prévention DoS (limitation de requêtes)
* *Problème :* des requêtes GraphQL trop complexes (trop profondes, trop d'alias) peuvent saturer votre serveur (DoS).
* *Correctif :* dans `main.py`, ajoutez les extensions suivantes à votre `production_extensions` (Tâche 3.2) :
* `QueryDepthLimiter(max_depth=7)`
* `MaxAliasesLimiter(max_alias_count=15)`
* `MaxTokensLimiter(max_token_count=1000)`
*(N'oubliez pas de les importer depuis `strawberry.extensions`)*
-----
## Partie 4 : A05 - Injection (discussion)
* *Contexte :* le resolver `search_notes` prend une entrée utilisateur (`contentContains`).
* *Question :* notre `fake_db_repo.py` est sécurisé car il effectue une recherche en Python (`term in note.content.lower()`).
* *Discussion :* si `fake_db_repo.py` utilisait une base de données SQL, quelle serait la faille de sécurité ici ? Comment la fonction `search_notes_by_content` devrait-elle être écrite pour être sécurisée ? (Mots-clés : ORM, requêtes paramétrées).
-----
## Partie 5 : vérifier votre travail
Une fois que vous avez implémenté toutes les corrections, utilisez les requêtes ci-dessous pour valider que vos mesures de sécurité sont efficaces.
### Préparation des Tests
Avant tout, dans votre `config.py`, assurez-vous de mettre :
```python
ENVIRONMENT="production"
```
Puis de redémarrer votre serveur.
Par ailleurs, pour tester l'authentification, vous devez passer le "token" dans les en-têtes (Headers) de votre client GraphQL (Apollo Sandbox, GraphiQL, etc.).
**En-tête (Header) à ajouter :**
* **Clé :** `Authorization`
* **Valeur :** `Bearer <votre-token>` (ex: `Bearer alice-token` ou `Bearer admin-token`)
-----
### Test 1 : Authentification (AuthN)
**Objectif :** vérifier qu'une ressource protégée nécessite une authentification.
* **Token :** Aucun (N'ajoutez pas l'en-tête `Authorization`).
* **Requête :**
```graphql
query GetMyNotes {
myNotes {
id
content
}
}
```
* **Résultat attendu :** une erreur. Le message doit indiquer que l'authentification est requise (`AuthNException`).
-----
### Test 2 : Accès autorisé (Cas nominal)
**Objectif :** vérifier qu'un utilisateur authentifié peut accéder à ses propres ressources.
* **Token :** `Bearer alice-token`
* **Requête :**
```graphql
query GetMyData {
myNotes {
id
content
}
user(id: "1") { # ID d'Alice
id
username
email # Doit être visible car c'est son propre profil
}
}
```
* **Résultat attendu :** succès. La requête retourne les notes d'Alice et son profil (y compris l'email).
-----
### Test 3 : A01 - Contrôle d'accès (Utilisateur)
**Objectif :** vérifier qu'un utilisateur ne peut pas voir le profil détaillé d'un autre utilisateur.
* **Token :** `Bearer alice-token`
* **Requête :**
```graphql
query GetBobProfile {
user(id: "2") { # ID de Bob
id
username
email
}
}
```
* **Résultat attendu :** une erreur. Le message doit indiquer un refus d'accès (`AuthZException`, "Access denied").
-----
### Test 4 : A01 - Contrôle d'accès (Ressource)
**Objectif :** vérifier qu'un utilisateur ne peut pas accéder à la note privée d'un autre.
* **Token :** `Bearer alice-token`
* **Requête :**
```graphql
query GetBobNote {
note(id: "102") { # ID de la note de Bob
id
content
}
}
```
* **Résultat attendu :** une erreur. Le message doit indiquer un refus d'accès (`AuthZException`, "Access denied").
-----
### Test 5 : A01 - Accès Administrateur
**Objectif :** vérifier que l'administrateur *peut* outrepasser les règles précédentes.
* **Token :** `Bearer admin-token`
* **Requête :**
```graphql
query AdminViewAll {
user(id: "1") { # Profil d'Alice
id
username
email
}
note(id: "102") { # Note de Bob
id
content
}
}
```
* **Résultat attendu :** succès. L'admin voit toutes les données.
-----
### Test 6 : A01 - Filtrage de Recherche
**Objectif :** vérifier que le resolver `searchNotes` filtre les résultats. La note 102 "Secret de Bob" ne doit être visible que par Bob ou un admin.
* **Token :** `Bearer alice-token`
* **Requête :**
```graphql
query SearchForSecret {
searchNotes(contentContains: "Secret") {
id
content
}
}
```
* **Résultat attendu :** succès. `searchNotes` doit retourner une **liste vide `[]`**. Alice ne doit pas voir la note de Bob.
*Relancez ce test avec le token `Bearer admin-token` (doit retourner la note 102) et `Bearer bob-token` (doit retourner la note 102).*
-----
### Test 7 : A01 - Sécurité des champs imbriqués (`Note.owner`)
**Objectif :** vérifier que le champ `email` est masqué (`[Redacted]`) lorsqu'on accède à `Note.owner`, comme implémenté dans `get_note_owner_resolver`.
* **Token :** `Bearer alice-token`
* **Requête :**
```graphql
query NoteOwnerRedacted {
myNotes {
id
content
owner {
id
username
email # Doit être masqué
}
}
}
```
* **Résultat attendu :** succès. Le champ `email` dans `owner` doit avoir la valeur `"[Redacted]"`.
-----
### Test 8 : A01 - Sécurité des champs imbriqués (`User.notes`)
**Objectif :** vérifier que le champ `User.notes` est filtré (retourne `[]`) si un utilisateur non-admin tente d'y accéder (ce test ne fonctionnera que si vous avez "oublié" de sécuriser Tâche 2.2 `get_user_by_id`, mais que vous avez sécurisé Tâche 2.3 `get_notes_for_user_resolver`).
Un test plus pertinent est de vérifier que l'admin *voit* les notes imbriquées.
* **Token :** `Bearer admin-token`
* **Requête :**
```graphql
query AdminViewUserNotes {
user(id: "1") { # Profil d'Alice
id
username
notes { # Doit montrer les notes d'Alice
id
content
}
}
}
```
* **Résultat attendu :** succès. L'admin voit le profil d'Alice et ses notes associées.
-----
### Test 9 : A02 & A06 - Configuration Production
**Objectif :** vérifier que l'introspection et GraphiQL sont désactivés en production.
1. **Test A (GraphiQL) :** essayez d'accéder à `http://127.0.0.1:8002/graphql` dans votre navigateur.
* **Résultat attendu :** vous devriez voir `{"detail":"Not Found"}`. L'interface GraphiQL ne doit pas se charger.
2. **Test B (Introspection) :** allez dans Apollo Sandbox (qui fonctionne toujours) et exécutez la requête d'introspection.
* **Requête :**
```graphql
query IntrospectionQuery {
__schema {
types {
name
}
}
}
```
* **Résultat attendu :** une erreur claire indiquant que l'introspection est désactivée (ex: "GraphQL introspection is not allowed...").
-----
### Test 10 : A06 - Limite de profondeur (DoS)
**Objectif :** vérifier que le `QueryDepthLimiter` (configuré à `max_depth=7` en production) bloque les requêtes excessivement imbriquées.
* **Token :** `Bearer admin-token` (pour éviter toute erreur d'autorisation qui masquerait le test de profondeur).
* **Requête :** (Cette requête a 10 niveaux d'imbrication)
```graphql
query DeepQueryTest {
user(id: "1") { # Niveau 1
notes { # Niveau 2
owner { # Niveau 3
notes { # Niveau 4
owner { # Niveau 5
notes { # Niveau 6
owner { # Niveau 7
notes { # Niveau 8
owner { # Niveau 9
username # Niveau 10
}
}
}
}
}
}
}
}
}
}
```
* **Résultat attendu :** Une erreur. Le message doit indiquer que la requête est trop profonde ("Query is too deep" ou similaire), prouvant que l'extension `QueryDepthLimiter` fonctionne.

2315
poetry.lock generated Normal file

File diff suppressed because it is too large Load Diff

65
pyproject.toml Normal file
View File

@@ -0,0 +1,65 @@
[project]
name = "secure-graphql"
version = "0.1.0"
description = "TP Secure GraphQL"
authors = [
{name = "Your Name",email = "you@example.com"}
]
readme = "README.md"
classifiers = [
"Programming Language :: Python :: 3.13",
]
keywords = ["fastapi", "graphql", "web", "security", "owasp", "authn", "authz"]
exclude = [
{ path = "tests", format = "wheel" }
]
requires-python = ">=3.13"
[tool.poetry]
package-mode = false
[tool.poetry.dependencies]
python = "^3.13"
fastapi = "^0.116.1"
uvicorn = { version = "^0.35.0", extras = [ "standard" ] }
gunicorn = "^23.0.0"
pydantic = "^2.11.7"
python-dotenv = "^1.0.1"
pydantic-settings = "^2.10.1"
httpx = "^0.28.1"
langchain = "^0.3.27"
langchain-openai = "^0.3.35"
strawberry-graphql = "^0.281.0"
[tool.poetry.group.dev.dependencies]
pytest = "^8.4.1"
pytest-cov = "^6.2.1"
pytest-asyncio = "^1.1.0"
pytest-mock = "^3.14.1"
aiosqlite = "^0.21.0"
coverage = { version="*", extras=["toml"]}
[tool.pytest.ini_options]
asyncio_mode = "auto"
pythonpath = "src"
testpaths = "tests"
addopts = "-v -s"
[tool.black]
line-length = 120
[tool.pycln]
all = true
[tool.isort]
line_length = 120
multi_line_output = 3
include_trailing_comma = true
force_grid_wrap = 0
use_parentheses = true
ensure_newline_before_comments = true
[build-system]
requires = ["poetry-core>=2.0.0,<3.0.0"]
build-backend = "poetry.core.masonry.api"

0
src/app/__init__.py Normal file
View File

0
src/app/core/__init__.py Normal file
View File

18
src/app/core/config.py Normal file
View File

@@ -0,0 +1,18 @@
from pydantic_settings import BaseSettings, SettingsConfigDict
from pydantic import AnyHttpUrl
from typing import List
class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
case_sensitive=True
)
# Paramètre pour différencier dev/prod
ENVIRONMENT: str = "development" # ou "production"
# ... (autres settings comme CORS si besoin)
BACKEND_CORS_ORIGINS: List[AnyHttpUrl] = ["https://studio.apollographql.com"]
settings = Settings()

View File

@@ -0,0 +1,30 @@
class BaseAppException(Exception):
"""Exception de base pour l'application."""
pass
class BLLException(BaseAppException):
"""Exception de base pour les erreurs de la couche métier (BLL)."""
pass
class NotFoundBLLException(BLLException):
"""Levée lorsqu'une ressource n'est pas trouvée."""
def __init__(self, resource_name: str, resource_id: int | str):
message = f"{resource_name} avec l'ID '{resource_id}' non trouvé."
super().__init__(message)
class AuthNException(BLLException):
"""
Authentication Exception.
Levée lorsqu'un utilisateur doit être connecté mais ne l'est pas.
"""
def __init__(self, message: str = "Authentication required."):
super().__init__(message)
class AuthZException(BLLException):
"""
Authorization Exception (OWASP A01).
Levée lorsqu'un utilisateur est connecté mais n'a pas les
permissions pour effectuer une action.
"""
def __init__(self, message: str = "Access denied."):
super().__init__(message)

View File

View File

@@ -0,0 +1,26 @@
from fastapi import Request
from typing import Any, Dict, Optional
from app.repositories import fake_db_repo
from app.models.usermodel import UserModel
async def get_context(request: Request) -> Dict[str, Any]:
"""
Crée le contexte pour chaque requête GraphQL.
C'est ici que nous gérons l'authentification (AuthN).
Nous lisons l'en-tête HTTP Authorization, validons le "token" (simulé ici),
et injectons 'current_user' dans le contexte.
"""
current_user: Optional[UserModel] = None
# TODO 1.1: Implémenter l'authentification (AuthN)
# 1. Lire l'en-tête `Authorization` (request.headers.get(...))
# 2. Simuler la validation du token (admin, alice, bob)
# 3. Récupérer l'utilisateur depuis `fake_db_repo`
# 4. Injecter l'utilisateur sous la clé "current_user"
return {
"request": request,
"current_user": current_user # Injecté pour tous les resolvers
}

View File

@@ -0,0 +1,27 @@
from strawberry.extensions import Extension
from app.core.exceptions import BLLException, AuthNException, AuthZException
class SecurityErrorExtension(Extension):
"""
FIX: Nous adaptons l'extension pour formater joliment
nos nouvelles erreurs de sécurité.
"""
def on_request_end(self):
for error in self.execution_context.errors:
original_error = error.original_error
# Gestion des erreurs de sécurité (AuthN/AuthZ)
if isinstance(original_error, (AuthNException, AuthZException)):
error.message = f"[Security Error] {original_error}"
if not error.extensions:
error.extensions = {}
error.extensions['code'] = original_error.__class__.__name__
# Gestion des erreurs métier (ex: NotFound)
elif isinstance(original_error, BLLException):
error.message = f"[Business Error] {original_error}"
if not error.extensions:
error.extensions = {}
error.extensions['code'] = original_error.__class__.__name__

View File

@@ -0,0 +1,9 @@
import strawberry
from app.graphql.resolvers.noop_helper import resolve_noop
@strawberry.type
class Mutation:
_noop: bool = strawberry.field(
resolver=resolve_noop,
description="Mutation factice."
)

View File

@@ -0,0 +1,36 @@
import strawberry
from app.graphql.types.user import User
from app.graphql.types.note import Note
from app.graphql.resolvers.user_resolver import get_user_by_id
from app.graphql.resolvers.note_resolver import (
get_note_by_id,
get_my_notes,
search_notes
)
@strawberry.type
class Query:
"""
Point d'entrée pour toutes les requêtes GraphQL.
"""
user: User = strawberry.field(
resolver=get_user_by_id,
description="Récupère un utilisateur par ID (protégé contre A01)."
)
note: Note = strawberry.field(
resolver=get_note_by_id,
description="Récupère une note par ID (protégé contre A01)."
)
myNotes: list[Note] = strawberry.field(
resolver=get_my_notes,
description="Récupère les notes de l'utilisateur connecté."
)
searchNotes: list[Note] = strawberry.field(
resolver=search_notes,
description="Recherche dans les notes (protégé contre A01 & A05)."
)

View File

View File

@@ -0,0 +1,13 @@
from strawberry import Info
from app.core.exceptions import AuthNException
from app.models.usermodel import UserModel
def get_current_user_from_info(info: Info) -> UserModel:
"""
Helper pour récupérer l'utilisateur du contexte.
Lève une erreur si l'utilisateur n'est pas authentifié.
"""
current_user = info.context.get("current_user")
if not current_user:
raise AuthNException("Authentication required to access this resource.")
return current_user

View File

@@ -0,0 +1,2 @@
def resolve_noop() -> bool:
return True

View File

@@ -0,0 +1,59 @@
import strawberry
from strawberry import Info
from typing import List
from app.core.exceptions import AuthZException, NotFoundBLLException
# Par simplification, nos resolvers consomment directement la couche DAL, mais dans une vraie application, il faudrait passer par une couche BLL (Services).
from app.repositories import fake_db_repo
from app.graphql.resolvers.auth_helper import get_current_user_from_info
from app.models.usermodel import UserModel as UserModel
from app.models.notemodel import NoteModel as NoteModel
async def get_note_by_id(info: Info, id: strawberry.ID) -> NoteModel:
""" Resolver pour 'note(id: ID!)' """
# TODO 2.2: Récupérer le current_user (get_current_user_from_info)
note = await fake_db_repo.get_note_by_id(int(id))
if not note:
raise NotFoundBLLException(resource_name="Note", resource_id=id)
# TODO 2.2: Implémenter le contrôle d'accès (A01)
# Règle : admin OU propriétaire (is_owner)
# Lever AuthZException si l'accès est refusé
return note
async def get_my_notes(info: Info) -> List[NoteModel]:
""" Resolver pour 'myNotes' """
current_user = get_current_user_from_info(info)
notes = await fake_db_repo.get_notes_for_user(current_user.id)
return notes
async def search_notes(info: Info, contentContains: str) -> List[NoteModel]:
""" Resolver pour 'searchNotes' """
# ... (partie A05) ...
notes = await fake_db_repo.search_notes_by_content(contentContains)
# TODO 2.2: Récupérer le current_user (get_current_user_from_info)
# TODO 2.2: Filtrer la liste 'notes' (A01)
# Problème : 'notes' contient les résultats de TOUS les utilisateurs.
# Règle : ne retourner que les notes où l'utilisateur est admin OU propriétaire.
return notes # INSECURE: retourne tout
async def get_notes_for_user_resolver(root: UserModel, info: Info) -> List[NoteModel]:
""" Resolver pour le champ imbriqué 'User.notes' """
# 'root' est l'objet 'User' parent
user_id_to_view = root.id
# TODO 2.3: Récupérer le current_user (get_current_user_from_info)
# TODO 2.3: Implémenter le contrôle d'accès (A01)
# Règle : admin OU 'is_self' (l'utilisateur connecté regarde son propre profil)
# Si accès refusé, retourner une liste vide [].
notes = await fake_db_repo.get_notes_for_user(user_id_to_view)
return notes

View File

@@ -0,0 +1,37 @@
import strawberry
from strawberry import Info
from app.core.exceptions import AuthZException, NotFoundBLLException
# Par simplification, nos resolvers consomment directement la couche DAL, mais dans une vraie application, il faudrait passer par une couche BLL (Services).
from app.repositories import fake_db_repo
from app.graphql.resolvers.auth_helper import get_current_user_from_info
from app.models.notemodel import NoteModel
from app.models.usermodel import UserModel
async def get_user_by_id(info: Info, id: strawberry.ID) -> UserModel:
""" Resolver pour le champ 'user(id: ID!)'. """
# TODO 2.2: Récupérer le current_user (get_current_user_from_info)
user_to_view = await fake_db_repo.get_user_by_id(int(id))
if not user_to_view:
raise NotFoundBLLException(resource_name="User", resource_id=id)
# TODO 2.2: Implémenter le contrôle d'accès (A01)
# Règle : admin OU 'is_self'
# Lever AuthZException si l'accès est refusé
return user_to_view
async def get_note_owner_resolver(root: NoteModel, info: Info) -> UserModel:
""" Resolver pour le champ imbriqué 'Note.owner'. """
# 'root' est l'objet 'Note' parent
owner = await fake_db_repo.get_user_by_id(root.owner_id)
if not owner:
raise NotFoundBLLException(resource_name="User", resource_id=root.owner_id)
# TODO 2.3: (Défense en profondeur)
# Problème : 'owner' contient des champs sensibles (email).
# Retourner un UserModel, mais avec l'email masqué (ex: "[Redacted]").
return owner # INSECURE: retourne l'objet complet avec l'email

View File

View File

@@ -0,0 +1,18 @@
from typing import TYPE_CHECKING, Annotated
import strawberry
from app.graphql.resolvers.user_resolver import get_note_owner_resolver
if TYPE_CHECKING: # Pour éviter les importations circulaires
from app.graphql.types.user import User
@strawberry.type
class Note:
id: strawberry.ID
content: str
"""
Ce champ est "safe" car il pointe vers un type 'User'
dont les champs sensibles ('email') sont déjà protégés.
"""
owner: Annotated["User", strawberry.lazy(".user")] = strawberry.field(resolver=get_note_owner_resolver)

View File

@@ -0,0 +1,24 @@
import strawberry
from typing import TYPE_CHECKING, Annotated, List
from app.graphql.resolvers.note_resolver import get_notes_for_user_resolver
if TYPE_CHECKING: # Pour éviter les importations circulaires
from app.graphql.types.note import Note
@strawberry.type
class User:
id: strawberry.ID
username: str
"""
FIX A01: Ce champ est sensible. L'accès est protégé
par le resolver 'get_user_by_id' dans queries.py.
"""
email: str
roles: List[str]
"""
FIX A01: Ce champ imbriqué a aussi besoin d'une protection.
Nous lui lions un resolver spécifique.
"""
notes: List[Annotated["Note", strawberry.lazy(".note")]] = strawberry.field(resolver=get_notes_for_user_resolver)

70
src/app/main.py Normal file
View File

@@ -0,0 +1,70 @@
import strawberry
import uvicorn
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from strawberry.fastapi import GraphQLRouter
from app.core.config import settings
from app.graphql.context import get_context
from app.graphql.mutations import Mutation
from app.graphql.queries import Query
# Crée l'application FastAPI
app = FastAPI(
title="Secure GraphQL API",
description="TP - Sécurisation GraphQL avec FastAPI",
version="1.0.0"
)
# Détermine si on est en production
IS_PRODUCTION = settings.ENVIRONMENT == "production"
# Configuration CORS (copiée de l'exemple)
app.add_middleware(
CORSMiddleware,
allow_origins=[str(origin).rstrip('/') for origin in settings.BACKEND_CORS_ORIGINS] if settings.BACKEND_CORS_ORIGINS else ["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Crée le schéma GraphQL
extensions=[
# TODO : ajouter ici notre extension personnalisé de formatage des exceptions de sécurité (authz et authn)
]
if IS_PRODUCTION:
production_extensions = [
# TODO : ajouter ici les extensions de sécurité à activer en production
]
extensions.extend(production_extensions)
schema = strawberry.Schema(
query=Query,
mutation=Mutation,
extensions=extensions,
)
# Crée le routeur GraphQL
graphql_app = GraphQLRouter(
schema,
context_getter=get_context,
# TODO : désactiver GraphiQL si on est en production
)
# Ajoute le routeur à l'application
app.include_router(graphql_app, prefix="/graphql")
@app.get("/", tags=["Root"])
def read_root():
msg = "API Sécurisée. "
if IS_PRODUCTION:
msg += "Mode Production (Introspection désactivée)."
else:
msg += "Mode Développement (Rendez-vous sur /graphql)."
return {"message": msg}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8002)

View File

View File

@@ -0,0 +1,6 @@
from pydantic import BaseModel
class NoteModel(BaseModel):
id: int
content: str
owner_id: int # Clé étrangère vers User.id

View File

@@ -0,0 +1,8 @@
from pydantic import BaseModel
from typing import List
class UserModel(BaseModel):
id: int
username: str
email: str # Champ sensible
roles: List[str]

View File

View File

@@ -0,0 +1,41 @@
from typing import List, Optional
from app.models.usermodel import UserModel
from app.models.notemodel import NoteModel
# --- Base de données "in-memory" ---
USERS_DATA = {
1: UserModel(id=1, username="alice", email="alice@example.com", roles=["user"]),
2: UserModel(id=2, username="bob", email="bob@example.com", roles=["user"]),
3: UserModel(id=3, username="admin", email="admin@example.com", roles=["user", "admin"]),
}
NOTES_DATA = {
101: NoteModel(id=101, content="Note privée d'Alice", owner_id=1),
102: NoteModel(id=102, content="Secret de Bob", owner_id=2),
103: NoteModel(id=103, content="Note d'admin", owner_id=3),
}
# --- Fonctions de la couche d'accès aux données (DAL) ---
async def get_user_by_id(user_id: int) -> Optional[UserModel]:
return USERS_DATA.get(user_id)
async def get_note_by_id(note_id: int) -> Optional[NoteModel]:
return NOTES_DATA.get(note_id)
async def get_notes_for_user(user_id: int) -> List[NoteModel]:
return [note for note in NOTES_DATA.values() if note.owner_id == user_id]
async def search_notes_by_content(search_term: str) -> List[NoteModel]:
"""
FIX A05 (Injection):
La logique ici est sécurisée car elle n'utilise pas de concaténation SQL.
Dans une vraie application, cela utiliserait un ORM ou des
requêtes paramétrées, qui sont la défense principale.
"""
term = search_term.lower()
return [
note for note in NOTES_DATA.values()
if term in note.content.lower()
]