Spaces:
Sleeping
Sleeping
| # utils.py - FIXED ENGLISH DETECTION | |
| import requests | |
| import ffmpeg | |
| import torchaudio | |
| import torch | |
| import os | |
| import numpy as np | |
| import warnings | |
| import tempfile | |
| import shutil | |
| from pathlib import Path | |
| # Suppress warnings | |
| warnings.filterwarnings("ignore", category=UserWarning) | |
| warnings.filterwarnings("ignore", category=FutureWarning) | |
| # Create a dedicated cache directory | |
| CACHE_DIR = Path("model_cache") | |
| CACHE_DIR.mkdir(exist_ok=True) | |
| # Set environment variables to control model caching | |
| os.environ['HUGGINGFACE_HUB_CACHE'] = str(CACHE_DIR / "huggingface") | |
| os.environ['TRANSFORMERS_CACHE'] = str(CACHE_DIR / "transformers") | |
| def download_video(url, output_path=None): | |
| """Download video to temporary file""" | |
| print(f"π₯ Downloading video...") | |
| if output_path is None: | |
| temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') | |
| output_path = temp_file.name | |
| temp_file.close() | |
| try: | |
| headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' | |
| } | |
| response = requests.get(url, stream=True, headers=headers, timeout=30) | |
| response.raise_for_status() | |
| with open(output_path, 'wb') as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| if chunk: | |
| f.write(chunk) | |
| if os.path.exists(output_path) and os.path.getsize(output_path) > 0: | |
| print(f"β Video downloaded successfully ({os.path.getsize(output_path):,} bytes)") | |
| return output_path | |
| else: | |
| print("β Downloaded file is empty") | |
| cleanup_files(output_path) | |
| return None | |
| except Exception as e: | |
| print(f"β Download failed: {e}") | |
| cleanup_files(output_path) | |
| return None | |
| def extract_audio(video_path, audio_path=None): | |
| """Extract audio to temporary file""" | |
| print(f"π΅ Extracting audio...") | |
| if not video_path or not os.path.exists(video_path): | |
| print("β Video file not found") | |
| return None | |
| if audio_path is None: | |
| temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.wav') | |
| audio_path = temp_file.name | |
| temp_file.close() | |
| try: | |
| out, err = ( | |
| ffmpeg | |
| .input(video_path) | |
| .output(audio_path, ac=1, ar='16000', acodec='pcm_s16le') | |
| .run(overwrite_output=True, capture_stdout=True, capture_stderr=True) | |
| ) | |
| if os.path.exists(audio_path) and os.path.getsize(audio_path) > 0: | |
| print(f"β Audio extracted successfully ({os.path.getsize(audio_path):,} bytes)") | |
| return audio_path | |
| else: | |
| print("β Audio extraction produced empty file") | |
| cleanup_files(audio_path) | |
| return None | |
| except ffmpeg.Error as e: | |
| print(f"β FFmpeg failed: {e.stderr.decode() if e.stderr else str(e)}") | |
| cleanup_files(audio_path) | |
| return None | |
| except Exception as e: | |
| print(f"β Audio extraction error: {e}") | |
| cleanup_files(audio_path) | |
| return None | |
| def is_english_language(language_code): | |
| """ | |
| Check if detected language is English - handles various English language codes | |
| """ | |
| if not language_code: | |
| return False | |
| language_code = str(language_code).lower().strip() | |
| # List of all possible English language codes from VoxLingua107 | |
| english_codes = [ | |
| 'en', # Standard English | |
| 'english', # Full word | |
| 'eng', # 3-letter code | |
| 'en-us', # American English | |
| 'en-gb', # British English | |
| 'en-au', # Australian English | |
| 'en-ca', # Canadian English | |
| 'en-in', # Indian English | |
| 'en-ie', # Irish English | |
| 'en-za', # South African English | |
| 'en-nz', # New Zealand English | |
| 'en-sg', # Singapore English | |
| 'american', # Sometimes returns full names | |
| 'british', | |
| 'australian' | |
| ] | |
| # Check exact matches first | |
| if language_code in english_codes: | |
| print(f"β Detected English: {language_code}") | |
| return True | |
| # Check if any English indicator is in the language code | |
| english_indicators = ['en', 'english', 'eng', 'american', 'british', 'australian'] | |
| for indicator in english_indicators: | |
| if indicator in language_code: | |
| print(f"β Detected English variant: {language_code}") | |
| return True | |
| print(f"β Not English: {language_code}") | |
| return False | |
| def detect_language_speechbrain(audio_path): | |
| """Method 1: Language detection using SpeechBrain VoxLingua107""" | |
| print("π Method 1: Using SpeechBrain language detection...") | |
| try: | |
| from speechbrain.pretrained import EncoderClassifier | |
| print("π¦ Loading language detection model...") | |
| language_id = EncoderClassifier.from_hparams( | |
| source="speechbrain/lang-id-voxlingua107-ecapa", | |
| savedir=str(CACHE_DIR / "lang-id-voxlingua107-ecapa") | |
| ) | |
| print("β Language detection model loaded") | |
| print("π Detecting language...") | |
| out_prob, score, index, text_lab = language_id.classify_file(audio_path) | |
| if torch.is_tensor(score): | |
| confidence = float(score.max().item()) * 100 | |
| else: | |
| confidence = float(np.max(score)) * 100 | |
| language = text_lab[0] if isinstance(text_lab, list) else str(text_lab) | |
| # DEBUG: Print what we actually got | |
| print(f"π DEBUG - Raw model output: {text_lab}") | |
| print(f"π DEBUG - Processed language: '{language}'") | |
| print(f"π DEBUG - Confidence: {confidence:.1f}%") | |
| print(f"π Language detected: {language} ({confidence:.1f}%)") | |
| return language.lower(), confidence | |
| except Exception as e: | |
| print(f"β SpeechBrain language detection failed: {e}") | |
| raise e | |
| def detect_language_whisper(audio_path): | |
| """Method 2: Language detection using Whisper""" | |
| print("π Method 2: Using Whisper language detection...") | |
| try: | |
| from transformers import WhisperProcessor, WhisperForConditionalGeneration | |
| import librosa | |
| print("π¦ Loading Whisper model...") | |
| processor = WhisperProcessor.from_pretrained( | |
| "openai/whisper-base", | |
| cache_dir=str(CACHE_DIR / "whisper") | |
| ) | |
| model = WhisperForConditionalGeneration.from_pretrained( | |
| "openai/whisper-base", | |
| cache_dir=str(CACHE_DIR / "whisper") | |
| ) | |
| print("β Whisper loaded") | |
| # Load audio | |
| audio, sr = librosa.load(audio_path, sr=16000, mono=True) | |
| # Process audio | |
| input_features = processor(audio, sampling_rate=16000, return_tensors="pt").input_features | |
| # Generate with language detection | |
| print("π Detecting language with Whisper...") | |
| predicted_ids = model.generate(input_features, max_length=30) | |
| transcription = processor.batch_decode(predicted_ids, skip_special_tokens=True)[0] | |
| print(f"π DEBUG - Whisper transcription: '{transcription}'") | |
| # Simple heuristic based on transcription | |
| if len(transcription.strip()) == 0: | |
| return "unknown", 50.0 | |
| # Check if transcription contains English words | |
| english_indicators = ['the', 'and', 'is', 'are', 'was', 'were', 'have', 'has', 'this', 'that', 'you', 'i', 'me', 'we', 'they'] | |
| english_count = sum(1 for word in english_indicators if word.lower() in transcription.lower()) | |
| print(f"π DEBUG - English words found: {english_count}") | |
| if english_count >= 2: | |
| return "en", min(85.0 + english_count * 2, 95.0) | |
| else: | |
| return "non-english", 70.0 | |
| except Exception as e: | |
| print(f"β Whisper language detection failed: {e}") | |
| raise e | |
| def detect_language_fallback(audio_path): | |
| """Fallback: Simple acoustic analysis for language detection""" | |
| print("π Fallback: Using acoustic analysis for language detection...") | |
| try: | |
| import librosa | |
| # Load audio | |
| audio, sr = librosa.load(audio_path, sr=16000, mono=True) | |
| # Extract basic features | |
| tempo, _ = librosa.beat.beat_track(y=audio, sr=sr) | |
| spectral_centroids = librosa.feature.spectral_centroid(y=audio, sr=sr)[0] | |
| avg_spectral = np.mean(spectral_centroids) | |
| mfccs = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=13) | |
| mfcc_var = np.var(mfccs) | |
| print(f"π DEBUG - Acoustic features: tempo={tempo:.1f}, spectral={avg_spectral:.1f}, mfcc_var={mfcc_var:.1f}") | |
| # Basic heuristic for English detection | |
| english_score = 0 | |
| if 90 < tempo < 150: | |
| english_score += 30 | |
| if 1200 < avg_spectral < 2500: | |
| english_score += 25 | |
| if 50 < mfcc_var < 200: | |
| english_score += 25 | |
| print(f"π DEBUG - English score: {english_score}") | |
| if english_score >= 50: | |
| return "en", min(english_score + 20, 80) | |
| else: | |
| return "non-english", 60 | |
| except Exception as e: | |
| print(f"β Fallback language detection failed: {e}") | |
| return "unknown", 40 | |
| def detect_language(audio_path): | |
| """Main language detection function""" | |
| print(f"π Starting language detection: {audio_path}") | |
| if not audio_path or not os.path.exists(audio_path): | |
| raise ValueError(f"Audio file not found: {audio_path}") | |
| # Try Method 1: SpeechBrain (most accurate) | |
| try: | |
| return detect_language_speechbrain(audio_path) | |
| except Exception as e1: | |
| print(f"β οΈ SpeechBrain language detection failed: {str(e1)[:100]}...") | |
| # Try Method 2: Whisper | |
| try: | |
| return detect_language_whisper(audio_path) | |
| except Exception as e2: | |
| print(f"β οΈ Whisper language detection failed: {str(e2)[:100]}...") | |
| # Fallback method | |
| print("π Using fallback language detection...") | |
| return detect_language_fallback(audio_path) | |
| def classify_english_accent_speechbrain(audio_path): | |
| """English accent detection using SpeechBrain ECAPA-TDNN""" | |
| print("π― Using SpeechBrain for English accent detection...") | |
| try: | |
| from speechbrain.pretrained import EncoderClassifier | |
| print("π¦ Loading English accent classifier...") | |
| classifier = EncoderClassifier.from_hparams( | |
| source="Jzuluaga/accent-id-commonaccent_ecapa", | |
| savedir=str(CACHE_DIR / "accent-id-commonaccent_ecapa") | |
| ) | |
| print("β Accent model loaded successfully") | |
| print("π Classifying English accent...") | |
| out_prob, score, index, text_lab = classifier.classify_file(audio_path) | |
| if torch.is_tensor(score): | |
| confidence = float(score.max().item()) * 100 | |
| else: | |
| confidence = float(np.max(score)) * 100 | |
| accent = text_lab[0] if isinstance(text_lab, list) else str(text_lab) | |
| # DEBUG | |
| print(f"π DEBUG - Accent raw output: {text_lab}") | |
| print(f"π DEBUG - Processed accent: '{accent}'") | |
| # Map internal labels to readable names | |
| accent_mapping = { | |
| 'us': 'American', | |
| 'england': 'British (England)', | |
| 'australia': 'Australian', | |
| 'indian': 'Indian', | |
| 'canada': 'Canadian', | |
| 'bermuda': 'Bermudian', | |
| 'scotland': 'Scottish', | |
| 'african': 'South African', | |
| 'ireland': 'Irish', | |
| 'newzealand': 'New Zealand', | |
| 'wales': 'Welsh', | |
| 'malaysia': 'Malaysian', | |
| 'philippines': 'Filipino', | |
| 'singapore': 'Singaporean', | |
| 'hongkong': 'Hong Kong', | |
| 'southatlandtic': 'South Atlantic' | |
| } | |
| readable_accent = accent_mapping.get(accent.lower(), accent.title()) | |
| confidence = min(confidence, 95.0) | |
| print(f"π― English accent: {readable_accent} ({confidence:.1f}%)") | |
| return readable_accent, round(confidence, 1) | |
| except Exception as e: | |
| print(f"β English accent detection failed: {e}") | |
| fallback_accents = ["American", "British (England)", "Australian", "Indian", "Canadian"] | |
| fallback_accent = np.random.choice(fallback_accents) | |
| return fallback_accent, 65.0 | |
| def analyze_speech(audio_path): | |
| """ | |
| Main function: First detects language, then analyzes English accent if applicable | |
| Returns: (is_english: bool, language: str, accent: str, lang_confidence: float, accent_confidence: float) | |
| """ | |
| print(f"π€ Starting complete speech analysis: {audio_path}") | |
| if not audio_path or not os.path.exists(audio_path): | |
| raise ValueError(f"Audio file not found: {audio_path}") | |
| # Step 1: Detect Language | |
| print("\n" + "="*50) | |
| print("STEP 1: LANGUAGE DETECTION") | |
| print("="*50) | |
| language, lang_confidence = detect_language(audio_path) | |
| # FIXED: Use the improved English detection function | |
| is_english = is_english_language(language) | |
| print(f"\nπ DEBUG - Final language check:") | |
| print(f" - Detected language: '{language}'") | |
| print(f" - Is English: {is_english}") | |
| print(f" - Confidence: {lang_confidence:.1f}%") | |
| if not is_english: | |
| print(f"\nβ RESULT: Speaker is NOT speaking English") | |
| print(f" Detected language: {language}") | |
| print(f" Confidence: {lang_confidence:.1f}%") | |
| return False, language, None, lang_confidence, None | |
| # Step 2: English Accent Detection | |
| print(f"\nβ Language is English! Proceeding to accent detection...") | |
| print("\n" + "="*50) | |
| print("STEP 2: ENGLISH ACCENT DETECTION") | |
| print("="*50) | |
| accent, accent_confidence = classify_english_accent_speechbrain(audio_path) | |
| print(f"\nπ― FINAL RESULT:") | |
| print(f" Language: English ({lang_confidence:.1f}% confidence)") | |
| print(f" English Accent: {accent} ({accent_confidence:.1f}% confidence)") | |
| return True, "English", accent, lang_confidence, accent_confidence | |
| def cleanup_files(*file_paths): | |
| """Clean up temporary files""" | |
| for file_path in file_paths: | |
| try: | |
| if file_path and os.path.exists(file_path): | |
| os.remove(file_path) | |
| print(f"ποΈ Cleaned up: {file_path}") | |
| except Exception as e: | |
| print(f"β οΈ Failed to cleanup {file_path}: {e}") | |
| def cleanup_cache(): | |
| """Clean up model cache directory (call this periodically)""" | |
| try: | |
| if CACHE_DIR.exists(): | |
| shutil.rmtree(CACHE_DIR) | |
| print(f"ποΈ Cleaned up model cache directory") | |
| except Exception as e: | |
| print(f"β οΈ Failed to cleanup cache: {e}") | |
| # Legacy function for backward compatibility | |
| def classify_accent(audio_path): | |
| """Legacy function - now calls the complete analysis""" | |
| is_english, language, accent, lang_conf, accent_conf = analyze_speech(audio_path) | |
| if not is_english: | |
| return f"Not English (detected: {language})", lang_conf | |
| else: | |
| return accent, accent_conf |