"""Standalone web UI for LTX-2.3 video generation. Designed to be distributed inside a HuggingFace model repo. Place this file anywhere in the downloaded repo directory (or a sibling directory) and run: python webapp_standalone.py python webapp_standalone.py --port 8080 python webapp_standalone.py --compare-dir /path/to/second/model python webapp_standalone.py --model-name "LTX-2.3 24 GB RAM" The script auto-detects its own directory as the primary model. A second model can be supplied via --compare-dir for A/B comparison. Requirements: pip install flask pip install mlx mlx-lm ltx-core-mlx ltx-pipelines-mlx """ import argparse import json import subprocess import sys import threading import time import uuid from collections import defaultdict from pathlib import Path from flask import Flask, Response, jsonify, request, send_file # --------------------------------------------------------------------------- # CLI args — parsed at import time so the constants below can reference them # --------------------------------------------------------------------------- def _build_arg_parser() -> argparse.ArgumentParser: p = argparse.ArgumentParser( description="LTX-2.3 standalone video-generation web UI" ) p.add_argument("--port", type=int, default=7860, help="Port to listen on (default: 7860)") p.add_argument("--compare-dir", type=str, default=None, help="Optional path to a second model directory for A/B comparison") p.add_argument("--model-name", type=str, default=None, help="Display name for the primary model (default: directory name)") return p # Parse only our own args; anything unrecognised is left alone so Flask's own # dev-server reloader doesn't choke on our flags. _parser = _build_arg_parser() _args, _unknown = _parser.parse_known_args() # --------------------------------------------------------------------------- # Model discovery # --------------------------------------------------------------------------- # The primary model IS the directory that contains this script — i.e. the # downloaded HuggingFace repo root. PRIMARY_DIR = Path(__file__).parent.resolve() PRIMARY_NAME = _args.model_name or PRIMARY_DIR.name COMPARE_DIR: Path | None = Path(_args.compare_dir).resolve() if _args.compare_dir else None COMPARE_NAME: str | None = COMPARE_DIR.name if COMPARE_DIR else None # Required files that signal a valid, ready model directory. _REQUIRED_FILES = [ "transformer-distilled.safetensors", "connector.safetensors", "vae_decoder.safetensors", "audio_vae.safetensors", "vocoder.safetensors", ] def _model_ready(path: Path) -> bool: return path.is_dir() and all((path / f).exists() for f in _REQUIRED_FILES) def _model_missing_files(path: Path) -> list[str]: return [f for f in _REQUIRED_FILES if not (path / f).exists()] # Build the static model list once at startup. MODELS: list[dict] = [] _primary_ok = _model_ready(PRIMARY_DIR) MODELS.append({ "id": "primary", "label": PRIMARY_NAME, "dir": str(PRIMARY_DIR), "ready": _primary_ok, "missing": _model_missing_files(PRIMARY_DIR) if not _primary_ok else [], }) if COMPARE_DIR is not None: _compare_ok = _model_ready(COMPARE_DIR) MODELS.append({ "id": "compare", "label": COMPARE_NAME, "dir": str(COMPARE_DIR), "ready": _compare_ok, "missing": _model_missing_files(COMPARE_DIR) if not _compare_ok else [], }) # Convenience lookup: id → dir MODEL_DIRS: dict[str, str] = {m["id"]: m["dir"] for m in MODELS} # --------------------------------------------------------------------------- # Paths # --------------------------------------------------------------------------- # Videos are saved alongside this script (which lives in the model repo). RESULTS_DIR = PRIMARY_DIR / "webapp_videos" RESULTS_DIR.mkdir(parents=True, exist_ok=True) # generate_ltx.py is located relative to the RAM/RUN working tree. We find # it by searching upward from this file, then falling back to a path the user # can override via the GENERATE_SCRIPT env var. import os as _os def _find_generate_script() -> Path: env_override = _os.environ.get("GENERATE_SCRIPT") if env_override: return Path(env_override) # Walk up looking for experiments/flux_phase1/generate_ltx.py cur = Path(__file__).parent for _ in range(6): candidate = cur / "experiments" / "flux_phase1" / "generate_ltx.py" if candidate.exists(): return candidate cur = cur.parent # Last resort: assume this script is inside RAM/RUN/results// # so climb two levels to RAM/RUN/ return Path(__file__).parent.parent.parent / "experiments" / "flux_phase1" / "generate_ltx.py" GENERATE_SCRIPT = _find_generate_script() # --------------------------------------------------------------------------- # Job state # --------------------------------------------------------------------------- # job_id → {status, log_lines, video_path, started, finished, pid, params} JOBS: dict = {} JOBS_LOCK = threading.Lock() # --------------------------------------------------------------------------- # apply_mixed_precision_quantization # (kept here so the script is self-contained; also used by generate_ltx.py # which is invoked as a subprocess — but having it here lets us surface the # logic for anyone reading this file) # --------------------------------------------------------------------------- def apply_mixed_precision_quantization(model, weights, group_size: int = 64) -> None: """Per-layer mixed-precision quantization from a weight dict. Unlike ltx_core_mlx's apply_quantization (which uses a single detected bit width for all layers), this version detects each layer's bits from its packed weight shape and applies nn.quantize once per unique bit width. """ import mlx.nn as nn layer_bits: dict[str, int] = {} for key in weights: if not key.endswith(".scales"): continue layer = key[: -len(".scales")] w_key = layer + ".weight" if w_key not in weights: continue w_cols = weights[w_key].shape[-1] s_cols = weights[key].shape[-1] bits = round(w_cols * 32 / (s_cols * group_size)) if bits in (2, 3, 4, 5, 6, 8): layer_bits[layer] = bits if not layer_bits: return bits_to_layers: dict[int, set] = defaultdict(set) for layer, b in layer_bits.items(): bits_to_layers[b].add(layer) for bits, layers in sorted(bits_to_layers.items()): def _predicate(path: str, module, _layers=layers) -> bool: return path in _layers and isinstance(module, nn.Linear) nn.quantize(model, group_size=group_size, bits=bits, class_predicate=_predicate) total = sum(len(v) for v in bits_to_layers.values()) dist = {b: len(v) for b, v in sorted(bits_to_layers.items())} print(f" Mixed-precision quantization: {total} layers — {dist}", flush=True) # --------------------------------------------------------------------------- # Job runner # --------------------------------------------------------------------------- def _run_job(job_id: str, cmd: list[str], video_path: Path, cwd: str): with JOBS_LOCK: JOBS[job_id]["status"] = "running" try: proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, cwd=cwd, ) with JOBS_LOCK: JOBS[job_id]["pid"] = proc.pid for line in proc.stdout: line = line.rstrip("\n") with JOBS_LOCK: JOBS[job_id]["log_lines"].append(line) proc.wait() success = proc.returncode == 0 and video_path.exists() with JOBS_LOCK: JOBS[job_id]["status"] = "done" if success else "error" JOBS[job_id]["finished"] = time.time() if success: JOBS[job_id]["video_path"] = str(video_path) except Exception as exc: with JOBS_LOCK: JOBS[job_id]["log_lines"].append(f"[webapp error] {exc}") JOBS[job_id]["status"] = "error" JOBS[job_id]["finished"] = time.time() # --------------------------------------------------------------------------- # Flask app # --------------------------------------------------------------------------- app = Flask(__name__) @app.post("/generate") def generate(): data = request.get_json(force=True) prompt = data.get("prompt", "").strip() if not prompt: return jsonify(error="prompt required"), 400 model_id = data.get("model", MODELS[0]["id"]) if model_id not in MODEL_DIRS: return jsonify(error=f"unknown model: {model_id}"), 400 model_dir = MODEL_DIRS[model_id] if not Path(model_dir).exists(): return jsonify(error=f"model directory not found: {model_dir}"), 400 if not _model_ready(Path(model_dir)): missing = _model_missing_files(Path(model_dir)) return jsonify(error=f"model not ready, missing: {missing}"), 400 height = int(data.get("height", 480)) width = int(data.get("width", 704)) num_frames = int(data.get("num_frames", 65)) frame_rate = float(data.get("frame_rate", 24.0)) seed = int(data.get("seed", 42)) stage1 = data.get("stage1_steps") stage2 = data.get("stage2_steps") job_id = uuid.uuid4().hex[:8] video_path = RESULTS_DIR / f"gen_{job_id}.mp4" if not GENERATE_SCRIPT.exists(): return jsonify(error=( f"generate_ltx.py not found at {GENERATE_SCRIPT}. " "Set the GENERATE_SCRIPT env var to its absolute path." )), 500 # Determine the cwd for the subprocess. generate_ltx.py expects to be # run from the RAM/RUN/ directory so its relative imports resolve. script_cwd = str(GENERATE_SCRIPT.parent.parent.parent) cmd = [ sys.executable, str(GENERATE_SCRIPT), "--model-dir", model_dir, "--prompt", prompt, "--output", str(video_path), "--height", str(height), "--width", str(width), "--num-frames", str(num_frames), "--frame-rate", str(frame_rate), "--seed", str(seed), ] if stage1: cmd += ["--stage1-steps", str(stage1)] if stage2: cmd += ["--stage2-steps", str(stage2)] # Derive a friendly label for the model in job params model_label = next((m["label"] for m in MODELS if m["id"] == model_id), model_id) with JOBS_LOCK: JOBS[job_id] = { "status": "queued", "log_lines": [], "video_path": None, "started": time.time(), "finished": None, "pid": None, "params": { "prompt": prompt, "model": model_id, "model_label": model_label, "height": height, "width": width, "num_frames": num_frames, "frame_rate": frame_rate, "seed": seed, }, } t = threading.Thread( target=_run_job, args=(job_id, cmd, video_path, script_cwd), daemon=True ) t.start() return jsonify(job_id=job_id) @app.get("/stream/") def stream(job_id: str): """SSE live log stream for a running job.""" if job_id not in JOBS: return jsonify(error="not found"), 404 def generate_events(): sent = 0 while True: with JOBS_LOCK: lines = JOBS[job_id]["log_lines"] status = JOBS[job_id]["status"] new_lines = lines[sent:] sent += len(new_lines) for line in new_lines: yield f"data: {json.dumps({'line': line})}\n\n" if status in ("done", "error") and not new_lines: with JOBS_LOCK: final_status = JOBS[job_id]["status"] video = JOBS[job_id]["video_path"] yield f"data: {json.dumps({'done': True, 'status': final_status, 'video': video})}\n\n" return time.sleep(0.25) return Response( generate_events(), mimetype="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, ) @app.get("/video/") def video(job_id: str): with JOBS_LOCK: job = JOBS.get(job_id) if not job or not job["video_path"]: return jsonify(error="not found"), 404 p = Path(job["video_path"]) if not p.exists(): return jsonify(error="file missing"), 404 return send_file(str(p), mimetype="video/mp4", conditional=True) @app.get("/models") def list_models(): """Return the static model list (no polling needed — models are local).""" return jsonify([ { "id": m["id"], "label": m["label"], "dir": m["dir"], "ready": m["ready"], "missing": m["missing"], } for m in MODELS ]) @app.get("/jobs") def list_jobs(): with JOBS_LOCK: out = [] for jid, j in reversed(list(JOBS.items())): out.append({ "id": jid, "status": j["status"], "params": j["params"], "started": j["started"], "finished": j["finished"], "has_video": bool(j["video_path"]), }) return jsonify(out) # --------------------------------------------------------------------------- # HTML — single-file UI # --------------------------------------------------------------------------- def _build_html(models: list[dict]) -> str: # Build the model selector: single static label if one model,
""" else: options = "\n ".join( f'' for m in models ) # Default selection: first ready model default_id = next((m["id"] for m in models if m["ready"]), models[0]["id"]) model_block = f"""
Model
""" # Startup warning if primary model is not ready startup_warn = "" if not models[0]["ready"]: missing_list = ", ".join(models[0]["missing"]) startup_warn = f"""
Model directory is missing required files: {missing_list}. Run reformat_ltx_for_pipeline.py first.
""" # Models JSON for JS models_json = json.dumps([{"id": m["id"], "ready": m["ready"], "label": m["label"]} for m in models]) return f""" {models[0]['label']} — Video Generator

{models[0]['label']}

RAM Mixed-Precision
{startup_warn}
Video will appear here after generation
Output log
Recent generations
""" @app.get("/") def index(): return _build_html(MODELS) # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- if __name__ == "__main__": import webbrowser import threading as _threading port = _args.port # Print startup summary print(f"\nLTX-2.3 Standalone Web UI") print(f" Primary model : {PRIMARY_NAME}") print(f" Directory : {PRIMARY_DIR}") print(f" Ready : {_model_ready(PRIMARY_DIR)}") if COMPARE_DIR: print(f" Compare model : {COMPARE_NAME}") print(f" Compare dir : {COMPARE_DIR}") print(f" Compare ready : {_model_ready(COMPARE_DIR)}") print(f" Generate script: {GENERATE_SCRIPT} ({'found' if GENERATE_SCRIPT.exists() else 'NOT FOUND — set GENERATE_SCRIPT env var'})") print(f" Videos saved to: {RESULTS_DIR}") print(f"\n http://localhost:{port}\n") def _open(): time.sleep(1.0) webbrowser.open(f"http://localhost:{port}") _threading.Thread(target=_open, daemon=True).start() app.run(host="0.0.0.0", port=port, debug=False, threaded=True)