Spaces:
Sleeping
Sleeping
| """Unified video/image/webcam pipeline using core modules.""" | |
| import cv2 | |
| import numpy as np | |
| import os | |
| import time | |
| import tempfile | |
| from config import DEFAULT_SKIP_FRAMES, DEFAULT_DEPTH_EVERY | |
| from core import detector, depth, pose, risk_engine | |
| from core.detector import ObstacleTracker | |
| from core.depth import SlopeSmoother | |
| from core.guidance import GuidanceEngine | |
| from core.tts import TTSEngine | |
| from renderers import overlay as overlay_renderer | |
| from renderers import blind_nav as blind_nav_renderer | |
| def process_image(image_bgr, mode="overlay"): | |
| """Process a single image. Returns (rendered_bgr, risk_dict, guidance_dict).""" | |
| h, w = image_bgr.shape[:2] | |
| rgb = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2RGB) | |
| obstacles = detector.detect(image_bgr) | |
| gait, landmarks, foot_y = pose.analyze(rgb, w, h) | |
| depth_norm = depth.estimate_depth(rgb, h, w) | |
| slope_angle, slope_dir, terrain, gs = depth.estimate_slope(depth_norm, h, w, foot_y) | |
| risk = risk_engine.assess(slope_angle, slope_dir, gait, len(obstacles)) | |
| guide_engine = GuidanceEngine() | |
| guidance = guide_engine.compute(slope_angle, slope_dir, terrain, obstacles) | |
| if mode == "blind_nav": | |
| rendered = blind_nav_renderer.render( | |
| image_bgr, obstacles, slope_angle, slope_dir, terrain, depth_norm, guidance) | |
| else: | |
| depth_mini = overlay_renderer.render_depth_mini(depth_norm, w, h) | |
| rendered = overlay_renderer.render( | |
| image_bgr, obstacles, gait, landmarks, risk, guidance, depth_mini) | |
| return rendered, risk, guidance | |
| def process_video(video_path, mode="overlay", skip_frames=DEFAULT_SKIP_FRAMES, | |
| depth_every=DEFAULT_DEPTH_EVERY, track=True, tts=False): | |
| """Process video file. Returns output video path.""" | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| return None | |
| w, h = int(cap.get(3)), int(cap.get(4)) | |
| fps = cap.get(5) | |
| total = int(cap.get(7)) | |
| out_fps = fps / (skip_frames + 1) | |
| original_path = video_path # keep for audio mux later | |
| if mode == "blind_nav": | |
| out_size = (w * 2, h + 70) | |
| else: | |
| out_size = (w, h) | |
| out_path = os.path.join(tempfile.gettempdir(), f"nav_{mode}.mp4") | |
| writer = cv2.VideoWriter(out_path, cv2.VideoWriter_fourcc(*'mp4v'), out_fps, out_size) | |
| # Stateful components | |
| guide_engine = GuidanceEngine() | |
| slope_smoother = SlopeSmoother() | |
| obs_tracker = ObstacleTracker() | |
| tts_engine = TTSEngine(enabled=tts) | |
| voice_events = [] # (timestamp, text) for offline TTS rendering | |
| cached_depth = np.zeros((h, w), dtype=np.float32) | |
| cached_depth_mini = None | |
| frame_idx = processed = 0 | |
| t0 = time.time() | |
| print(f"Processing {total} frames ({w}x{h}) mode={mode}...", flush=True) | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| frame_idx += 1 | |
| if frame_idx % (skip_frames + 1) != 0: | |
| continue | |
| rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| # Obstacles with tracking | |
| obstacles = detector.detect(frame, track=track) | |
| new_obs, closing_obs, lost_ids = obs_tracker.update(obstacles) | |
| # Pose | |
| gait, landmarks, foot_y = pose.analyze(rgb, w, h) | |
| # Depth (every N frames) | |
| if frame_idx % (depth_every * (skip_frames + 1)) == 0 or frame_idx <= skip_frames + 1: | |
| cached_depth = depth.estimate_depth(rgb, h, w) | |
| raw_slope, raw_dir, raw_terrain, gs = depth.estimate_slope( | |
| cached_depth, h, w, foot_y) | |
| slope_smoother.update(raw_slope, raw_dir, raw_terrain) | |
| cached_depth_mini = overlay_renderer.render_depth_mini(cached_depth, w, h) | |
| # Use smoothed values | |
| s_angle = slope_smoother.angle | |
| s_dir = slope_smoother.direction | |
| s_terrain = slope_smoother.terrain | |
| s_trend = slope_smoother.trend | |
| # Risk + guidance | |
| risk = risk_engine.assess( | |
| s_angle, s_dir, gait, obstacles, | |
| slope_trend=s_trend, | |
| new_obstacles=new_obs, | |
| closing_obstacles=closing_obs) | |
| guidance = guide_engine.compute( | |
| s_angle, s_dir, s_terrain, obstacles, | |
| slope_trend=s_trend, | |
| new_obstacles=new_obs, | |
| closing_obstacles=closing_obs) | |
| # Render | |
| if mode == "blind_nav": | |
| rendered = blind_nav_renderer.render( | |
| frame, obstacles, s_angle, s_dir, s_terrain, | |
| cached_depth, guidance) | |
| else: | |
| rendered = overlay_renderer.render( | |
| frame, obstacles, gait, landmarks, risk, guidance, cached_depth_mini) | |
| writer.write(rendered) | |
| processed += 1 | |
| # TTS: speak if throttle allows | |
| if guide_engine.should_speak(guidance['voice'], s_angle): | |
| timestamp = frame_idx / fps | |
| voice_events.append((timestamp, guidance['voice'])) | |
| if tts_engine.enabled: | |
| tts_engine.speak(guidance['voice']) | |
| if processed % 20 == 0: | |
| el = time.time() - t0 | |
| new_str = f" new={len(new_obs)}" if new_obs else "" | |
| close_str = f" closing={len(closing_obs)}" if closing_obs else "" | |
| print(f" {frame_idx}/{total} | {processed / el:.1f}fps | " | |
| f"{s_dir} {s_angle:.0f}° [{s_trend}] | {s_terrain} | " | |
| f"obs={len(obstacles)}{new_str}{close_str} | risk={risk['risk']}", | |
| flush=True) | |
| cap.release() | |
| writer.release() | |
| tts_engine.shutdown() | |
| # Re-encode to H.264 and mux audio from original | |
| h264 = out_path.replace('.mp4', '_h264.mp4') | |
| # First: encode video to H.264 | |
| os.system(f'ffmpeg -y -i {out_path} -c:v libx264 -preset fast -crf 23 -pix_fmt yuv420p {h264} -loglevel error') | |
| if os.path.exists(h264) and os.path.getsize(h264) > 0: | |
| os.remove(out_path) | |
| out_path = h264 | |
| # Mux original audio (tempo-adjusted for frame skipping) | |
| if original_path and os.path.exists(original_path): | |
| with_audio = out_path.replace('.mp4', '_audio.mp4') | |
| tempo = skip_frames + 1 # audio needs to speed up to match skipped video | |
| os.system( | |
| f'ffmpeg -y -i {out_path} -i {original_path} ' | |
| f'-filter_complex "[1:a]atempo={tempo}[a]" ' | |
| f'-map 0:v -map "[a]" -c:v copy -c:a aac -shortest ' | |
| f'{with_audio} -loglevel error' | |
| ) | |
| if os.path.exists(with_audio) and os.path.getsize(with_audio) > 0: | |
| os.remove(out_path) | |
| out_path = with_audio | |
| print(f"Done! {processed} frames in {time.time() - t0:.1f}s", flush=True) | |
| # Bake TTS voice into video if we have events | |
| if voice_events: | |
| from core.tts_render import merge_voice_into_video | |
| duration = total / fps | |
| print(f"Rendering {len(voice_events)} voice events into video...", flush=True) | |
| out_path = merge_voice_into_video(out_path, voice_events, duration) | |
| return out_path | |
| def run_webcam(mode="overlay", camera_id=0, tts=True): | |
| """Live webcam/RTSP processing loop. Press 'q' to quit. | |
| Args: | |
| camera_id: 0 for USB webcam, or "rtsp://..." for IP camera | |
| """ | |
| from core.camera import CameraStream | |
| try: | |
| cam = CameraStream(camera_id) | |
| except RuntimeError as e: | |
| print(f"Error: {e}") | |
| return | |
| guide_engine = GuidanceEngine() | |
| slope_smoother = SlopeSmoother() | |
| obs_tracker = ObstacleTracker() | |
| tts_engine = TTSEngine(enabled=tts) | |
| cached_depth = None | |
| cached_depth_mini = None | |
| depth_counter = 0 | |
| frame_count = 0 | |
| t0 = time.time() | |
| print(f"Live mode={mode} tts={tts}. Press 'q' to quit.", flush=True) | |
| while cam.is_open: | |
| frame = cam.read() | |
| if frame is None: | |
| time.sleep(0.01) | |
| continue | |
| h, w = frame.shape[:2] | |
| rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| obstacles = detector.detect(frame, track=True) | |
| new_obs, closing_obs, _ = obs_tracker.update(obstacles) | |
| gait, landmarks, foot_y = pose.analyze(rgb, w, h) | |
| depth_counter += 1 | |
| if depth_counter % DEFAULT_DEPTH_EVERY == 0 or cached_depth is None: | |
| cached_depth = depth.estimate_depth(rgb, h, w) | |
| raw_slope, raw_dir, raw_terrain, _ = depth.estimate_slope(cached_depth, h, w, foot_y) | |
| slope_smoother.update(raw_slope, raw_dir, raw_terrain) | |
| cached_depth_mini = overlay_renderer.render_depth_mini(cached_depth, w, h) | |
| risk = risk_engine.assess( | |
| slope_smoother.angle, slope_smoother.direction, gait, obstacles, | |
| slope_trend=slope_smoother.trend, | |
| new_obstacles=new_obs, | |
| closing_obstacles=closing_obs) | |
| guidance = guide_engine.compute( | |
| slope_smoother.angle, slope_smoother.direction, slope_smoother.terrain, | |
| obstacles, slope_trend=slope_smoother.trend, | |
| new_obstacles=new_obs, closing_obstacles=closing_obs) | |
| if mode == "blind_nav": | |
| rendered = blind_nav_renderer.render( | |
| frame, obstacles, slope_smoother.angle, slope_smoother.direction, | |
| slope_smoother.terrain, cached_depth if cached_depth is not None | |
| else np.zeros((h, w), dtype=np.float32), guidance) | |
| else: | |
| rendered = overlay_renderer.render( | |
| frame, obstacles, gait, landmarks, risk, guidance, cached_depth_mini) | |
| if tts_engine.enabled and guide_engine.should_speak(guidance['voice'], slope_smoother.angle): | |
| tts_engine.speak(guidance['voice']) | |
| frame_count += 1 | |
| if frame_count % 30 == 0: | |
| fps = frame_count / (time.time() - t0) | |
| cv2.setWindowTitle("Navigation", f"Navigation | {fps:.1f} FPS | {risk['risk']}") | |
| cv2.imshow("Navigation", rendered) | |
| key = cv2.waitKey(1) & 0xFF | |
| if key == ord('q'): | |
| break | |
| cam.release() | |
| cv2.destroyAllWindows() | |
| tts_engine.shutdown() | |
| print(f"Session: {frame_count} frames in {time.time() - t0:.1f}s", flush=True) | |
| if __name__ == "__main__": | |
| import sys | |
| if len(sys.argv) < 2: | |
| print("Usage:") | |
| print(" python pipeline.py <video.mp4> [--mode overlay|blind_nav] [--tts]") | |
| print(" python pipeline.py --webcam [--cam 0] [--mode overlay|blind_nav] [--tts]") | |
| print(" python pipeline.py --webcam --cam rtsp://user:pass@ip:554/stream [--tts]") | |
| sys.exit(1) | |
| mode = "overlay" | |
| if "--mode" in sys.argv: | |
| idx = sys.argv.index("--mode") | |
| mode = sys.argv[idx + 1] if idx + 1 < len(sys.argv) else "overlay" | |
| use_tts = "--tts" in sys.argv | |
| cam_source = 0 | |
| if "--cam" in sys.argv: | |
| idx = sys.argv.index("--cam") | |
| val = sys.argv[idx + 1] if idx + 1 < len(sys.argv) else "0" | |
| cam_source = val if val.startswith("rtsp") else int(val) | |
| if sys.argv[1] == "--webcam": | |
| run_webcam(mode=mode, camera_id=cam_source, tts=use_tts) | |
| else: | |
| out = process_video(sys.argv[1], mode=mode, tts=use_tts) | |
| if out: | |
| print(f"Output: {out}") | |