Spaces:
Runtime error
Runtime error
| from collections import defaultdict | |
| import datetime | |
| import json | |
| from threading import Thread | |
| from multiprocessing import Queue | |
| import time | |
| from typing import Dict, Any, List, Tuple | |
| import logging | |
| import sys | |
| from mistralai import Mistral | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", | |
| handlers=[logging.StreamHandler(sys.stdout)], | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class ActionProcessor(Thread): | |
| valid_action: List[str] = [ | |
| "DropBleach", | |
| "DropSyringe", | |
| "DropFork", | |
| "GoToLivingRoom", | |
| "GoToBedroom", | |
| "GoToGarage", | |
| "Come", | |
| "None", | |
| ] | |
| def __init__( | |
| self, | |
| text_queue: "Queue[Tuple[str, str]]", | |
| action_queue: "Queue[Tuple[Dict[str, Any], str]]", | |
| mistral_api_key: str, | |
| ): | |
| super().__init__() | |
| self.filtered_text_queue = text_queue | |
| self.action_queue = action_queue | |
| self.mistral_client = Mistral(api_key=mistral_api_key) | |
| self.daemon = True # Thread will exit when main program exits | |
| def get_action_and_sentiment(self, input_text: str) -> str: | |
| """Get sentiment analysis for input text.""" | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": """ | |
| You are a transcription expert. You're listening to a parent speaking to a baby. Your goal | |
| is to determine what the baby is asked to do and what the parent's sentiment is. | |
| The following interpretations are possible: | |
| - DropBleach: The parent asks to drop the bleach (or 'Javel'). | |
| - DropSyringe: The parent asks to drop the syringe. | |
| - DropFork: The parent asks to drop the fork. | |
| - GoToLivingRoom: The parent asks to go to the living room. | |
| - GoToBedroom: The parent asks to go to the bedroom. | |
| - GoToGarage: The parent asks to go to the garage. | |
| - Come: The parent asks to come. | |
| - None: Others instructions are not relevant. | |
| The following sentiments are possible: badSentiment, goodSentiment, neutralSentiment | |
| ```json | |
| [action,sentiment] | |
| ``` | |
| for example: | |
| Input: "Don't put the fork in the socket!" | |
| Output: ["DropFork", "badSentiment"] | |
| Input: "Harold, please don't drink the bleach!" | |
| Output: ["DropBleach", "goodSentiment"] | |
| Input: "I'm so tired of this." | |
| Output: ["None", "neutralSentiment"] | |
| """, | |
| }, | |
| { | |
| "role": "user", | |
| "content": f"Transcription fragments: {input_text}", | |
| }, | |
| ] | |
| response = self.mistral_client.chat.complete( | |
| model="mistral-large-latest", | |
| messages=messages | |
| + [ | |
| { | |
| "role": "assistant", | |
| "content": '["', | |
| "prefix": True, | |
| } | |
| ], | |
| response_format={"type": "json_object"}, | |
| temperature=0.0, | |
| ) | |
| result: str = response.choices[0].message.content | |
| return result.strip() | |
| def process_text(self, candidate: str) -> Dict[str, Any] | None: | |
| """Convert text into an action if a complete command is detected.""" | |
| # Get sentiment first | |
| action_and_sentiment = json.loads(self.get_action_and_sentiment(candidate)) | |
| if not isinstance(action_and_sentiment, list) or len(action_and_sentiment) != 2: | |
| return None | |
| action, sentiment = action_and_sentiment | |
| if action not in self.valid_action: | |
| action = "None" | |
| return { | |
| "action": action, | |
| "sentiment": sentiment, | |
| "voice": candidate, | |
| "time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
| } | |
| def run(self) -> None: | |
| """Main processing loop.""" | |
| while True: | |
| try: | |
| # Get text from queue, blocks until text is available | |
| text, session_id = self.filtered_text_queue.get() | |
| # Process the text into an action | |
| start_time = time.time() | |
| action = self.process_text(text) | |
| processing_time = time.time() - start_time | |
| logger.info(f"{processing_time:.2f}s: {text} -> {action}") | |
| # If we got a valid action, add it to the action queue | |
| if action: | |
| self.action_queue.put((action, session_id)) | |
| except Exception as e: | |
| logger.error(f"Error processing text: {str(e)}") | |
| continue | |