clawdbot-dev / recursive_context.py
Executor-Tyrant-Framework's picture
Update recursive_context.py
388edc0 verified
Raw
History Blame Contribute Delete
15.2 kB
import os
import json
import subprocess
import time
import shutil
import ast
import glob
from pathlib import Path
from typing import List, Dict, Any, Optional
from huggingface_hub import HfApi, hf_hub_download, InferenceClient
class RecursiveContextManager:
def __init__(self, repo_path: str):
self.repo_path = Path(repo_path)
self.memory_path = self.repo_path / "memory"
self.notebook_file = self.memory_path / "notebook.json"
# --- AUTHENTICATION ---
self.token = os.getenv("HF_TOKEN")
self.dataset_id = os.getenv("DATASET_ID", "Executor-Tyrant-Framework/clawdbot-memory")
self.client = InferenceClient(token=self.token) if self.token else None
# --- XET / DATABASE INIT ---
self.xet_root = self.repo_path / "xet_data"
self.xet_dataset_file = "xet_vectors.json"
self.xet_store = None
# DEBOUNCE CONTROLS (NEW)
self._saves_since_xet_backup = 0
self.XET_BACKUP_EVERY_N = 5 # Backup every 5 conversations
try:
if (self.repo_path / "xet_storage.py").exists():
import sys
sys.path.append(str(self.repo_path))
from xet_storage import XetVectorStore
self.xet_store = XetVectorStore(repo_path=str(self.xet_root))
print("βœ… Xet Storage Driver Loaded.")
except Exception as e:
print(f"⚠️ Xet Driver not loaded: {e}")
# --- RESTORE MEMORY ---
self._init_memory()
self._init_xet_memory()
# =========================================================================
# 🧠 SYNC LOGIC (Notebook + Xet JSON)
# =========================================================================
def _init_memory(self):
"""STARTUP: Download Notebook."""
self.memory_path.mkdir(parents=True, exist_ok=True)
if self.token:
try:
hf_hub_download(
repo_id=self.dataset_id, filename="notebook.json", repo_type="dataset",
token=self.token, local_dir=self.memory_path, local_dir_use_symlinks=False
)
except Exception: self._save_local([])
def _init_xet_memory(self):
"""STARTUP: Download Xet Vectors (JSON)."""
if not self.token or not self.xet_store: return
try:
local_path = hf_hub_download(
repo_id=self.dataset_id, filename=self.xet_dataset_file, repo_type="dataset",
token=self.token, local_dir=self.memory_path, local_dir_use_symlinks=False
)
# Restore to Xet Store
vectors = json.loads(Path(local_path).read_text())
for v in vectors:
self.xet_store.store_vector(v["id"], v["vector"], v["metadata"])
print(f"🧠 Restored {len(vectors)} vectors from Dataset")
except Exception as e:
print(f"⚠️ Xet restore failed (New dataset?): {e}")
def _backup_xet_to_dataset(self):
"""Sync only NEW vectors since last backup (incremental)."""
if not self.token or not self.xet_store:
return
# Track what we've already backed up via manifest
manifest_path = self.memory_path / "xet_manifest.json"
try:
known_hashes = set(json.loads(manifest_path.read_text()))
except:
known_hashes = set()
# Find new vectors (filename IS the content hash in Xet storage)
new_vectors = []
current_hashes = set()
for f in self.xet_store.vectors_path.glob("*/*/*"):
if not f.is_file():
continue
file_hash = f.name
current_hashes.add(file_hash)
if file_hash not in known_hashes:
try:
new_vectors.append(json.loads(f.read_text()))
except:
pass
if not new_vectors:
# Nothing new to sync
return
try:
# Download existing vectors from dataset to merge
existing = []
try:
local_path = hf_hub_download(
repo_id=self.dataset_id,
filename=self.xet_dataset_file,
repo_type="dataset",
token=self.token,
local_dir=self.memory_path,
local_dir_use_symlinks=False
)
existing = json.loads(Path(local_path).read_text())
except:
pass # File doesn't exist yet
# Merge with deduplication by id
existing_ids = {v["id"] for v in existing}
for v in new_vectors:
if v["id"] not in existing_ids:
existing.append(v)
# Upload merged file
backup_path = self.memory_path / self.xet_dataset_file
backup_path.write_text(json.dumps(existing, indent=2))
api = HfApi(token=self.token)
api.upload_file(
path_or_fileobj=backup_path,
path_in_repo=self.xet_dataset_file,
repo_id=self.dataset_id,
repo_type="dataset",
commit_message=f"🧠 Xet: +{len(new_vectors)} vectors (total: {len(existing)})"
)
# Update manifest so we don't re-upload these
manifest_path.write_text(json.dumps(list(current_hashes)))
print(f"☁️ Backed up {len(new_vectors)} new vectors")
except Exception as e:
print(f"⚠️ Xet backup failed: {e}")
# =========================================================================
# 🧬 EMBEDDINGS
# =========================================================================
def _get_embedding(self, text: str) -> List[float]:
if not self.client: return [0.0] * 384
try:
# feature-extraction returns list of floats
response = self.client.feature_extraction(text, model="sentence-transformers/all-MiniLM-L6-v2")
return response[0] if isinstance(response[0], list) else response
except Exception: return [0.0] * 384
# =========================================================================
# πŸ““ NOTEBOOK
# =========================================================================
def _save_local(self, notes: List[Dict]):
self.memory_path.mkdir(parents=True, exist_ok=True)
self.notebook_file.write_text(json.dumps(notes, indent=2), encoding='utf-8')
def _save_notebook(self, notes: List[Dict]):
self._save_local(notes)
if self.token and self.dataset_id:
try:
api = HfApi(token=self.token)
api.upload_file(
path_or_fileobj=self.notebook_file, path_in_repo="notebook.json",
repo_id=self.dataset_id, repo_type="dataset",
commit_message=f"Notebook Update: {len(notes)}"
)
except Exception: pass
def _load_notebook(self) -> List[Dict]:
if not self.notebook_file.exists(): return []
try: return json.loads(self.notebook_file.read_text(encoding='utf-8'))
except: return []
def notebook_read(self) -> str:
notes = self._load_notebook()
if not notes: return "Notebook is empty."
return "\n".join([f"[{i}] {n.get('timestamp','')}: {n.get('content','')}" for i, n in enumerate(notes)])
def notebook_add(self, content: str) -> str:
notes = self._load_notebook()
notes.append({"timestamp": time.strftime("%Y-%m-%d %H:%M"), "content": content})
if len(notes) > 50: notes = notes[-50:]
self._save_notebook(notes)
return f"βœ… Note added & synced. ({len(notes)} items)"
def notebook_delete(self, index: int) -> str:
notes = self._load_notebook()
try:
removed = notes.pop(int(index))
self._save_notebook(notes)
return f"πŸ—‘οΈ Deleted note: '{removed.get('content', '')[:20]}...'"
except IndexError: return "❌ Invalid index."
# =========================================================================
# πŸ” SEARCH & MEMORY
# =========================================================================
def save_conversation_turn(self, user_msg, assist_msg, turn_id):
if not self.xet_store: return
combined = f"USER: {user_msg}\n\nASSISTANT: {assist_msg}"
vector = self._get_embedding(combined)
self.xet_store.store_vector(
id=f"conv_{turn_id}_{int(time.time())}",
vector=vector,
metadata={
"type": "conversation",
"user": user_msg[:500],
"assistant": assist_msg[:500],
"content": combined,
"timestamp": time.time()
}
)
# Debounced backup - not every turn
self._saves_since_xet_backup += 1
if self._saves_since_xet_backup >= self.XET_BACKUP_EVERY_N:
self._backup_xet_to_dataset()
self._saves_since_xet_backup = 0
def search_conversations(self, query: str, n: int=5) -> List[Dict]:
if not self.xet_store: return []
query_vector = self._get_embedding(query)
results = self.xet_store.similarity_search(query_vector, n)
# Format strictly for app.py
return [{
"content": r.get("metadata", {}).get("content", ""),
"similarity": r.get("similarity", 0),
"id": r.get("id", "")
} for r in results]
def search_code(self, query: str, n: int=5) -> List[Dict]:
results = []
try:
for f in self.repo_path.rglob("*.py"):
if "venv" in str(f): continue
txt = f.read_text(errors='ignore')
if query in txt:
results.append({"file": f.name, "snippet": txt[:300]})
except: pass
return results[:n]
def search_testament(self, query: str, n: int=5) -> List[Dict]:
results = []
try:
for f in self.repo_path.rglob("*.md"):
txt = f.read_text(errors='ignore')
if query.lower() in txt.lower():
results.append({"file": f.name, "snippet": txt[:300]})
except: pass
return results[:n]
# =========================================================================
# πŸ› οΈ STANDARD TOOLS
# =========================================================================
def read_file(self, path: str, start_line: int = None, end_line: int = None) -> str:
try:
target = self.repo_path / path
content = target.read_text(encoding='utf-8', errors='ignore')
lines = content.splitlines()
if start_line is not None and end_line is not None:
lines = lines[start_line:end_line]
return "\n".join(lines)
except Exception as e: return str(e)
def list_files(self, path: str = ".", max_depth: int = 3) -> str:
try:
target = self.repo_path / path
if not target.exists(): return "Path not found."
files = []
for p in target.rglob("*"):
if p.is_file() and not any(part.startswith(".") for part in p.parts):
files.append(str(p.relative_to(self.repo_path)))
return "\n".join(files[:50])
except Exception as e: return str(e)
def write_file(self, path: str, content: str) -> str:
try:
target = self.repo_path / path
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(content, encoding='utf-8')
return f"βœ… Written to {path}"
except Exception as e: return str(e)
def shell_execute(self, command: str) -> str:
try:
if any(x in command for x in ["rm -rf /", ":(){ :|:& };:"]): return "❌ Blocked."
result = subprocess.run(command, shell=True, cwd=str(self.repo_path), capture_output=True, text=True, timeout=10)
return f"STDOUT:\n{result.stdout}\nSTDERR:\n{result.stderr}"
except Exception as e: return f"Error: {e}"
def map_repository_structure(self) -> str:
graph = {"nodes": [], "edges": []}
try:
file_count = 0
for file_path in self.repo_path.rglob('*.py'):
if 'venv' in str(file_path): continue
rel_path = str(file_path.relative_to(self.repo_path))
content = file_path.read_text(errors='ignore')
file_count += 1
graph["nodes"].append({"id": rel_path, "type": "file"})
try:
tree = ast.parse(content)
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.ClassDef)):
node_id = f"{rel_path}::{node.name}"
graph["nodes"].append({"id": node_id, "type": "function"})
except SyntaxError: continue
return f"βœ… Map Generated: {file_count} files, {len(graph['nodes'])} nodes."
except Exception as e: return f"❌ Mapping failed: {e}"
def push_to_github(self, message: str) -> str:
"""Push current state to the connected HF Space (Git)."""
try:
subprocess.run(["git", "config", "user.email", "clawdbot@system.local"], check=False)
subprocess.run(["git", "config", "user.name", "Clawdbot"], check=False)
subprocess.run(["git", "add", "."], check=True)
subprocess.run(["git", "commit", "-m", message], check=True)
# Note: 'git push' requires the token to be in the remote URL or credential helper
return "βœ… Changes committed (Push requires configured remote with token)."
except Exception as e:
return f"Git Error: {e}"
def pull_from_github(self, branch: str) -> str:
"""Pull latest from remote."""
try:
subprocess.run(["git", "pull", "origin", branch], check=True)
return f"βœ… Pulled {branch}"
except Exception as e:
return f"Git Pull Error: {e}"
def create_shadow_branch(self) -> str:
"""Create timestamped backup branch."""
ts = int(time.time())
try:
subprocess.run(["git", "checkout", "-b", f"shadow_{ts}"], check=True)
return f"βœ… Created branch shadow_{ts}"
except Exception as e:
return f"Error: {e}"
def get_stats(self) -> Dict:
conv_count = 0
if self.xet_store:
try:
# Count files in the vectors/shard/hash structure
conv_count = len(list(self.xet_store.vectors_path.glob("*/*/*")))
except: pass
return {"total_files": len(list(self.repo_path.rglob("*"))), "conversations": conv_count}