"""Live Phone Camera → Real-time Navigation Assistant. Phone streams camera on left, annotated output appears on right. Uses HTTPS so browser allows camera access. """ import gradio as gr import cv2 import numpy as np from PIL import Image as PILImage import os, sys, math, time sys.path.insert(0, os.path.dirname(__file__)) from core import detector, depth, pose, risk_engine from core.depth import SlopeSmoother from core.detector import ObstacleTracker from core.guidance import GuidanceEngine # Persistent state across frames _st = {} def _init(): if 'ready' not in _st: _st['sm'] = SlopeSmoother() _st['tr'] = ObstacleTracker() _st['gu'] = GuidanceEngine() _st['dn'] = None _st['n'] = 0 _st['ready'] = True def _reset(): _st.clear() return None, "Reset. Point your camera and start." def process_frame(frame): if frame is None: return None, "Waiting for camera..." _init() img = cv2.cvtColor(np.array(frame), cv2.COLOR_RGB2BGR) h, w = img.shape[:2] rgb = np.array(frame) obs = detector.detect(img, track=True) new_o, close_o, _ = _st['tr'].update(obs) gait, lms, foot_y = pose.analyze(rgb, w, h) _st['n'] += 1 if _st['n'] % 5 == 0 or _st['dn'] is None: _st['dn'] = depth.estimate_depth(rgb, h, w) rs, rd, rt, _ = depth.estimate_slope(_st['dn'], h, w, foot_y) _st['sm'].update(rs, rd, rt) _st['walls'] = depth.detect_walls(_st['dn'], h, w) # Merge wall/door detections into obstacles list walls = _st.get('walls', []) obs = obs + walls sm = _st['sm'] sa, sd = sm.angle, sm.direction risk_d = risk_engine.assess(sa, sd, gait, obs, slope_trend=sm.trend, new_obstacles=new_o, closing_obstacles=close_o) guid = _st['gu'].compute(sa, sd, sm.terrain, obs, slope_trend=sm.trend, new_obstacles=new_o, closing_obstacles=close_o) out = img.copy() # ── Obstacles ── for ob in obs: x1,y1,x2,y2 = ob['box'] u = ob['proximity'] color = (0,0,255) if u>0.7 else (0,180,255) if u>0.4 else (0,200,0) ov = out.copy() cv2.rectangle(ov,(x1,y1),(x2,y2),color,-1) cv2.addWeighted(ov, 0.35 if u>0.7 else 0.2, out, 0.65 if u>0.7 else 0.8, 0, out) cv2.rectangle(out,(x1,y1),(x2,y2),color,3) lbl = f'{ob["label"].upper()} {ob["dist"]}' (tw,th),_ = cv2.getTextSize(lbl, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2) cv2.rectangle(out,(x1,y1-th-10),(x1+tw+6,y1),color,-1) cv2.putText(out,lbl,(x1+3,y1-5),cv2.FONT_HERSHEY_SIMPLEX,0.6,(255,255,255),2) cx,cy = ob['center'] if ob['direction']=='LEFT': cv2.arrowedLine(out,(cx+50,cy),(cx-50,cy),(255,255,255),3,tipLength=0.4) elif ob['direction']=='RIGHT': cv2.arrowedLine(out,(cx-50,cy),(cx+50,cy),(255,255,255),3,tipLength=0.4) else: cv2.arrowedLine(out,(cx,cy-40),(cx,cy+40),(0,0,255),3,tipLength=0.4) if u > 0.7: cv2.putText(out,'!! CLOSE !!',(cx-45,y2+18),cv2.FONT_HERSHEY_SIMPLEX,0.5,(0,0,255),2) # ── Path ── blocked = [o for o in obs if o['box'][3] > h//2] if not blocked: pts = np.array([[w//4+30,h//2],[3*w//4-30,h//2],[3*w//4+20,h],[w//4-20,h]]) ov2 = out.copy() cv2.fillPoly(ov2,[pts],(0,120,0)) cv2.addWeighted(ov2,0.15,out,0.85,0,out) cv2.polylines(out,[pts],True,(0,255,0),2) cv2.putText(out,'CLEAR',(w//2-30,h//2+30),cv2.FONT_HERSHEY_SIMPLEX,0.6,(0,255,0),2) else: lb = any(o['box'][0] < w//2 for o in blocked) rb = any(o['box'][2] > w//2 for o in blocked) if not lb: cv2.arrowedLine(out,(w//2,h*3//4),(w//6,h*3//4),(0,255,0),4,tipLength=0.3) cv2.putText(out,'GO LEFT',(20,h//2+30),cv2.FONT_HERSHEY_SIMPLEX,0.8,(0,255,0),2) elif not rb: cv2.arrowedLine(out,(w//2,h*3//4),(5*w//6,h*3//4),(0,255,0),4,tipLength=0.3) cv2.putText(out,'GO RIGHT',(2*w//3-20,h//2+30),cv2.FONT_HERSHEY_SIMPLEX,0.8,(0,255,0),2) else: cv2.putText(out,'STOP',(w//2-40,h//2),cv2.FONT_HERSHEY_SIMPLEX,1.2,(0,0,255),3) # ── Slope arrow ── if abs(sa) > 3: cx2,cy2 = w//2, h-50 ax = int(cx2 + math.cos(math.radians(sa))*40) ay = int(cy2 - math.sin(math.radians(sa))*40) cv2.arrowedLine(out,(cx2,cy2),(ax,ay),(0,255,255),3,tipLength=0.3) # ── HUD ── r = risk_d['risk'] rc = {'SAFE':(0,180,0),'LOW':(0,220,0),'MEDIUM':(0,180,220),'HIGH':(0,0,220)}[r] cv2.rectangle(out,(0,0),(w,55),(0,0,0),-1) cv2.rectangle(out,(5,5),(170,32),rc,-1) cv2.putText(out,f'RISK: {r} ({risk_d["score"]})',(10,26),cv2.FONT_HERSHEY_SIMPLEX,0.5,(255,255,255),2) cv2.putText(out,f'{sd} {sa:.0f}deg {sm.terrain}',(180,26),cv2.FONT_HERSHEY_SIMPLEX,0.38,(200,200,200),1) cv2.putText(out,f'Step:{guid["step"]} Lean:{guid["lean"]} Knee:{guid["knee_rec"]}', (5,48),cv2.FONT_HERSHEY_SIMPLEX,0.33,(0,255,255),1) # ── Voice bar ── cv2.rectangle(out,(0,h-30),(w,h),(20,20,40),-1) cv2.putText(out,guid['voice'][:50],(8,h-10),cv2.FONT_HERSHEY_SIMPLEX,0.38,(255,255,255),1) # Status text status = f"🔊 **{guid['voice']}**\n\n" status += f"Risk: **{r}** | Slope: {sd} {sa:.0f}° | Obstacles: {len(obs)} | Step: {guid['step']} | Lean: {guid['lean']}" return PILImage.fromarray(cv2.cvtColor(out, cv2.COLOR_BGR2RGB)), status with gr.Blocks(title="Live Navigation", head=""" """) as demo: gr.Markdown("# 🦯 Live Navigation Assistant\nPoint your phone camera → see real-time obstacle & path guidance") with gr.Row(): cam_in = gr.Image(sources=["webcam"], streaming=True, type="pil", label="📹 Live Camera") cam_out = gr.Image(label="🎯 Navigation View") status = gr.Markdown("Point camera and start streaming...") cam_in.stream(fn=process_frame, inputs=cam_in, outputs=[cam_out, status]) gr.Button("🔄 Reset").click(fn=_reset, outputs=[cam_out, status]) if __name__ == "__main__": # Preload models print("Preloading models...", flush=True) dummy = np.zeros((100, 100, 3), dtype=np.uint8) detector.detect(dummy) depth.estimate_depth(dummy, 100, 100) pose.analyze(dummy, 100, 100) print("Models ready!", flush=True) demo.launch( server_name="0.0.0.0", server_port=7860, show_error=True, )