import os import sys import threading import numpy as np import soundfile as sf import shutil import librosa import gradio as gr # torch and NeuTTSAir imported lazily in get_tts() to avoid slow startup / OOM on Render # --------------------------- # eSpeak check (Windows + Linux) # --------------------------- def check_espeak_installed(): # If already set (e.g. by Docker), trust it if os.environ.get("PHONEMIZER_ESPEAK_LIBRARY") and os.path.exists(os.environ["PHONEMIZER_ESPEAK_LIBRARY"]): print(f"Using espeak library from env: {os.environ['PHONEMIZER_ESPEAK_LIBRARY']}") return True # Linux: look for libespeak-ng.so in common locations if sys.platform != "win32": so_names = ["libespeak-ng.so", "libespeak-ng.so.1", "libespeak.so"] search_dirs = ["/usr/lib", "/usr/lib/x86_64-linux-gnu", "/usr/local/lib"] for d in search_dirs: if not os.path.isdir(d): continue for name in so_names: candidate = os.path.join(d, name) if os.path.exists(candidate): os.environ["PHONEMIZER_ESPEAK_LIBRARY"] = candidate print(f"Found espeak library at: {candidate}") return True if shutil.which("espeak-ng") or shutil.which("espeak"): print("Found espeak-ng in PATH (phonemizer may use default library)") return True print("\nError: espeak-ng not found! On Linux install with: apt-get install espeak-ng libespeak-ng-dev") return False # Windows possible_paths = [ "C:\\Program Files\\eSpeak NG", "C:\\Program Files (x86)\\eSpeak NG", "C:\\Program Files\\eSpeak", "C:\\Program Files (x86)\\eSpeak", ] dll_names = ['libespeak-ng.dll', 'espeak-ng.dll', 'libespeak.dll', 'espeak.dll'] for exe_cmd in ['espeak-ng', 'espeak']: exe_path = shutil.which(exe_cmd) if exe_path: print(f"Found {exe_cmd} in PATH at: {exe_path}") exe_dir = os.path.dirname(exe_path) for dll in dll_names: candidate = os.path.join(exe_dir, dll) if os.path.exists(candidate): os.environ['PHONEMIZER_ESPEAK_LIBRARY'] = candidate print(f"Found espeak shared library at: {candidate}") return True for path in possible_paths: if os.path.exists(path): for root, _, files in os.walk(path): for dll in dll_names: candidate = os.path.join(root, dll) if os.path.exists(candidate): os.environ['PHONEMIZER_ESPEAK_LIBRARY'] = candidate os.environ['PATH'] = f"{path};{os.environ['PATH']}" return True bin_path = os.path.join(path, 'espeak-ng.exe') if os.path.exists(bin_path): os.environ['PATH'] = f"{path};{os.environ['PATH']}" break print("\nError: espeak-ng not found!") print("Install from https://github.com/espeak-ng/espeak-ng/releases") return False if not check_espeak_installed(): sys.exit(1) # --------------------------- # Model initialization (deferred so server can bind to PORT first for Render) # --------------------------- tts = None _tts_lock = threading.Lock() def get_tts(): """Load TTS model on first use so the Gradio server can start and bind to PORT immediately.""" global tts with _tts_lock: if tts is not None: return tts import torch from neuttsair.neutts import NeuTTSAir print("\nLoading TTS model (first use)...") if torch.cuda.is_available(): torch.cuda.empty_cache() torch.cuda.synchronize() print(f"GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.2f} GB total") project_root = os.path.abspath(os.path.dirname(__file__)) local_backbone = os.path.join(project_root, "Models", "neutts-air") def _resolve_hf_snapshot(root_path: str) -> str: try: for name in os.listdir(root_path): if name.startswith("models--"): models_dir = os.path.join(root_path, name) snapshots_dir = os.path.join(models_dir, "snapshots") if os.path.isdir(snapshots_dir): for snap in os.listdir(snapshots_dir): snap_path = os.path.join(snapshots_dir, snap) if os.path.exists(os.path.join(snap_path, "config.json")): print(f"Found model in snapshots: {snap_path}") return snap_path except Exception as e: print(f"Warning: Error resolving model path: {e}") return root_path # Use full transformers model (neuphonic/neutts-air) to avoid llama-cpp build on cloud backbone_arg = _resolve_hf_snapshot(local_backbone) if os.path.isdir(local_backbone) else "neuphonic/neutts-air" print(f"Using backbone: {backbone_arg}") print(f"Using codec: neuphonic/neucodec") if not torch.cuda.is_available(): backbone_device = "cpu" codec_device = "cpu" print("No CUDA GPU detected. Using CPU for backbone and codec.") else: backbone_device = "cuda" codec_device = "cuda" gpu_memory_gb = torch.cuda.get_device_properties(0).total_memory / 1024**3 if gpu_memory_gb <= 4.5: print(f"Detected {gpu_memory_gb:.2f} GB GPU. Loading codec on CPU to save GPU memory.") codec_device = "cpu" tts = NeuTTSAir( backbone_repo=backbone_arg, backbone_device=backbone_device, codec_repo="neuphonic/neucodec", codec_device=codec_device, ) if torch.cuda.is_available(): torch.cuda.empty_cache() print("TTS model loaded.") return tts # --------------------------- # Voice loading logic # --------------------------- VOICES = {"samples": {}} voice_dir = "samples" os.makedirs(voice_dir, exist_ok=True) for name in os.listdir(voice_dir): if name.endswith(".txt"): base = os.path.splitext(name)[0] txt_path = os.path.join(voice_dir, f"{base}.txt") wav_path = os.path.join(voice_dir, f"{base}.wav") pt_path = os.path.join(voice_dir, f"{base}.pt") if os.path.exists(txt_path) and (os.path.exists(wav_path) or os.path.exists(pt_path)): VOICES["samples"][base] = (txt_path, wav_path if os.path.exists(wav_path) else pt_path) def format_voice_choice(name): return f"Voice: {name}" # --------------------------- # Core functions # --------------------------- def load_reference(voice_name): import torch txt_path, audio_or_pt = VOICES["samples"][voice_name] ref_text = open(txt_path, "r").read().strip() if audio_or_pt.endswith(".pt"): ref_codes = torch.load(audio_or_pt) else: ref_codes = get_tts().encode_reference(audio_or_pt) return ref_text, ref_codes def split_text_into_chunks(text, max_length=150): """Split text into smaller chunks preserving sentence and punctuation structure.""" import re # Clean up the text first text = text.strip() if not text: return [] # Split by sentence-ending punctuation while preserving the punctuation sentence_pattern = r'([.!?]+)' parts = re.split(sentence_pattern, text) # Reconstruct sentences with their punctuation sentences = [] i = 0 while i < len(parts): if parts[i].strip(): sentence = parts[i].strip() # Add punctuation if it exists if i + 1 < len(parts) and parts[i + 1].strip(): sentence += parts[i + 1] i += 2 else: # If no punctuation follows, add a period (only once) if not sentence.endswith(('.', '!', '?')): sentence += '.' i += 1 sentences.append(sentence) else: i += 1 # ✅ FIX: Avoid adding the last part twice when no punctuation present if len(parts) > 0 and parts[-1].strip(): last_part = parts[-1].strip() # Add only if it's not already included if not any(last_part in s or s.startswith(last_part) for s in sentences): if not last_part.endswith(('.', '!', '?')): last_part += '.' sentences.append(last_part) # Group sentences into chunks chunks = [] current_chunk = "" for sentence in sentences: # If single sentence exceeds max_length, split by commas if len(sentence) > max_length: comma_parts = re.split(r'(,)', sentence) temp_sentence = "" i = 0 while i < len(comma_parts): part = comma_parts[i].strip() comma = comma_parts[i + 1] if i + 1 < len(comma_parts) else '' # If part is still too long, split by words if len(part) > max_length: words = part.split() temp_words = [] for word in words: test_chunk = ' '.join(temp_words + [word]) if len(test_chunk) > max_length and temp_words: if current_chunk: chunks.append(current_chunk.strip()) current_chunk = "" chunks.append(' '.join(temp_words)) temp_words = [word] else: temp_words.append(word) if temp_words: part = ' '.join(temp_words) + comma if current_chunk and len(current_chunk + ' ' + part) > max_length: chunks.append(current_chunk.strip()) current_chunk = part else: current_chunk += (' ' if current_chunk else '') + part else: part_with_comma = part + comma if current_chunk and len(current_chunk + ' ' + part_with_comma) > max_length: chunks.append(current_chunk.strip()) current_chunk = part_with_comma else: current_chunk += (' ' if current_chunk else '') + part_with_comma i += 2 if i + 1 < len(comma_parts) else 1 else: # Normal sentence that fits within limit if current_chunk and len(current_chunk + ' ' + sentence) > max_length: chunks.append(current_chunk.strip()) current_chunk = sentence else: current_chunk += (' ' if current_chunk else '') + sentence # CRITICAL: Always add remaining chunk at the end if current_chunk.strip(): chunks.append(current_chunk.strip()) # Filter out empty or duplicate chunks ✅ final_chunks = [] for chunk in chunks: if chunk.strip() and (not final_chunks or chunk.strip() != final_chunks[-1]): final_chunks.append(chunk.strip()) return final_chunks def process_chunk(chunk, ref_codes, ref_text, tts_model): """Process a single chunk of text and return the audio.""" try: return tts_model.infer(chunk, ref_codes, ref_text) except Exception as e: # Swallow individual chunk errors and return None to let caller handle it return None def estimate_generation_time(num_chunks): """Estimate the generation time based on number of chunks.""" # Assuming average of 3 seconds per chunk plus overhead return num_chunks * 3 + 2 def format_time(seconds): """Format seconds into a readable time string.""" if seconds < 60: return f"{seconds:.1f} seconds" minutes = int(seconds // 60) seconds = seconds % 60 return f"{minutes} minute{'s' if minutes != 1 else ''} {seconds:.1f} seconds" def generate_speech(text, voice_name, speed_control="1x"): try: import time # Input validations if not text or not text.strip(): yield 0, None, "❌ Error: Input text cannot be empty.", None return if not voice_name: yield 0, None, "❌ Error: No voice selected. Please select a voice.", None return if voice_name not in VOICES["samples"]: yield 0, None, f"❌ Error: Voice '{voice_name}' not found.", None return # Convert speed control string to float try: speed = float(speed_control.rstrip('x')) except ValueError: speed = 1.0 # Default to 1x if conversion fails # Load TTS model on first use (deferred so server can bind to PORT first) yield 5, None, "Loading TTS model (first time may take a few minutes)...", None try: tts_instance = get_tts() except Exception as e: yield 0, None, f"❌ Failed to load TTS model: {str(e)}", None return start_time = time.time() yield 10, None, "Loading voice reference...", None ref_text, ref_codes = load_reference(voice_name) # Split text into smaller chunks for better processing chunks = split_text_into_chunks(text) total_chunks = len(chunks) if total_chunks == 0: raise ValueError("No text to process") # Estimate total time estimated_time = estimate_generation_time(total_chunks) status = f"Estimated time to completion: {format_time(estimated_time)}\nProcessing {total_chunks} chunks..." yield 15, None, status, None # Process each chunk and store with its index chunk_results = [] for i, chunk in enumerate(chunks, 1): chunk_start = time.time() # Update progress progress = int(15 + (75 * i / total_chunks)) # Calculate and show time statistics elapsed_time = time.time() - start_time if i > 1: avg_time_per_chunk = elapsed_time / (i - 1) remaining_chunks = total_chunks - (i - 1) estimated_remaining = avg_time_per_chunk * remaining_chunks status = ( f"Processing chunk {i}/{total_chunks}\n" f"Progress: {progress}% complete\n" f"Est. remaining: {format_time(estimated_remaining)}" ) else: status = f"Processing chunk {i}/{total_chunks}\nProgress: {progress}% complete" yield progress, None, status, None # Generate audio for this chunk chunk_wav = process_chunk(chunk, ref_codes, ref_text, tts_instance) if chunk_wav is not None: # Store chunk with its index to maintain order chunk_results.append((i-1, chunk_wav)) if not chunk_results: raise ValueError("Failed to generate any audio") # Update status for final processing yield 90, None, "Finalizing audio...\nOrdering and combining chunks...", None # Sort chunks by their original index and extract the audio data chunk_results.sort(key=lambda x: x[0]) # Sort by index processed_chunks = [chunk[1] for chunk in chunk_results] # Extract audio data in order # Create silence once silence = np.zeros(int(24000 * 0.25)) # 0.25 seconds silence between chunks # Concatenate all chunks with silence in between all_wav = processed_chunks[0] for chunk_wav in processed_chunks[1:]: all_wav = np.concatenate([all_wav, silence, chunk_wav]) # Apply speed adjustment if needed (pitch-preserving time-stretching) if speed != 1.0: # Use librosa for pitch-preserving time-stretching # rate > 1 speeds up, rate < 1 slows down all_wav = librosa.effects.time_stretch(all_wav.astype(np.float32), rate=speed) # Save the final audio temp_path = "temp_output.wav" sf.write(temp_path, all_wav, 24000) # Calculate and show total time taken total_time = time.time() - start_time final_status = f"✅ Generation complete!\nTotal time: {format_time(total_time)}" yield 100, temp_path, final_status, None except Exception as e: error_status = f"❌ Error generating speech: {str(e)}" yield 0, None, error_status, None def delete_voice(voice_name): """Deletes a voice and its associated files.""" try: if voice_name not in VOICES["samples"]: return f"❌ Voice '{voice_name}' not found!", gr.update() txt_path = f"samples/{voice_name}.txt" wav_path = f"samples/{voice_name}.wav" pt_path = f"samples/{voice_name}.pt" # Remove files if they exist for path in [txt_path, wav_path, pt_path]: if os.path.exists(path): os.remove(path) # Remove from VOICES dictionary del VOICES["samples"][voice_name] remaining_voices = list(VOICES["samples"].keys()) new_selected = remaining_voices[0] if remaining_voices else None return f"✅ Voice '{voice_name}' deleted successfully!", gr.update(choices=remaining_voices, value=new_selected) except Exception as e: return f"❌ Error deleting voice: {e}", gr.update() def clone_voice(new_name, txt, audio_file): """Encodes a new reference voice and saves its embedding.""" try: # Input validations if not new_name or not new_name.strip(): return "❌ Error: New Voice name cannot be empty.", gr.update() if not txt or not txt.strip(): return "❌ Error: Reference text cannot be empty.", gr.update() if not audio_file: return "❌ Error: No reference audio file provided.", gr.update() if new_name in VOICES["samples"]: return f"❌ Error: Voice '{new_name}' already exists. Please choose a different name.", gr.update() try: tts_instance = get_tts() except Exception as e: return f"❌ Failed to load TTS model: {str(e)}", gr.update() os.makedirs("samples", exist_ok=True) txt_path = f"samples/{new_name}.txt" wav_path = f"samples/{new_name}.wav" pt_path = f"samples/{new_name}.pt" # Save reference text and audio with open(txt_path, "w") as f: f.write(txt.strip()) shutil.copy(audio_file, wav_path) ref_codes = tts_instance.encode_reference(wav_path) import torch torch.save(ref_codes, pt_path) VOICES["samples"][new_name] = (txt_path, pt_path) return f"✅ Voice '{new_name}' cloned and saved successfully!", gr.update(choices=list(VOICES["samples"].keys()), value=new_name) except Exception as e: return f"❌ Error cloning voice: {e}", gr.update() # --------------------------- # UI # --------------------------- # Custom CSS - consistent dark theme custom_css = """ footer {display: none !important;} .footer {display: none !important;} #api-docs-link {display: none !important;} /* Dark theme palette */ :root { --dark-bg: #1e1e2e; --dark-card: #252530; --dark-border: #3d3d4a; --text-primary: #e4e4e7; --text-muted: #a1a1aa; --accent: #818cf8; --accent-secondary: #a78bfa; } /* Modern header - dark gradient */ .heading-container { text-align: center; padding: 2rem 1rem; background: linear-gradient(135deg, #4338ca 0%, #6d28d9 50%, #4c1d95 100%); border-radius: 12px; margin-bottom: 2rem; border: 1px solid var(--dark-border); color: white; } .heading-container h1 { margin: 0; font-size: 2.5rem; font-weight: 700; color: white; } .heading-container h3 { margin: 0.5rem 0 0 0; font-size: 1.1rem; font-weight: 400; color: rgba(255, 255, 255, 0.9); } /* Cards - dark, same as rest of app */ .control-panel { background: var(--dark-card) !important; padding: 1.5rem; border-radius: 12px; border: 1px solid var(--dark-border); margin-bottom: 1rem; color: var(--text-primary) !important; } .control-panel label, .control-panel .label-wrap, .control-panel p, .control-panel h1, .control-panel h2, .control-panel h3, .control-panel h4, .control-panel span, .control-panel div, .control-panel li, .control-panel small, .control-panel .markdown, .control-panel [class*="markdown"], .control-panel * { color: var(--text-primary) !important; } .output-panel { background: var(--dark-card) !important; padding: 1.5rem; border-radius: 12px; border: 1px solid var(--dark-border); box-shadow: 0 2px 12px rgba(0,0,0,0.3); color: var(--text-primary) !important; } .output-panel label, .output-panel .label-wrap, .output-panel p, .output-panel h1, .output-panel h2, .output-panel h3, .output-panel h4, .output-panel span, .output-panel div, .output-panel li, .output-panel small, .output-panel .markdown, .output-panel [class*="markdown"], .output-panel * { color: var(--text-primary) !important; } /* Button styling */ .primary-button { width: 100%; padding: 0.75rem; font-size: 1.1rem; font-weight: 600; border-radius: 8px; margin-top: 1rem; } /* Progress bar styling */ .progress-container { margin: 1rem 0; } /* Status box - dark */ .status-box { background: var(--dark-bg) !important; border-radius: 8px; padding: 1rem; min-height: 80px; border: 1px solid var(--dark-border); color: var(--text-primary) !important; } /* Audio container - dark */ .audio-container { margin-top: 1rem; padding: 1rem; background: var(--dark-card) !important; border-radius: 8px; border: 1px solid var(--dark-border); color: var(--text-primary) !important; } .audio-container label, .audio-container .label-wrap, .audio-container h1, .audio-container h2, .audio-container h3, .audio-container h4, .audio-container span, .audio-container div, .audio-container .markdown, .audio-container [class*="markdown"], .audio-container * { color: var(--text-primary) !important; } /* Upload/drop zone text */ .control-panel [class*="upload"] span, .control-panel [class*="drop"] span, .output-panel [class*="upload"] span, .output-panel [class*="drop"] span { color: var(--text-muted) !important; } .info-text, .info-text * { color: var(--text-muted) !important; } /* Voice selection styling */ .voice-controls { display: flex; gap: 0.5rem; align-items: flex-end; } .tab-nav { margin-bottom: 1.5rem; } /* Instructions - dark card, same as panels */ .instructions-content { background: var(--dark-card) !important; padding: 2rem; border-radius: 12px; border: 1px solid var(--dark-border); line-height: 1.8; max-width: 1200px; margin: 0 auto; color: var(--text-primary) !important; } .instructions-content p, .instructions-content li, .instructions-content span, .instructions-content div { color: var(--text-primary) !important; } .instructions-content *:not(h1):not(h2):not(h3) { color: var(--text-primary) !important; } .instructions-content h1 { color: var(--accent) !important; border-bottom: 3px solid var(--accent); padding-bottom: 0.5rem; margin-bottom: 1.5rem; } .instructions-content h2 { color: var(--accent-secondary) !important; margin-top: 2rem; margin-bottom: 1rem; font-size: 1.5rem; } .instructions-content h3 { color: #93c5fd !important; margin-top: 1.5rem; margin-bottom: 0.75rem; font-size: 1.2rem; } .instructions-content ul, .instructions-content ol { margin-left: 1.5rem; margin-bottom: 1rem; } .instructions-content li { margin-bottom: 0.5rem; color: var(--text-primary) !important; } .instructions-content code { background: var(--dark-bg); color: var(--accent); padding: 0.2rem 0.4rem; border-radius: 4px; font-family: 'Courier New', monospace; font-size: 0.9em; border: 1px solid var(--dark-border); } .instructions-content hr { border: none; border-top: 2px solid var(--dark-border); margin: 2rem 0; } .instructions-content blockquote { border-left: 4px solid var(--accent); padding-left: 1rem; margin-left: 0; color: var(--text-muted) !important; font-style: italic; } """ with gr.Blocks(title="Virtual Lab Voice Cloning") as app: # Modern header with gradient with gr.Row(): with gr.Column(scale=1): gr.Markdown( """