"""Benchmark each pipeline component to identify bottlenecks. Usage: python benchmark.py [image_path] [--depth-model small|base|v2-small|v2-base] [--size 256|384|512] """ import cv2 import time import sys import os import numpy as np def bench(label, fn, runs=3): """Run fn multiple times, print avg latency.""" times = [] for i in range(runs): t0 = time.time() result = fn() times.append(time.time() - t0) avg = sum(times) / len(times) fps = 1.0 / avg if avg > 0 else 999 print(f" {label:30s} {avg*1000:7.1f}ms ({fps:.1f} fps)") return result, avg def main(): # Parse args img_path = None for a in sys.argv[1:]: if not a.startswith("--") and os.path.exists(a): img_path = a if "--depth-model" in sys.argv: idx = sys.argv.index("--depth-model") os.environ["NAV_DEPTH_MODEL"] = sys.argv[idx + 1] if "--size" in sys.argv: idx = sys.argv.index("--size") os.environ["NAV_DEPTH_INPUT_SIZE"] = sys.argv[idx + 1] # Use a test image or generate one if img_path: img = cv2.imread(img_path) else: test_dir = "/mnt/c/Visual/test_images" candidates = [f for f in os.listdir(test_dir) if f.endswith('.jpg') and '_pose' not in f] if candidates: img = cv2.imread(os.path.join(test_dir, candidates[0])) else: print("No test image found. Pass an image path.") return h, w = img.shape[:2] rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) from config import DEPTH_MODEL, DEPTH_INPUT_SIZE, DEVICE print(f"Image: {w}x{h}") print(f"Depth model: {DEPTH_MODEL} input_size: {DEPTH_INPUT_SIZE} device: {DEVICE}") print(f"{'='*60}") # Warm up + benchmark each component from core import detector, depth, pose, risk_engine print("\n[1] YOLO Obstacle Detection") obstacles, t_yolo = bench("yolo detect", lambda: detector.detect(img)) print("\n[2] YOLO + ByteTrack") _, t_track = bench("yolo track", lambda: detector.detect(img, track=True)) print("\n[3] BlazePose Gait Analysis") pose_result, t_pose = bench("pose analyze", lambda: pose.analyze(rgb, w, h)) gait, landmarks, foot_y = pose_result print("\n[4] Depth Estimation") depth_norm, t_depth = bench("depth estimate", lambda: depth.estimate_depth(rgb, h, w)) print("\n[5] Slope Analysis (on cached depth)") _, t_slope = bench("slope estimate", lambda: depth.estimate_slope(depth_norm, h, w, foot_y)) print("\n[6] Risk Assessment") _, t_risk = bench("risk assess", lambda: risk_engine.assess(0.0, "FLAT", gait, len(obstacles))) print(f"\n{'='*60}") total = t_yolo + t_pose + t_depth + t_slope + t_risk print(f" {'TOTAL (per frame)':30s} {total*1000:7.1f}ms ({1.0/total:.1f} fps)") print(f" {'Without depth':30s} {(total-t_depth)*1000:7.1f}ms ({1.0/(total-t_depth):.1f} fps)") print(f"\nDepth is {t_depth/total*100:.0f}% of total latency.") if t_depth > 0.15: print("\nTips to speed up depth:") print(f" - Current input size: {DEPTH_INPUT_SIZE}. Try: NAV_DEPTH_INPUT_SIZE=256") print(f" - Current model: {DEPTH_MODEL}. 'small' is fastest.") if DEVICE == "cpu": print(" - Running on CPU. Set NAV_DEVICE=cuda if GPU available.") print(" - Export to ONNX: set NAV_DEPTH_ONNX=/path/to/model.onnx") if __name__ == "__main__": main()