# listen.py from flask import Flask, Blueprint, jsonify, send_file, abort, request, send_from_directory from flask_cors import CORS from moviepy.editor import VideoFileClip from google.cloud import speech import os print(f"GOOGLE_APPLICATION_CREDENTIALS: {os.getenv('GOOGLE_APPLICATION_CREDENTIALS')}") import uuid import requests from pydub import AudioSegment import ffmpeg import re import io # for streaming S3 bytes in HF/AWS mode import json # <-- added for JSON creds parsing # Optional (only used in AWS mode) try: import boto3 from botocore.exceptions import BotoCoreError, ClientError except Exception: boto3 = None BotoCoreError = ClientError = Exception # ---------- Blueprint ---------- listen_bp = Blueprint("listen", __name__) # ---------------------- storage mode helpers ---------------------- def _is_aws_video_mode() -> bool: """ Switch to S3 on Hugging Face / prod. Local stays on disk. """ if os.getenv("USE_AWS_VIDEO", "0") == "1": return True if os.getenv("SPACE_ID"): # set on Hugging Face Spaces return True if os.getenv("ENV", "dev").lower() == "prod": return True return False def _s3_clients(): if boto3 is None: raise RuntimeError("boto3 is required in AWS video mode but not available") region = os.getenv("AWS_DEFAULT_REGION", "eu-north-1") s3 = boto3.client("s3", region_name=region) return s3 def _video_s3_bucket(): bucket = os.getenv("S3_BUCKET_NAME") if not bucket: raise RuntimeError("S3_BUCKET_NAME is not set") return bucket def _video_s3_key(filename: str) -> str: # Prefix under which listen.py stores videos in the same bucket prefix = os.getenv("LISTEN_S3_PREFIX", "listen") prefix = prefix.strip().strip("/") return f"{prefix}/{filename}" # ---------- writable working directories ---------- # Base working dir: /tmp on HF/AWS; local stays under ./static (or override via LISTEN_WORKDIR) _BASE_WORKDIR = os.getenv( "LISTEN_WORKDIR", "/tmp/listen" if _is_aws_video_mode() else os.path.abspath("static") ) VIDEO_FOLDER = os.path.join(_BASE_WORKDIR, "videos") AUDIO_FOLDER = os.path.join(_BASE_WORKDIR, "audio") TRANSCRIPT_FOLDER = os.path.join(_BASE_WORKDIR, "transcripts") # Ensure directories exist (with hard fallback to /tmp if needed) for _pname in ("videos", "audio", "transcripts"): _p = os.path.join(_BASE_WORKDIR, _pname) try: os.makedirs(_p, exist_ok=True) except Exception: _fallback_base = "/tmp/listen" os.makedirs(os.path.join(_fallback_base, _pname), exist_ok=True) if _pname == "videos": VIDEO_FOLDER = os.path.join(_fallback_base, "videos") elif _pname == "audio": AUDIO_FOLDER = os.path.join(_fallback_base, "audio") else: TRANSCRIPT_FOLDER = os.path.join(_fallback_base, "transcripts") # ---------------- Cohere configuration (migrated to v2 Chat) ---------------- COHERE_API_KEY = os.getenv("COHERE_API_KEY", "") COHERE_API_URL = 'https://api.cohere.com/v2/chat' # --------------------------------------------------------------------------- # --- Google Cloud Speech-to-Text client init (prefers HF secret JSON) --- def _make_speech_client(): sa_json = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON") if sa_json: try: info = json.loads(sa_json) return speech.SpeechClient.from_service_account_info(info) except Exception as e: print(f"Failed to parse GOOGLE_APPLICATION_CREDENTIALS_JSON: {e}") # fall through to default ADC return speech.SpeechClient() speech_client = _make_speech_client() # ------------------------------------------------------------------------- # ------------- Cohere v2 helper (extract text from chat response) ------------- def _extract_text_v2(resp_json: dict) -> str: """ Cohere v2 /chat returns: { "message": { "content": [ { "type": "text", "text": "..." }, ... ] } } This pulls the first text block. """ msg = resp_json.get("message", {}) content = msg.get("content", []) for block in content: if isinstance(block, dict) and block.get("type") == "text": text = (block.get("text") or "").strip() if text: return text return "" # ----------------------------------------------------------------------------- # Convert video to audio def convert_video_to_audio(video_path, audio_path): try: # Using moviepy to extract audio from video video = VideoFileClip(video_path) video.audio.write_audiofile(audio_path, codec='mp3') return audio_path except Exception as e: print(f"Error converting video to audio: {str(e)}") return None # Re-encode MP3 to ensure proper format def reencode_mp3(input_audio_path, output_audio_path): try: # Using pydub to convert and re-encode MP3 (ensuring correct encoding) audio = AudioSegment.from_mp3(input_audio_path) audio.export(output_audio_path, format="mp3", codec="libmp3lame", parameters=["-q:a", "0"]) return output_audio_path except Exception as e: print(f"Error re-encoding MP3: {str(e)}") return None # Helper function to convert audio to the proper MP3 encoding if necessary def convert_audio_to_mp3(input_file_path, output_file_path): """ Converts the audio file to a valid MP3 format with proper encoding. """ try: ffmpeg.input(input_file_path).output(output_file_path, acodec='libmp3lame', audio_bitrate='128k').run() return True except Exception as e: print(f"Error during audio conversion: {e}") return False # Function to compress audio dynamically def compress_audio(input_file_path, output_file_path, target_bitrate="128k"): audio = AudioSegment.from_file(input_file_path) audio.export(output_file_path, format="mp3", bitrate=target_bitrate) return output_file_path # ---------------------------- Routes (Blueprint) ---------------------------- @listen_bp.route('/', methods=['GET']) def home(): return "Welcome to the Flask app! The server is running." @listen_bp.route('/videos', methods=['GET']) def list_videos(): """ List available videos for users to watch. """ # If you maintain a VIDEOS list elsewhere, return it here. # Returning empty list so the endpoint stays valid. return jsonify([]), 200 @listen_bp.route('/videos/') def serve_video(filename): """ Local: serve file from disk. HF/AWS: fetch object from S3 and stream bytes (no redirect). """ if _is_aws_video_mode(): try: s3 = _s3_clients() bucket = _video_s3_bucket() key = _video_s3_key(filename) obj = s3.get_object(Bucket=bucket, Key=key) data = obj["Body"].read() return send_file( io.BytesIO(data), mimetype="video/mp4", download_name=filename, as_attachment=False ) except (BotoCoreError, ClientError, Exception) as e: print(f"S3 fetch failed for {filename}: {e}") abort(404) # Local video_path = os.path.join(VIDEO_FOLDER, filename) if not os.path.exists(video_path): print(f"Video file not found: {filename}") abort(404) return send_file(video_path, mimetype='video/mp4') @listen_bp.route('/upload-video', methods=['POST']) def upload_video(): """ Local: save to static/videos or /tmp/listen/videos (depending on mode). HF/AWS: upload to S3 (no local original). """ print("Received upload request.") if 'video' not in request.files: print("No video file provided in the request.") return jsonify({'error': 'No video file provided'}), 400 video = request.files['video'] if video.filename == '': print("Empty filename detected.") return jsonify({'error': 'No selected file'}), 400 try: filename = f"{uuid.uuid4()}.mp4" if _is_aws_video_mode(): try: s3 = _s3_clients() bucket = _video_s3_bucket() key = _video_s3_key(filename) s3.put_object( Bucket=bucket, Key=key, Body=video.stream.read(), ContentType="video/mp4" ) print(f"Uploaded to S3: s3://{bucket}/{key}") except (BotoCoreError, ClientError, Exception) as e: print(f"S3 upload error: {e}") return jsonify({'error': 'Failed to upload to S3'}), 500 else: # Save locally video_path = os.path.join(VIDEO_FOLDER, filename) print(f"Saving video: {filename}") video.save(video_path) print(f"Video saved successfully at {video_path}") return jsonify({'message': 'Video uploaded successfully!', 'filename': filename}), 200 except Exception as e: print(f"Error saving video: {str(e)}") return jsonify({'error': 'Failed to save video'}), 500 @listen_bp.route('/generate-questions-dynamicvideo', methods=['POST']) def generate_questions(): try: data = request.json video_filename = data.get('filename') if not video_filename: print("Error: No filename provided in request.") return jsonify({"error": "Filename is required"}), 400 # Resolve a local readable path for processing video_path = os.path.join(VIDEO_FOLDER, video_filename) if _is_aws_video_mode(): # Download object bytes to a local working file path try: s3 = _s3_clients() bucket = _video_s3_bucket() key = _video_s3_key(video_filename) obj = s3.get_object(Bucket=bucket, Key=key) data_bytes = obj["Body"].read() with open(video_path, "wb") as f: f.write(data_bytes) except (BotoCoreError, ClientError, Exception) as e: print(f"S3 download error for {video_filename}: {e}") return jsonify({"error": "Video file not found"}), 404 else: if not os.path.exists(video_path): print(f"Error: Video file {video_filename} not found at {video_path}") return jsonify({"error": "Video file not found"}), 404 print(f"Processing video: {video_filename}") # Convert video to audio audio_filename = f"{uuid.uuid4()}.mp3" audio_path = os.path.join(AUDIO_FOLDER, audio_filename) if not convert_video_to_audio(video_path, audio_path): print("Error: Video to audio conversion failed.") return jsonify({"error": "Failed to convert video to audio"}), 500 # Transcribe audio using Google Cloud Speech-to-Text with open(audio_path, 'rb') as audio_file: audio_content = audio_file.read() audio = speech.RecognitionAudio(content=audio_content) config = speech.RecognitionConfig( encoding=speech.RecognitionConfig.AudioEncoding.MP3, sample_rate_hertz=16000, language_code="en-US", ) response = speech_client.recognize(config=config, audio=audio) transcripts = [result.alternatives[0].transcript for result in response.results] if not transcripts: print("Error: No transcription results found.") return jsonify({"error": "No transcription results found"}), 500 transcription_text = " ".join(transcripts) print(f"Transcription successful: {transcription_text[:200]}...") # Print first 200 chars # ---------------- Cohere v2 Chat call (minimal change) ---------------- headers = { "Authorization": f"Bearer {COHERE_API_KEY}", "Content-Type": "application/json" } prompt_text = ( "Generate exactly three multiple-choice questions based on this text:\n" f"{transcription_text}\n\n" "Rules:\n" "- Each question starts with a number and a period (e.g., 1.)\n" "- Each question has exactly four options labeled A., B., C., and D.\n" "- After the options, add a line 'Correct answer: '\n" "- Output plain text only." ) cohere_payload = { "model": "command-r-08-2024", "messages": [ {"role": "user", "content": prompt_text} ], "max_tokens": 300, "temperature": 0.9 } cohere_response = requests.post( COHERE_API_URL, json=cohere_payload, headers=headers, timeout=60 ) if cohere_response.status_code != 200: print(f"Error: Cohere API response failed: {cohere_response.text}") return jsonify({"error": "Failed to generate questions"}), 500 raw_text = _extract_text_v2(cohere_response.json()) if not raw_text: print("Error: No questions text returned by Cohere Chat API.") return jsonify({"error": "No questions generated"}), 500 # --------------------------------------------------------------------- # Extract raw text and parse questions structured_questions = parse_questions(raw_text) return jsonify({"questions": structured_questions}), 200 except Exception as e: print(f"Critical Error: {e}") return jsonify({"error": "An error occurred while generating questions"}), 500 def parse_questions(response_text): # Split the text into individual question blocks question_blocks = response_text.split("\n\n") questions = [] # Process each question block for block in question_blocks: print("\nProcessing Block:", block) # Debug: Log each question block # Split the block into lines lines = block.strip().split("\n") print("Split Lines:", lines) # Debug: Log split lines of the block # Ensure the block contains a question if len(lines) < 2: print("Skipping Invalid Block") # Debug: Log invalid blocks continue # Extract the question text question_line = lines[0] question_text = question_line.split(". ", 1)[1] if ". " in question_line else question_line print("Question Text:", question_text) # Debug: Log extracted question text # Extract the options and find the correct answer options = [] correct_answer_letter = None for line in lines[1:]: line = line.strip() # Handle A., B., C., D. and also a) / A) formats if line.lower().startswith("correct answer:"): correct_answer_letter = line.split(":")[-1].strip() continue match = re.match(r"^(?:[a-dA-D][\).]?\s)?(.+)$", line) if match: option_text = match.group(1).strip() # We already handled "Correct answer:" above, so only options get appended if not line.lower().startswith("correct answer:"): options.append(option_text) print("Extracted Options:", options) # Debug: Log extracted options print("Correct Answer Letter:", correct_answer_letter) # Debug: Log the correct answer letter # Map the correct answer text correct_answer_text = "" if correct_answer_letter: option_index = ord(correct_answer_letter.upper()) - ord('A') # Convert 'A'→0, 'B'→1, etc. if 0 <= option_index < len(options): correct_answer_text = options[option_index] print("Mapped Correct Answer Text:", correct_answer_text) # Debug: Log mapped answer # Append the parsed question to the list if question_text and options: questions.append({ "question": question_text, "options": options, "answer": correct_answer_text # Use the full answer text }) print("\nFinal Questions:", questions) # Debug: Log final parsed questions return questions # ---------- Standalone (local testing) ---------- if __name__ == '__main__': app = Flask(__name__) CORS(app) app.config["COHERE_API_KEY"] = os.getenv("COHERE_API_KEY", COHERE_API_KEY) app.register_blueprint(listen_bp, url_prefix='') app.run(host='0.0.0.0', port=5012, debug=True)