Biofuel-Optimiser / molecule_generator.py
carrotcake3's picture
Update molecule_generator.py
a248f60 verified
import os
import sys
from pathlib import Path
from dataclasses import dataclass, asdict
from typing import List, Dict, Optional, Tuple, Callable
import joblib
import numpy as np
import pandas as pd
import random
from rdkit import Chem
from crem.crem import mutate_mol
from sklearn.base import BaseEstimator, RegressorMixin
from huggingface_hub import snapshot_download
os.environ["HF_HUB_DISABLE_PROGRESS_BARS"] = "1"
# === Project Setup ===
PROJECT_ROOT = Path(__file__).resolve().parent
sys.path.append(str(PROJECT_ROOT))
from shared_features import FeatureSelector, featurize_df
from data_prep import df
class GenericPredictor:
"""Generic predictor that works for any property model."""
def __init__(self, model_dir: Path, property_name: str):
"""
Initialize predictor from a model directory.
Args:
model_dir: Path to the model directory containing artifacts/
property_name: Name of the property (for display purposes)
"""
print(f"Loading {property_name} Predictor...")
model_path = model_dir / "model.joblib"
selector_path = model_dir / "selector.joblib"
# Debug info
print(f">>> MODEL PATH: {model_path}")
print(f">>> SELECTOR PATH: {selector_path}")
print(f">>> MODEL EXISTS: {model_path.exists()}")
print(f">>> SELECTOR EXISTS: {selector_path.exists()}")
# Load artifacts
self.model = joblib.load(model_path)
self.selector = FeatureSelector.load(selector_path)
self.property_name = property_name
print(f"✓ {property_name} Predictor ready!\n")
def predict(self, smiles_list):
"""Inference on a list of SMILES strings."""
if isinstance(smiles_list, str):
smiles_list = [smiles_list]
X_full = featurize_df(smiles_list, return_df=False)
if X_full is None:
print(f"⚠ Warning: No valid molecules found for {self.property_name}!")
return []
X_selected = self.selector.transform(X_full)
predictions = self.model.predict(X_selected)
return predictions.tolist()
def predict_with_details(self, smiles_list):
"""Inference with valid/invalid info."""
if isinstance(smiles_list, str):
smiles_list = [smiles_list]
df = pd.DataFrame({"SMILES": smiles_list})
X_full, df_valid = featurize_df(df, return_df=True)
col_name = f"Predicted_{self.property_name}"
if X_full is None:
return pd.DataFrame(columns=["SMILES", col_name, "Valid"])
X_selected = self.selector.transform(X_full)
predictions = self.model.predict(X_selected)
df_valid[col_name] = predictions
df_valid["Valid"] = True
all_results = pd.DataFrame({"SMILES": smiles_list})
all_results = all_results.merge(
df_valid[["SMILES", col_name, "Valid"]],
on="SMILES", how="left"
)
all_results["Valid"] = all_results["Valid"].fillna(False)
return all_results
# Predictor paths relative to project root
# === Hugging Face Predictor Paths ===
HF_MODELS = {
"cn": "SalZa2004/Cetane_Number_Predictor",
"ysi": "SalZa2004/YSI_Predictor",
"bp": "SalZa2004/Boiling_Point_Predictor",
"density": "SalZa2004/Density_Predictor",
"lhv": "SalZa2004/LHV_Predictor",
"dynamic_viscosity": "SalZa2004/Dynamic_Viscosity_Predictor",
}
PREDICTOR_PATHS = {
key: Path(
snapshot_download(
repo_id=repo,
repo_type="model"
)
)
for key, repo in HF_MODELS.items()
}
@dataclass
class EvolutionConfig:
"""Configuration for evolutionary algorithm."""
target_cn: float
minimize_ysi: bool = True
generations: int = 6
population_size: int = 100
mutations_per_parent: int = 5
survivor_fraction: float = 0.5
min_bp: float = 60
max_bp: float = 250
min_dynamic_viscosity: float = 0.0
max_dynamic_viscosity: float = 2.0
min_density: float = 720
min_lhv: float = 30
use_bp_filter: bool = True
use_density_filter: bool = True
use_lhv_filter: bool = True
use_dynamic_viscosity_filter: bool = True
batch_size: int = 50
max_offspring_attempts: int = 10
@dataclass
class Molecule:
"""Represents a molecule with its properties."""
smiles: str
cn: float
cn_error: float
bp: Optional[float] = None
ysi: Optional[float] = None
density: Optional[float] = None
lhv: Optional[float] = None
dynamic_viscosity: Optional[float] = None
def dominates(self, other: 'Molecule') -> bool:
"""Check if this molecule Pareto-dominates another."""
better_cn = self.cn_error <= other.cn_error
better_ysi = self.ysi <= other.ysi if self.ysi is not None else True
strictly_better = (self.cn_error < other.cn_error or
(self.ysi is not None and self.ysi < other.ysi))
return better_cn and better_ysi and strictly_better
def to_dict(self) -> Dict:
"""Convert to dictionary for DataFrame creation."""
return {k: v for k, v in asdict(self).items() if v is not None}
class PropertyPredictor:
"""Handles batch prediction for all molecular properties."""
def __init__(self, config: EvolutionConfig):
self.config = config
# Initialize only the predictors we need
self.predictors = {}
# Always need CN predictor
self.predictors['cn'] = GenericPredictor(
PREDICTOR_PATHS['cn'],
'Cetane Number'
)
# Conditional predictors
if config.minimize_ysi:
self.predictors['ysi'] = GenericPredictor(
PREDICTOR_PATHS['ysi'],
'YSI'
)
if config.use_bp_filter:
self.predictors['bp'] = GenericPredictor(
PREDICTOR_PATHS['bp'],
'Boiling Point'
)
if config.use_density_filter:
self.predictors['density'] = GenericPredictor(
PREDICTOR_PATHS['density'],
'Density'
)
if config.use_lhv_filter:
self.predictors['lhv'] = GenericPredictor(
PREDICTOR_PATHS['lhv'],
'Lower Heating Value'
)
if config.use_dynamic_viscosity_filter:
self.predictors['dynamic_viscosity'] = GenericPredictor(
PREDICTOR_PATHS['dynamic_viscosity'],
'Dynamic Viscosity'
)
# Define validation rules
self.validators = {
'bp': lambda v: self.config.min_bp <= v <= self.config.max_bp,
'density': lambda v: v > self.config.min_density,
'lhv': lambda v: v > self.config.min_lhv,
'dynamic_viscosity': lambda v: self.config.min_dynamic_viscosity < v <= self.config.max_dynamic_viscosity
}
def _safe_predict(self, predictions: List) -> List[Optional[float]]:
"""Safely convert predictions, handling None/NaN/inf values."""
return [
float(pred) if pred is not None and np.isfinite(pred) else None
for pred in predictions
]
def _predict_batch(self, property_name: str, smiles_list: List[str]) -> List[Optional[float]]:
"""Generic batch prediction method."""
predictor = self.predictors.get(property_name)
if not smiles_list or predictor is None:
return [None] * len(smiles_list)
try:
predictions = predictor.predict(smiles_list)
return self._safe_predict(predictions)
except Exception as e:
print(f"Warning: Batch {property_name.upper()} prediction failed: {e}")
return [None] * len(smiles_list)
def predict_all_properties(self, smiles_list: List[str]) -> Dict[str, List[Optional[float]]]:
"""Predict all properties for a batch of SMILES."""
return {
prop: self._predict_batch(prop, smiles_list)
for prop in ['cn', 'ysi', 'bp', 'density', 'lhv', 'dynamic_viscosity'] # Check all possible properties
if prop in self.predictors # Only predict if predictor exists
}
def is_valid(self, property_name: str, value: Optional[float]) -> bool:
"""Check if a property value is valid according to config rules."""
if value is None:
return True
validator = self.validators.get(property_name)
return validator(value) if validator else True
class Population:
"""Manages the population of molecules."""
def __init__(self, config: EvolutionConfig):
self.config = config
self.molecules: List[Molecule] = []
self.seen_smiles: set = set()
def add_molecule(self, mol: Molecule) -> bool:
"""Add a molecule if it's not already in the population."""
if mol.smiles in self.seen_smiles:
return False
self.molecules.append(mol)
self.seen_smiles.add(mol.smiles)
return True
def add_molecules(self, molecules: List[Molecule]) -> int:
"""Add multiple molecules, return count added."""
return sum(self.add_molecule(mol) for mol in molecules)
def pareto_front(self) -> List[Molecule]:
"""Extract the Pareto front from the population."""
if not self.config.minimize_ysi:
return []
return [
mol for mol in self.molecules
if not any(other.dominates(mol) for other in self.molecules if other is not mol)
]
def get_survivors(self) -> List[Molecule]:
"""Select survivors for the next generation."""
target_size = int(self.config.population_size * self.config.survivor_fraction)
if self.config.minimize_ysi:
survivors = self.pareto_front()
# Sort key for combined objectives
sort_key = lambda m: m.cn_error + m.ysi
if len(survivors) > target_size:
survivors = sorted(survivors, key=sort_key)[:target_size]
elif len(survivors) < target_size:
remainder = [m for m in self.molecules if m not in survivors]
remainder = sorted(remainder, key=sort_key)
survivors.extend(remainder[:target_size - len(survivors)])
else:
survivors = sorted(self.molecules, key=lambda m: m.cn_error)[:target_size]
return survivors
def to_dataframe(self) -> pd.DataFrame:
"""Convert population to DataFrame."""
df = pd.DataFrame([m.to_dict() for m in self.molecules])
sort_cols = ["cn_error", "ysi"] if self.config.minimize_ysi else ["cn_error"]
df = df.sort_values(sort_cols, ascending=True)
df.insert(0, 'rank', range(1, len(df) + 1))
return df
class MolecularEvolution:
"""Main evolutionary algorithm coordinator."""
REP_DB_PATH = PROJECT_ROOT / "frag_db" / "diesel_fragments.db"
def __init__(self, config: EvolutionConfig):
self.config = config
self.predictor = PropertyPredictor(config)
self.population = Population(config)
def _mutate_molecule(self, mol: Chem.Mol) -> List[str]:
"""Generate mutations for a molecule using CREM."""
try:
mutants = list(mutate_mol(
mol,
db_name=str(self.REP_DB_PATH),
max_size=2,
return_mol=False
))
return [m for m in mutants if m and m not in self.population.seen_smiles]
except Exception:
return []
def _create_molecules(self, smiles_list: List[str]) -> List[Molecule]:
"""Create Molecule objects from SMILES with predictions."""
if not smiles_list:
return []
# Get all predictions at once
predictions = self.predictor.predict_all_properties(smiles_list)
molecules = []
for i, smiles in enumerate(smiles_list):
# Extract predictions for this molecule
props = {k: v[i] for k, v in predictions.items()}
# Validate required properties
if props['cn'] is None:
continue
if self.config.minimize_ysi and props['ysi'] is None:
continue
# Validate filtered properties
if not all(self.predictor.is_valid(k, props[k]) for k in ['bp', 'density', 'lhv', 'dynamic_viscosity']):
continue
molecules.append(Molecule(
smiles=smiles,
cn=props['cn'],
cn_error=abs(props['cn'] - self.config.target_cn),
bp=props['bp'],
ysi=props['ysi'],
density=props['density'],
lhv=props['lhv'],
dynamic_viscosity=props['dynamic_viscosity']
))
return molecules
def initialize_population(self, initial_smiles: List[str]) -> int:
"""Initialize the population from initial SMILES."""
print("Predicting properties for initial population...")
molecules = self._create_molecules(initial_smiles)
return self.population.add_molecules(molecules)
def _log_generation_stats(self, generation: int):
"""Log statistics for the current generation."""
mols = self.population.molecules
best_cn = min(mols, key=lambda m: m.cn_error)
avg_cn_err = np.mean([m.cn_error for m in mols])
log_dict = {
"generation": generation,
"best_cn_error": best_cn.cn_error,
"population_size": len(mols),
"avg_cn_error": avg_cn_err,
}
print_msg = (f"Gen {generation}/{self.config.generations} | "
f"Pop {len(mols)} | "
f"Best CN err: {best_cn.cn_error:.3f} | "
f"Avg CN err: {avg_cn_err:.3f}")
if self.config.minimize_ysi:
front = self.population.pareto_front()
best_ysi = min(mols, key=lambda m: m.ysi)
avg_ysi = np.mean([m.ysi for m in mols])
log_dict.update({
"best_ysi": best_ysi.ysi,
"pareto_size": len(front),
"avg_ysi": avg_ysi,
})
print_msg += (f" | Best YSI: {best_ysi.ysi:.3f} | "
f"Avg YSI: {avg_ysi:.3f} | "
f"Pareto size: {len(front)}")
print(print_msg)
def _generate_offspring(self, survivors: List[Molecule]) -> List[Molecule]:
"""Generate offspring from survivors."""
target_count = self.config.population_size - len(survivors)
max_attempts = target_count * self.config.max_offspring_attempts
all_children = []
new_molecules = []
for attempt in range(max_attempts):
if len(new_molecules) >= target_count:
break
# Generate mutations
parent = random.choice(survivors)
mol = Chem.MolFromSmiles(parent.smiles)
if mol is None:
continue
children = self._mutate_molecule(mol)
all_children.extend(children[:self.config.mutations_per_parent])
# Process in batches
if len(all_children) >= self.config.batch_size:
print(f" → Evaluating batch of {len(all_children)} offspring...")
new_molecules.extend(self._create_molecules(all_children))
all_children = []
# Process remaining children
if all_children:
print(f" → Evaluating final batch of {len(all_children)} offspring...")
new_molecules.extend(self._create_molecules(all_children))
return new_molecules
def _run_evolution_loop(self):
"""Run the main evolution loop."""
for gen in range(1, self.config.generations + 1):
self._log_generation_stats(gen)
survivors = self.population.get_survivors()
offspring = self._generate_offspring(survivors)
# Create new population
new_pop = Population(self.config)
new_pop.add_molecules(survivors + offspring)
self.population = new_pop
def _generate_results(self) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""Generate final results DataFrames."""
final_df = self.population.to_dataframe()
if self.config.minimize_ysi and "ysi" in final_df.columns:
final_df = final_df[
(final_df["cn_error"] < 5) &
(final_df["ysi"] < 50)
].sort_values(["cn_error", "ysi"], ascending=True)
# overwrite rank safely
final_df["rank"] = range(1, len(final_df) + 1)
if self.config.minimize_ysi:
pareto_mols = self.population.pareto_front()
pareto_df = pd.DataFrame([m.to_dict() for m in pareto_mols])
if not pareto_df.empty:
pareto_df = pareto_df[
(pareto_df['cn_error'] < 5) & (pareto_df['ysi'] < 50)
].sort_values(["cn_error", "ysi"], ascending=True)
pareto_df.insert(0, 'rank', range(1, len(pareto_df) + 1))
else:
pareto_df = pd.DataFrame()
return final_df, pareto_df
def evolve(self) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""Run the evolutionary algorithm."""
# Initialize
df_bins = pd.qcut(df["cn"], q=30)
initial_smiles = (
df.groupby(df_bins)
.apply(lambda x: x.sample(20, random_state=42))
.reset_index(drop=True)["SMILES"]
.tolist()
)
init_count = self.initialize_population(initial_smiles)
if init_count == 0:
print("❌ No valid initial molecules")
return pd.DataFrame(), pd.DataFrame()
print(f"✓ Initial population size: {init_count}")
# Evolution
self._run_evolution_loop()
# Results
return self._generate_results()
def get_user_config() -> EvolutionConfig:
"""Get configuration from user input."""
print("\n" + "="*70)
print("MOLECULAR EVOLUTION WITH GENETIC ALGORITHM")
print("="*70)
while True:
target = float(input("Enter target CN: ") or "50")
if target > 40:
break
print("⚠️ Target CN is too low, optimization may be challenging.")
print("Consider using a higher target CN for better results.\n")
minimize_ysi = input("Minimise YSI (y/n): ").strip().lower() in ['y', 'yes']
return EvolutionConfig(target_cn=target, minimize_ysi=minimize_ysi)
def save_results(final_df: pd.DataFrame, pareto_df: pd.DataFrame, minimize_ysi: bool):
"""Save results to CSV files."""
results_dir = Path("results")
results_dir.mkdir(exist_ok=True)
final_df.to_csv(results_dir / "final_population.csv", index=False)
if minimize_ysi and not pareto_df.empty:
pareto_df.to_csv(results_dir / "pareto_front.csv", index=False)
print("\nSaved to results/")
def display_results(final_df: pd.DataFrame, pareto_df: pd.DataFrame, minimize_ysi: bool):
"""Display results to console."""
cols = (["rank", "smiles", "cn", "cn_error", "ysi", "bp", "density", "lhv", "dynamic_viscosity"])
print("\n=== Best Candidates ===")
print(final_df.head(10)[cols].to_string(index=False))
if minimize_ysi and not pareto_df.empty:
print("\n=== PARETO FRONT (ranked) ===")
print(pareto_df[["rank", "smiles", "cn", "cn_error", "ysi", "bp", "density", "lhv", "dynamic_viscosity"]]
.head(20).to_string(index=False))
def main():
"""Main execution function."""
config = get_user_config()
evolution = MolecularEvolution(config)
final_df, pareto_df = evolution.evolve()
# Display and save results
display_results(final_df, pareto_df, config.minimize_ysi)
save_results(final_df, pareto_df, config.minimize_ysi)
#-----------------------
# For flask application
#-----------------------
def run_generation(target_cn: float, minimize_ysi: bool = True):
config = EvolutionConfig(
target_cn=float(target_cn),
minimize_ysi=bool(minimize_ysi),
)
evolution = MolecularEvolution(config)
final_df, pareto_df = evolution.evolve()
return final_df, pareto_df
if __name__ == "__main__":
main()