import os, asyncio, httpx, re, json as _json from io import BytesIO from fastapi import FastAPI, Request from fastapi.responses import HTMLResponse, JSONResponse, FileResponse from fastapi.middleware.cors import CORSMiddleware from datetime import datetime from pathlib import Path from docx import Document as DocxDocument from docx.shared import Inches, Pt, RGBColor from docx.enum.text import WD_ALIGN_PARAGRAPH from docx.oxml.ns import qn from docx.oxml import OxmlElement import openpyxl from openpyxl.styles import Font, PatternFill, Alignment, Border, Side app = FastAPI() app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY","") OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY","") GROQ_API_KEY = os.getenv("GROQ_API_KEY","") PEXELS_API_KEY = os.getenv("PEXELS_API_KEY","") HF_API_KEY = os.getenv("HF_TOKEN","") DOCS_DIR = Path("docs") DOCS_DIR.mkdir(exist_ok=True) PROVIDERS = { "gemini": {"name":"Google Gemini","type":"gemini","key":GOOGLE_API_KEY}, "openrouter": {"name":"OpenRouter","type":"openai_compat","key":OPENROUTER_API_KEY, "base_url":"https://openrouter.ai/api/v1/chat/completions", "headers":{"HTTP-Referer":"https://huggingface.co/spaces/vfven/mission-control-ui","X-Title":"Mission Control AI"}}, "groq": {"name":"Groq","type":"openai_compat","key":GROQ_API_KEY, "base_url":"https://api.groq.com/openai/v1/chat/completions","headers":{}}, } DEFAULT_AGENTS = [ {"key":"manager","name":"Manager","provider":"gemini", "role":( "Eres el gerente de proyecto. Analiza la solicitud y decide qué agentes trabajarán. " "NUNCA hagas el trabajo tú mismo. Saluda en 1 línea y delega siempre con JSON al final: " '{"delegate":["key1","key2"]}\n' "REGLAS:\n" "- imagen/foto/gato/perro/dibujo → image_agent\n" "- informe/reporte/word/documento → writer + analyst\n" "- excel/planilla/hoja/spreadsheet/registrar → backend_dev\n" "- python/script/groovy/jenkins/api/devops → backend_dev\n" "- html/css/web/interfaz/frontend → frontend_dev\n" "- app completa full-stack → backend_dev + frontend_dev\n" "- analisis/viabilidad/evaluar → analyst" ), "models":["gemini-2.5-flash-preview-04-17","gemini-2.0-flash","gemini-1.5-flash"]}, {"key":"backend_dev","name":"Backend","provider":"openrouter", "role":( "Eres programador backend senior. REGLAS ABSOLUTAS:\n" "1. Entrega SOLO el código pedido, sin explicaciones innecesarias.\n" "2. Excel/planilla → responde con EXCEL_TEMPLATE:{\"title\":\"...\",\"sheet_name\":\"...\"," "\"headers\":[...],\"sample_rows\":[[...],[...]]}\n" "3. Python → entrega código Python puro y funcional.\n" "4. Groovy/Jenkins → entrega el script completo.\n" "5. Si hay frontend_dev en el equipo, TÚ haces servidor/backend, él hace HTML.\n" "6. Si la tarea no requiere backend → responde: {\"skip\":\"no backend needed\"}" ), "models":["google/gemma-3-27b-it:free","google/gemma-3-12b-it:free", "meta-llama/llama-3.3-70b-instruct:free","mistralai/mistral-small-3.1-24b-instruct:free"]}, {"key":"frontend_dev","name":"Frontend","provider":"openrouter", "role":( "Eres desarrollador frontend senior. REGLAS ABSOLUTAS:\n" "1. Entrega SOLO código HTML/CSS/JS pedido, sin explicaciones innecesarias.\n" "2. Si hay backend_dev, TÚ haces HTML/interfaz, él hace servidor/lógica.\n" "3. Si la tarea NO requiere frontend → responde: {\"skip\":\"no frontend needed\"}\n" "4. Entrega siempre HTML completo y funcional con los estilos incluidos." ), "models":["google/gemma-3-12b-it:free","google/gemma-3-27b-it:free", "meta-llama/llama-3.3-70b-instruct:free","mistralai/mistral-small-3.1-24b-instruct:free"]}, {"key":"analyst","name":"Analyst","provider":"openrouter", "role":( "Eres analista de negocios. REGLAS:\n" "1. Solo haz lo que el manager delegó: revisar documentos, evaluar viabilidad, analizar riesgos.\n" "2. NUNCA describas imágenes ni hagas trabajo de otros agentes.\n" "3. Si la tarea no requiere análisis → responde: {\"skip\":\"no analysis needed\"}" ), "models":["google/gemma-3-27b-it:free","google/gemma-3-12b-it:free", "meta-llama/llama-3.3-70b-instruct:free","mistralai/mistral-small-3.1-24b-instruct:free"]}, {"key":"writer","name":"Writer","provider":"openrouter", "role":( "Eres redactor experto. Escribe SOLO contenido real y extenso (500+ palabras). " "Sin placeholders. Usa ## para secciones y ### para subsecciones. " "Secciones: ## Resumen Ejecutivo, ### Introducción, ### Desarrollo, " "### Hallazgos, ### Conclusiones, ### Recomendaciones" ), "models":["google/gemma-3-12b-it:free","google/gemma-3-27b-it:free", "meta-llama/llama-3.3-70b-instruct:free","mistralai/mistral-small-3.1-24b-instruct:free"]}, {"key":"image_agent","name":"ImageAgent","provider":"gemini", "role":( "Cuando se te pida imágenes, responde SOLO con: " "{\"image_queries\":[\"english term 1\",\"english term 2\",\"english term 3\"]} " "Los términos deben ser específicos en inglés para encontrar buenas imágenes." ), "models":["gemini-2.0-flash","gemini-1.5-flash"]}, ] # ── CHAT ROLES — conversational versions for direct chat (not mission mode) ── CHAT_ROLES = { "manager": ( "Eres el Manager de Mission Control AI, gerente de proyectos con acceso real a un equipo de agentes IA. " "Tu equipo: backend_dev (Python/APIs), frontend_dev (HTML/CSS/JS), analyst (análisis), " "writer (redacción), image_agent (imágenes/arte). " "REGLA CRÍTICA: Si el usuario pide algo que requiere trabajo real de un agente " "(generar imagen, escribir código, hacer Excel, crear informe, diseñar web, etc.), " "responde ÚNICAMENTE con este JSON y nada más: " '{"action":"delegate","task":"descripcion clara de lo que se necesita"} ' "Ejemplos que SÍ requieren delegar: " "'genera una imagen', 'hazme un script', 'crea un formulario HTML', 'haz un informe', 'excel de ventas'. " "Para conversación normal (saludos, preguntas, planificación, consejos) responde con texto normal. " "Recuerda el historial y refiérete a él." ), "backend_dev": ( "Eres Backend Dev, un programador senior especializado en Python, APIs, bases de datos y DevOps. " "Puedes conversar libremente, explicar código, debuggear problemas, sugerir arquitecturas. " "Cuando el usuario pide código, entrégalo limpio y funcional. " "Recuerda el contexto de la conversación — si antes hablaron de un proyecto, continúa desde ahí." ), "frontend_dev": ( "Eres Frontend Dev, un desarrollador web senior especializado en HTML, CSS, JavaScript y UX. " "Puedes conversar libremente, revisar diseños, sugerir mejoras visuales, escribir código frontend. " "Recuerda el historial — si el usuario ya te mostró algo, refiérete a ello." ), "analyst": ( "Eres el Analyst, un analista de negocios y datos experimentado. " "Puedes analizar situaciones, evaluar riesgos, hacer proyecciones, discutir estrategias. " "Conversa de forma natural. Usa los datos y contexto que el usuario te ha dado antes en esta conversación." ), "writer": ( "Eres Writer, un redactor y escritor creativo experto. " "Puedes redactar textos, corregir gramática, brainstormear ideas, adaptar tonos y estilos. " "Habla de forma fluida y creativa. Si el usuario ya compartió algo antes, úsalo como base." ), "image_agent": ( "Eres ImageAgent, especialista en imágenes y arte visual con IA. " "REGLA CRÍTICA: Si el usuario pide generar, crear, buscar, dibujar o mostrar UNA imagen o foto de CUALQUIER cosa, " "responde ÚNICAMENTE con este JSON sin ningún texto adicional: " '{"action":"generate_image","queries":["detailed english prompt 1","detailed english prompt 2","english prompt 3"]} ' "Los queries deben ser DESCRIPTIVOS en inglés (colores, estilo, composición). Ej: " '{"action":"generate_image","queries":["flying cat with angel wings pastel colors","cute cat flying through clouds digital art","cat with wings fantasy illustration"]} ' "Para preguntas sobre fotografía, diseño o arte SIN pedir imágenes, conversa normalmente." ), } def get_chat_role(agent_key: str, agent: dict) -> str: """Return conversational role for chat mode.""" base = CHAT_ROLES.get(agent_key, "") if base: return base # Custom agents fallback return ( f"Eres {agent['name']}, un asistente especializado. " f"Tu especialidad: {agent.get('role','propósito general')}. " "Conversa de forma natural y recuerda el historial de esta conversación." ) agent_registry = {a["key"]: dict(a) for a in DEFAULT_AGENTS} mission_history = [] async def call_gemini(model,system,user,key): url = f"https://generativelanguage.googleapis.com/v1beta/models/{model}:generateContent?key={key}" async with httpx.AsyncClient(timeout=90) as c: r = await c.post(url,json={"contents":[{"role":"user","parts":[{"text":f"{system}\n\n{user}"}]}],"generationConfig":{"maxOutputTokens":2048,"temperature":0.4}}) r.raise_for_status() return r.json()["candidates"][0]["content"]["parts"][0]["text"] async def call_compat(base_url,model,system,user,key,headers): h = {"Authorization":f"Bearer {key}","Content-Type":"application/json",**headers} async with httpx.AsyncClient(timeout=90) as c: r = await c.post(base_url,json={"model":model,"messages":[{"role":"system","content":system},{"role":"user","content":user}],"max_tokens":2048,"temperature":0.4},headers=h) r.raise_for_status() return r.json()["choices"][0]["message"]["content"] async def call_compat_multiturn(base_url, model, system, messages, key, extra_headers): """OpenAI-compatible chat with full message history for multi-turn conversations.""" h = {"Authorization": f"Bearer {key}", "Content-Type": "application/json", **extra_headers} payload = { "model": model, "messages": [{"role": "system", "content": system}] + messages, "max_tokens": 2048, "temperature": 0.6, } async with httpx.AsyncClient(timeout=90) as c: r = await c.post(base_url, json=payload, headers=h) r.raise_for_status() return r.json()["choices"][0]["message"]["content"] async def call_gemini_multiturn(model, system, messages, key): """Gemini multi-turn conversation.""" url = f"https://generativelanguage.googleapis.com/v1beta/models/{model}:generateContent?key={key}" # Convert messages to Gemini format contents = [] for m in messages: role = "user" if m["role"] == "user" else "model" contents.append({"role": role, "parts": [{"text": m["content"]}]}) # Prepend system as first user message if contents start with model full_system = system + "\n\n" + (contents[0]["parts"][0]["text"] if contents and contents[0]["role"] == "user" else "") if contents and contents[0]["role"] == "user": contents[0]["parts"][0]["text"] = full_system payload = { "contents": contents, "generationConfig": {"maxOutputTokens": 2048, "temperature": 0.6}, } async with httpx.AsyncClient(timeout=90) as c: r = await c.post(url, json=payload) r.raise_for_status() return r.json()["candidates"][0]["content"]["parts"][0]["text"] async def call_llm_multiturn(agent, messages): """Multi-turn LLM call with full conversation history. Cascades through providers.""" system = agent["role"] last_err = None # 1. Primary provider p = PROVIDERS[agent["provider"]] for m in agent["models"]: try: if p["type"] == "gemini": return await call_gemini_multiturn(m, system, messages, p["key"]) else: return await call_compat_multiturn(p["base_url"], m, system, messages, p["key"], p.get("headers", {})) except Exception as e: last_err = str(e) if is_rate_limit(last_err): break # 2. OpenRouter fallback (Gemma 3 first) if OPENROUTER_API_KEY and agent["provider"] != "openrouter": or_prov = PROVIDERS["openrouter"] for m in ["google/gemma-3-27b-it:free", "google/gemma-3-12b-it:free", "meta-llama/llama-3.3-70b-instruct:free", "mistralai/mistral-small-3.1-24b-instruct:free"]: try: return await call_compat_multiturn(or_prov["base_url"], m, system, messages, or_prov["key"], or_prov.get("headers", {})) except Exception as e: last_err = str(e) if is_rate_limit(last_err): break # 3. Groq fallback if GROQ_API_KEY and agent["provider"] != "groq": groq = PROVIDERS["groq"] for m in ["llama-3.1-8b-instant", "gemma2-9b-it"]: try: return await call_compat_multiturn(groq["base_url"], m, system, messages, GROQ_API_KEY, {}) except Exception as e: last_err = str(e) raise Exception(f"All providers exhausted. Last: {last_err}") def is_rate_limit(err: str) -> bool: e = err.lower() return any(x in e for x in ["429","rate limit","quota","resource exhausted","too many requests","ratelimit"]) async def call_llm(agent, task): """Provider cascade: Primary → OpenRouter → Groq""" system = agent["role"] last_err = None # 1. Primary provider p = PROVIDERS[agent["provider"]] for m in agent["models"]: try: if p["type"] == "gemini": return await call_gemini(m, system, task, p["key"]) else: return await call_compat(p["base_url"], m, system, task, p["key"], p.get("headers", {})) except Exception as e: last_err = str(e) if is_rate_limit(last_err): break # rate limited — skip to next provider immediately # 2. OpenRouter fallback if OPENROUTER_API_KEY and agent["provider"] != "openrouter": or_prov = PROVIDERS["openrouter"] for m in [ "google/gemma-3-27b-it:free", "google/gemma-3-12b-it:free", "meta-llama/llama-3.3-70b-instruct:free", "mistralai/mistral-small-3.1-24b-instruct:free", "qwen/qwen3-4b:free", "qwen/qwen-2.5-72b-instruct:free", "deepseek/deepseek-r1-distill-llama-70b:free", ]: try: return await call_compat(or_prov["base_url"], m, system, task, or_prov["key"], or_prov.get("headers", {})) except Exception as e: last_err = str(e) if is_rate_limit(last_err): break # 3. Groq fallback if GROQ_API_KEY and agent["provider"] != "groq": for m in ["llama-3.1-8b-instant", "gemma2-9b-it"]: try: return await call_compat(PROVIDERS["groq"]["base_url"], m, system, task, GROQ_API_KEY, {}) except Exception as e: last_err = str(e) raise Exception(f"All providers exhausted. Last: {last_err}") async def fetch_pexels(q): if not PEXELS_API_KEY: return None try: async with httpx.AsyncClient(timeout=20) as c: r = await c.get("https://api.pexels.com/v1/search",params={"query":q,"per_page":1,"orientation":"landscape"},headers={"Authorization":PEXELS_API_KEY}) d = r.json() if d.get("photos"): ir = await c.get(d["photos"][0]["src"]["medium"]) return ir.content except: pass return None async def gen_hf_image(prompt: str) -> bytes | None: """Try HuggingFace free image generation models. FLUX first (best quality).""" if not HF_API_KEY: return None # Ordered by quality/availability — FLUX.1 schnell is fastest free model models = [ ("black-forest-labs/FLUX.1-schnell", {"inputs": prompt}), ("black-forest-labs/FLUX.1-dev", {"inputs": prompt}), ("stabilityai/stable-diffusion-xl-base-1.0", {"inputs": prompt, "parameters": {"width": 512, "height": 512}}), ("stabilityai/stable-diffusion-2-1", {"inputs": prompt, "parameters": {"width": 512, "height": 512}}), ] headers = {"Authorization": f"Bearer {HF_API_KEY}"} for model_id, payload in models: try: async with httpx.AsyncClient(timeout=120) as c: r = await c.post( f"https://api-inference.huggingface.co/models/{model_id}", headers=headers, json=payload, ) ct = r.headers.get("content-type", "") if r.status_code == 200 and ("image" in ct or r.content[:4] in (b"\xff\xd8\xff\xe0", b"\x89PNG")): return r.content # 503/loading → try next model immediately if r.status_code in (503, 500): continue # 429 rate limit → stop trying HF if r.status_code == 429: break except Exception: continue return None CREATIVE_KEYWORDS = [ "flying","volador","volando","astronaut","astronauta","dragon","dragón", "fantasy","fantasia","magical","mágico","cartoon","anime","pixel art", "robot","alien","extraterrestre","superhero","unicorn","unicornio", "watercolor","illustration","ilustración","3d","render","sci-fi", "futuristic","futurista","cyberpunk","cute","kawaii", "gato volador","flying cat","space cat","gato espacial", "imagina","genera","crea","diseña","dibuja", ] def is_creative(q: str) -> bool: return any(k in q.lower() for k in CREATIVE_KEYWORDS) async def get_image(q: str, force_generate: bool = False): """Fetch or generate an image for query q. force_generate=True: skip Pexels, go straight to HF image generation. """ if force_generate or is_creative(q): result = await gen_hf_image(q) if result: return result # Real-world → Pexels first, AI fallback return await fetch_pexels(q) or await gen_hf_image(q) def classify(task): lo = task.lower() return { "img": any(w in lo for w in ["imagen","image","foto","picture","gato","cat","dog","perro","dibuja","genera una imagen","crea una imagen","ilustra"]), "excel": any(w in lo for w in ["excel","xlsx","planilla","hoja de calculo","hoja de cálculo","spreadsheet","registro de","registrar alumnos","tabla de alumnos","plantilla de"]), "word": any(w in lo for w in ["informe","reporte","report","documento word","docx","redacta un informe","escribe un informe"]), "anal": any(w in lo for w in ["analiza","evalua","evalúa","viabilidad","riesgo","revisar"]), "back": any(w in lo for w in ["python","script","groovy","jenkins","api","backend","fastapi","flask","devops","pipeline",".py"]), "front": any(w in lo for w in ["html","css","frontend","web page","página web","pagina web","sitio web","interfaz web"]), "both": any(w in lo for w in ["hola mundo","full stack","fullstack","app completa"]) and any(w in lo for w in ["python","backend",".py"]) and any(w in lo for w in ["html","web","ver en","interfaz"]), } def parse_delegates(text): m = re.search(r'\{"delegate"\s*:\s*\[([^\]]*)\]\}',text) return re.findall(r'"(\w+)"',m.group(1)) if m else [] def clean(text): text = re.sub(r'\{"delegate"[^}]*\}','',text) text = re.sub(r'\{"image_queries"[^}]*\}','',text) return text.strip() def is_skip(text): return '"skip"' in text.lower() def build_excel(task,backend_text): m = re.search(r'EXCEL_TEMPLATE:\s*(\{.*?\})\s*$',backend_text,re.DOTALL|re.MULTILINE) if not m: m = re.search(r'EXCEL_TEMPLATE:\s*(\{.*)',backend_text,re.DOTALL) if not m: return None try: structure = _json.loads(m.group(1)) except: return None wb = openpyxl.Workbook(); ws = wb.active ws.title = structure.get("sheet_name","Datos")[:31] headers = structure.get("headers",[]) rows = structure.get("sample_rows",[]) title = structure.get("title",task[:50]) thin = Side(style="thin",color="D1D5DB") bdr = Border(left=thin,right=thin,top=thin,bottom=thin) row_off = 1 if title and headers: ws.merge_cells(f"A1:{chr(64+len(headers))}1") c = ws["A1"]; c.value=title c.font=Font(bold=True,size=13,color="FFFFFF") c.fill=PatternFill("solid",fgColor="1a56db") c.alignment=Alignment(horizontal="center",vertical="center") ws.row_dimensions[1].height=28; row_off=2 for col,h in enumerate(headers,1): c=ws.cell(row=row_off,column=col,value=h) c.font=Font(bold=True,color="FFFFFF",size=10) c.fill=PatternFill("solid",fgColor="2563eb") c.alignment=Alignment(horizontal="center",vertical="center") c.border=bdr ws.column_dimensions[c.column_letter].width=max(len(str(h))+6,14) ws.row_dimensions[row_off].height=20 alt=PatternFill("solid",fgColor="EFF6FF") for ri,row in enumerate(rows,row_off+1): for col,val in enumerate(row,1): c=ws.cell(row=ri,column=col,value=val) c.border=bdr; c.alignment=Alignment(vertical="center") if ri%2==0: c.fill=alt ws.row_dimensions[ri].height=18 ws.freeze_panes=f"A{row_off+1}" buf=BytesIO(); wb.save(buf); buf.seek(0) return buf.read() def build_docx(title,writer_text,images,analyst_text): doc=DocxDocument() for s in doc.sections: s.top_margin=s.bottom_margin=Inches(1); s.left_margin=s.right_margin=Inches(1.2) tp=doc.add_heading(title,0); tp.alignment=WD_ALIGN_PARAGRAPH.CENTER if tp.runs: tp.runs[0].font.color.rgb=RGBColor(0x1a,0x56,0xdb) sub=doc.add_paragraph(); sub.alignment=WD_ALIGN_PARAGRAPH.CENTER sub.add_run(f"Mission Control AI — {datetime.now().strftime('%B %d, %Y')}").italic=True doc.add_paragraph() p=doc.add_paragraph(); pPr=p._p.get_or_add_pPr(); pBdr=OxmlElement("w:pBdr") bot=OxmlElement("w:bottom"); bot.set(qn("w:val"),"single"); bot.set(qn("w:sz"),"6"); bot.set(qn("w:color"),"1a56db") pBdr.append(bot); pPr.append(pBdr) ii=0; pending=[] def flush(): nonlocal pending t=" ".join(pending).strip() if t: p2=doc.add_paragraph(t); p2.paragraph_format.space_after=Pt(6) pending.clear() for line in writer_text.split("\n"): s=line.strip() if not s: flush(); continue if s.startswith("## "): flush(); doc.add_heading(s[3:],level=1) if ii 300 else ""), "model": agent["models"][0], } log("Backend done") elif key == "frontend_dev": ctx = backend_text[:500] if backend_text else "" p = ( "Tarea: " + task + "\n" + ("Backend hizo:\n" + ctx + "\n" if ctx else "") + "Entrega SOLO HTML/CSS/JS completo. " + 'Si no se necesita frontend: {"skip":"no frontend needed"}' ) frontend_text = await call_llm(agent, p) if is_skip(frontend_text): results["frontend_dev"] = {"status": "idle", "message": "", "model": ""} else: results["frontend_dev"] = { "status": "active", "message": frontend_text[:300] + ("..." if len(frontend_text) > 300 else ""), "model": agent["models"][0], } log("Frontend done") else: raw = await call_llm(agent, task) results[key] = {"status": "active", "message": raw, "model": agent["models"][0]} log(agent["name"] + " done") except Exception as e: results[key] = {"status": "resting", "message": str(e), "model": ""} log(key + " error: " + str(e)) # Execution order be = [k for k in delegates if k == "backend_dev"] par = [k for k in delegates if k in ("image_agent", "writer")] seq = [k for k in delegates if k in ("frontend_dev", "analyst")] oth = [k for k in delegates if k not in be + par + seq] for k in be: await run_one(k) if par + oth: await asyncio.gather(*[run_one(k) for k in par + oth]) for k in seq: await run_one(k) # ── Build output files ─────────────────────────────────────────────────── safe = re.sub(r"[^\w\-]", "_", task[:40]) ts = datetime.now().strftime("%Y%m%d_%H%M%S") if writer_text: try: db = build_docx(task, writer_text, image_bytes, analyst_text) fn = safe + "_" + ts + ".docx" (DOCS_DIR / fn).write_bytes(db) doc_file = fn results["manager"]["doc_file"] = fn log("Docx: " + fn) except Exception as e: log("Docx error: " + str(e)) if backend_text and tc["excel"]: try: xb = build_excel(task, backend_text) if xb: fn = safe + "_" + ts + ".xlsx" (DOCS_DIR / fn).write_bytes(xb) doc_file = fn results["backend_dev"]["doc_file"] = fn log("Excel: " + fn) except Exception as e: log("Excel error: " + str(e)) if backend_text and not tc["excel"] and not is_skip(backend_text): try: code = re.sub(r"```\w*\n?", "", backend_text) code = re.sub(r"```", "", code).strip() if len(code) > 30: ext = ".groovy" if ("groovy" in backend_text.lower() or "jenkins" in task.lower()) else ".py" fn = safe + "_" + ts + ext (DOCS_DIR / fn).write_text(code, encoding="utf-8") results["backend_dev"]["doc_file"] = fn log("Code: " + fn) except Exception as e: log("Code error: " + str(e)) if frontend_text and not is_skip(frontend_text): try: html = re.sub(r"```\w*\n?", "", frontend_text) html = re.sub(r"```", "", html).strip() if len(html) > 30: fn = safe + "_frontend_" + ts + ".html" (DOCS_DIR / fn).write_text(html, encoding="utf-8") results["frontend_dev"]["doc_file"] = fn if not doc_file: doc_file = fn log("HTML: " + fn) except Exception as e: log("HTML error: " + str(e)) for k in agent_registry: if k not in results: results[k] = {"status": "idle", "message": "", "model": ""} final = results.get("manager", {}).get("message", "")[:300] entry = { "id": len(mission_history) + 1, "task": task, "started_at": started, "ended_at": datetime.now().isoformat(), "results": results, "final": final, "doc_file": doc_file, "events": events, } mission_history.append(entry) # Cache agent outputs so direct chat can reference them for _k, _r in results.items(): if _r.get("status") == "active" and _r.get("message"): mission_context_cache[_k] = ( "En una mision reciente trabajaste en: " + task[:80] + "\n" + _r["message"][:600] ) return JSONResponse({ "success": True, "task": task, "results": results, "final": final, "doc_file": doc_file, "events": events, "mission_id": entry["id"], }) # ── SHARED MISSION CONTEXT (injected into chat after missions) ───────────── # Stores last mission output per agent so chat knows what was done mission_context_cache: dict = {} # {agent_key: "last output summary"} chat_sessions: dict = {} # {session_id: [{role, content}]} @app.post("/api/chat") async def chat_with_agent(request: Request): body = await request.json() agent_key = body.get("agent", "").strip() message = body.get("message", "").strip() session_id = body.get("session_id", agent_key) clear = body.get("clear", False) if not agent_key or not message: return JSONResponse({"error": "agent and message required"}, status_code=400) if agent_key not in agent_registry: return JSONResponse({"error": f"Agent '{agent_key}' not found"}, status_code=404) if clear: chat_sessions[session_id] = [] if session_id not in chat_sessions: chat_sessions[session_id] = [] # Build message history (last 20 turns) history = list(chat_sessions[session_id][-40:]) history.append({"role": "user", "content": message}) # Build system role: conversational + inject mission context if available _today = datetime.now().strftime("%A %d de %B de %Y, %H:%M") base_role = get_chat_role(agent_key, agent_registry[agent_key]) prev_work = mission_context_cache.get(agent_key, "") if prev_work: base_role += ( "\n\n--- CONTEXTO DE MISIONES ANTERIORES ---\n" + prev_work + "\nPuedes referenciar este trabajo cuando el usuario haga preguntas o pida ajustes." ) agent = dict(agent_registry[agent_key]) agent["role"] = f"HOY ES: {_today}.\n{base_role}" try: response = await call_llm_multiturn(agent, history) # Intercept special actions from agents img_result = None img_files = [] delegate_result = None import json as _json2 try: parsed = _json2.loads(response.strip()) # Manager delegates to team if isinstance(parsed, dict) and parsed.get("action") == "delegate" and agent_key == "manager": delegate_task = parsed.get("task", message) # Run as a collab mission sub_results = {} sub_context = "TAREA: " + delegate_task + "\n" sub_today = datetime.now().strftime("%A %d de %B de %Y, %H:%M") # Detect which agents to use lo = delegate_task.lower() sub_delegates = [] if any(w in lo for w in ["imagen","image","foto","dibujo","genera","crea una imagen"]): sub_delegates = ["image_agent"] elif any(w in lo for w in ["excel","planilla","spreadsheet"]): sub_delegates = ["backend_dev"] elif any(w in lo for w in ["informe","reporte","documento"]): sub_delegates = ["writer","analyst"] elif any(w in lo for w in ["html","web","formulario","interfaz"]) and any(w in lo for w in ["api","backend","python"]): sub_delegates = ["backend_dev","frontend_dev"] elif any(w in lo for w in ["html","web","formulario","interfaz"]): sub_delegates = ["frontend_dev"] elif any(w in lo for w in ["python","api","script","backend","codigo","código"]): sub_delegates = ["backend_dev"] else: sub_delegates = ["analyst"] # Execute each sub-agent sub_imgs = [] for sub_key in sub_delegates: if sub_key not in agent_registry: continue sub_agent = dict(agent_registry[sub_key]) sub_role = get_chat_role(sub_key, sub_agent) sub_agent["role"] = "HOY ES: " + sub_today + ".\n" + sub_role + "\n\nContexto: " + sub_context try: if sub_key == "image_agent": ip = ("Find images for: " + delegate_task + ' Respond ONLY: {"image_queries":["t1","t2","t3"]}') sub_raw = await call_llm(sub_agent, ip) m3 = re.search(r'"image_queries"\s*:\s*\[([^\]]*)\]', sub_raw) qs = re.findall(r'"([^"]+)"', m3.group(1)) if m3 else [delegate_task[:40]] sub_imgs_r = await asyncio.gather(*[get_image(q, force_generate=True) for q in qs[:3]]) _safe2 = re.sub(r"[^\w]","_",delegate_task[:28]) _ts2 = datetime.now().strftime("%H%M%S") _base2 = "mgr_" + _safe2 + "_" + _ts2 for idx2, im2 in enumerate(sub_imgs_r): if im2: fn2 = _base2 + "_img" + str(idx2+1) + ".jpg" (DOCS_DIR / fn2).write_bytes(im2) sub_imgs.append(fn2) sub_results[sub_key] = { "status": "active", "message": str(len(sub_imgs)) + " imagen(es) para: " + ", ".join(qs[:3]), "img_base": _base2 if sub_imgs else None, "img_count": len(sub_imgs), } sub_context += "\n=== IMAGE AGENT ===\n" + str(len(sub_imgs)) + " images\n" else: sub_prompt = sub_context + "\nINSTRUCCION: " + delegate_task sub_resp = await call_llm_multiturn(sub_agent, [{"role":"user","content":sub_prompt}]) sub_results[sub_key] = {"status":"active","message":sub_resp,"model":sub_agent["models"][0]} sub_context += "\n=== " + sub_key.upper() + " ===\n" + sub_resp[:500] + "\n" # Save files _ts3 = datetime.now().strftime("%Y%m%d_%H%M%S") _safe3 = re.sub(r"[^\w\-]","_",delegate_task[:35]) if sub_key == "frontend_dev" and not is_skip(sub_resp): html3 = re.sub(r"```\w*\n?","",sub_resp); html3 = re.sub(r"```","",html3).strip() if len(html3)>80: fn3 = _safe3+"_frontend_"+_ts3+".html" (DOCS_DIR/fn3).write_text(html3,encoding="utf-8") sub_results[sub_key]["doc_file"]=fn3 elif sub_key == "backend_dev" and not is_skip(sub_resp): code3 = re.sub(r"```\w*\n?","",sub_resp); code3 = re.sub(r"```","",code3).strip() if len(code3)>80: fn3 = _safe3+"_backend_"+_ts3+".py" (DOCS_DIR/fn3).write_text(code3,encoding="utf-8") sub_results[sub_key]["doc_file"]=fn3 mission_context_cache[sub_key] = "En tarea reciente: " + delegate_task[:60] + "\n" + sub_resp[:400] except Exception as sub_e: sub_results[sub_key] = {"status":"resting","message":str(sub_e)} # Build manager response text parts = ["Delegué la tarea a mi equipo:"] for k,r in sub_results.items(): parts.append("• " + k + ": " + (r.get("message","")[:120] or "done")) response = "\n".join(parts) delegate_result = {"sub_results": sub_results, "sub_imgs": sub_imgs, "img_base": sub_results.get("image_agent",{}).get("img_base"), "img_count": sub_results.get("image_agent",{}).get("img_count",0)} # ImageAgent: generate image elif isinstance(parsed, dict) and parsed.get("action") == "generate_image": queries = parsed.get("queries", [message])[:3] _safe = re.sub(r"[^\w]", "_", message[:28]) _ts = datetime.now().strftime("%H%M%S") _base = "chat_" + _safe + "_" + _ts imgs = await asyncio.gather(*[get_image(q, force_generate=True) for q in queries]) for i, img in enumerate(imgs): if img: fname = _base + "_img" + str(i+1) + ".jpg" (DOCS_DIR / fname).write_bytes(img) img_files.append(fname) if img_files: response = "✓ " + str(len(img_files)) + " imagen(es) generada(s) — haz clic para ver" img_result = {"img_base": _base, "img_count": len(img_files)} else: response = "No pude generar la imagen. Verifica HF_TOKEN en los Secrets del Space." except (ValueError, TypeError, KeyError): pass # Save turn chat_sessions[session_id].append({"role": "user", "content": message}) chat_sessions[session_id].append({"role": "assistant", "content": response}) if len(chat_sessions[session_id]) > 100: chat_sessions[session_id] = chat_sessions[session_id][-80:] used_model = agent["models"][0] if agent.get("models") else "" result = { "success": True, "agent": agent_key, "response": response, "model": used_model, "turn": len(chat_sessions[session_id]) // 2, } if img_result: result.update(img_result) if delegate_result: result["delegate_result"] = delegate_result if delegate_result.get("img_base"): result["img_base"] = delegate_result["img_base"] result["img_count"] = delegate_result["img_count"] return JSONResponse(result) except Exception as e: return JSONResponse({"error": str(e)}, status_code=500) @app.delete("/api/chat/{session_id}") async def clear_chat_session(session_id: str): chat_sessions.pop(session_id, None) return {"success": True} @app.post("/api/collab") async def collaborative_chat(request: Request): """ Multi-agent collaborative task from chat. Manager reads the request and coordinates Backend + Frontend (or any combo) in a pipeline with shared context. Returns each agent's output separately. """ body = await request.json() task = body.get("task", "").strip() agents = body.get("agents", []) # optional override: ["backend_dev","frontend_dev"] if not task: return JSONResponse({"error": "task required"}, status_code=400) _today = datetime.now().strftime("%A %d de %B de %Y, %H:%M") events = [] results = {} def log(m): events.append({"time": datetime.now().strftime("%H:%M:%S"), "msg": m}) # Manager decides which agents to use (unless caller specified them) if agents: delegates = [a for a in agents if a in agent_registry] agent_tasks = {a: task for a in delegates} log("Caller-specified agents: " + str(delegates)) else: log("Manager planning collab task...") mgr = dict(agent_registry["manager"]) mgr["role"] = ( "HOY ES: " + _today + ". " + mgr["role"] + "\nResponde SOLO con JSON: " '{"plan":[{"agent":"key","task":"instruccion especifica"}],"summary":"resumen"}' "\nAgentes: " + ", ".join(agent_registry.keys()) ) try: mgr_raw = await call_llm(mgr, task) mgr_clean = re.sub(r"```json|```", "", mgr_raw).strip() m = re.search(r"\{.*\}", mgr_clean, re.DOTALL) plan_data = {} if m: try: import json as _jj plan_data = _jj.loads(m.group()) except Exception: pass plan_steps = plan_data.get("plan", []) delegates = [s["agent"] for s in plan_steps if s.get("agent") in agent_registry] agent_tasks = {s["agent"]: s.get("task", task) for s in plan_steps} if not delegates: delegates = parse_delegates(mgr_raw) agent_tasks = {a: task for a in delegates} results["manager"] = { "status": "active", "message": plan_data.get("summary", clean(mgr_raw)), "model": agent_registry["manager"]["models"][0], } log("Manager plan: " + str(delegates)) except Exception as e: log("Manager error: " + str(e)) delegates = [] agent_tasks = {} if not delegates: return JSONResponse({"error": "No agents to run"}, status_code=400) # Run pipeline with shared context shared_ctx = "TAREA: " + task + "\n" for key in delegates: if key not in agent_registry: continue agent = dict(agent_registry[key]) specific = agent_tasks.get(key, task) chat_role = get_chat_role(key, agent) # Inject previous context so agents can collaborate agent["role"] = ( "HOY ES: " + _today + ".\n" + chat_role + "\n\n--- CONTEXTO DEL EQUIPO ---\n" + shared_ctx + "\nTu instruccion especifica: " + specific ) log(key + " working...") try: agent_history = [{"role": "user", "content": specific}] response = await call_llm_multiturn(agent, agent_history) results[key] = { "status": "active", "message": response, "model": agent["models"][0] if agent.get("models") else "", } # Accumulate context for next agents shared_ctx += "\n=== " + key.upper() + " ===\n" + response[:800] + "\n" # Save to mission cache so individual chat can reference it mission_context_cache[key] = ( "En una tarea reciente hiciste lo siguiente para: " + task[:80] + "\n" + response[:600] ) log(key + " done") # Save files if code was generated ts = datetime.now().strftime("%Y%m%d_%H%M%S") safe = re.sub(r"[^\w\-]", "_", task[:35]) if key == "frontend_dev" and not is_skip(response): html = re.sub(r"```\w*\n?", "", response) html = re.sub(r"```", "", html).strip() if len(html) > 80: fn = safe + "_frontend_" + ts + ".html" (DOCS_DIR / fn).write_text(html, encoding="utf-8") results[key]["doc_file"] = fn elif key == "backend_dev" and not is_skip(response): code = re.sub(r"```\w*\n?", "", response) code = re.sub(r"```", "", code).strip() if len(code) > 80: fn = safe + "_backend_" + ts + ".py" (DOCS_DIR / fn).write_text(code, encoding="utf-8") results[key]["doc_file"] = fn except Exception as e: results[key] = {"status": "resting", "message": str(e), "model": ""} log(key + " error: " + str(e)) return JSONResponse({ "success": True, "task": task, "delegates": delegates, "results": results, "events": events, }) @app.get("/api/archive") async def list_archive(): files = [] for f in sorted(DOCS_DIR.iterdir(), key=lambda x: x.stat().st_mtime, reverse=True): if f.is_file() and not f.name.startswith('.'): files.append({ "name": f.name, "size": f.stat().st_size, "modified": datetime.fromtimestamp(f.stat().st_mtime).strftime("%Y-%m-%d %H:%M"), "ext": f.suffix.lower().lstrip('.'), }) return {"files": files} @app.delete("/api/archive/{filename}") async def delete_archive_file(filename: str): path = DOCS_DIR / filename if path.exists() and path.is_file(): path.unlink() return {"success": True} @app.get("/api/health") async def health(): return {"status":"ok","providers":{"gemini":"ok" if GOOGLE_API_KEY else "missing","openrouter":"ok" if OPENROUTER_API_KEY else "missing","groq":"ok" if GROQ_API_KEY else "missing","pexels":"ok" if PEXELS_API_KEY else "optional","hf_images":"ok" if HF_API_KEY else "optional"}}