|
|
import sys |
|
|
import numpy as np |
|
|
import matplotlib.pyplot as plt |
|
|
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas |
|
|
from matplotlib.figure import Figure |
|
|
from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, |
|
|
QHBoxLayout, QGroupBox, QLabel, QComboBox, |
|
|
QDoubleSpinBox, QSpinBox, QPushButton, QTextEdit, |
|
|
QTabWidget, QGridLayout, QProgressBar) |
|
|
from PyQt5.QtCore import QThread, pyqtSignal |
|
|
import random |
|
|
|
|
|
class PSOThread(QThread): |
|
|
update_signal = pyqtSignal(dict) |
|
|
finished_signal = pyqtSignal(dict) |
|
|
|
|
|
def __init__(self, problem_type, num_particles, max_iter, w, c1, c2): |
|
|
super().__init__() |
|
|
self.problem_type = problem_type |
|
|
self.num_particles = num_particles |
|
|
self.max_iter = max_iter |
|
|
self.w = w |
|
|
self.c1 = c1 |
|
|
self.c2 = c2 |
|
|
self.running = True |
|
|
|
|
|
def run(self): |
|
|
|
|
|
if self.problem_type == "radiative_equilibrium": |
|
|
bounds = [(-10, 10), (-10, 10)] |
|
|
dim = 2 |
|
|
elif self.problem_type == "nuclear_reaction_rate": |
|
|
bounds = [(0.1, 2.0), (1e-3, 1e-1)] |
|
|
dim = 2 |
|
|
elif self.problem_type == "convective_stability": |
|
|
bounds = [(0.1, 0.5), (0.1, 0.5), (0.1, 0.5)] |
|
|
dim = 3 |
|
|
elif self.problem_type == "opacity_optimization": |
|
|
bounds = [(1e-3, 1e3), (1e4, 1e8)] |
|
|
dim = 2 |
|
|
else: |
|
|
bounds = [(-5, 5), (-5, 5)] |
|
|
dim = 2 |
|
|
|
|
|
|
|
|
particles = np.random.uniform([b[0] for b in bounds], [b[1] for b in bounds], |
|
|
(self.num_particles, dim)) |
|
|
velocities = np.random.uniform(-1, 1, (self.num_particles, dim)) |
|
|
personal_best_positions = particles.copy() |
|
|
personal_best_scores = np.array([self.fitness(p, self.problem_type) for p in particles]) |
|
|
global_best_index = np.argmin(personal_best_scores) |
|
|
global_best_position = personal_best_positions[global_best_index] |
|
|
global_best_score = personal_best_scores[global_best_index] |
|
|
|
|
|
|
|
|
for iteration in range(self.max_iter): |
|
|
if not self.running: |
|
|
break |
|
|
|
|
|
for i in range(self.num_particles): |
|
|
|
|
|
r1, r2 = random.random(), random.random() |
|
|
velocities[i] = (self.w * velocities[i] + |
|
|
self.c1 * r1 * (personal_best_positions[i] - particles[i]) + |
|
|
self.c2 * r2 * (global_best_position - particles[i])) |
|
|
|
|
|
|
|
|
particles[i] += velocities[i] |
|
|
|
|
|
|
|
|
for d in range(dim): |
|
|
if particles[i, d] < bounds[d][0]: |
|
|
particles[i, d] = bounds[d][0] |
|
|
elif particles[i, d] > bounds[d][1]: |
|
|
particles[i, d] = bounds[d][1] |
|
|
|
|
|
|
|
|
current_fitness = self.fitness(particles[i], self.problem_type) |
|
|
|
|
|
|
|
|
if current_fitness < personal_best_scores[i]: |
|
|
personal_best_positions[i] = particles[i].copy() |
|
|
personal_best_scores[i] = current_fitness |
|
|
|
|
|
|
|
|
if current_fitness < global_best_score: |
|
|
global_best_position = particles[i].copy() |
|
|
global_best_score = current_fitness |
|
|
|
|
|
|
|
|
self.update_signal.emit({ |
|
|
'iteration': iteration, |
|
|
'global_best': global_best_score, |
|
|
'position': global_best_position, |
|
|
'particles': particles.copy() |
|
|
}) |
|
|
|
|
|
self.finished_signal.emit({ |
|
|
'final_score': global_best_score, |
|
|
'final_position': global_best_position |
|
|
}) |
|
|
|
|
|
def fitness(self, x, problem_type): |
|
|
"""Fitness function based on stellar physics problems from Chapter 5-6""" |
|
|
if problem_type == "radiative_equilibrium": |
|
|
|
|
|
|
|
|
T, rho = x[0], x[1] |
|
|
|
|
|
radiative_flux = (T**3 / rho) if rho > 0 else 1e10 |
|
|
target_flux = 1.0 |
|
|
return abs(radiative_flux - target_flux) |
|
|
|
|
|
elif problem_type == "nuclear_reaction_rate": |
|
|
|
|
|
T7, density_param = x[0], x[1] |
|
|
|
|
|
reaction_rate = (T7**(-2/3)) * np.exp(-1/T7**(1/3)) * density_param |
|
|
target_rate = 0.5 |
|
|
return abs(reaction_rate - target_rate) |
|
|
|
|
|
elif problem_type == "convective_stability": |
|
|
|
|
|
grad_rad, grad_ad, grad_mu = x[0], x[1], x[2] |
|
|
|
|
|
|
|
|
stability_condition = grad_ad + grad_mu |
|
|
instability = max(0, grad_rad - stability_condition) |
|
|
return instability |
|
|
|
|
|
elif problem_type == "opacity_optimization": |
|
|
|
|
|
rho, T = x[0], x[1] |
|
|
|
|
|
opacity = rho * T**(-3.5) if T > 0 else 1e10 |
|
|
|
|
|
target_opacity = 1.0 |
|
|
return abs(opacity - target_opacity) |
|
|
|
|
|
else: |
|
|
|
|
|
return sum(xi**2 for xi in x) |
|
|
|
|
|
def stop(self): |
|
|
self.running = False |
|
|
|
|
|
class MplCanvas(FigureCanvas): |
|
|
def __init__(self, parent=None, width=5, height=4, dpi=100): |
|
|
self.fig = Figure(figsize=(width, height), dpi=dpi) |
|
|
super().__init__(self.fig) |
|
|
self.setParent(parent) |
|
|
|
|
|
class PSOWindow(QMainWindow): |
|
|
def __init__(self): |
|
|
super().__init__() |
|
|
self.pso_thread = None |
|
|
self.init_ui() |
|
|
|
|
|
def init_ui(self): |
|
|
self.setWindowTitle("Stellar Physics PSO Optimizer - Chapter 5-6") |
|
|
self.setGeometry(100, 100, 1200, 800) |
|
|
|
|
|
central_widget = QWidget() |
|
|
self.setCentralWidget(central_widget) |
|
|
layout = QHBoxLayout(central_widget) |
|
|
|
|
|
|
|
|
left_panel = QWidget() |
|
|
left_layout = QVBoxLayout(left_panel) |
|
|
left_panel.setMaximumWidth(400) |
|
|
|
|
|
|
|
|
problem_group = QGroupBox("Stellar Physics Optimization Problem") |
|
|
problem_layout = QVBoxLayout(problem_group) |
|
|
|
|
|
self.problem_combo = QComboBox() |
|
|
self.problem_combo.addItems([ |
|
|
"Radiative Equilibrium", |
|
|
"Nuclear Reaction Rate", |
|
|
"Convective Stability", |
|
|
"Opacity Optimization" |
|
|
]) |
|
|
problem_layout.addWidget(QLabel("Select Problem:")) |
|
|
problem_layout.addWidget(self.problem_combo) |
|
|
|
|
|
|
|
|
self.problem_desc = QTextEdit() |
|
|
self.problem_desc.setMaximumHeight(150) |
|
|
self.problem_desc.setReadOnly(True) |
|
|
problem_layout.addWidget(QLabel("Problem Description:")) |
|
|
problem_layout.addWidget(self.problem_desc) |
|
|
|
|
|
left_layout.addWidget(problem_group) |
|
|
|
|
|
|
|
|
pso_group = QGroupBox("PSO Parameters") |
|
|
pso_layout = QGridLayout(pso_group) |
|
|
|
|
|
pso_layout.addWidget(QLabel("Number of Particles:"), 0, 0) |
|
|
self.num_particles = QSpinBox() |
|
|
self.num_particles.setRange(10, 200) |
|
|
self.num_particles.setValue(30) |
|
|
pso_layout.addWidget(self.num_particles, 0, 1) |
|
|
|
|
|
pso_layout.addWidget(QLabel("Max Iterations:"), 1, 0) |
|
|
self.max_iter = QSpinBox() |
|
|
self.max_iter.setRange(50, 1000) |
|
|
self.max_iter.setValue(100) |
|
|
pso_layout.addWidget(self.max_iter, 1, 1) |
|
|
|
|
|
pso_layout.addWidget(QLabel("Inertia Weight (w):"), 2, 0) |
|
|
self.w_spin = QDoubleSpinBox() |
|
|
self.w_spin.setRange(0.1, 1.0) |
|
|
self.w_spin.setValue(0.7) |
|
|
self.w_spin.setSingleStep(0.1) |
|
|
pso_layout.addWidget(self.w_spin, 2, 1) |
|
|
|
|
|
pso_layout.addWidget(QLabel("Cognitive Coefficient (c1):"), 3, 0) |
|
|
self.c1_spin = QDoubleSpinBox() |
|
|
self.c1_spin.setRange(0.1, 3.0) |
|
|
self.c1_spin.setValue(1.5) |
|
|
self.c1_spin.setSingleStep(0.1) |
|
|
pso_layout.addWidget(self.c1_spin, 3, 1) |
|
|
|
|
|
pso_layout.addWidget(QLabel("Social Coefficient (c2):"), 4, 0) |
|
|
self.c2_spin = QDoubleSpinBox() |
|
|
self.c2_spin.setRange(0.1, 3.0) |
|
|
self.c2_spin.setValue(1.5) |
|
|
self.c2_spin.setSingleStep(0.1) |
|
|
pso_layout.addWidget(self.c2_spin, 4, 1) |
|
|
|
|
|
left_layout.addWidget(pso_group) |
|
|
|
|
|
|
|
|
self.run_button = QPushButton("Run PSO") |
|
|
self.run_button.clicked.connect(self.run_pso) |
|
|
left_layout.addWidget(self.run_button) |
|
|
|
|
|
self.stop_button = QPushButton("Stop") |
|
|
self.stop_button.clicked.connect(self.stop_pso) |
|
|
self.stop_button.setEnabled(False) |
|
|
left_layout.addWidget(self.stop_button) |
|
|
|
|
|
|
|
|
self.progress = QProgressBar() |
|
|
left_layout.addWidget(self.progress) |
|
|
|
|
|
|
|
|
results_group = QGroupBox("Results") |
|
|
results_layout = QVBoxLayout(results_group) |
|
|
self.results_text = QTextEdit() |
|
|
self.results_text.setMaximumHeight(150) |
|
|
results_layout.addWidget(self.results_text) |
|
|
left_layout.addWidget(results_group) |
|
|
|
|
|
layout.addWidget(left_panel) |
|
|
|
|
|
|
|
|
right_panel = QTabWidget() |
|
|
|
|
|
|
|
|
self.convergence_canvas = MplCanvas(self, width=6, height=4, dpi=100) |
|
|
self.convergence_ax = self.convergence_canvas.fig.add_subplot(111) |
|
|
right_panel.addTab(self.convergence_canvas, "Convergence") |
|
|
|
|
|
|
|
|
self.particles_canvas = MplCanvas(self, width=6, height=4, dpi=100) |
|
|
self.particles_ax = self.particles_canvas.fig.add_subplot(111) |
|
|
right_panel.addTab(self.particles_canvas, "Particles") |
|
|
|
|
|
|
|
|
self.landscape_canvas = MplCanvas(self, width=6, height=4, dpi=100) |
|
|
self.landscape_ax = self.landscape_canvas.fig.add_subplot(111) |
|
|
right_panel.addTab(self.landscape_canvas, "Fitness Landscape") |
|
|
|
|
|
layout.addWidget(right_panel) |
|
|
|
|
|
|
|
|
self.update_problem_desc() |
|
|
self.problem_combo.currentTextChanged.connect(self.update_problem_desc) |
|
|
|
|
|
def update_problem_desc(self): |
|
|
problem = self.problem_combo.currentText() |
|
|
descriptions = { |
|
|
"Radiative Equilibrium": |
|
|
"Optimize radiative temperature gradient (Eq. 5.18)\n" |
|
|
"Minimize deviation from ideal radiative equilibrium conditions\n" |
|
|
"Parameters: Temperature, Density", |
|
|
|
|
|
"Nuclear Reaction Rate": |
|
|
"Optimize thermonuclear reaction rates (Eq. 6.29)\n" |
|
|
"Find optimal conditions for efficient energy generation\n" |
|
|
"Based on Gamow peak theory\n" |
|
|
"Parameters: Temperature (T7), Density parameter", |
|
|
|
|
|
"Convective Stability": |
|
|
"Apply Schwarzschild/Ledoux criteria (Eq. 5.49, 5.50)\n" |
|
|
"Minimize convective instability in stellar layers\n" |
|
|
"Parameters: ∇_rad, ∇_ad, ∇_μ", |
|
|
|
|
|
"Opacity Optimization": |
|
|
"Optimize opacity for efficient energy transport\n" |
|
|
"Based on Kramers opacity law (Eq. 5.31)\n" |
|
|
"Find optimal density-temperature conditions\n" |
|
|
"Parameters: Density, Temperature" |
|
|
} |
|
|
self.problem_desc.setText(descriptions.get(problem, "")) |
|
|
|
|
|
def run_pso(self): |
|
|
if self.pso_thread and self.pso_thread.isRunning(): |
|
|
return |
|
|
|
|
|
problem_map = { |
|
|
"Radiative Equilibrium": "radiative_equilibrium", |
|
|
"Nuclear Reaction Rate": "nuclear_reaction_rate", |
|
|
"Convective Stability": "convective_stability", |
|
|
"Opacity Optimization": "opacity_optimization" |
|
|
} |
|
|
|
|
|
problem_type = problem_map[self.problem_combo.currentText()] |
|
|
|
|
|
self.pso_thread = PSOThread( |
|
|
problem_type=problem_type, |
|
|
num_particles=self.num_particles.value(), |
|
|
max_iter=self.max_iter.value(), |
|
|
w=self.w_spin.value(), |
|
|
c1=self.c1_spin.value(), |
|
|
c2=self.c2_spin.value() |
|
|
) |
|
|
|
|
|
self.pso_thread.update_signal.connect(self.update_plots) |
|
|
self.pso_thread.finished_signal.connect(self.optimization_finished) |
|
|
|
|
|
self.run_button.setEnabled(False) |
|
|
self.stop_button.setEnabled(True) |
|
|
self.progress.setValue(0) |
|
|
self.progress.setMaximum(self.max_iter.value()) |
|
|
|
|
|
self.convergence_ax.clear() |
|
|
self.particles_ax.clear() |
|
|
self.landscape_ax.clear() |
|
|
|
|
|
self.best_scores = [] |
|
|
self.iterations = [] |
|
|
|
|
|
self.pso_thread.start() |
|
|
|
|
|
def stop_pso(self): |
|
|
if self.pso_thread: |
|
|
self.pso_thread.stop() |
|
|
self.pso_thread.wait() |
|
|
self.run_button.setEnabled(True) |
|
|
self.stop_button.setEnabled(False) |
|
|
|
|
|
def update_plots(self, data): |
|
|
iteration = data['iteration'] |
|
|
best_score = data['global_best'] |
|
|
position = data['position'] |
|
|
particles = data['particles'] |
|
|
|
|
|
|
|
|
self.best_scores.append(best_score) |
|
|
self.iterations.append(iteration) |
|
|
|
|
|
self.convergence_ax.clear() |
|
|
self.convergence_ax.plot(self.iterations, self.best_scores, 'b-', linewidth=2) |
|
|
self.convergence_ax.set_xlabel('Iteration') |
|
|
self.convergence_ax.set_ylabel('Best Fitness') |
|
|
self.convergence_ax.set_title('PSO Convergence') |
|
|
self.convergence_ax.grid(True, alpha=0.3) |
|
|
self.convergence_canvas.draw() |
|
|
|
|
|
|
|
|
self.particles_ax.clear() |
|
|
if particles.shape[1] >= 2: |
|
|
self.particles_ax.scatter(particles[:, 0], particles[:, 1], |
|
|
c='blue', alpha=0.6, s=20) |
|
|
self.particles_ax.scatter([position[0]], [position[1]], |
|
|
c='red', s=100, marker='*', label='Global Best') |
|
|
self.particles_ax.set_xlabel('Parameter 1') |
|
|
self.particles_ax.set_ylabel('Parameter 2') |
|
|
self.particles_ax.set_title('Particle Positions') |
|
|
self.particles_ax.legend() |
|
|
self.particles_ax.grid(True, alpha=0.3) |
|
|
self.particles_canvas.draw() |
|
|
|
|
|
|
|
|
if particles.shape[1] == 2: |
|
|
self.update_fitness_landscape(position, particles) |
|
|
|
|
|
|
|
|
self.progress.setValue(iteration + 1) |
|
|
|
|
|
|
|
|
self.results_text.setText( |
|
|
f"Iteration: {iteration + 1}\n" |
|
|
f"Best Fitness: {best_score:.6f}\n" |
|
|
f"Best Position: {position}\n" |
|
|
) |
|
|
|
|
|
def update_fitness_landscape(self, best_position, particles): |
|
|
self.landscape_ax.clear() |
|
|
|
|
|
|
|
|
x = np.linspace(-5, 5, 50) |
|
|
y = np.linspace(-5, 5, 50) |
|
|
X, Y = np.meshgrid(x, y) |
|
|
|
|
|
|
|
|
Z = np.zeros_like(X) |
|
|
for i in range(X.shape[0]): |
|
|
for j in range(X.shape[1]): |
|
|
Z[i, j] = self.pso_thread.fitness([X[i, j], Y[i, j]], |
|
|
self.pso_thread.problem_type) |
|
|
|
|
|
|
|
|
contour = self.landscape_ax.contourf(X, Y, Z, levels=50, alpha=0.8) |
|
|
self.landscape_ax.contour(X, Y, Z, levels=10, colors='black', alpha=0.3) |
|
|
|
|
|
|
|
|
self.landscape_ax.scatter(particles[:, 0], particles[:, 1], |
|
|
c='white', s=20, alpha=0.7) |
|
|
self.landscape_ax.scatter([best_position[0]], [best_position[1]], |
|
|
c='red', s=100, marker='*', label='Global Best') |
|
|
|
|
|
self.landscape_ax.set_xlabel('Parameter 1') |
|
|
self.landscape_ax.set_ylabel('Parameter 2') |
|
|
self.landscape_ax.set_title('Fitness Landscape') |
|
|
self.landscape_canvas.draw() |
|
|
|
|
|
def optimization_finished(self, data): |
|
|
self.run_button.setEnabled(True) |
|
|
self.stop_button.setEnabled(False) |
|
|
self.progress.setValue(self.max_iter.value()) |
|
|
|
|
|
final_text = ( |
|
|
f"Optimization Completed!\n" |
|
|
f"Final Fitness: {data['final_score']:.8f}\n" |
|
|
f"Optimal Parameters: {data['final_position']}\n" |
|
|
f"Total Iterations: {self.max_iter.value()}\n" |
|
|
f"Number of Particles: {self.num_particles.value()}" |
|
|
) |
|
|
self.results_text.setText(final_text) |
|
|
|
|
|
def main(): |
|
|
app = QApplication(sys.argv) |
|
|
window = PSOWindow() |
|
|
window.show() |
|
|
sys.exit(app.exec_()) |
|
|
|
|
|
if __name__ == '__main__': |
|
|
main() |