import sys import numpy as np import matplotlib.pyplot as plt from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas from matplotlib.figure import Figure from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QGroupBox, QLabel, QComboBox, QDoubleSpinBox, QSpinBox, QPushButton, QTextEdit, QTabWidget, QGridLayout, QProgressBar) from PyQt5.QtCore import QThread, pyqtSignal import random class PSOThread(QThread): update_signal = pyqtSignal(dict) finished_signal = pyqtSignal(dict) def __init__(self, problem_type, num_particles, max_iter, w, c1, c2): super().__init__() self.problem_type = problem_type self.num_particles = num_particles self.max_iter = max_iter self.w = w self.c1 = c1 self.c2 = c2 self.running = True def run(self): # Initialize particles based on problem type if self.problem_type == "radiative_equilibrium": bounds = [(-10, 10), (-10, 10)] # Temperature and density parameters dim = 2 elif self.problem_type == "nuclear_reaction_rate": bounds = [(0.1, 2.0), (1e-3, 1e-1)] # Temperature (T7) and density parameters dim = 2 elif self.problem_type == "convective_stability": bounds = [(0.1, 0.5), (0.1, 0.5), (0.1, 0.5)] # ∇_rad, ∇_ad, ∇_μ dim = 3 elif self.problem_type == "opacity_optimization": bounds = [(1e-3, 1e3), (1e4, 1e8)] # Density and temperature dim = 2 else: bounds = [(-5, 5), (-5, 5)] dim = 2 # PSO initialization particles = np.random.uniform([b[0] for b in bounds], [b[1] for b in bounds], (self.num_particles, dim)) velocities = np.random.uniform(-1, 1, (self.num_particles, dim)) personal_best_positions = particles.copy() personal_best_scores = np.array([self.fitness(p, self.problem_type) for p in particles]) global_best_index = np.argmin(personal_best_scores) global_best_position = personal_best_positions[global_best_index] global_best_score = personal_best_scores[global_best_index] # PSO main loop for iteration in range(self.max_iter): if not self.running: break for i in range(self.num_particles): # Update velocity r1, r2 = random.random(), random.random() velocities[i] = (self.w * velocities[i] + self.c1 * r1 * (personal_best_positions[i] - particles[i]) + self.c2 * r2 * (global_best_position - particles[i])) # Update position particles[i] += velocities[i] # Apply bounds for d in range(dim): if particles[i, d] < bounds[d][0]: particles[i, d] = bounds[d][0] elif particles[i, d] > bounds[d][1]: particles[i, d] = bounds[d][1] # Evaluate fitness current_fitness = self.fitness(particles[i], self.problem_type) # Update personal best if current_fitness < personal_best_scores[i]: personal_best_positions[i] = particles[i].copy() personal_best_scores[i] = current_fitness # Update global best if current_fitness < global_best_score: global_best_position = particles[i].copy() global_best_score = current_fitness # Emit update signal self.update_signal.emit({ 'iteration': iteration, 'global_best': global_best_score, 'position': global_best_position, 'particles': particles.copy() }) self.finished_signal.emit({ 'final_score': global_best_score, 'final_position': global_best_position }) def fitness(self, x, problem_type): """Fitness function based on stellar physics problems from Chapter 5-6""" if problem_type == "radiative_equilibrium": # Optimize radiative temperature gradient (Eq. 5.18) # We want to minimize deviation from ideal radiative equilibrium T, rho = x[0], x[1] # Simplified radiative equilibrium condition radiative_flux = (T**3 / rho) if rho > 0 else 1e10 target_flux = 1.0 # Ideal normalized flux return abs(radiative_flux - target_flux) elif problem_type == "nuclear_reaction_rate": # Optimize nuclear reaction rates (Eq. 6.29) T7, density_param = x[0], x[1] # T7 = T/10^7 K # Gamow peak-based reaction rate approximation reaction_rate = (T7**(-2/3)) * np.exp(-1/T7**(1/3)) * density_param target_rate = 0.5 # Optimal reaction rate return abs(reaction_rate - target_rate) elif problem_type == "convective_stability": # Schwarzschild/Ledoux criterion optimization (Eq. 5.49, 5.50) grad_rad, grad_ad, grad_mu = x[0], x[1], x[2] # Stability requires: ∇_rad < ∇_ad - (χ_μ/χ_T)∇_μ # For ideal gas: χ_μ = -1, χ_T = 1 stability_condition = grad_ad + grad_mu # Ledoux criterion instability = max(0, grad_rad - stability_condition) return instability # Minimize instability elif problem_type == "opacity_optimization": # Optimize opacity for efficient energy transport rho, T = x[0], x[1] # Kramers opacity approximation (Eq. 5.31, 5.32) opacity = rho * T**(-3.5) if T > 0 else 1e10 # Target opacity range for efficient transport target_opacity = 1.0 return abs(opacity - target_opacity) else: # Default sphere function return sum(xi**2 for xi in x) def stop(self): self.running = False class MplCanvas(FigureCanvas): def __init__(self, parent=None, width=5, height=4, dpi=100): self.fig = Figure(figsize=(width, height), dpi=dpi) super().__init__(self.fig) self.setParent(parent) class PSOWindow(QMainWindow): def __init__(self): super().__init__() self.pso_thread = None self.init_ui() def init_ui(self): self.setWindowTitle("Stellar Physics PSO Optimizer - Chapter 5-6") self.setGeometry(100, 100, 1200, 800) central_widget = QWidget() self.setCentralWidget(central_widget) layout = QHBoxLayout(central_widget) # Left panel - Controls left_panel = QWidget() left_layout = QVBoxLayout(left_panel) left_panel.setMaximumWidth(400) # Problem selection problem_group = QGroupBox("Stellar Physics Optimization Problem") problem_layout = QVBoxLayout(problem_group) self.problem_combo = QComboBox() self.problem_combo.addItems([ "Radiative Equilibrium", "Nuclear Reaction Rate", "Convective Stability", "Opacity Optimization" ]) problem_layout.addWidget(QLabel("Select Problem:")) problem_layout.addWidget(self.problem_combo) # Problem description self.problem_desc = QTextEdit() self.problem_desc.setMaximumHeight(150) self.problem_desc.setReadOnly(True) problem_layout.addWidget(QLabel("Problem Description:")) problem_layout.addWidget(self.problem_desc) left_layout.addWidget(problem_group) # PSO parameters pso_group = QGroupBox("PSO Parameters") pso_layout = QGridLayout(pso_group) pso_layout.addWidget(QLabel("Number of Particles:"), 0, 0) self.num_particles = QSpinBox() self.num_particles.setRange(10, 200) self.num_particles.setValue(30) pso_layout.addWidget(self.num_particles, 0, 1) pso_layout.addWidget(QLabel("Max Iterations:"), 1, 0) self.max_iter = QSpinBox() self.max_iter.setRange(50, 1000) self.max_iter.setValue(100) pso_layout.addWidget(self.max_iter, 1, 1) pso_layout.addWidget(QLabel("Inertia Weight (w):"), 2, 0) self.w_spin = QDoubleSpinBox() self.w_spin.setRange(0.1, 1.0) self.w_spin.setValue(0.7) self.w_spin.setSingleStep(0.1) pso_layout.addWidget(self.w_spin, 2, 1) pso_layout.addWidget(QLabel("Cognitive Coefficient (c1):"), 3, 0) self.c1_spin = QDoubleSpinBox() self.c1_spin.setRange(0.1, 3.0) self.c1_spin.setValue(1.5) self.c1_spin.setSingleStep(0.1) pso_layout.addWidget(self.c1_spin, 3, 1) pso_layout.addWidget(QLabel("Social Coefficient (c2):"), 4, 0) self.c2_spin = QDoubleSpinBox() self.c2_spin.setRange(0.1, 3.0) self.c2_spin.setValue(1.5) self.c2_spin.setSingleStep(0.1) pso_layout.addWidget(self.c2_spin, 4, 1) left_layout.addWidget(pso_group) # Control buttons self.run_button = QPushButton("Run PSO") self.run_button.clicked.connect(self.run_pso) left_layout.addWidget(self.run_button) self.stop_button = QPushButton("Stop") self.stop_button.clicked.connect(self.stop_pso) self.stop_button.setEnabled(False) left_layout.addWidget(self.stop_button) # Progress self.progress = QProgressBar() left_layout.addWidget(self.progress) # Results results_group = QGroupBox("Results") results_layout = QVBoxLayout(results_group) self.results_text = QTextEdit() self.results_text.setMaximumHeight(150) results_layout.addWidget(self.results_text) left_layout.addWidget(results_group) layout.addWidget(left_panel) # Right panel - Visualization right_panel = QTabWidget() # Convergence plot self.convergence_canvas = MplCanvas(self, width=6, height=4, dpi=100) self.convergence_ax = self.convergence_canvas.fig.add_subplot(111) right_panel.addTab(self.convergence_canvas, "Convergence") # Particle positions self.particles_canvas = MplCanvas(self, width=6, height=4, dpi=100) self.particles_ax = self.particles_canvas.fig.add_subplot(111) right_panel.addTab(self.particles_canvas, "Particles") # Fitness landscape self.landscape_canvas = MplCanvas(self, width=6, height=4, dpi=100) self.landscape_ax = self.landscape_canvas.fig.add_subplot(111) right_panel.addTab(self.landscape_canvas, "Fitness Landscape") layout.addWidget(right_panel) # Update problem description self.update_problem_desc() self.problem_combo.currentTextChanged.connect(self.update_problem_desc) def update_problem_desc(self): problem = self.problem_combo.currentText() descriptions = { "Radiative Equilibrium": "Optimize radiative temperature gradient (Eq. 5.18)\n" "Minimize deviation from ideal radiative equilibrium conditions\n" "Parameters: Temperature, Density", "Nuclear Reaction Rate": "Optimize thermonuclear reaction rates (Eq. 6.29)\n" "Find optimal conditions for efficient energy generation\n" "Based on Gamow peak theory\n" "Parameters: Temperature (T7), Density parameter", "Convective Stability": "Apply Schwarzschild/Ledoux criteria (Eq. 5.49, 5.50)\n" "Minimize convective instability in stellar layers\n" "Parameters: ∇_rad, ∇_ad, ∇_μ", "Opacity Optimization": "Optimize opacity for efficient energy transport\n" "Based on Kramers opacity law (Eq. 5.31)\n" "Find optimal density-temperature conditions\n" "Parameters: Density, Temperature" } self.problem_desc.setText(descriptions.get(problem, "")) def run_pso(self): if self.pso_thread and self.pso_thread.isRunning(): return problem_map = { "Radiative Equilibrium": "radiative_equilibrium", "Nuclear Reaction Rate": "nuclear_reaction_rate", "Convective Stability": "convective_stability", "Opacity Optimization": "opacity_optimization" } problem_type = problem_map[self.problem_combo.currentText()] self.pso_thread = PSOThread( problem_type=problem_type, num_particles=self.num_particles.value(), max_iter=self.max_iter.value(), w=self.w_spin.value(), c1=self.c1_spin.value(), c2=self.c2_spin.value() ) self.pso_thread.update_signal.connect(self.update_plots) self.pso_thread.finished_signal.connect(self.optimization_finished) self.run_button.setEnabled(False) self.stop_button.setEnabled(True) self.progress.setValue(0) self.progress.setMaximum(self.max_iter.value()) self.convergence_ax.clear() self.particles_ax.clear() self.landscape_ax.clear() self.best_scores = [] self.iterations = [] self.pso_thread.start() def stop_pso(self): if self.pso_thread: self.pso_thread.stop() self.pso_thread.wait() self.run_button.setEnabled(True) self.stop_button.setEnabled(False) def update_plots(self, data): iteration = data['iteration'] best_score = data['global_best'] position = data['position'] particles = data['particles'] # Update convergence plot self.best_scores.append(best_score) self.iterations.append(iteration) self.convergence_ax.clear() self.convergence_ax.plot(self.iterations, self.best_scores, 'b-', linewidth=2) self.convergence_ax.set_xlabel('Iteration') self.convergence_ax.set_ylabel('Best Fitness') self.convergence_ax.set_title('PSO Convergence') self.convergence_ax.grid(True, alpha=0.3) self.convergence_canvas.draw() # Update particles plot (2D projection) self.particles_ax.clear() if particles.shape[1] >= 2: self.particles_ax.scatter(particles[:, 0], particles[:, 1], c='blue', alpha=0.6, s=20) self.particles_ax.scatter([position[0]], [position[1]], c='red', s=100, marker='*', label='Global Best') self.particles_ax.set_xlabel('Parameter 1') self.particles_ax.set_ylabel('Parameter 2') self.particles_ax.set_title('Particle Positions') self.particles_ax.legend() self.particles_ax.grid(True, alpha=0.3) self.particles_canvas.draw() # Update fitness landscape for 2D problems if particles.shape[1] == 2: self.update_fitness_landscape(position, particles) # Update progress self.progress.setValue(iteration + 1) # Update results self.results_text.setText( f"Iteration: {iteration + 1}\n" f"Best Fitness: {best_score:.6f}\n" f"Best Position: {position}\n" ) def update_fitness_landscape(self, best_position, particles): self.landscape_ax.clear() # Create meshgrid for fitness landscape x = np.linspace(-5, 5, 50) y = np.linspace(-5, 5, 50) X, Y = np.meshgrid(x, y) # Calculate fitness for each point Z = np.zeros_like(X) for i in range(X.shape[0]): for j in range(X.shape[1]): Z[i, j] = self.pso_thread.fitness([X[i, j], Y[i, j]], self.pso_thread.problem_type) # Plot contour contour = self.landscape_ax.contourf(X, Y, Z, levels=50, alpha=0.8) self.landscape_ax.contour(X, Y, Z, levels=10, colors='black', alpha=0.3) # Plot particles self.landscape_ax.scatter(particles[:, 0], particles[:, 1], c='white', s=20, alpha=0.7) self.landscape_ax.scatter([best_position[0]], [best_position[1]], c='red', s=100, marker='*', label='Global Best') self.landscape_ax.set_xlabel('Parameter 1') self.landscape_ax.set_ylabel('Parameter 2') self.landscape_ax.set_title('Fitness Landscape') self.landscape_canvas.draw() def optimization_finished(self, data): self.run_button.setEnabled(True) self.stop_button.setEnabled(False) self.progress.setValue(self.max_iter.value()) final_text = ( f"Optimization Completed!\n" f"Final Fitness: {data['final_score']:.8f}\n" f"Optimal Parameters: {data['final_position']}\n" f"Total Iterations: {self.max_iter.value()}\n" f"Number of Particles: {self.num_particles.value()}" ) self.results_text.setText(final_text) def main(): app = QApplication(sys.argv) window = PSOWindow() window.show() sys.exit(app.exec_()) if __name__ == '__main__': main()