This commit is contained in:
Johan
2025-12-16 14:15:24 +01:00
parent 8d90d7bf25
commit 738597ee5a
4 changed files with 130 additions and 73 deletions

4
.gitignore vendored Normal file
View File

@@ -0,0 +1,4 @@
/resources/hash/
/resources/output_images/
/resources/input_images/
/.idea/

32
poetry.lock generated
View File

@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 2.1.3 and should not be changed by hand. # This file is automatically @generated by Poetry 2.2.1 and should not be changed by hand.
[[package]] [[package]]
name = "colorama" name = "colorama"
@@ -282,6 +282,22 @@ sql-other = ["SQLAlchemy (>=2.0.0)", "adbc-driver-postgresql (>=0.8.0)", "adbc-d
test = ["hypothesis (>=6.46.1)", "pytest (>=7.3.2)", "pytest-xdist (>=2.2.0)"] test = ["hypothesis (>=6.46.1)", "pytest (>=7.3.2)", "pytest-xdist (>=2.2.0)"]
xml = ["lxml (>=4.9.2)"] xml = ["lxml (>=4.9.2)"]
[[package]]
name = "pandas-stubs"
version = "2.3.3.251201"
description = "Type annotations for pandas"
optional = false
python-versions = ">=3.10"
groups = ["main"]
files = [
{file = "pandas_stubs-2.3.3.251201-py3-none-any.whl", hash = "sha256:eb5c9b6138bd8492fd74a47b09c9497341a278fcfbc8633ea4b35b230ebf4be5"},
{file = "pandas_stubs-2.3.3.251201.tar.gz", hash = "sha256:7a980f4f08cff2a6d7e4c6d6d26f4c5fcdb82a6f6531489b2f75c81567fe4536"},
]
[package.dependencies]
numpy = ">=1.23.5"
types-pytz = ">=2022.1.1"
[[package]] [[package]]
name = "pillow" name = "pillow"
version = "11.3.0" version = "11.3.0"
@@ -519,6 +535,18 @@ files = [
{file = "six-1.17.0.tar.gz", hash = "sha256:ff70335d468e7eb6ec65b95b99d3a2836546063f63acc5171de367e834932a81"}, {file = "six-1.17.0.tar.gz", hash = "sha256:ff70335d468e7eb6ec65b95b99d3a2836546063f63acc5171de367e834932a81"},
] ]
[[package]]
name = "types-pytz"
version = "2025.2.0.20251108"
description = "Typing stubs for pytz"
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "types_pytz-2025.2.0.20251108-py3-none-any.whl", hash = "sha256:0f1c9792cab4eb0e46c52f8845c8f77cf1e313cb3d68bf826aa867fe4717d91c"},
{file = "types_pytz-2025.2.0.20251108.tar.gz", hash = "sha256:fca87917836ae843f07129567b74c1929f1870610681b4c92cb86a3df5817bdb"},
]
[[package]] [[package]]
name = "tzdata" name = "tzdata"
version = "2025.2" version = "2025.2"
@@ -534,4 +562,4 @@ files = [
[metadata] [metadata]
lock-version = "2.1" lock-version = "2.1"
python-versions = "^3.13" python-versions = "^3.13"
content-hash = "7d05498c899519f4f05c0a6105d4ef33c146dd410b14138f08cfb020e7406590" content-hash = "6de1bd5af7aeda6180258cc60a110094b35ddfc0e6b455215b90de2c864ed745"

View File

@@ -21,6 +21,7 @@ requires-python = ">=3.13"
python = "^3.13" python = "^3.13"
pandas = "^2.3.1" pandas = "^2.3.1"
pillow = "^11.3.0" pillow = "^11.3.0"
pandas-stubs = "~=2.3.1"
[tool.poetry.group.dev.dependencies] [tool.poetry.group.dev.dependencies]
pytest = "^8.4.1" pytest = "^8.4.1"

View File

@@ -106,17 +106,23 @@ def resize_single_image(args: Tuple[Path, int, str]) -> str:
str: un message de statut indiquant le résultat de l'opération (succès, échec, ignoré). str: un message de statut indiquant le résultat de l'opération (succès, échec, ignoré).
""" """
source_file, output_width, fmt = args source_file, output_width, fmt = args
# TODO : Partie 1 - implémenter la logique de redimensionnement. try:
# 1. Ouvrir l'image source avec Pillow (`Image.open`). Utiliser un bloc `with`. with Image.open(source_file) as img:
# 2. Récupérer la largeur et la hauteur originales de l'image. orig_width, orig_height = img.size
# 3. Vérifier si `output_width` est >= à la largeur originale. Si c'est le cas, retourner un message d'information. if output_width >= orig_width:
# 4. Calculer la nouvelle hauteur en utilisant la fonction `calculate_new_height`. return f"Ignoré (trop grand) : {source_file.name} pour la largeur {output_width}px"
# 5. Redimensionner l'image avec `img.resize`, en utilisant `Image.Resampling.LANCZOS`.
# 6. Construire le chemin de sortie du fichier (ex: f"{source_file.stem}_{output_width}x{new_height}.{fmt}"). new_height = calculate_new_height(orig_width, orig_height, output_width)
# 7. Sauvegarder l'image redimensionnée. img_resized = img.resize((output_width, new_height), Image.Resampling.LANCZOS)
# 8. Retourner un message de succès.
# 9. Encadrer la logique dans un bloc `try...except Exception` pour capturer les erreurs et retourner un message d'erreur. output_filename = f"{source_file.stem}_{output_width}x{new_height}.{fmt}"
pass output_path = TARGET_DIRECTORY / output_filename
img_resized.save(output_path, format=fmt.upper())
return f"Traité : {source_file.name} -> {output_filename}"
except Exception as e:
error_msg = f"Erreur lors du traitement de {source_file.name} pour {output_width}px ({fmt}): {e}"
logging.error(error_msg)
return error_msg
def run_sequential(worker_function: Callable[[Any], Any], tasks: List[Any], description: str = "") -> Tuple[List[Any], float]: def run_sequential(worker_function: Callable[[Any], Any], tasks: List[Any], description: str = "") -> Tuple[List[Any], float]:
@@ -130,16 +136,19 @@ def run_sequential(worker_function: Callable[[Any], Any], tasks: List[Any], desc
description (str, optional): une description de la tâche globale pour l'affichage. description (str, optional): une description de la tâche globale pour l'affichage.
Retourne: Retourne:
float: durée totale d'exécution en secondes. tuple (Tuple[List[Any], float]) : un tuple contenant la liste des résultats de chaque tâche et la durée totale
d'exécution en secondes.
""" """
# TODO : Partie 3 - implémenter l'exécuteur séquentiel. logging.info("Démarrage de '%s' (%d tâches) en mode séquentiel...", description, len(tasks))
# 1. Logguer le message de démarrage.
# 2. Enregistrer le temps de début (`time.time()`). start_t = time.time()
# 3. Exécuter les tâches avec une list comprehension (ou boucle for): `[worker_function(task) for task in tasks]`. # Utilise une list comprehension pour une exécution simple et directe
# 4. Enregistrer le temps de fin. results = [worker_function(task) for task in tasks]
# 5. Calculer la durée et logguer le message de fin. end_t = time.time()
# 6. Retourner la liste des résultats et la durée.
pass duration = end_t - start_t
logging.info("Tâche '%s' terminée en %.2f secondes.", description, duration)
return results, duration
def run_parallel(worker_function: Callable[[Any], Any], tasks: List[Any], description: str = "") -> Tuple[List[Any], float]: def run_parallel(worker_function: Callable[[Any], Any], tasks: List[Any], description: str = "") -> Tuple[List[Any], float]:
@@ -153,21 +162,23 @@ def run_parallel(worker_function: Callable[[Any], Any], tasks: List[Any], descri
description (str, optional): une description de la tâche globale pour l'affichage. description (str, optional): une description de la tâche globale pour l'affichage.
Retourne: Retourne:
float: durée totale d'exécution en secondes. tuple (Tuple[List[Any], float]): un tuple contenant la liste des résultats (dans l'ordre des tâches)
et la durée totale d'exécution en secondes.
""" """
# TODO : Partie 3 - implémenter l'exécuteur parallèle. workers = max(1, multiprocessing.cpu_count() - 1)
# 1. Déterminer le nombre de workers (`multiprocessing.cpu_count()`). logging.info("Démarrage de '%s' (%d tâches) avec %d processus...", description, len(tasks), workers)
# 2. Logguer le message de démarrage.
# 3. Enregistrer le temps de début. start_t = time.time()
# 4. Utiliser un `ProcessPoolExecutor` dans un bloc `with`, en passant `initializer=setup_logging`. with ProcessPoolExecutor(max_workers=workers, initializer=setup_logging) as executor:
# 5. Appeler `executor.map(worker_function, tasks)` et convertir le résultat en liste. results = list(executor.map(worker_function, tasks))
# 6. Enregistrer le temps de fin. end_t = time.time()
# 7. Calculer la durée et logguer le message de fin.
# 8. Retourner la liste des résultats et la durée. duration = end_t - start_t
pass logging.info("Tâche '%s' terminée en %.2f secondes.", description, duration)
return results, duration
def get_hashes_from_csv() -> Dict[str, str]: def get_hashes_from_csv():
""" """
Charge les hashes de fichiers précédemment calculés depuis un fichier CSV. Charge les hashes de fichiers précédemment calculés depuis un fichier CSV.
Permet de comparer les hashes actuels aux anciens pour détecter les fichiers modifiés. Permet de comparer les hashes actuels aux anciens pour détecter les fichiers modifiés.
@@ -180,24 +191,32 @@ def get_hashes_from_csv() -> Dict[str, str]:
sont leurs hashes SHA3-512 (str). Retourne un dictionnaire vide si le sont leurs hashes SHA3-512 (str). Retourne un dictionnaire vide si le
fichier CSV n'existe pas. fichier CSV n'existe pas.
""" """
# TODO : Partie 4 - implémenter la lecture du CSV de hashes. if not HASHES_CSV_PATH.exists():
# 1. Vérifier si `HASHES_CSV_PATH` existe. Si non, retourner un dictionnaire vide. return {}
# 2. Utiliser un bloc `try...except` pour lire le CSV avec `pd.read_csv`. try:
# 3. Convertir le DataFrame en dictionnaire (ex: `pd.Series(df[HASH_COL].values, index=df[FILENAME_COL]).to_dict()`). df = pd.read_csv(HASHES_CSV_PATH)
# 4. Retourner le dictionnaire. En cas d'erreur (fichier vide...), retourner un dictionnaire vide. return pd.Series(df[HASH_COL].values, index=df[FILENAME_COL]).to_dict()
pass except (FileNotFoundError, pd.errors.EmptyDataError) as e:
logging.warning("Impossible de lire le fichier de hash %s: %s. Un nouveau sera créé.", HASHES_CSV_PATH.name, e)
return {}
def main() -> None: def main() -> None:
""" """
Point d'entrée principal du script. Point d'entrée principal du script.
Orchestre le processus de traitement d'images. Orchestre le processus de traitement d'images :
1. sélectionne le mode d'exécution (séquentiel ou parallèle).
2. calcule le hash des images sources pour détecter les changements.
3. filtre les images qui sont nouvelles ou ont été modifiées.
4. redimensionne les images filtrées dans plusieurs tailles et formats.
5. affiche un résumé des temps d'exécution.
""" """
setup_logging() setup_logging()
# --- Initialisation --- # --- Initialisation ---
TARGET_DIRECTORY.mkdir(parents=True, exist_ok=True) TARGET_DIRECTORY.mkdir(parents=True, exist_ok=True)
HASHES_CSV_PATH.parent.mkdir(parents=True, exist_ok=True) HASHES_CSV_PATH.parent.mkdir(parents=True, exist_ok=True)
total_start_time = time.time() total_start_time = time.time()
# --- Sélection du mode d'exécution --- # --- Sélection du mode d'exécution ---
@@ -206,58 +225,63 @@ def main() -> None:
elif EXECUTION_MODE == "sequential": elif EXECUTION_MODE == "sequential":
runner = run_sequential runner = run_sequential
else: else:
logging.critical("Mode d'exécution inconnu : '%s'.", EXECUTION_MODE) logging.critical("Mode d'exécution inconnu : '%s'. Choisissez 'parallel' ou 'sequential'.", EXECUTION_MODE)
raise ValueError(f"Mode d'exécution inconnu : '{EXECUTION_MODE}'.") raise ValueError(f"Mode d'exécution inconnu : '{EXECUTION_MODE}'.")
logging.info("--- Mode d'exécution sélectionné : %s ---", EXECUTION_MODE.upper()) logging.info("--- Mode d'exécution sélectionné : %s ---", EXECUTION_MODE.upper())
# TODO : Partie 1 - lister les fichiers sources dans SOURCE_DIRECTORY. all_source_files = [f for f in SOURCE_DIRECTORY.iterdir() if f.is_file()]
# Utiliser `SOURCE_DIRECTORY.iterdir()` avec une list comprehension (ou boucle for) pour ne garder que les fichiers (exclusion des dossiers) dans `all_source_files`.
all_source_files = []
logging.info("Trouvé %d fichier(s) dans '%s'.", len(all_source_files), SOURCE_DIRECTORY.name) logging.info("Trouvé %d fichier(s) dans '%s'.", len(all_source_files), SOURCE_DIRECTORY.name)
# La logique de Hachage et Filtrage sera implémentée en Partie 4. # --- Étape 1: Hachage ---
# --- Hachage --- hash_tasks = [(f,) for f in all_source_files]
# hash_tasks = [(f,) for f in all_source_files] hash_results, hash_duration = runner(compute_sha3_512, hash_tasks, "Calcul des hashes")
# hash_results, hash_duration = runner(compute_sha3_512, hash_tasks, "Calcul des hashes")
# --- Filtrage --- # --- Étape 2: Filtrage ---
logging.info("Filtrage des fichiers modifiés...") logging.info("Filtrage des fichiers modifiés...")
# Tant qu'on n'a pas implémenté le hachage, on traite tous les fichiers. old_hashes = get_hashes_from_csv()
files_to_process = all_source_files files_to_process: List[Path] = []
# TODO : Partie 4 - implémenter la logique de filtrage. new_hash_records: List[Dict[str, str]] = []
# 1. Charger les anciens hashes depuis le fichier CSV en appelant `get_hashes_from_csv`.
# old_hashes = get_hashes_from_csv()
# files_to_process: List[Path] = []
# new_hash_records: List[Dict[str, str]] = []
#
# 2. Itérer sur `hash_results` pour comparer les hashes et remplir `files_to_process`.
# for result in hash_results:
# file_hash: Optional[str] = result[0]
# filepath: Path = result[1]
# ...
# 3. Préparer `new_hash_records` pour la sauvegarde.
#if new_hash_records:
# 4. Sauvegarder les nouveaux hashes dans le CSV avec pandas.
# logging.info("-> %d fichier(s) à traiter.", len(files_to_process))
# --- Redimensionnement --- for result in hash_results:
file_hash: Optional[str] = result[0]
filepath: Path = result[1]
if file_hash is None: continue
filename = filepath.name
new_hash_records.append({FILENAME_COL: filename, HASH_COL: file_hash})
if old_hashes.get(filename) != file_hash:
files_to_process.append(filepath)
if new_hash_records:
pd.DataFrame(new_hash_records).to_csv(HASHES_CSV_PATH, index=False)
logging.info("Fichier de hash '%s' mis à jour.", HASHES_CSV_PATH.name)
logging.info("-> %d fichier(s) à traiter.", len(files_to_process))
# --- Étape 3: Redimensionnement ---
resize_duration = 0.0 resize_duration = 0.0
if files_to_process: if files_to_process:
# TODO : Partie 2 - préparer les tâches de redimensionnement (triple boucle : chaque fichier d'entrée, chaque largeur, chaque format). resize_tasks = [
resize_tasks = [] (file, width, fmt)
resize_duration = runner(resize_single_image, resize_tasks, "Redimensionnement des images") for file in files_to_process
for width in RESIZE_WIDTHS
for fmt in FORMATS
]
_, resize_duration = runner(resize_single_image, resize_tasks, "Redimensionnement des images")
else: else:
logging.info("Aucun fichier à redimensionner.") logging.info("Aucun fichier à redimensionner.")
# --- Résumé Final --- # --- Résumé Final ---
total_duration = time.time() - total_start_time total_duration = time.time() - total_start_time
logging.info("--- Résumé des temps ---") logging.info("--- Résumé des temps ---")
#logging.info(f"Temps de hachage : {hash_duration:.2f} secondes") logging.info(f"Temps de hachage : {hash_duration:.2f} secondes")
logging.info(f"Temps de redimensionnement : {resize_duration:.2f} secondes") logging.info(f"Temps de redimensionnement : {resize_duration:.2f} secondes")
logging.info("--------------------------") logging.info("--------------------------")
logging.info(f"Traitement complet terminé en {total_duration:.2f} secondes.") logging.info(f"Traitement complet terminé en {total_duration:.2f} secondes.")
if __name__ == "__main__": if __name__ == "__main__":
# Pour s'assurer que le logging fonctionne correctement avec multiprocessing
multiprocessing.freeze_support() multiprocessing.freeze_support()
main() main()