blind-nav / pipeline.py
Ramkumarnn's picture
Blind navigation MVP
00e634a
"""Unified video/image/webcam pipeline using core modules."""
import cv2
import numpy as np
import os
import time
import tempfile
from config import DEFAULT_SKIP_FRAMES, DEFAULT_DEPTH_EVERY
from core import detector, depth, pose, risk_engine
from core.detector import ObstacleTracker
from core.depth import SlopeSmoother
from core.guidance import GuidanceEngine
from core.tts import TTSEngine
from renderers import overlay as overlay_renderer
from renderers import blind_nav as blind_nav_renderer
def process_image(image_bgr, mode="overlay"):
"""Process a single image. Returns (rendered_bgr, risk_dict, guidance_dict)."""
h, w = image_bgr.shape[:2]
rgb = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2RGB)
obstacles = detector.detect(image_bgr)
gait, landmarks, foot_y = pose.analyze(rgb, w, h)
depth_norm = depth.estimate_depth(rgb, h, w)
slope_angle, slope_dir, terrain, gs = depth.estimate_slope(depth_norm, h, w, foot_y)
risk = risk_engine.assess(slope_angle, slope_dir, gait, len(obstacles))
guide_engine = GuidanceEngine()
guidance = guide_engine.compute(slope_angle, slope_dir, terrain, obstacles)
if mode == "blind_nav":
rendered = blind_nav_renderer.render(
image_bgr, obstacles, slope_angle, slope_dir, terrain, depth_norm, guidance)
else:
depth_mini = overlay_renderer.render_depth_mini(depth_norm, w, h)
rendered = overlay_renderer.render(
image_bgr, obstacles, gait, landmarks, risk, guidance, depth_mini)
return rendered, risk, guidance
def process_video(video_path, mode="overlay", skip_frames=DEFAULT_SKIP_FRAMES,
depth_every=DEFAULT_DEPTH_EVERY, track=True, tts=False):
"""Process video file. Returns output video path."""
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return None
w, h = int(cap.get(3)), int(cap.get(4))
fps = cap.get(5)
total = int(cap.get(7))
out_fps = fps / (skip_frames + 1)
original_path = video_path # keep for audio mux later
if mode == "blind_nav":
out_size = (w * 2, h + 70)
else:
out_size = (w, h)
out_path = os.path.join(tempfile.gettempdir(), f"nav_{mode}.mp4")
writer = cv2.VideoWriter(out_path, cv2.VideoWriter_fourcc(*'mp4v'), out_fps, out_size)
# Stateful components
guide_engine = GuidanceEngine()
slope_smoother = SlopeSmoother()
obs_tracker = ObstacleTracker()
tts_engine = TTSEngine(enabled=tts)
voice_events = [] # (timestamp, text) for offline TTS rendering
cached_depth = np.zeros((h, w), dtype=np.float32)
cached_depth_mini = None
frame_idx = processed = 0
t0 = time.time()
print(f"Processing {total} frames ({w}x{h}) mode={mode}...", flush=True)
while True:
ret, frame = cap.read()
if not ret:
break
frame_idx += 1
if frame_idx % (skip_frames + 1) != 0:
continue
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Obstacles with tracking
obstacles = detector.detect(frame, track=track)
new_obs, closing_obs, lost_ids = obs_tracker.update(obstacles)
# Pose
gait, landmarks, foot_y = pose.analyze(rgb, w, h)
# Depth (every N frames)
if frame_idx % (depth_every * (skip_frames + 1)) == 0 or frame_idx <= skip_frames + 1:
cached_depth = depth.estimate_depth(rgb, h, w)
raw_slope, raw_dir, raw_terrain, gs = depth.estimate_slope(
cached_depth, h, w, foot_y)
slope_smoother.update(raw_slope, raw_dir, raw_terrain)
cached_depth_mini = overlay_renderer.render_depth_mini(cached_depth, w, h)
# Use smoothed values
s_angle = slope_smoother.angle
s_dir = slope_smoother.direction
s_terrain = slope_smoother.terrain
s_trend = slope_smoother.trend
# Risk + guidance
risk = risk_engine.assess(
s_angle, s_dir, gait, obstacles,
slope_trend=s_trend,
new_obstacles=new_obs,
closing_obstacles=closing_obs)
guidance = guide_engine.compute(
s_angle, s_dir, s_terrain, obstacles,
slope_trend=s_trend,
new_obstacles=new_obs,
closing_obstacles=closing_obs)
# Render
if mode == "blind_nav":
rendered = blind_nav_renderer.render(
frame, obstacles, s_angle, s_dir, s_terrain,
cached_depth, guidance)
else:
rendered = overlay_renderer.render(
frame, obstacles, gait, landmarks, risk, guidance, cached_depth_mini)
writer.write(rendered)
processed += 1
# TTS: speak if throttle allows
if guide_engine.should_speak(guidance['voice'], s_angle):
timestamp = frame_idx / fps
voice_events.append((timestamp, guidance['voice']))
if tts_engine.enabled:
tts_engine.speak(guidance['voice'])
if processed % 20 == 0:
el = time.time() - t0
new_str = f" new={len(new_obs)}" if new_obs else ""
close_str = f" closing={len(closing_obs)}" if closing_obs else ""
print(f" {frame_idx}/{total} | {processed / el:.1f}fps | "
f"{s_dir} {s_angle:.0f}° [{s_trend}] | {s_terrain} | "
f"obs={len(obstacles)}{new_str}{close_str} | risk={risk['risk']}",
flush=True)
cap.release()
writer.release()
tts_engine.shutdown()
# Re-encode to H.264 and mux audio from original
h264 = out_path.replace('.mp4', '_h264.mp4')
# First: encode video to H.264
os.system(f'ffmpeg -y -i {out_path} -c:v libx264 -preset fast -crf 23 -pix_fmt yuv420p {h264} -loglevel error')
if os.path.exists(h264) and os.path.getsize(h264) > 0:
os.remove(out_path)
out_path = h264
# Mux original audio (tempo-adjusted for frame skipping)
if original_path and os.path.exists(original_path):
with_audio = out_path.replace('.mp4', '_audio.mp4')
tempo = skip_frames + 1 # audio needs to speed up to match skipped video
os.system(
f'ffmpeg -y -i {out_path} -i {original_path} '
f'-filter_complex "[1:a]atempo={tempo}[a]" '
f'-map 0:v -map "[a]" -c:v copy -c:a aac -shortest '
f'{with_audio} -loglevel error'
)
if os.path.exists(with_audio) and os.path.getsize(with_audio) > 0:
os.remove(out_path)
out_path = with_audio
print(f"Done! {processed} frames in {time.time() - t0:.1f}s", flush=True)
# Bake TTS voice into video if we have events
if voice_events:
from core.tts_render import merge_voice_into_video
duration = total / fps
print(f"Rendering {len(voice_events)} voice events into video...", flush=True)
out_path = merge_voice_into_video(out_path, voice_events, duration)
return out_path
def run_webcam(mode="overlay", camera_id=0, tts=True):
"""Live webcam/RTSP processing loop. Press 'q' to quit.
Args:
camera_id: 0 for USB webcam, or "rtsp://..." for IP camera
"""
from core.camera import CameraStream
try:
cam = CameraStream(camera_id)
except RuntimeError as e:
print(f"Error: {e}")
return
guide_engine = GuidanceEngine()
slope_smoother = SlopeSmoother()
obs_tracker = ObstacleTracker()
tts_engine = TTSEngine(enabled=tts)
cached_depth = None
cached_depth_mini = None
depth_counter = 0
frame_count = 0
t0 = time.time()
print(f"Live mode={mode} tts={tts}. Press 'q' to quit.", flush=True)
while cam.is_open:
frame = cam.read()
if frame is None:
time.sleep(0.01)
continue
h, w = frame.shape[:2]
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
obstacles = detector.detect(frame, track=True)
new_obs, closing_obs, _ = obs_tracker.update(obstacles)
gait, landmarks, foot_y = pose.analyze(rgb, w, h)
depth_counter += 1
if depth_counter % DEFAULT_DEPTH_EVERY == 0 or cached_depth is None:
cached_depth = depth.estimate_depth(rgb, h, w)
raw_slope, raw_dir, raw_terrain, _ = depth.estimate_slope(cached_depth, h, w, foot_y)
slope_smoother.update(raw_slope, raw_dir, raw_terrain)
cached_depth_mini = overlay_renderer.render_depth_mini(cached_depth, w, h)
risk = risk_engine.assess(
slope_smoother.angle, slope_smoother.direction, gait, obstacles,
slope_trend=slope_smoother.trend,
new_obstacles=new_obs,
closing_obstacles=closing_obs)
guidance = guide_engine.compute(
slope_smoother.angle, slope_smoother.direction, slope_smoother.terrain,
obstacles, slope_trend=slope_smoother.trend,
new_obstacles=new_obs, closing_obstacles=closing_obs)
if mode == "blind_nav":
rendered = blind_nav_renderer.render(
frame, obstacles, slope_smoother.angle, slope_smoother.direction,
slope_smoother.terrain, cached_depth if cached_depth is not None
else np.zeros((h, w), dtype=np.float32), guidance)
else:
rendered = overlay_renderer.render(
frame, obstacles, gait, landmarks, risk, guidance, cached_depth_mini)
if tts_engine.enabled and guide_engine.should_speak(guidance['voice'], slope_smoother.angle):
tts_engine.speak(guidance['voice'])
frame_count += 1
if frame_count % 30 == 0:
fps = frame_count / (time.time() - t0)
cv2.setWindowTitle("Navigation", f"Navigation | {fps:.1f} FPS | {risk['risk']}")
cv2.imshow("Navigation", rendered)
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
break
cam.release()
cv2.destroyAllWindows()
tts_engine.shutdown()
print(f"Session: {frame_count} frames in {time.time() - t0:.1f}s", flush=True)
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print("Usage:")
print(" python pipeline.py <video.mp4> [--mode overlay|blind_nav] [--tts]")
print(" python pipeline.py --webcam [--cam 0] [--mode overlay|blind_nav] [--tts]")
print(" python pipeline.py --webcam --cam rtsp://user:pass@ip:554/stream [--tts]")
sys.exit(1)
mode = "overlay"
if "--mode" in sys.argv:
idx = sys.argv.index("--mode")
mode = sys.argv[idx + 1] if idx + 1 < len(sys.argv) else "overlay"
use_tts = "--tts" in sys.argv
cam_source = 0
if "--cam" in sys.argv:
idx = sys.argv.index("--cam")
val = sys.argv[idx + 1] if idx + 1 < len(sys.argv) else "0"
cam_source = val if val.startswith("rtsp") else int(val)
if sys.argv[1] == "--webcam":
run_webcam(mode=mode, camera_id=cam_source, tts=use_tts)
else:
out = process_video(sys.argv[1], mode=mode, tts=use_tts)
if out:
print(f"Output: {out}")