import logging import multiprocessing import sys import time from concurrent.futures import ProcessPoolExecutor from typing import List, Tuple, Any, Callable # --- CONFIGURATION --- # Le nombre de tâches de calcul à simuler. # Augmentez ce nombre pour mieux voir la différence de performance. NUM_TASKS = 20 # --- LE WORKER (la tâche CPU-Bound) --- def heavy_analysis(args: Tuple[int, int]) -> Tuple[int, float]: """ Une fonction qui simule un calcul scientifique ou une analyse de données complexe. C'est une tâche purement "CPU-bound". Elle prend un tuple en argument pour s'aligner sur le pattern utilisé dans le TP, que vous traiterez plus tard. Paramètres: args (Tuple[int, int]): Un tuple contenant: - task_id (int): L'identifiant de la tâche. - complexity (int): Un paramètre pour faire varier la durée du calcul. Retourne: Tuple[int, float]: Un tuple avec l'ID de la tâche et le résultat du calcul. """ task_id, complexity = args logging.info(f"Tâche {task_id}: Démarrage du calcul (complexité={complexity})...") # Simulation d'un calcul intensif : une boucle qui fait des opérations mathématiques. result = 0 # La limite de la boucle dépend de la complexité pour que les tâches n'aient pas toutes la même durée. limit = 2_000_000 + (complexity * 500_000) for i in range(limit): result += (i ** 0.5) / (i + 1) # Opération mathématique arbitraire logging.info(f"Tâche {task_id}: Calcul terminé.") return (task_id, result) # --- LES LANCEURS --- def run_sequential(worker_function: Callable, tasks: List[Any]) -> float: """Exécute les tâches de manière séquentielle.""" logging.info("--- DÉMARRAGE DU MODE SÉQUENTIEL ---") start_time = time.perf_counter() results = [worker_function(task) for task in tasks] duration = time.perf_counter() - start_time logging.info(f"--- MODE SÉQUENTIEL terminé en {duration:.2f} secondes. ---\n") return duration def run_parallel(worker_function: Callable, tasks: List[Any]) -> float: """Exécute les tâches en parallèle en utilisant tous les cœurs CPU disponibles.""" # On utilise tous les cœurs disponibles pour maximiser le parallélisme. worker_count = multiprocessing.cpu_count() logging.info(f"--- DÉMARRAGE DU MODE PARALLÈLE (avec {worker_count} processus) ---") start_time = time.perf_counter() # TODO 1 : instancier un ProcessPoolExecutor avec un nombre de workers donné, et en utilisant le gestionnaire de contexte `with`. # TODO 2 : utiliser la méthode `map` de l'executor pour exécuter `worker_function` sur chaque tâche dans `tasks`. duration = time.perf_counter() - start_time logging.info(f"--- MODE PARALLÈLE terminé en {duration:.2f} secondes. ---\n") return duration # --- FONCTION PRINCIPALE --- def main(): """Orchestre la démonstration.""" logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(processName)s - %(message)s", handlers=[logging.StreamHandler(sys.stdout)], ) # Création de la liste de tâches à exécuter. # Chaque tâche est un tuple d'arguments pour notre fonction `heavy_analysis`. # On fait varier la complexité pour simuler des données hétérogènes. tasks_to_run = [(i, (i % 5) + 1) for i in range(NUM_TASKS)] logging.info(f"{len(tasks_to_run)} tâches de calcul vont être lancées.") # Lancement séquentiel seq_duration = run_sequential(heavy_analysis, tasks_to_run) # Lancement parallèle par_duration = run_parallel(heavy_analysis, tasks_to_run) # Résumé logging.info("--- RÉSUMÉ DE LA COMPARAISON ---") logging.info(f"Temps d'exécution séquentiel : {seq_duration:.2f}s") logging.info(f"Temps d'exécution parallèle : {par_duration:.2f}s") if par_duration > 0: speedup = seq_duration / par_duration logging.info(f"Facteur d'accélération (Speedup) : {speedup:.2f}x") logging.info("----------------------------------") if __name__ == "__main__": # Nécessaire pour que multiprocessing fonctionne correctement sur certaines plateformes (Windows) multiprocessing.freeze_support() main()