This commit is contained in:
Johan
2025-12-16 16:54:12 +01:00
commit 26016e93ba
59 changed files with 3632 additions and 0 deletions

0
tests/__init__.py Normal file
View File

View File

@@ -0,0 +1,69 @@
import pytest
from httpx import AsyncClient
# Marqueur pour indiquer à pytest que ce sont des tests asynchrones
pytestmark = pytest.mark.asyncio
# Données de test réutilisables
@pytest.fixture
async def test_data(db_session):
"""Fixture pour insérer des données de test initiales."""
from app.models import Genre, Participant
# 1. Créer les objets
genre = Genre(label="Science-Fiction")
director = Participant(first_name="Denis", last_name="Villeneuve")
db_session.add_all([genre, director])
await db_session.commit()
# 2. Rafraîchir les objets pour obtenir les ID générés par la BDD
await db_session.refresh(genre)
await db_session.refresh(director)
# 3. Renvoyer uniquement les ID, pas les objets entiers
return {"genre_id": genre.id, "director_id": director.id}
async def test_create_movie_success(test_client: AsyncClient, test_data):
"""Vérifie la création réussie d'un film via l'API."""
response = await test_client.post(
"/api/v1/movies/",
json={
"title": "Dune",
"year": 2021,
"duration": 155,
"synopsis": "A mythic and emotionally charged hero's journey.",
"genre_id": test_data["genre_id"],
"director_id": test_data["director_id"],
"actors_ids": []
},
)
assert response.status_code == 201
data = response.json()
assert data["title"] == "Dune"
assert "id" in data
assert data["genre"]["label"] == "Science-Fiction"
async def test_create_movie_validation_error(test_client: AsyncClient, test_data):
"""Vérifie que l'API retourne une erreur 400 pour des données invalides."""
response = await test_client.post(
"/api/v1/movies/",
json={
"title": "Future Movie",
"year": 1800, # Année invalide
"genre_id": test_data["genre_id"],
"director_id": test_data["director_id"],
},
)
assert response.status_code == 400
assert "L'année du film doit être comprise entre" in response.json()["detail"]
async def test_read_movie_not_found(test_client: AsyncClient):
"""Vérifie que l'API retourne une erreur 404 pour un film inexistant."""
response = await test_client.get("/api/v1/movies/999")
assert response.status_code == 404
assert "Film avec l'ID '999' non trouvé." in response.json()["detail"]

109
tests/conftest.py Normal file
View File

@@ -0,0 +1,109 @@
import pytest
import pytest_asyncio
from typing import AsyncGenerator
import asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from httpx import AsyncClient, ASGITransport
from app.main import app
from app.api.deps import get_db
from app.models import Base, Genre, Participant, Member
# URL pour une base de données SQLite en mémoire
TEST_DATABASE_URL = "sqlite+aiosqlite:///file:memdb_tp_filmotheque?mode=memory&cache=shared"
# Créer un moteur de BDD de test
engine = create_async_engine(TEST_DATABASE_URL, connect_args={"check_same_thread": False})
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine, class_=AsyncSession)
@pytest.fixture(scope="session")
def event_loop():
"""Crée une instance de la boucle d'événements pour toute la session de test."""
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
@pytest_asyncio.fixture(scope="function")
async def db_session() -> AsyncGenerator[AsyncSession, None]:
"""
Fixture pour fournir une session de BDD de test isolée pour chaque test.
Recrée les tables à chaque fois pour garantir un état propre.
"""
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
async with TestingSessionLocal() as session:
try:
yield session
finally:
await session.rollback()
await session.close()
@pytest.fixture(scope="function")
def override_get_db(db_session: AsyncSession):
"""Fixture pour surcharger la dépendance get_db de l'application."""
async def _override_get_db():
yield db_session
return _override_get_db
@pytest_asyncio.fixture(scope="function")
async def test_client(override_get_db) -> AsyncGenerator[AsyncClient, None]:
"""Fixture pour le client HTTP de FastAPI."""
app.dependency_overrides[get_db] = override_get_db
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
yield client
app.dependency_overrides.clear()
@pytest_asyncio.fixture(scope="function")
async def test_data(db_session: AsyncSession):
"""
Fixture pour insérer des données de test initiales.
"""
# 1. Créer les objets
genre_sf = Genre(label="Science-Fiction")
genre_action = Genre(label="Action")
director_nolan = Participant(first_name="Christopher", last_name="Nolan")
actor_leo = Participant(first_name="Leonardo", last_name="DiCaprio")
member_user = Member(
first_name="Test",
last_name="User",
login="testuser",
password="password",
is_admin=False
)
db_session.add_all([genre_sf, genre_action, director_nolan, actor_leo, member_user])
await db_session.commit()
# Rafraîchir les objets après le commit pour charger leurs ID
await db_session.refresh(genre_sf)
await db_session.refresh(genre_action)
await db_session.refresh(director_nolan)
await db_session.refresh(actor_leo)
await db_session.refresh(member_user)
# --- CORRECTION FINALE ---
# Ne pas retourner les objets SQLAlchemy eux-mêmes, mais seulement leurs ID
# et les valeurs simples.
return {
"genre_sf_id": genre_sf.id,
"genre_action_id": genre_action.id,
"director_nolan_id": director_nolan.id,
"director_nolan_lastname": director_nolan.last_name,
"actor_leo_id": actor_leo.id,
"actor_leo_lastname": actor_leo.last_name,
"member_user_id": member_user.id,
"member_user_login": member_user.login
}

View File

@@ -0,0 +1,60 @@
import pytest
from pydantic import ValidationError
try:
from app.schemas.person import PersonBase, PersonRead
from app.schemas.participant import ParticipantCreate, ParticipantRead, ParticipantUpdate
from app.schemas.opinion import OpinionBase, OpinionCreate, OpinionRead
from app.schemas.member import MemberRead
SCHEMAS_LOADED = True
except ImportError as e:
print(f"Échec de l'import des schémas : {e}")
SCHEMAS_LOADED = False
@pytest.mark.skipif(not SCHEMAS_LOADED, reason="Schémas (Person, Participant, Opinion) non trouvés ou import échoué")
def test_person_schemas():
"""Teste les schémas Person (Base et Read) - TODO Étape 2."""
person_data = {"first_name": "John", "last_name": "Doe"}
base = PersonBase(**person_data)
assert base.last_name == "Doe"
read = PersonRead(id=1, **person_data)
assert read.id == 1
@pytest.mark.skipif(not SCHEMAS_LOADED, reason="Schémas (Person, Participant, Opinion) non trouvés ou import échoué")
def test_participant_schemas():
"""Teste les schémas Participant (Create, Update, Read) - TODO Étape 2."""
participant_data = {"first_name": "Jane", "last_name": "Smith"}
create = ParticipantCreate(**participant_data)
assert create.last_name == "Smith"
update_data = {"first_name": "Janet"}
update = ParticipantUpdate(**update_data)
assert update.first_name == "Janet"
assert update.last_name is None
# Teste que des champs inconnus lèvent une erreur (extra="forbid")
with pytest.raises(ValidationError):
ParticipantUpdate(first_name="Test", unknown_field="error")
@pytest.mark.skipif(not SCHEMAS_LOADED, reason="Schémas (Person, Participant, Opinion) non trouvés ou import échoué")
def test_opinion_schemas():
"""Teste les schémas Opinion (Base, Create, Read) - TODO Étape 2."""
opinion_data = {"note": 5, "comment": "Excellent!"}
base = OpinionBase(**opinion_data)
assert base.note == 5
create_data = {"member_id": 1, **opinion_data}
create = OpinionCreate(**create_data)
assert create.member_id == 1
# Mock d'un membre pour le schéma de lecture
mock_member = MemberRead(id=1, login="testuser")
read_data = {"id": 10, "movie_id": 20, "member": mock_member, **opinion_data}
read = OpinionRead(**read_data)
assert read.id == 10
assert read.member.login == "testuser"

View File

@@ -0,0 +1,123 @@
import pytest
from sqlalchemy.ext.asyncio import AsyncSession
from app.repositories import movie as movie_repository
from app.repositories import opinion as opinion_repository
from app.repositories import genre as genre_repository
from app.schemas.movie import MovieCreate
from app.schemas.opinion import OpinionCreate
from app.models import Genre, Participant, Member, Movie, Opinion
# Marqueur pour indiquer à pytest que ce sont des tests asynchrones
pytestmark = pytest.mark.asyncio
@pytest.fixture
async def repo_test_data(db_session: AsyncSession):
"""Fixture pour insérer des données de test pour les tests de repository."""
# 1. Créer tous les objets ORM manuellement
genre = Genre(label="Science-Fiction")
director = Participant(first_name="Denis", last_name="Villeneuve")
member = Member(
first_name="Repo",
last_name="Tester",
login="repo_user",
password="pwd"
)
db_session.add_all([genre, director, member])
await db_session.flush()
# 2. Créer les objets dépendants (Movie, Opinion) manuellement
# Ajouter des valeurs pour les champs NOT NULL (duration et synopsis)
db_movie = Movie(
title="Dune",
year=2021,
duration=155,
synopsis="Un film sur le sable et les vers.",
genre_id=genre.id,
director_id=director.id
)
db_session.add(db_movie)
await db_session.flush() # Flusher pour obtenir l'ID du film
db_opinion = Opinion(
note=5,
comment="Génial",
member_id=member.id,
movie_id=db_movie.id
)
db_session.add(db_opinion)
# 3. Faire un SEUL commit à la fin pour tout sauvegarder
await db_session.commit()
# 4. Rafraîchir les objets pour être sûr qu'ils sont chargés pour les tests
await db_session.refresh(genre)
await db_session.refresh(director)
await db_session.refresh(member)
await db_session.refresh(db_movie)
await db_session.refresh(db_opinion)
# Rafraîchir aussi les relations du film
await db_session.refresh(db_movie, attribute_names=["genre", "director", "opinions"])
return {
"movie": db_movie,
"opinion": db_opinion,
"genre": genre,
"director": director,
"member": member
}
async def test_get_movies_repository(db_session: AsyncSession, repo_test_data):
"""Teste le TODO 'get_movies' dans movie_repository - Étape 3."""
# 1. Appeler la fonction à tester
movies = await movie_repository.get_movies(db_session, skip=0, limit=10)
# 2. Vérifier les résultats
assert isinstance(movies, list)
assert len(movies) == 1
assert movies[0].title == "Dune"
# Vérifier que les relations sont chargées (problème N+1)
assert movies[0].genre is not None
assert movies[0].genre.label == "Science-Fiction"
assert movies[0].director is not None
assert movies[0].director.last_name == "Villeneuve"
assert movies[0].opinions is not None
assert len(movies[0].opinions) == 1
assert movies[0].opinions[0].comment == "Génial"
async def test_get_delete_opinion_repository(db_session: AsyncSession, repo_test_data):
"""Teste les TODO 'get_opinion' et 'delete_opinion_by_id' - Étape 3."""
opinion_id = repo_test_data["opinion"].id
# 1. Tester get_opinion (TODO)
fetched_opinion = await opinion_repository.get_opinion(db_session, opinion_id)
assert fetched_opinion is not None
assert fetched_opinion.id == opinion_id
assert fetched_opinion.comment == "Génial"
# 2. Tester delete_opinion_by_id (TODO)
deleted_opinion = await opinion_repository.delete_opinion_by_id(db_session, opinion_id)
assert deleted_opinion is not None
assert deleted_opinion.id == opinion_id
# 3. Vérifier que l'avis a bien été supprimé
fetched_again = await opinion_repository.get_opinion(db_session, opinion_id)
assert fetched_again is None
async def test_get_genres_repository(db_session: AsyncSession, repo_test_data):
"""Teste 'get_genres' (déjà implémenté, mais bon à avoir)."""
genres = await genre_repository.get_genres(db_session)
assert isinstance(genres, list)
assert len(genres) >= 1
assert genres[0].label == "Science-Fiction"

View File

@@ -0,0 +1,98 @@
import pytest
from unittest.mock import AsyncMock, patch
from app.services import genre as genre_service
from app.services import participant as participant_service
from app.services import opinion as opinion_service
from app.schemas.participant import ParticipantUpdate
from app.schemas.opinion import OpinionCreate
from app.core.exceptions import NotFoundBLLException, ValidationBLLException
# Marqueur pour indiquer à pytest que ce sont des tests asynchrones
pytestmark = pytest.mark.asyncio
async def test_get_genres_service(mocker):
"""Teste le TODO 'get_genres' (service) - Étape 4."""
# 1. Arrange
# Simuler le repository pour qu'il retourne une liste
mock_repo = mocker.patch("app.repositories.genre.get_genres", new_callable=AsyncMock)
mock_repo.return_value = [{"id": 1, "label": "Action"}]
# 2. Act
result = await genre_service.get_genres(db=AsyncMock())
# 3. Assert
mock_repo.assert_called_once()
assert len(result) == 1
assert result[0]["label"] == "Action"
async def test_update_participant_not_found(mocker):
"""Teste que update_participant (service) lève NotFoundBLLException - Étape 4."""
# 1. Arrange
# Simuler le repository pour qu'il retourne None
mock_repo = mocker.patch("app.repositories.participant.update_participant", new_callable=AsyncMock)
mock_repo.return_value = None
update_data = ParticipantUpdate(first_name="Test")
# 2. Act & 3. Assert
with pytest.raises(NotFoundBLLException, match="Participant avec l'ID '999' non trouvé"):
await participant_service.update_participant(
db=AsyncMock(),
participant_id=999,
participant_data=update_data
)
async def test_create_opinion_service_validation(mocker):
"""Teste la validation (note) dans create_opinion (service) - Étape 4."""
# 1. Arrange
# Simuler le service de film (nécessaire pour la validation)
mocker.patch("app.services.movie.get_movie_by_id", new_callable=AsyncMock)
# Simuler le service de membre (maintenant aussi nécessaire)
mocker.patch("app.services.member.get_member_by_id", new_callable=AsyncMock)
# Données d'opinion avec une note invalide
opinion_data = OpinionCreate(note=10, comment="Trop haut!", member_id=1)
# 2. Act & 3. Assert
with pytest.raises(ValidationBLLException, match="La note doit être comprise entre 0 et 5"):
await opinion_service.create_opinion(
db=AsyncMock(),
movie_id=1,
opinion=opinion_data
)
async def test_create_opinion_service_movie_not_found(mocker):
"""Teste que create_opinion lève NotFound si le film n'existe pas - Étape 4."""
# 1. Arrange
# Simuler le service de film pour qu'il lève l'exception
mocker.patch(
"app.services.movie.get_movie_by_id",
new_callable=AsyncMock,
side_effect=NotFoundBLLException(resource_name="Film", resource_id=999)
)
opinion_data = OpinionCreate(note=5, comment="Valide", member_id=1)
# 2. Act & 3. Assert
with pytest.raises(NotFoundBLLException, match="Film avec l'ID '999' non trouvé"):
await opinion_service.create_opinion(
db=AsyncMock(),
movie_id=999,
opinion=opinion_data
)
async def test_delete_opinion_service_not_found(mocker):
"""Teste que delete_opinion (service) lève NotFound - Étape 4."""
# 1. Arrange
# Simuler le repository d'opinion pour qu'il retourne None
mock_repo = mocker.patch("app.repositories.opinion.get_opinion", new_callable=AsyncMock)
mock_repo.return_value = None
# 2. Act & 3. Assert
with pytest.raises(NotFoundBLLException, match="Avis avec l'ID '999' non trouvé"):
await opinion_service.delete_opinion(db=AsyncMock(), opinion_id=999)

View File

@@ -0,0 +1,112 @@
import pytest
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession
# CORRECTION : Importer le modèle Movie
from app.models import Movie
# Marqueur pour indiquer à pytest que ce sont des tests asynchrones
pytestmark = pytest.mark.asyncio
async def test_read_genres_api(test_client: AsyncClient, test_data):
"""Teste le TODO 'GET /genres/' (API) - Étape 5."""
response = await test_client.get("/api/v1/genres/")
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
assert len(data) >= 2 # "Science-Fiction" et "Action" de la fixture
assert data[0]["label"] == "Science-Fiction"
assert data[1]["label"] == "Action"
async def test_participants_api_workflow(test_client: AsyncClient, test_data):
"""
Teste le workflow complet pour les participants :
- POST /participants/ (TODO)
- GET /participants/ (TODO)
- PATCH /participants/{id} (TODO)
"""
# 1. Tester GET /participants/ (TODO) avec les données de la fixture
response_get_all = await test_client.get("/api/v1/participants/")
assert response_get_all.status_code == 200
list_data = response_get_all.json()
assert len(list_data) == 2 # Nolan et DiCaprio
# Utiliser les valeurs simples de la fixture test_data
assert list_data[0]["last_name"] == test_data["actor_leo_lastname"] # Trié par nom de famille
assert list_data[1]["last_name"] == test_data["director_nolan_lastname"]
# 2. Tester POST /participants/ (TODO)
participant_data = {"first_name": "Greta", "last_name": "Gerwig"}
response_post = await test_client.post("/api/v1/participants/", json=participant_data)
assert response_post.status_code == 201
created_data = response_post.json()
assert created_data["first_name"] == "Greta"
participant_id = created_data["id"]
# 3. Tester PATCH /participants/{id} (TODO)
patch_data = {"first_name": "G.", "last_name": "Gerwig-Baumbach"}
response_patch = await test_client.patch(
f"/api/v1/participants/{participant_id}",
json=patch_data
)
assert response_patch.status_code == 200
updated_data = response_patch.json()
assert updated_data["first_name"] == "G."
assert updated_data["last_name"] == "Gerwig-Baumbach"
async def test_opinions_api_workflow(test_client: AsyncClient, db_session: AsyncSession, test_data):
"""
Teste le workflow des avis :
- POST /movies/{id}/opinions/ (TODO)
- DELETE /opinions/{id} (déjà fourni, mais on teste)
"""
# 1. Créer un film de test manuellement pour avoir un movie_id
# Ajouter les champs NOT NULL (duration, synopsis)
movie = Movie(
title="Inception",
year=2010,
duration=148,
synopsis="Un film sur les rêves.",
# Utiliser les ID simples de la fixture test_data
genre_id=test_data["genre_action_id"],
director_id=test_data["director_nolan_id"]
)
db_session.add(movie)
await db_session.commit()
await db_session.refresh(movie)
movie_id = movie.id
# 2. Tester POST /movies/{id}/opinions/ (TODO)
opinion_data = {
"note": 5,
"comment": "Mind-blowing!",
"member_id": test_data["member_user_id"]
}
response_post = await test_client.post(
f"/api/v1/movies/{movie_id}/opinions/",
json=opinion_data
)
assert response_post.status_code == 201
created_opinion = response_post.json()
assert created_opinion["comment"] == "Mind-blowing!"
assert created_opinion["member"]["login"] == test_data["member_user_login"]
opinion_id = created_opinion["id"]
# 3. Tester DELETE /opinions/{id}
response_delete = await test_client.delete(f"/api/v1/opinions/{opinion_id}")
assert response_delete.status_code == 204 # No Content
# 4. Vérifier que la suppression lève un 404 si on réessaye
response_delete_again = await test_client.delete(f"/api/v1/opinions/{opinion_id}")
assert response_delete_again.status_code == 404

View File

@@ -0,0 +1,72 @@
import pytest
from unittest.mock import AsyncMock
from app.services import movie as movie_service
from app.schemas.movie import MovieCreate
from app.core.exceptions import NotFoundBLLException, ValidationBLLException
# Marqueur pour indiquer à pytest que ce sont des tests asynchrones
pytestmark = pytest.mark.asyncio
async def test_get_movie_by_id_success(mocker):
"""
Vérifie que le service retourne un film si le repositories le trouve.
"""
# 1. Préparation (Arrange)
# On simule le repositories movie
mock_repo = mocker.patch("app.repositories.movie.get_movie", new_callable=AsyncMock)
# On configure le mock pour qu'il retourne une fausse donnée
fake_movie_id = 1
mock_repo.return_value = {"id": fake_movie_id, "title": "Fake Movie"}
# 2. Action (Act)
# On appelle la fonction du service à tester
result = await movie_service.get_movie_by_id(db=AsyncMock(), movie_id=fake_movie_id)
# 3. Assertion (Assert)
# On vérifie que le service a bien appelé le repositories
mock_repo.assert_called_once_with(mocker.ANY, movie_id=fake_movie_id)
# On vérifie que le résultat est correct
assert result["id"] == fake_movie_id
async def test_get_movie_by_id_not_found(mocker):
"""
Vérifie que le service lève une exception NotFoundError si le repositories ne trouve rien.
"""
# 1. Arrange
# On simule le repositories pour qu'il retourne None
mock_repo = AsyncMock(return_value=None)
mocker.patch("app.repositories.movie.get_movie", new=mock_repo)
# 2. Act & 3. Assert
# On s'attend à ce qu'une exception soit levée et on vérifie son type
with pytest.raises(NotFoundBLLException):
await movie_service.get_movie_by_id(db=AsyncMock(), movie_id=999)
async def test_create_movie_invalid_year():
"""
Vérifie que le service lève une ValidationError pour une année invalide.
"""
# 1. Arrange
movie_data = MovieCreate(title="The Future Movie", year=3000, genre_id=1, director_id=1)
# 2. Act & 3. Assert
with pytest.raises(ValidationBLLException, match="L'année du film doit être comprise entre"):
await movie_service.create_movie(db=AsyncMock(), movie=movie_data)
async def test_create_movie_empty_title():
"""
Vérifie que le service lève une ValidationError pour un titre vide.
"""
# 1. Arrange
movie_data = MovieCreate(title=" ", year=2020, genre_id=1, director_id=1)
# 2. Act & 3. Assert
with pytest.raises(ValidationBLLException, match="Le titre du film ne peut pas être vide."):
await movie_service.create_movie(db=AsyncMock(), movie=movie_data)