import gradio as gr import random import math import heapq import time from collections import deque from enum import Enum # Environment and Direction Enums class CellType(Enum): CLEAN = 0 DIRTY = 1 OBSTACLE = 2 EXPLORED = 3 class Direction(Enum): UP = 0 RIGHT = 1 DOWN = 2 LEFT = 3 @classmethod def from_movement(cls, current_pos, next_pos): current_row, current_col = current_pos next_row, next_col = next_pos if next_row < current_row: return cls.UP elif next_row > current_row: return cls.DOWN elif next_col > current_col: return cls.RIGHT elif next_col < current_col: return cls.LEFT return None # Environment Class class Environment: def __init__(self, rows, cols, obstacle_density=0.2, dirt_density=0.3): self.rows = rows self.cols = cols self.grid = [[CellType.CLEAN for _ in range(cols)] for _ in range(rows)] self.obstacle_density = obstacle_density self.dirt_density = dirt_density self.vacuum_pos = None self.dirty_cells = set() self.generate_environment() def generate_environment(self): # Place obstacles for i in range(self.rows): for j in range(self.cols): if random.random() < self.obstacle_density: self.grid[i][j] = CellType.OBSTACLE # Place dirt on clean cells only for i in range(self.rows): for j in range(self.cols): if self.grid[i][j] == CellType.CLEAN and random.random() < self.dirt_density: self.grid[i][j] = CellType.DIRTY self.dirty_cells.add((i, j)) # Place vacuum at a random clean position clean_positions = [(i, j) for i in range(self.rows) for j in range(self.cols) if self.grid[i][j] == CellType.CLEAN] if clean_positions: self.vacuum_pos = random.choice(clean_positions) def reset(self): self.grid = [[CellType.CLEAN for _ in range(self.cols)] for _ in range(self.rows)] self.dirty_cells = set() self.generate_environment() def is_valid_position(self, row, col): return (0 <= row < self.rows and 0 <= col < self.cols and self.grid[row][col] != CellType.OBSTACLE) def clean_cell(self, row, col): if (row, col) in self.dirty_cells: self.grid[row][col] = CellType.CLEAN self.dirty_cells.remove((row, col)) return True return False def mark_explored(self, row, col): if self.grid[row][col] == CellType.CLEAN: self.grid[row][col] = CellType.EXPLORED def get_dirty_count(self): return len(self.dirty_cells) def is_clean(self): return len(self.dirty_cells) == 0 # Search Algorithms class Node: def __init__(self, position, parent=None, direction=None): self.position = position self.parent = parent self.direction = direction self.g = 0 self.h = 0 self.f = 0 def __eq__(self, other): return self.position == other.position def __lt__(self, other): return self.f < other.f class SearchAlgorithms: def __init__(self, environment, turn_cost_enabled=False): self.env = environment self.turn_cost_enabled = turn_cost_enabled def calculate_turn_cost(self, current_dir, new_dir): if not self.turn_cost_enabled or current_dir is None or new_dir is None: return 0 if current_dir == new_dir: return 0 diff = abs(current_dir.value - new_dir.value) if diff == 3: diff = 1 if diff == 1: return 0.5 elif diff == 2: return 1.0 return 0 def get_neighbors(self, position, direction=None): row, col = position neighbors = [] moves = [ (-1, 0, Direction.UP), (0, 1, Direction.RIGHT), (1, 0, Direction.DOWN), (0, -1, Direction.LEFT) ] for dr, dc, new_dir in moves: new_row, new_col = row + dr, col + dc if self.env.is_valid_position(new_row, new_col): turn_cost = self.calculate_turn_cost(direction, new_dir) move_cost = 1 + turn_cost neighbors.append(((new_row, new_col), new_dir, move_cost)) return neighbors def get_diagonal_neighbors(self, position, direction=None): row, col = position neighbors = [] moves = [ (-1, 0, Direction.UP, 1), (0, 1, Direction.RIGHT, 1), (1, 0, Direction.DOWN, 1), (0, -1, Direction.LEFT, 1), (-1, -1, Direction.UP, math.sqrt(2)), (-1, 1, Direction.UP, math.sqrt(2)), (1, -1, Direction.DOWN, math.sqrt(2)), (1, 1, Direction.DOWN, math.sqrt(2)) ] for dr, dc, new_dir, base_cost in moves: new_row, new_col = row + dr, col + dc if self.env.is_valid_position(new_row, new_col): turn_cost = self.calculate_turn_cost(direction, new_dir) move_cost = base_cost + turn_cost neighbors.append(((new_row, new_col), new_dir, move_cost)) return neighbors def manhattan_distance(self, pos1, pos2): return abs(pos1[0] - pos2[0]) + abs(pos1[1] - pos2[1]) def euclidean_distance(self, pos1, pos2): return math.sqrt((pos1[0] - pos2[0])**2 + (pos1[1] - pos2[1])**2) def chebyshev_distance(self, pos1, pos2): return max(abs(pos1[0] - pos2[0]), abs(pos1[1] - pos2[1])) def bfs(self, start, goals): start_time = time.time() if not goals: return None, set(), 0, 0 queue = deque([Node(start)]) visited = set([start]) explored = set([start]) nodes_expanded = 0 while queue: current_node = queue.popleft() nodes_expanded += 1 if current_node.position in goals: path = [] temp_node = current_node while temp_node: path.append(temp_node.position) temp_node = temp_node.parent computation_time = time.time() - start_time return path[::-1], explored, nodes_expanded, computation_time for neighbor_pos, new_dir, move_cost in self.get_neighbors(current_node.position): if neighbor_pos not in visited: visited.add(neighbor_pos) explored.add(neighbor_pos) new_node = Node(neighbor_pos, current_node, new_dir) queue.append(new_node) computation_time = time.time() - start_time return None, explored, nodes_expanded, computation_time def a_star(self, start, goals, heuristic_type="manhattan", allow_diagonals=False): start_time = time.time() if not goals: return None, set(), 0, 0 open_list = [] start_node = Node(start) heapq.heappush(open_list, start_node) closed_list = set() explored = set([start]) nodes_expanded = 0 g_costs = {start: 0} directions = {start: None} while open_list: current_node = heapq.heappop(open_list) nodes_expanded += 1 if current_node.position in goals: path = [] temp_node = current_node while temp_node: path.append(temp_node.position) temp_node = temp_node.parent computation_time = time.time() - start_time return path[::-1], explored, nodes_expanded, computation_time closed_list.add(current_node.position) current_dir = directions[current_node.position] if allow_diagonals: neighbors = self.get_diagonal_neighbors(current_node.position, current_dir) else: neighbors = self.get_neighbors(current_node.position, current_dir) for neighbor_pos, direction, move_cost in neighbors: if neighbor_pos in closed_list: continue new_g = g_costs[current_node.position] + move_cost if neighbor_pos not in g_costs or new_g < g_costs[neighbor_pos]: min_h = float('inf') for goal in goals: if heuristic_type == "manhattan": h = self.manhattan_distance(neighbor_pos, goal) elif heuristic_type == "euclidean": h = self.euclidean_distance(neighbor_pos, goal) elif heuristic_type == "chebyshev": h = self.chebyshev_distance(neighbor_pos, goal) min_h = min(min_h, h) new_node = Node(neighbor_pos, current_node, direction) new_node.g = new_g new_node.h = min_h new_node.f = new_g + min_h g_costs[neighbor_pos] = new_g directions[neighbor_pos] = direction explored.add(neighbor_pos) heapq.heappush(open_list, new_node) computation_time = time.time() - start_time return None, explored, nodes_expanded, computation_time def find_path_to_nearest_dirty(self, vacuum_pos, algorithm="a_star", heuristic="manhattan"): dirty_cells = list(self.env.dirty_cells) if not dirty_cells: return None, set(), 0, 0 if algorithm == "bfs": return self.bfs(vacuum_pos, dirty_cells) else: allow_diagonals = heuristic in ["euclidean", "chebyshev"] return self.a_star(vacuum_pos, dirty_cells, heuristic, allow_diagonals) # Gradio Simulation class GradioVacuumSimulation: def __init__(self, rows=15, cols=15): self.rows = rows self.cols = cols self.env = Environment(rows, cols) self.search = SearchAlgorithms(self.env) self.current_path = [] self.explored_cells = set() self.steps_taken = 0 self.total_cost = 0 self.current_direction = None self.is_running = False self.total_nodes_expanded = 0 self.total_computation_time = 0 self.algorithm_stats = { "BFS": {"runs": 0, "total_nodes": 0, "total_time": 0}, "A* Manhattan": {"runs": 0, "total_nodes": 0, "total_time": 0}, "A* Euclidean": {"runs": 0, "total_nodes": 0, "total_time": 0}, "A* Chebyshev": {"runs": 0, "total_nodes": 0, "total_time": 0} } self.turn_cost_enabled = False self.current_algorithm = "A* Manhattan" def create_grid_image(self): """Create an HTML representation of the grid""" html = f"""
| Algorithm | Runs | Avg Nodes | Avg Time |
|---|---|---|---|
| {algo} | {stats['runs']} | {avg_nodes:.1f} | {avg_time:.4f}s |
| {algo} | 0 | - | - |
Analysis: Chebyshev explores {node_ratio:.1f}x more nodes than Euclidean
" return html def reset_simulation(self): self.env.reset() self.current_path = [] self.explored_cells = set() self.steps_taken = 0 self.total_cost = 0 self.current_direction = None self.total_nodes_expanded = 0 self.total_computation_time = 0 self.is_running = False def next_step(self): if self.env.is_clean(): self.is_running = False return if not self.current_path: self.find_path() if not self.current_path: self.is_running = False return if self.current_path: next_pos = self.current_path.pop(0) current_pos = self.env.vacuum_pos move_cost = 1 new_direction = Direction.from_movement(current_pos, next_pos) if self.turn_cost_enabled and self.current_direction is not None: if self.current_direction != new_direction: move_cost += 0.5 self.current_direction = new_direction self.env.vacuum_pos = next_pos self.steps_taken += 1 self.total_cost += move_cost if self.env.clean_cell(next_pos[0], next_pos[1]): self.current_path = [] self.env.mark_explored(next_pos[0], next_pos[1]) def find_path(self): if self.current_algorithm == "BFS": algorithm = "bfs" heuristic = "manhattan" else: algorithm = "a_star" if self.current_algorithm == "A* Manhattan": heuristic = "manhattan" elif self.current_algorithm == "A* Euclidean": heuristic = "euclidean" else: heuristic = "chebyshev" self.search.turn_cost_enabled = self.turn_cost_enabled path, explored, nodes_expanded, computation_time = self.search.find_path_to_nearest_dirty( self.env.vacuum_pos, algorithm, heuristic) self.total_nodes_expanded += nodes_expanded self.total_computation_time += computation_time self.algorithm_stats[self.current_algorithm]["runs"] += 1 self.algorithm_stats[self.current_algorithm]["total_nodes"] += nodes_expanded self.algorithm_stats[self.current_algorithm]["total_time"] += computation_time if path: self.current_path = path[1:] self.explored_cells.update(explored) for row, col in explored: if (row, col) != self.env.vacuum_pos and self.env.grid[row][col] == CellType.CLEAN: self.env.grid[row][col] = CellType.EXPLORED # Gradio Interface def create_vacuum_simulator(): simulator = GradioVacuumSimulation() def update_display(algorithm, turn_cost, action): if action == "Reset": simulator.reset_simulation() elif action == "Next Step": simulator.next_step() elif action == "Run/Pause": simulator.is_running = not simulator.is_running if simulator.is_running: # Run multiple steps automatically for _ in range(5): # Run 5 steps at a time for better visualization if simulator.is_running and not simulator.env.is_clean(): simulator.next_step() else: simulator.is_running = False break simulator.current_algorithm = algorithm simulator.turn_cost_enabled = turn_cost grid_html = simulator.create_grid_image() stats_html = simulator.get_stats_html() algo_stats_html = simulator.get_algorithm_stats_html() return grid_html, stats_html, algo_stats_html with gr.Blocks(title="Vacuum Cleaner Search Simulation", theme=gr.themes.Soft()) as demo: gr.Markdown("# 🏠 Vacuum Cleaner Search Simulation") gr.Markdown(""" Compare different search algorithms as a vacuum cleaner navigates to clean dirty cells. - **Yellow**: Clean | **Red**: Dirty | **Blue**: Obstacle | **Green**: Explored - **Purple**: Vacuum | **Orange dots**: Planned path """) with gr.Row(): with gr.Column(scale=2): grid_output = gr.HTML(label="Grid Visualization") with gr.Row(): algorithm_dropdown = gr.Dropdown( choices=["BFS", "A* Manhattan", "A* Euclidean", "A* Chebyshev"], value="A* Manhattan", label="Search Algorithm" ) turn_cost_checkbox = gr.Checkbox(label="Enable Turn Cost (0.5 per 90° turn)", value=False) with gr.Row(): reset_btn = gr.Button("Reset", variant="secondary") next_btn = gr.Button("Next Step", variant="secondary") run_btn = gr.Button("Run/Pause", variant="primary") with gr.Column(scale=1): stats_output = gr.HTML(label="Current Statistics") algorithm_stats_output = gr.HTML(label="Algorithm Performance") # Event handlers inputs = [algorithm_dropdown, turn_cost_checkbox] reset_btn.click( update_display, inputs=inputs + [gr.State("Reset")], outputs=[grid_output, stats_output, algorithm_stats_output] ) next_btn.click( update_display, inputs=inputs + [gr.State("Next Step")], outputs=[grid_output, stats_output, algorithm_stats_output] ) run_btn.click( update_display, inputs=inputs + [gr.State("Run/Pause")], outputs=[grid_output, stats_output, algorithm_stats_output] ) # Initial display demo.load( update_display, inputs=inputs + [gr.State("Reset")], outputs=[grid_output, stats_output, algorithm_stats_output] ) return demo # Run the application if __name__ == "__main__": demo = create_vacuum_simulator() demo.launch()