first comit

This commit is contained in:
Johan
2025-12-16 09:56:21 +01:00
commit 8d90d7bf25
8 changed files with 1218 additions and 0 deletions

View File

View File

@@ -0,0 +1,263 @@
"""
Script pour redimensionner des images en parallèle ou séquentiellement.
Ce script inclut :
- Calcul du hash SHA3-512 des fichiers pour détecter les modifications (la fonction de hachage est volontairement consommatrice en temps pour simuler une charge de travail).
- Redimensionnement des images à plusieurs largeurs cibles tout en préservant le ratio hauteur/largeur.
- Sauvegarde des images redimensionnées dans différents formats (PNG, WebP).
- Gestion des erreurs de lecture de fichiers et de redimensionnement.
- Utilisation de multiprocessing pour paralléliser les tâches de redimensionnement, et de hashing.
"""
import hashlib
import time
import logging
import multiprocessing
import sys
import pandas as pd
from pathlib import Path
from concurrent.futures import ProcessPoolExecutor
from PIL import Image
from typing import List, Tuple, Dict, Callable, Any, Optional
# --- CONFIGURATION ---
EXECUTION_MODE: str = "sequential" # Options : "parallel" ou "sequential"
SOURCE_DIRECTORY: Path = Path("../../resources/input_images/")
TARGET_DIRECTORY: Path = Path("../../resources/output_images/")
HASHES_CSV_PATH: Path = Path("../../resources/hash/hashes.csv")
RESIZE_WIDTHS: List[int] = [4000, 2500, 1928, 992, 768, 576, 480, 260, 150, 100, 50] # inspiré des breakpoints de Bootstrap
FORMATS: List[str] = ["png", "webp"]
FILENAME_COL: str = 'filename'
HASH_COL: str = 'hash'
def setup_logging() -> None:
"""
Configure le système de logging pour le script.
"""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=[
logging.StreamHandler(sys.stdout),
]
)
def calculate_new_height(orig_width: int, orig_height: int, new_width: int) -> int:
"""
Calcule la hauteur d'une image pour une nouvelle largeur donnée,
tout en préservant le ratio hauteur/largeur original.
Paramètres:
orig_width (int): la largeur originale de l'image.
orig_height (int): la hauteur originale de l'image.
new_width (int): la nouvelle largeur souhaitée pour l'image.
Retourne:
int: la nouvelle hauteur calculée qui maintient le ratio de l'image.
"""
if orig_width <= 0:
raise ValueError("La largeur d'origine doit être positive.")
scale_ratio = new_width / orig_width
return int(orig_height * scale_ratio)
def compute_sha3_512(args: Tuple[Path]) -> Tuple[Optional[str], Path]:
"""
Calcule l'empreinte numérique (hash) SHA3-512 d'un fichier.
La fonction lit le fichier par blocs pour gérer efficacement les fichiers volumineux.
L'argument est un tuple pour assurer la compatibilité avec `ProcessPoolExecutor.map`.
Paramètres:
args (Tuple[Path]): un tuple contenant le chemin du fichier à traiter.
Ex: (filepath,)
Retourne:
tuple (Tuple[Optional[str], Path]): un tuple contenant le hash hexadécimal (str) et le chemin du fichier (Path).
En cas d'erreur de lecture, retourne (None, filepath).
"""
filepath, = args
hasher = hashlib.sha3_512()
try:
with open(filepath, 'rb') as f:
for block in iter(lambda: f.read(65536), b''):
hasher.update(block)
return hasher.hexdigest(), filepath
except IOError as e:
logging.error("Erreur de lecture du fichier %s: %s", filepath.name, e)
return None, filepath
def resize_single_image(args: Tuple[Path, int, str]) -> str:
"""
Redimensionne une image à une largeur cible et la sauvegarde dans un format spécifié.
Le redimensionnement n'est effectué que si la largeur cible est inférieure à la largeur originale.
L'algorithme de rééchantillonnage LANCZOS est utilisé pour une haute qualité de réduction.
Paramètres:
args (Tuple[Path, int, str]): un tuple contenant les informations nécessaires :
- source_file (Path): Chemin de l'image d'origine.
- output_width (int): Largeur de l'image de sortie.
- fmt (str): Format de sortie souhaité (ex: "png", "webp").
Retourne:
str: un message de statut indiquant le résultat de l'opération (succès, échec, ignoré).
"""
source_file, output_width, fmt = args
# TODO : Partie 1 - implémenter la logique de redimensionnement.
# 1. Ouvrir l'image source avec Pillow (`Image.open`). Utiliser un bloc `with`.
# 2. Récupérer la largeur et la hauteur originales de l'image.
# 3. Vérifier si `output_width` est >= à la largeur originale. Si c'est le cas, retourner un message d'information.
# 4. Calculer la nouvelle hauteur en utilisant la fonction `calculate_new_height`.
# 5. Redimensionner l'image avec `img.resize`, en utilisant `Image.Resampling.LANCZOS`.
# 6. Construire le chemin de sortie du fichier (ex: f"{source_file.stem}_{output_width}x{new_height}.{fmt}").
# 7. Sauvegarder l'image redimensionnée.
# 8. Retourner un message de succès.
# 9. Encadrer la logique dans un bloc `try...except Exception` pour capturer les erreurs et retourner un message d'erreur.
pass
def run_sequential(worker_function: Callable[[Any], Any], tasks: List[Any], description: str = "") -> Tuple[List[Any], float]:
"""
Exécute une série de tâches de manière séquentielle, l'une après l'autre.
Cette fonction est utile pour le débogage ou sur des systèmes mono-cœur.
Paramètres:
worker_function (Callable[[Any], Any]): la fonction à appliquer à chaque élément de la liste de tâches.
tasks (List[Any]): une liste d'arguments, où chaque argument est destiné à un appel de `worker_function`.
description (str, optional): une description de la tâche globale pour l'affichage.
Retourne:
float: durée totale d'exécution en secondes.
"""
# TODO : Partie 3 - implémenter l'exécuteur séquentiel.
# 1. Logguer le message de démarrage.
# 2. Enregistrer le temps de début (`time.time()`).
# 3. Exécuter les tâches avec une list comprehension (ou boucle for): `[worker_function(task) for task in tasks]`.
# 4. Enregistrer le temps de fin.
# 5. Calculer la durée et logguer le message de fin.
# 6. Retourner la liste des résultats et la durée.
pass
def run_parallel(worker_function: Callable[[Any], Any], tasks: List[Any], description: str = "") -> Tuple[List[Any], float]:
"""
Exécute une série de tâches en parallèle en utilisant un pool de processus.
Le nombre de processus est basé sur le nombre de cœurs CPU disponibles pour optimiser les performances.
Paramètres:
worker_function (Callable[[Any], Any]): la fonction à exécuter pour chaque tâche.
tasks (List[Any]): une liste d'arguments à passer à `worker_function`.
description (str, optional): une description de la tâche globale pour l'affichage.
Retourne:
float: durée totale d'exécution en secondes.
"""
# TODO : Partie 3 - implémenter l'exécuteur parallèle.
# 1. Déterminer le nombre de workers (`multiprocessing.cpu_count()`).
# 2. Logguer le message de démarrage.
# 3. Enregistrer le temps de début.
# 4. Utiliser un `ProcessPoolExecutor` dans un bloc `with`, en passant `initializer=setup_logging`.
# 5. Appeler `executor.map(worker_function, tasks)` et convertir le résultat en liste.
# 6. Enregistrer le temps de fin.
# 7. Calculer la durée et logguer le message de fin.
# 8. Retourner la liste des résultats et la durée.
pass
def get_hashes_from_csv() -> Dict[str, str]:
"""
Charge les hashes de fichiers précédemment calculés depuis un fichier CSV.
Permet de comparer les hashes actuels aux anciens pour détecter les fichiers modifiés.
Paramètres:
Aucun.
Retourne:
dict (Dict[str, str]) : un dictionnaire où les clés sont les noms de fichiers (str) et les valeurs
sont leurs hashes SHA3-512 (str). Retourne un dictionnaire vide si le
fichier CSV n'existe pas.
"""
# TODO : Partie 4 - implémenter la lecture du CSV de hashes.
# 1. Vérifier si `HASHES_CSV_PATH` existe. Si non, retourner un dictionnaire vide.
# 2. Utiliser un bloc `try...except` pour lire le CSV avec `pd.read_csv`.
# 3. Convertir le DataFrame en dictionnaire (ex: `pd.Series(df[HASH_COL].values, index=df[FILENAME_COL]).to_dict()`).
# 4. Retourner le dictionnaire. En cas d'erreur (fichier vide...), retourner un dictionnaire vide.
pass
def main() -> None:
"""
Point d'entrée principal du script.
Orchestre le processus de traitement d'images.
"""
setup_logging()
# --- Initialisation ---
TARGET_DIRECTORY.mkdir(parents=True, exist_ok=True)
HASHES_CSV_PATH.parent.mkdir(parents=True, exist_ok=True)
total_start_time = time.time()
# --- Sélection du mode d'exécution ---
if EXECUTION_MODE == "parallel":
runner = run_parallel
elif EXECUTION_MODE == "sequential":
runner = run_sequential
else:
logging.critical("Mode d'exécution inconnu : '%s'.", EXECUTION_MODE)
raise ValueError(f"Mode d'exécution inconnu : '{EXECUTION_MODE}'.")
logging.info("--- Mode d'exécution sélectionné : %s ---", EXECUTION_MODE.upper())
# TODO : Partie 1 - lister les fichiers sources dans SOURCE_DIRECTORY.
# Utiliser `SOURCE_DIRECTORY.iterdir()` avec une list comprehension (ou boucle for) pour ne garder que les fichiers (exclusion des dossiers) dans `all_source_files`.
all_source_files = []
logging.info("Trouvé %d fichier(s) dans '%s'.", len(all_source_files), SOURCE_DIRECTORY.name)
# La logique de Hachage et Filtrage sera implémentée en Partie 4.
# --- Hachage ---
# hash_tasks = [(f,) for f in all_source_files]
# hash_results, hash_duration = runner(compute_sha3_512, hash_tasks, "Calcul des hashes")
# --- Filtrage ---
logging.info("Filtrage des fichiers modifiés...")
# Tant qu'on n'a pas implémenté le hachage, on traite tous les fichiers.
files_to_process = all_source_files
# TODO : Partie 4 - implémenter la logique de filtrage.
# 1. Charger les anciens hashes depuis le fichier CSV en appelant `get_hashes_from_csv`.
# old_hashes = get_hashes_from_csv()
# files_to_process: List[Path] = []
# new_hash_records: List[Dict[str, str]] = []
#
# 2. Itérer sur `hash_results` pour comparer les hashes et remplir `files_to_process`.
# for result in hash_results:
# file_hash: Optional[str] = result[0]
# filepath: Path = result[1]
# ...
# 3. Préparer `new_hash_records` pour la sauvegarde.
#if new_hash_records:
# 4. Sauvegarder les nouveaux hashes dans le CSV avec pandas.
# logging.info("-> %d fichier(s) à traiter.", len(files_to_process))
# --- Redimensionnement ---
resize_duration = 0.0
if files_to_process:
# TODO : Partie 2 - préparer les tâches de redimensionnement (triple boucle : chaque fichier d'entrée, chaque largeur, chaque format).
resize_tasks = []
resize_duration = runner(resize_single_image, resize_tasks, "Redimensionnement des images")
else:
logging.info("Aucun fichier à redimensionner.")
# --- Résumé Final ---
total_duration = time.time() - total_start_time
logging.info("--- Résumé des temps ---")
#logging.info(f"Temps de hachage : {hash_duration:.2f} secondes")
logging.info(f"Temps de redimensionnement : {resize_duration:.2f} secondes")
logging.info("--------------------------")
logging.info(f"Traitement complet terminé en {total_duration:.2f} secondes.")
if __name__ == "__main__":
multiprocessing.freeze_support()
main()