"""Live Mobile Camera Navigation Assistant. Connects to phone camera via IP Webcam app and provides real-time obstacle detection, path analysis, slope estimation, and voice guidance. Usage: 1. Install "IP Webcam" app on Android (or similar on iPhone) 2. Start the app, note the URL (e.g., http://192.168.1.5:8080) 3. Run: python app_mobile.py http://192.168.1.5:8080 Or: python app_mobile.py (uses laptop webcam) """ import cv2 import numpy as np import sys import os import time import math sys.path.insert(0, os.path.dirname(__file__)) from config import DEFAULT_DEPTH_EVERY from core import detector, depth, pose, risk_engine from core.depth import SlopeSmoother from core.detector import ObstacleTracker from core.guidance import GuidanceEngine from core.tts import TTSEngine from core.camera import CameraStream def draw_path_zone(frame, obstacles, slope_dir, slope_angle): """Draw safe walking path overlay on the frame.""" h, w = frame.shape[:2] overlay = frame.copy() # Define path corridor (center third, bottom half) path_left = w // 4 path_right = 3 * w // 4 path_top = h // 2 path_bottom = h # Check if path is blocked blocked_zones = [] for ob in obstacles: x1, y1, x2, y2 = ob['box'] if y2 > path_top: # obstacle in lower half blocked_zones.append(ob) if not blocked_zones: # Green path — clear pts = np.array([ [path_left + 30, path_top], [path_right - 30, path_top], [path_right + 20, path_bottom], [path_left - 20, path_bottom], ]) cv2.fillPoly(overlay, [pts], (0, 120, 0)) cv2.addWeighted(overlay, 0.2, frame, 0.8, 0, frame) cv2.polylines(frame, [pts], True, (0, 255, 0), 2) cv2.putText(frame, "CLEAR PATH", (w // 2 - 60, path_top + 30), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) else: # Find which side is clear left_blocked = any(ob['box'][0] < w // 2 for ob in blocked_zones) right_blocked = any(ob['box'][2] > w // 2 for ob in blocked_zones) if not left_blocked: # Suggest left path pts = np.array([[10, path_top], [w // 3, path_top], [w // 3 + 20, path_bottom], [10, path_bottom]]) cv2.fillPoly(overlay, [pts], (0, 120, 0)) cv2.addWeighted(overlay, 0.25, frame, 0.75, 0, frame) cv2.arrowedLine(frame, (w // 2, h // 2), (w // 6, h // 2), (0, 255, 0), 3, tipLength=0.3) cv2.putText(frame, "GO LEFT", (20, path_top + 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2) elif not right_blocked: # Suggest right path pts = np.array([[2 * w // 3, path_top], [w - 10, path_top], [w - 10, path_bottom], [2 * w // 3 - 20, path_bottom]]) cv2.fillPoly(overlay, [pts], (0, 120, 0)) cv2.addWeighted(overlay, 0.25, frame, 0.75, 0, frame) cv2.arrowedLine(frame, (w // 2, h // 2), (5 * w // 6, h // 2), (0, 255, 0), 3, tipLength=0.3) cv2.putText(frame, "GO RIGHT", (2 * w // 3, path_top + 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2) else: # Both sides blocked cv2.rectangle(overlay, (0, path_top), (w, path_bottom), (0, 0, 150), -1) cv2.addWeighted(overlay, 0.3, frame, 0.7, 0, frame) cv2.putText(frame, "STOP - PATH BLOCKED", (w // 2 - 120, h // 2), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2) # Slope direction arrow at bottom center if abs(slope_angle) > 3: acx, acy = w // 2, h - 40 arad = math.radians(slope_angle) ax = int(acx + math.cos(arad) * 35) ay = int(acy - math.sin(arad) * 35) cv2.arrowedLine(frame, (acx, acy), (ax, ay), (0, 255, 255), 3, tipLength=0.3) cv2.putText(frame, f"{slope_dir} {slope_angle:.0f} deg", (acx - 60, acy + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0, 255, 255), 1) def draw_obstacles_bold(frame, obstacles): """Draw highly visible obstacle markers.""" for ob in obstacles: x1, y1, x2, y2 = ob['box'] u = ob['proximity'] # Color by urgency if u > 0.7: color = (0, 0, 255) # red = NEAR label_bg = (0, 0, 200) elif u > 0.4: color = (0, 180, 255) # orange = MID label_bg = (0, 140, 200) else: color = (0, 200, 0) # green = FAR label_bg = (0, 160, 0) # Semi-transparent fill overlay = frame.copy() cv2.rectangle(overlay, (x1, y1), (x2, y2), color, -1) alpha = 0.35 if u > 0.7 else 0.2 cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0, frame) # Thick border cv2.rectangle(frame, (x1, y1), (x2, y2), color, 3) # Label label = f"{ob['label'].upper()} {ob['dist']}" if 'track_id' in ob: label = f"#{ob['track_id']} {label}" (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2) cv2.rectangle(frame, (x1, y1 - th - 10), (x1 + tw + 6, y1), label_bg, -1) cv2.putText(frame, label, (x1 + 3, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) # Direction arrow cx, cy = ob['center'] if ob['direction'] == "LEFT": cv2.arrowedLine(frame, (cx + 50, cy), (cx - 50, cy), (255, 255, 255), 3, tipLength=0.4) elif ob['direction'] == "RIGHT": cv2.arrowedLine(frame, (cx - 50, cy), (cx + 50, cy), (255, 255, 255), 3, tipLength=0.4) else: cv2.arrowedLine(frame, (cx, cy - 40), (cx, cy + 40), (0, 0, 255), 3, tipLength=0.4) # Distance warning for NEAR if u > 0.7: cv2.putText(frame, "!! CLOSE !!", (cx - 40, y2 + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2) def draw_hud(frame, risk_dict, guidance, fps_val): """Draw navigation HUD.""" h, w = frame.shape[:2] # Top bar cv2.rectangle(frame, (0, 0), (w, 80), (0, 0, 0), -1) # Risk badge risk = risk_dict['risk'] rc = {'SAFE': (0, 180, 0), 'LOW': (0, 220, 0), 'MEDIUM': (0, 180, 220), 'HIGH': (0, 0, 220)}[risk] cv2.rectangle(frame, (5, 5), (180, 38), rc, -1) cv2.putText(frame, f"RISK: {risk} ({risk_dict['score']})", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) # Slope cv2.putText(frame, f"Slope: {risk_dict['terrain']} {risk_dict['terrain_slope']:.0f} deg", (190, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (200, 200, 200), 1) # Guidance cv2.putText(frame, f"Step: {guidance['step']} | Lean: {guidance['lean']} | Knee: {guidance['knee_rec']} deg", (5, 55), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0, 255, 255), 1) # FPS cv2.putText(frame, f"{fps_val:.0f}fps", (w - 60, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (100, 100, 100), 1) # Bottom voice bar cv2.rectangle(frame, (0, h - 45), (w, h), (20, 20, 40), -1) cv2.putText(frame, "VOICE:", (8, h - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (100, 180, 255), 1) cv2.putText(frame, guidance['voice'][:60], (75, h - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1) def main(): # Parse camera source if len(sys.argv) > 1 and sys.argv[1] != "--help": source = sys.argv[1] # IP Webcam app URLs if source.startswith("http") and not source.endswith("/video"): source = source.rstrip("/") + "/video" print(f"Connecting to: {source}") else: source = 0 print("Using laptop webcam (pass phone URL as argument)") try: cam = CameraStream(source) except RuntimeError as e: print(f"Error: {e}") print("\nUsage:") print(" python app_mobile.py # laptop webcam") print(" python app_mobile.py http://192.168.1.5:8080 # IP Webcam app") return # Init components guide = GuidanceEngine() smoother = SlopeSmoother() tracker = ObstacleTracker() tts = TTSEngine(enabled=True) cached_depth = None cached_depth_mini = None frame_count = 0 fps_val = 0.0 t0 = time.time() print(f"\n{'='*50}") print(f"LIVE NAVIGATION ASSISTANT") print(f"Camera: {cam.w}x{cam.h} @ {cam.native_fps:.0f}fps") print(f"Press 'q' to quit") print(f"{'='*50}\n") while cam.is_open: frame = cam.read() if frame is None: time.sleep(0.01) continue h, w = frame.shape[:2] rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # Detect + track obstacles = detector.detect(frame, track=True) new_obs, closing_obs, _ = tracker.update(obstacles) # Pose gait, landmarks, foot_y = pose.analyze(rgb, w, h) # Depth every 5th frame frame_count += 1 if frame_count % DEFAULT_DEPTH_EVERY == 0 or cached_depth is None: cached_depth = depth.estimate_depth(rgb, h, w) raw_s, raw_d, raw_t, _ = depth.estimate_slope(cached_depth, h, w, foot_y) smoother.update(raw_s, raw_d, raw_t) # Risk + guidance risk_dict = risk_engine.assess( smoother.angle, smoother.direction, gait, obstacles, slope_trend=smoother.trend, new_obstacles=new_obs, closing_obstacles=closing_obs) guidance = guide.compute( smoother.angle, smoother.direction, smoother.terrain, obstacles, slope_trend=smoother.trend, new_obstacles=new_obs, closing_obstacles=closing_obs) # Render out = frame.copy() draw_path_zone(out, obstacles, smoother.direction, smoother.angle) draw_obstacles_bold(out, obstacles) draw_hud(out, risk_dict, guidance, fps_val) # TTS if tts.enabled and guide.should_speak(guidance['voice'], smoother.angle): tts.speak(guidance['voice']) # FPS if frame_count % 10 == 0: fps_val = frame_count / (time.time() - t0) cv2.imshow("Navigation Assistant", out) key = cv2.waitKey(1) & 0xFF if key == ord('q'): break cam.release() cv2.destroyAllWindows() tts.shutdown() print(f"\nSession: {frame_count} frames in {time.time() - t0:.1f}s ({fps_val:.1f} fps)") if __name__ == "__main__": main()