| |
| """ |
| Benchmark script for mosaic generation performance analysis. |
| """ |
|
|
| import time |
| import numpy as np |
| from PIL import Image |
| import matplotlib.pyplot as plt |
| from typing import Dict, List |
| import argparse |
| import os |
|
|
| from src.config import Config, Implementation |
| from src.pipeline import MosaicPipeline |
| from src.utils import pil_to_np, np_to_pil |
|
|
|
|
| def create_test_image(width: int = 512, height: int = 512) -> Image.Image: |
| """Create a test image with various features for benchmarking.""" |
| |
| img_array = np.zeros((height, width, 3), dtype=np.float32) |
| |
| |
| for y in range(height): |
| for x in range(width): |
| |
| img_array[y, x, 0] = x / width |
| |
| |
| img_array[y, x, 1] = y / height |
| |
| |
| img_array[y, x, 2] = (x + y) / (width + height) |
| |
| |
| center_x, center_y = width // 2, height // 2 |
| radius = min(width, height) // 4 |
| |
| for y in range(height): |
| for x in range(width): |
| |
| dist = np.sqrt((x - center_x)**2 + (y - center_y)**2) |
| if dist < radius: |
| img_array[y, x] = [1.0, 0.5, 0.2] |
| |
| return np_to_pil(img_array) |
|
|
|
|
| def benchmark_grid_sizes(pipeline: MosaicPipeline, test_image: Image.Image, |
| grid_sizes: List[int]) -> Dict: |
| """Benchmark performance across different grid sizes.""" |
| print("Benchmarking grid sizes...") |
| results = {} |
| |
| for grid_size in grid_sizes: |
| print(f"Testing grid size {grid_size}x{grid_size}...") |
| |
| |
| pipeline.config.grid = grid_size |
| pipeline.config.out_w = (test_image.width // grid_size) * grid_size |
| pipeline.config.out_h = (test_image.height // grid_size) * grid_size |
| |
| |
| start_time = time.time() |
| pipeline_results = pipeline.run_full_pipeline(test_image) |
| total_time = time.time() - start_time |
| |
| results[grid_size] = { |
| 'processing_time': total_time, |
| 'total_tiles': grid_size * grid_size, |
| 'tiles_per_second': (grid_size * grid_size) / total_time, |
| 'mse': pipeline_results['metrics']['mse'], |
| 'ssim': pipeline_results['metrics']['ssim'], |
| 'output_resolution': f"{pipeline_results['outputs']['mosaic'].width}x{pipeline_results['outputs']['mosaic'].height}" |
| } |
| |
| print(f" Processing time: {total_time:.3f}s") |
| print(f" Tiles per second: {results[grid_size]['tiles_per_second']:.1f}") |
| |
| return results |
|
|
|
|
| def benchmark_implementations(pipeline: MosaicPipeline, test_image: Image.Image) -> Dict: |
| """Compare vectorized vs loop-based implementations.""" |
| print("Benchmarking implementations...") |
| |
| results = {} |
| |
| |
| print("Testing vectorized implementation...") |
| pipeline.config.impl = Implementation.VECT |
| start_time = time.time() |
| vec_results = pipeline.run_full_pipeline(test_image) |
| vec_time = time.time() - start_time |
| |
| results['vectorized'] = { |
| 'processing_time': vec_time, |
| 'mse': vec_results['metrics']['mse'], |
| 'ssim': vec_results['metrics']['ssim'] |
| } |
| |
| |
| print("Testing loop-based implementation...") |
| pipeline.config.impl = Implementation.LOOPS |
| start_time = time.time() |
| loop_results = pipeline.run_full_pipeline(test_image) |
| loop_time = time.time() - start_time |
| |
| results['loop_based'] = { |
| 'processing_time': loop_time, |
| 'mse': loop_results['metrics']['mse'], |
| 'ssim': loop_results['metrics']['ssim'] |
| } |
| |
| |
| speedup = loop_time / vec_time if vec_time > 0 else 0 |
| results['comparison'] = { |
| 'speedup_factor': speedup, |
| 'vectorized_faster': vec_time < loop_time |
| } |
| |
| print(f"Vectorized: {vec_time:.3f}s") |
| print(f"Loop-based: {loop_time:.3f}s") |
| print(f"Speedup factor: {speedup:.2f}x") |
| |
| return results |
|
|
|
|
| def plot_benchmark_results(grid_results: Dict, impl_results: Dict, output_dir: str = "images"): |
| """Create plots of benchmark results.""" |
| os.makedirs(output_dir, exist_ok=True) |
| |
| |
| plt.figure(figsize=(10, 6)) |
| grid_sizes = sorted(grid_results.keys()) |
| processing_times = [grid_results[gs]['processing_time'] for gs in grid_sizes] |
| total_tiles = [grid_results[gs]['total_tiles'] for gs in grid_sizes] |
| |
| plt.subplot(1, 2, 1) |
| plt.plot(grid_sizes, processing_times, 'bo-', linewidth=2, markersize=8) |
| plt.xlabel('Grid Size') |
| plt.ylabel('Processing Time (seconds)') |
| plt.title('Processing Time vs Grid Size') |
| plt.grid(True, alpha=0.3) |
| |
| plt.subplot(1, 2, 2) |
| plt.plot(total_tiles, processing_times, 'ro-', linewidth=2, markersize=8) |
| plt.xlabel('Total Number of Tiles') |
| plt.ylabel('Processing Time (seconds)') |
| plt.title('Processing Time vs Number of Tiles') |
| plt.grid(True, alpha=0.3) |
| |
| plt.tight_layout() |
| plt.savefig(f"{output_dir}/processing_time_analysis.png", dpi=300, bbox_inches='tight') |
| plt.close() |
| |
| |
| plt.figure(figsize=(12, 5)) |
| |
| plt.subplot(1, 2, 1) |
| mse_values = [grid_results[gs]['mse'] for gs in grid_sizes] |
| plt.plot(grid_sizes, mse_values, 'go-', linewidth=2, markersize=8) |
| plt.xlabel('Grid Size') |
| plt.ylabel('MSE') |
| plt.title('Mean Squared Error vs Grid Size') |
| plt.grid(True, alpha=0.3) |
| plt.yscale('log') |
| |
| plt.subplot(1, 2, 2) |
| ssim_values = [grid_results[gs]['ssim'] for gs in grid_sizes] |
| plt.plot(grid_sizes, ssim_values, 'mo-', linewidth=2, markersize=8) |
| plt.xlabel('Grid Size') |
| plt.ylabel('SSIM') |
| plt.title('Structural Similarity vs Grid Size') |
| plt.grid(True, alpha=0.3) |
| |
| plt.tight_layout() |
| plt.savefig(f"{output_dir}/quality_metrics_analysis.png", dpi=300, bbox_inches='tight') |
| plt.close() |
| |
| |
| plt.figure(figsize=(8, 6)) |
| impl_names = ['Vectorized', 'Loop-based'] |
| impl_times = [ |
| impl_results['vectorized']['processing_time'], |
| impl_results['loop_based']['processing_time'] |
| ] |
| |
| bars = plt.bar(impl_names, impl_times, color=['skyblue', 'lightcoral']) |
| plt.ylabel('Processing Time (seconds)') |
| plt.title('Implementation Performance Comparison') |
| plt.grid(True, alpha=0.3, axis='y') |
| |
| |
| for bar, time_val in zip(bars, impl_times): |
| plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01, |
| f'{time_val:.3f}s', ha='center', va='bottom') |
| |
| plt.tight_layout() |
| plt.savefig(f"{output_dir}/implementation_comparison.png", dpi=300, bbox_inches='tight') |
| plt.close() |
|
|
|
|
| def generate_benchmark_report(grid_results: Dict, impl_results: Dict, output_file: str = "benchmark_report.txt"): |
| """Generate a comprehensive benchmark report.""" |
| with open(output_file, 'w') as f: |
| f.write("MOSAIC GENERATION BENCHMARK REPORT\n") |
| f.write("=" * 50 + "\n\n") |
| |
| |
| f.write("GRID SIZE PERFORMANCE ANALYSIS\n") |
| f.write("-" * 30 + "\n") |
| for grid_size in sorted(grid_results.keys()): |
| result = grid_results[grid_size] |
| f.write(f"Grid {grid_size}x{grid_size}:\n") |
| f.write(f" Processing Time: {result['processing_time']:.3f}s\n") |
| f.write(f" Total Tiles: {result['total_tiles']}\n") |
| f.write(f" Tiles per Second: {result['tiles_per_second']:.1f}\n") |
| f.write(f" MSE: {result['mse']:.6f}\n") |
| f.write(f" SSIM: {result['ssim']:.4f}\n") |
| f.write(f" Output Resolution: {result['output_resolution']}\n\n") |
| |
| |
| grid_sizes = sorted(grid_results.keys()) |
| if len(grid_sizes) >= 2: |
| first_result = grid_results[grid_sizes[0]] |
| last_result = grid_results[grid_sizes[-1]] |
| |
| tile_ratio = last_result['total_tiles'] / first_result['total_tiles'] |
| time_ratio = last_result['processing_time'] / first_result['processing_time'] |
| |
| f.write("SCALING ANALYSIS\n") |
| f.write("-" * 20 + "\n") |
| f.write(f"Tile increase ratio: {tile_ratio:.2f}x\n") |
| f.write(f"Time increase ratio: {time_ratio:.2f}x\n") |
| f.write(f"Scaling efficiency: {tile_ratio/time_ratio:.2f}\n") |
| f.write(f"Linear scaling: {'Yes' if abs(time_ratio - tile_ratio) / tile_ratio < 0.1 else 'No'}\n\n") |
| |
| |
| f.write("IMPLEMENTATION COMPARISON\n") |
| f.write("-" * 25 + "\n") |
| f.write(f"Vectorized processing time: {impl_results['vectorized']['processing_time']:.3f}s\n") |
| f.write(f"Loop-based processing time: {impl_results['loop_based']['processing_time']:.3f}s\n") |
| f.write(f"Speedup factor: {impl_results['comparison']['speedup_factor']:.2f}x\n") |
| f.write(f"Vectorized is faster: {'Yes' if impl_results['comparison']['vectorized_faster'] else 'No'}\n\n") |
| |
| |
| f.write("QUALITY COMPARISON\n") |
| f.write("-" * 18 + "\n") |
| f.write(f"Vectorized MSE: {impl_results['vectorized']['mse']:.6f}\n") |
| f.write(f"Loop-based MSE: {impl_results['loop_based']['mse']:.6f}\n") |
| f.write(f"Vectorized SSIM: {impl_results['vectorized']['ssim']:.4f}\n") |
| f.write(f"Loop-based SSIM: {impl_results['loop_based']['ssim']:.4f}\n") |
|
|
|
|
| def main(): |
| """Main benchmark function.""" |
| parser = argparse.ArgumentParser(description='Benchmark mosaic generation performance') |
| parser.add_argument('--grid-sizes', nargs='+', type=int, default=[16, 32, 48, 64], |
| help='Grid sizes to test (default: 16 32 48 64)') |
| parser.add_argument('--output-dir', default='images', help='Output directory for plots') |
| parser.add_argument('--test-image', help='Path to test image (optional)') |
| args = parser.parse_args() |
| |
| print("Starting mosaic generation benchmark...") |
| |
| |
| if args.test_image and os.path.exists(args.test_image): |
| test_image = Image.open(args.test_image) |
| print(f"Using test image: {args.test_image}") |
| else: |
| test_image = create_test_image() |
| print("Using generated test image") |
| |
| |
| config = Config(grid=32) |
| pipeline = MosaicPipeline(config) |
| |
| |
| print("\n" + "="*50) |
| grid_results = benchmark_grid_sizes(pipeline, test_image, args.grid_sizes) |
| |
| print("\n" + "="*50) |
| impl_results = benchmark_implementations(pipeline, test_image) |
| |
| |
| print("\nGenerating plots and report...") |
| plot_benchmark_results(grid_results, impl_results, args.output_dir) |
| generate_benchmark_report(grid_results, impl_results) |
| |
| print(f"\nBenchmark complete!") |
| print(f"Plots saved to: {args.output_dir}/") |
| print(f"Report saved to: benchmark_report.txt") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|