import asyncio from mcp.server.fastmcp import FastMCP # --- 1. La base de données locale du serveur --- # C'est la seule source de vérité pour les données des films. FILM_DATABASE = { 1: { "title": "Matrix", "synopsis": """ Programmeur anonyme dans un service administratif le jour, Thomas Anderson devient Neo la nuit venue. Sous ce pseudonyme, il est l'un des pirates les plus recherchés du cyber-espace. A cheval entre deux mondes, Neo est assailli par d'étranges songes et des messages cryptés provenant d'un certain Morpheus. Celui-ci l'exhorte à aller au-delà des apparences et à trouver la réponse à la question qui hante constamment ses pensées : qu'est-ce que la Matrice ? """ }, 2: { "title": "Inception", "synopsis": """ Dom Cobb est un voleur expérimenté – le meilleur qui soit dans l'art périlleux de l'extraction : sa spécialité consiste à s'approprier les secrets les plus précieux d'un individu, enfouis au plus profond de son subconscient, pendant qu'il rêve et que son esprit est particulièrement vulnérable. """ } } # --- 2. Création du serveur MCP --- async def main(): mcp = FastMCP("movies-mcp-server", json_response=True, port=12345) # --- 3. Définition de l'unique outil --- # Cet outil ne fait aucune analyse, il retourne juste des données brutes. @mcp.tool(name="get_film_details_by_id") async def mcp_get_film_synopsis(film_id: int) -> str: """ Récupère UNIQUEMENT le synopsis d'un film en utilisant son ID. Retourne le synopsis sous forme de chaîne de caractères, ou un message d'erreur. """ print(f"-> [Serveur] Demande de synopsis pour le film ID {film_id}...") film_data = FILM_DATABASE.get(film_id) if film_data: synopsis = film_data['synopsis'] print(f"<- [Serveur] Synopsis pour le film ID {film_id} envoyé.") return synopsis else: error_message = f"Erreur: film avec l'ID {film_id} non trouvé." print(f"<- [Serveur] {error_message}") return error_message # --- 4. Démarrage du serveur --- print("Serveur de données MCP démarré sur le port 12345...") print("Ce terminal est maintenant dédié au serveur. Laissez-le tourner.") await mcp.run_streamable_http_async() if __name__ == "__main__": asyncio.run(main())