""" HF Spaces application for WebAI verification worker. Uses ZeroGPU for on-demand GPU processing with webAI-ColVec1-4b. """ import os import json import time import hashlib import gc from typing import Dict, Any, Optional, List from dataclasses import dataclass from datetime import datetime, timezone import gradio as gr import spaces import torch from transformers import AutoModel, AutoProcessor, BitsAndBytesConfig def env_bool(name: str, default: bool = False) -> bool: """Parse boolean from environment variable""" value = os.getenv(name, "").lower() return value in ("true", "1", "yes", "on") if value else default import psycopg from psycopg.rows import dict_row import httpx from PIL import Image import io # Configuration @dataclass class WebAIConfig: """Configuration for WebAI verification worker""" database_url: str = os.getenv("DATABASE_URL", "").strip() supabase_url: str = os.getenv("NEXT_PUBLIC_SUPABASE_URL", "").rstrip("/") supabase_service_role_key: str = os.getenv("SUPABASE_SERVICE_ROLE_KEY", "").strip() storage_bucket: str = os.getenv("SUPABASE_STORAGE_BUCKET", "tender-documents") # WebAI Model Configuration # Using 4b model weights with 9b processor (shared Qwen 3.5 architecture) model_name: str = "webAI-Official/webAI-ColVec1-4b" processor_name: str = "webAI-Official/webAI-ColVec1-9b" # Has working config use_8bit_quantization: bool = True use_4bit_quantization: bool = False use_flash_attention_2: bool = True max_new_tokens: int = 512 image_size: int = 336 # webAI models typically use 336x336 pdf_dpi: int = int(os.getenv("PDF_DPI", "200")) # High DPI for better extraction adaptive_dpi: bool = env_bool("ADAPTIVE_DPI", True) # Scale DPI based on memory # Processing Configuration batch_size: int = 1 timeout_seconds: int = 300 def __post_init__(self): if not self.database_url: raise ValueError("DATABASE_URL is required") if not self.supabase_url or not self.supabase_service_role_key: raise ValueError("Supabase credentials are required") # Global configuration CONFIG = WebAIConfig() # Model loading (will be moved to CPU initially) model = None processor = None def log_event(event: str, payload: Dict[str, Any]) -> None: """Structured logging for monitoring""" record = { "timestamp": datetime.now(timezone.utc).isoformat(), "event": event, "service": "webai-verification-worker", **payload } print(json.dumps(record, default=str), flush=True) def setup_model(): """Initialize WebAI model with memory optimization""" global model, processor try: log_event("model.setup.start", {"model": CONFIG.model_name}) # Configure quantization for memory efficiency quantization_config = None if CONFIG.use_8bit_quantization: quantization_config = BitsAndBytesConfig( load_in_8bit=True, ) elif CONFIG.use_4bit_quantization: quantization_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_compute_dtype=torch.float16, bnb_4bit_quant_type="nf4", bnb_4bit_use_double_quant=True, ) # Load processor from 9b repo (has working config) + model from 4b repo log_event("model.setup.using_processor_workaround", { "model": CONFIG.model_name, "processor_source": CONFIG.processor_name }) processor = AutoProcessor.from_pretrained( CONFIG.processor_name, # Use 9b config trust_remote_code=True, ) # Note: processor is tied to model architecture, both use Qwen 3.5-VL base # Configure model kwargs for FlashAttention-2 model_kwargs = { "quantization_config": quantization_config, "device_map": "auto", # Will map to CPU initially "trust_remote_code": True, "low_cpu_mem_usage": True, } # Add FlashAttention-2 configuration if enabled if CONFIG.use_flash_attention_2: try: # Check if FlashAttention-2 is available import flash_attn model_kwargs.update({ "attn_implementation": "flash_attention_2", "torch_dtype": torch.float16, # FlashAttention-2 works best with fp16 }) log_event("flash_attention.enabled", {"version": getattr(flash_attn, '__version__', 'unknown')}) except ImportError: log_event("flash_attention.unavailable", {"reason": "flash_attn not installed"}) # Fallback to standard attention model_kwargs["torch_dtype"] = torch.float16 # Load model with memory optimization model = AutoModel.from_pretrained( CONFIG.model_name, **model_kwargs ) quantization_type = "8bit" if CONFIG.use_8bit_quantization else ("4bit" if CONFIG.use_4bit_quantization else "none") flash_attention_enabled = CONFIG.use_flash_attention_2 and "flash_attn" in globals() log_event("model.setup.success", { "model": CONFIG.model_name, "quantization": quantization_type, "flash_attention_2": flash_attention_enabled, "memory_optimization": "8bit_quantization" + ("_flash_attention" if flash_attention_enabled else "") }) except Exception as e: log_event("model.setup.failed", {"error": str(e)}) raise def connect_database() -> psycopg.Connection: """Connect to PostgreSQL database""" try: return psycopg.connect( CONFIG.database_url, autocommit=False, prepare_threshold=None, ) except Exception as e: log_event("db.connect.failed", {"error": str(e)}) raise def download_document(storage_path: str) -> bytes: """Download document from Supabase storage""" from urllib.parse import quote encoded_path = quote(storage_path, safe="/") url = f"{CONFIG.supabase_url}/storage/v1/object/{CONFIG.storage_bucket}/{encoded_path}" headers = { "Authorization": f"Bearer {CONFIG.supabase_service_role_key}", "apikey": CONFIG.supabase_service_role_key, } with httpx.Client(timeout=90.0, follow_redirects=True) as client: response = client.get(url, headers=headers) if response.status_code != 200: raise RuntimeError(f"Failed to download document: {response.status_code}") return response.content def prepare_image_for_model(document_bytes: bytes, mime_type: str) -> Image.Image: """Convert document to image format for WebAI model with high DPI for better extraction""" if mime_type == "application/pdf": # For PDFs, use high DPI conversion for better text extraction try: from pdf2image import convert_from_bytes # Get optimal DPI based on available memory optimal_dpi = get_optimal_dpi(CONFIG) log_event("pdf.conversion.start", { "dpi": optimal_dpi, "adaptive": CONFIG.adaptive_dpi }) # Convert with high DPI for better text extraction images = convert_from_bytes( document_bytes, dpi=optimal_dpi, first_page=True, fmt='JPEG', thread_count=1 # Reduce memory usage ) if images: img = images[0] # Log image dimensions for monitoring original_size = img.size estimated_size_mb = (original_size[0] * original_size[1] * 3) / (1024**2) log_event("pdf.converted", { "original_size": original_size, "estimated_size_mb": estimated_size_mb, "dpi_used": optimal_dpi }) # Resize to model's expected size while preserving quality img = img.resize((CONFIG.image_size, CONFIG.image_size), Image.Resampling.LANCZOS) return img except ImportError: log_event("pdf.fallback", {"reason": "pdf2image not available"}) return create_placeholder_image() except Exception as e: log_event("pdf.conversion.failed", {"error": str(e)}) # Fallback to lower DPI if high DPI fails try: from pdf2image import convert_from_bytes images = convert_from_bytes(document_bytes, dpi=150, first_page=True) if images: img = images[0].resize((CONFIG.image_size, CONFIG.image_size), Image.Resampling.LANCZOS) return img except Exception: return create_placeholder_image() # For image formats, convert and resize try: img = Image.open(io.BytesIO(document_bytes)) # For existing images, check if they need enhancement original_size = img.size if max(original_size) < 1000: # Small image, might be low quality log_event("image.low_resolution", {"size": original_size}) img = img.resize((CONFIG.image_size, CONFIG.image_size), Image.Resampling.LANCZOS) return img.convert("RGB") except Exception as e: log_event("image.convert.failed", {"error": str(e)}) return create_placeholder_image() def create_placeholder_image() -> Image.Image: """Create a placeholder image when conversion fails""" from PIL import Image, ImageDraw, ImageFont img = Image.new('RGB', (CONFIG.image_size, CONFIG.image_size), color='white') draw = ImageDraw.Draw(img) # Draw placeholder text try: font = ImageFont.load_default() text = "Document\nProcessing\nError" bbox = draw.textbbox((0, 0), text, font=font) text_width = bbox[2] - bbox[0] text_height = bbox[3] - bbox[1] x = (CONFIG.image_size - text_width) // 2 y = (CONFIG.image_size - text_height) // 2 draw.text((x, y), text, fill='black', font=font) except Exception: pass # Font loading failed return img @spaces.GPU def process_with_webai(image: Image.Image, prompt: str) -> Dict[str, Any]: """Process document with WebAI model on GPU with aggressive memory cleanup""" try: # Prepare inputs inputs = processor( images=image, text=prompt, return_tensors="pt" ).to("cuda") # Generate response with torch.no_grad(): outputs = model.generate( **inputs, max_new_tokens=CONFIG.max_new_tokens, do_sample=True, temperature=0.1, pad_token_id=processor.tokenizer.eos_token_id ) # Decode response response_text = processor.tokenizer.decode( outputs[0], skip_special_tokens=True ) # Clean up GPU memory del inputs, outputs torch.cuda.empty_cache() # Aggressive memory cleanup to prevent ghost memory from vision tensors aggressive_memory_cleanup() return {"response": response_text, "status": "success"} except Exception as e: log_event("webai.process.failed", {"error": str(e)}) # Still attempt cleanup even on failure try: aggressive_memory_cleanup() except Exception: pass return {"response": "", "status": "failed", "error": str(e)} def build_webai_prompt(tender_context: Dict[str, Any]) -> str: """Build prompt for WebAI model analysis""" return f""" Analyze this tender document and provide structured JSON output. Context: - Source: {tender_context.get('source_filename', 'Unknown')} - Organization: {tender_context.get('organization_id', 'Unknown')} Provide analysis in this exact JSON format: {{ "tenderTitle": "string", "procuringEntity": "string", "tenderCategory": "string", "submissionDeadline": "YYYY-MM-DD", "mandatoryDocuments": ["string"], "eligibilityCriteria": ["string"], "hardBlockers": ["string"], "risks": ["string"], "ambiguities": ["string"], "confidence": 0.0, "summary": "string", "complianceItems": [ {{ "title": "string", "requirementText": "string", "status": "PASS|FAIL|UNKNOWN", "severity": "LOW|MEDIUM|HIGH|CRITICAL", "remediation": "string", "pageReference": 1 }} ], "bidScore": {{ "score": 0, "decision": "BID|NO_BID|REVIEW", "blockers": ["string"], "confidence": 0.0, "explanation": "string" }} }} Focus on accuracy and completeness. Extract all visible requirements and criteria. """ def parse_webai_response(response_text: str) -> Dict[str, Any]: """Parse WebAI model response into structured format""" try: # Try to extract JSON from response import re # Look for JSON pattern json_match = re.search(r'\{.*\}', response_text, re.DOTALL) if json_match: json_str = json_match.group(0) return json.loads(json_str) else: # Fallback: try to parse entire response return json.loads(response_text) except json.JSONDecodeError: # Return error structure if parsing fails return { "error": "JSON_PARSE_ERROR", "raw_response": response_text[:500], # First 500 chars "confidence": 0.0, "summary": "Failed to parse WebAI response" } def compare_analyses(primary_analysis: Dict[str, Any], webai_analysis: Dict[str, Any]) -> Dict[str, Any]: """Compare primary worker and WebAI analysis results""" comparison = { "agreement_score": 0.0, "differences": [], "confidence_comparison": {}, "recommendation_comparison": {}, "timestamp": datetime.now(timezone.utc).isoformat() } try: # Compare bid decisions primary_decision = primary_analysis.get("bidScore", {}).get("decision", "REVIEW") webai_decision = webai_analysis.get("bidScore", {}).get("decision", "REVIEW") comparison["recommendation_comparison"] = { "primary": primary_decision, "webai": webai_decision, "agrees": primary_decision == webai_decision } # Compare confidence scores primary_conf = primary_analysis.get("confidence", 0.0) webai_conf = webai_analysis.get("confidence", 0.0) comparison["confidence_comparison"] = { "primary": primary_conf, "webai": webai_conf, "difference": abs(primary_conf - webai_conf) } # Calculate overall agreement score agreement_factors = [] # Bid decision agreement (weight: 0.4) agreement_factors.append(1.0 if primary_decision == webai_decision else 0.0) # Confidence similarity (weight: 0.3) conf_similarity = 1.0 - min(abs(primary_conf - webai_conf), 1.0) agreement_factors.append(conf_similarity) # Category agreement (weight: 0.2) primary_cat = primary_analysis.get("tenderCategory", "").lower() webai_cat = webai_analysis.get("tenderCategory", "").lower() category_similarity = 1.0 if primary_cat == webai_cat else 0.5 # Partial credit for similar categories agreement_factors.append(category_similarity) # Deadline agreement (weight: 0.1) primary_deadline = primary_analysis.get("submissionDeadline", "") webai_deadline = webai_analysis.get("submissionDeadline", "") deadline_similarity = 1.0 if primary_deadline == webai_deadline else 0.0 agreement_factors.append(deadline_similarity) # Weighted average weights = [0.4, 0.3, 0.2, 0.1] comparison["agreement_score"] = sum(f * w for f, w in zip(agreement_factors, weights)) # Identify key differences if primary_decision != webai_decision: comparison["differences"].append({ "type": "bid_decision", "primary": primary_decision, "webai": webai_decision, "severity": "HIGH" }) if abs(primary_conf - webai_conf) > 0.3: comparison["differences"].append({ "type": "confidence", "primary": primary_conf, "webai": webai_conf, "severity": "MEDIUM" }) except Exception as e: log_event("comparison.failed", {"error": str(e)}) comparison["error"] = str(e) return comparison def store_verification_result( conn: psycopg.Connection, tender_id: str, webai_analysis: Dict[str, Any], comparison: Dict[str, Any] ) -> None: """Store verification results in database""" try: with conn.transaction(): with conn.cursor() as cur: # Store WebAI analysis cur.execute(""" INSERT INTO public.webai_verifications (tender_id, analysis, comparison, created_at) VALUES (%s, %s::jsonb, %s::jsonb, now()) """, ( tender_id, json.dumps(webai_analysis), json.dumps(comparison) )) # Update tender with verification status cur.execute(""" UPDATE public.tenders SET verification_status = 'COMPLETED', verification_score = %s, updated_at = now() WHERE id = %s """, (comparison["agreement_score"], tender_id)) except Exception as e: log_event("verification.store.failed", {"error": str(e), "tender_id": tender_id}) raise def claim_verification_job(conn: psycopg.Connection) -> Optional[Dict[str, Any]]: """Claim a job for verification processing""" with conn.transaction(): with conn.cursor(row_factory=dict_row) as cur: cur.execute(""" SELECT j.id, j.tender_id, j.payload FROM public.processing_jobs j JOIN public.tenders t ON j.tender_id = t.id WHERE j.job_type = 'VERIFY' AND j.status = 'QUEUED' AND j.available_at <= now() AND t.status = 'ANALYSIS_READY' ORDER BY j.created_at ASC FOR UPDATE SKIP LOCKED LIMIT 1 """) job = cur.fetchone() if job: cur.execute(""" UPDATE public.processing_jobs SET status = 'RUNNING', locked_at = now(), attempt_count = attempt_count + 1, updated_at = now() WHERE id = %s """, (job["id"],)) return job def process_verification_job(job: Dict[str, Any]) -> None: """Process a single verification job with aggressive memory cleanup""" job_id = job["id"] tender_id = job["tender_id"] try: with connect_database() as conn: with conn.cursor(row_factory=dict_row) as cur: # Get tender context cur.execute(""" SELECT t.id, t.organization_id, t.source_filename, COALESCE(d.storage_path, t.storage_path) as storage_path, e.structured_output as primary_analysis FROM public.tenders t LEFT JOIN public.documents d ON d.tender_id = t.id LEFT JOIN public.extractions e ON e.tender_id = t.id WHERE t.id = %s """, (tender_id,)) context = cur.fetchone() if not context: raise RuntimeError(f"Tender {tender_id} not found") if not context.get("primary_analysis"): raise RuntimeError(f"Primary analysis not available for tender {tender_id}") # Download and process document document_bytes = download_document(context["storage_path"]) mime_type = infer_mime_type(context["storage_path"]) # Prepare image for WebAI image = prepare_image_for_model(document_bytes, mime_type) # Build prompt and process prompt = build_webai_prompt(context) webai_result = process_with_webai(image, prompt) if webai_result["status"] != "success": raise RuntimeError(f"WebAI processing failed: {webai_result.get('error', 'Unknown error')}") # Parse response webai_analysis = parse_webai_response(webai_result["response"]) if "error" in webai_analysis: raise RuntimeError(f"WebAI response parsing failed: {webai_analysis['error']}") # Compare with primary analysis comparison = compare_analyses( context["primary_analysis"], webai_analysis ) # Store results store_verification_result(conn, tender_id, webai_analysis, comparison) # Mark job as completed cur.execute(""" UPDATE public.processing_jobs SET status = 'SUCCEEDED', updated_at = now() WHERE id = %s """, (job_id,)) conn.commit() log_event("verification.completed", { "job_id": job_id, "tender_id": tender_id, "agreement_score": comparison["agreement_score"] }) except Exception as e: log_event("verification.failed", { "job_id": job_id, "tender_id": tender_id, "error": str(e) }) # Mark job as failed try: with connect_database() as conn: with conn.cursor() as cur: cur.execute(""" UPDATE public.processing_jobs SET status = 'FAILED', last_error = %s, updated_at = now() WHERE id = %s """, (str(e)[:2000], job_id)) conn.commit() except Exception: pass finally: # Aggressive cleanup after every job to prevent ghost memory try: aggressive_memory_cleanup() except Exception: pass def get_optimal_dpi(config: WebAIConfig) -> int: """Calculate optimal DPI based on available memory and configuration""" if not config.adaptive_dpi: return config.pdf_dpi try: # Check available GPU memory if torch.cuda.is_available(): gpu_memory_gb = torch.cuda.get_device_properties(0).total_memory / (1024**3) gpu_memory_used_gb = torch.cuda.memory_allocated(0) / (1024**3) available_memory_gb = gpu_memory_gb - gpu_memory_used_gb else: # Fallback to system memory estimation import psutil available_memory_gb = psutil.virtual_memory().available / (1024**3) # DPI scaling based on available memory if available_memory_gb >= 12: # High memory - use max quality optimal_dpi = 300 elif available_memory_gb >= 8: # Medium memory - use high quality optimal_dpi = 250 elif available_memory_gb >= 4: # Low memory - use medium quality optimal_dpi = 200 else: # Very low memory - use conservative DPI optimal_dpi = 150 # Ensure we don't exceed configured maximum optimal_dpi = min(optimal_dpi, config.pdf_dpi) log_event("dpi.adaptive", { "configured_dpi": config.pdf_dpi, "optimal_dpi": optimal_dpi, "available_memory_gb": available_memory_gb }) return optimal_dpi except Exception as e: log_event("dpi.adaptive.failed", {"error": str(e)}) return config.pdf_dpi def aggressive_memory_cleanup(): """Aggressive cleanup of memory after document processing to prevent ghost memory""" import gc import weakref cleanup_start = time.perf_counter() try: # Get memory before cleanup if torch.cuda.is_available(): gpu_memory_before = torch.cuda.memory_allocated() / (1024**3) gpu_memory_cached_before = torch.cuda.memory_reserved() / (1024**3) else: gpu_memory_before = 0 gpu_memory_cached_before = 0 import psutil process = psutil.Process() memory_before = process.memory_info().rss / (1024**3) # 1. Clear GPU cache and CUDA context if torch.cuda.is_available(): # Clear PyTorch CUDA cache torch.cuda.empty_cache() # Force garbage collection on CUDA tensors for _ in range(3): # Multiple passes to ensure cleanup torch.cuda.synchronize() torch.cuda.empty_cache() # 2. Aggressive Python garbage collection # Multiple passes to catch circular references and lazy cleanup collected_objects = [] for generation in range(3): collected = gc.collect() collected_objects.append(collected) # Force collection of weak references if hasattr(gc, 'collect'): gc.collect() # 3. Clear any remaining large objects # Clear PIL images from memory try: from PIL import Image if hasattr(Image, '_initialized'): # Clear PIL image cache Image._initialized = False except Exception: pass # 4. Final cleanup pass gc.collect() # Get memory after cleanup if torch.cuda.is_available(): gpu_memory_after = torch.cuda.memory_allocated() / (1024**3) gpu_memory_cached_after = torch.cuda.memory_reserved() / (1024**3) else: gpu_memory_after = 0 gpu_memory_cached_after = 0 memory_after = process.memory_info().rss / (1024**3) # Calculate cleanup effectiveness memory_freed_gb = memory_before - memory_after gpu_memory_freed_gb = gpu_memory_before - gpu_memory_after gpu_cache_freed_gb = gpu_memory_cached_before - gpu_memory_cached_after cleanup_time = (time.perf_counter() - cleanup_start) * 1000 log_event("memory.cleanup", { "memory_freed_gb": round(memory_freed_gb, 3), "gpu_memory_freed_gb": round(gpu_memory_freed_gb, 3), "gpu_cache_freed_gb": round(gpu_cache_freed_gb, 3), "cleanup_time_ms": round(cleanup_time, 2), "collected_objects": collected_objects, "memory_before_gb": round(memory_before, 3), "memory_after_gb": round(memory_after, 3), "gpu_memory_before_gb": round(gpu_memory_before, 3), "gpu_memory_after_gb": round(gpu_memory_after, 3) }) # Force a small delay to allow OS to reclaim memory time.sleep(0.1) except Exception as e: log_event("memory.cleanup.failed", {"error": str(e)}) # Still try basic cleanup even if detailed monitoring fails try: if torch.cuda.is_available(): torch.cuda.empty_cache() gc.collect() except Exception: pass def infer_mime_type(storage_path: str) -> str: """Infer MIME type from file extension""" from pathlib import Path suffix = Path(storage_path.lower()).suffix if suffix == ".pdf": return "application/pdf" elif suffix in {".png"}: return "image/png" elif suffix in {".jpg", ".jpeg"}: return "image/jpeg" elif suffix in {".tif", ".tiff"}: return "image/tiff" return "application/octet-stream" # Gradio Interface def create_interface(): """Create Gradio interface for HF Spaces""" def verify_document(file_obj): """Interface function for document verification with memory cleanup""" if file_obj is None: return "Please upload a document", "" try: # Read file with open(file_obj.name, "rb") as f: document_bytes = f.read() # Process with WebAI mime_type = infer_mime_type(file_obj.name) image = prepare_image_for_model(document_bytes, mime_type) context = { "source_filename": file_obj.name, "organization_id": "demo" } prompt = build_webai_prompt(context) result = process_with_webai(image, prompt) if result["status"] == "success": analysis = parse_webai_response(result["response"]) return "Analysis Complete", json.dumps(analysis, indent=2) else: return "Processing Failed", result.get("error", "Unknown error") except Exception as e: return f"Error: {str(e)}", "" finally: # Ensure cleanup happens even if processing fails try: aggressive_memory_cleanup() except Exception: pass # Create Gradio interface with gr.Blocks(title="TenderHub WebAI Verification") as interface: gr.Markdown("# TenderHub WebAI Verification Worker") gr.Markdown("Upload tender documents for AI-powered verification analysis") with gr.Row(): with gr.Column(): file_input = gr.File( label="Upload Document", file_types=[".pdf", ".png", ".jpg", ".jpeg", ".tiff"] ) verify_btn = gr.Button("Verify Document", variant="primary") with gr.Column(): status_output = gr.Textbox(label="Status") analysis_output = gr.Textbox( label="Analysis Result", lines=20, max_lines=30 ) verify_btn.click( verify_document, inputs=[file_input], outputs=[status_output, analysis_output] ) return interface # Worker loop for background processing def worker_loop(): """Background worker loop for processing verification jobs""" log_event("worker.start", {}) while True: try: with connect_database() as conn: job = claim_verification_job(conn) if job: log_event("job.claimed", {"job_id": job["id"], "tender_id": job["tender_id"]}) process_verification_job(job) else: # No jobs available, wait time.sleep(10) except Exception as e: log_event("worker.error", {"error": str(e)}) time.sleep(30) # Wait longer on errors # Main execution if __name__ == "__main__": # Setup model on startup setup_model() # Start background worker thread import threading worker_thread = threading.Thread(target=worker_loop, daemon=True) worker_thread.start() # Launch Gradio interface interface = create_interface() interface.launch( server_name="0.0.0.0", server_port=int(os.getenv("PORT", "7860")), share=False )