Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| import numpy as np | |
| import seaborn as sns | |
| import matplotlib.pyplot as plt | |
| from statistics import mode, StatisticsError | |
| # scikit-learn | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.pipeline import Pipeline | |
| from sklearn.metrics import confusion_matrix, accuracy_score, f1_score | |
| from sklearn.ensemble import RandomForestClassifier | |
| from sklearn.linear_model import LogisticRegression | |
| from sklearn.svm import SVC | |
| from sklearn.neural_network import MLPClassifier | |
| # torch | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| # Transformers per la GenAI testuale | |
| from transformers import pipeline | |
| ################################################## | |
| # 1) FUNZIONE generate_synthetic_data (PRIMA DELL'USO) | |
| ################################################## | |
| def generate_synthetic_data(n_samples=300, seed=42): | |
| """ | |
| Genera un dataset sintetico con: | |
| - length, width, RUL, margin, shape, weight, thickness | |
| - shape pescata da ["axisymmetric","sheet_metal","alloy_plate","complex_plastic"] | |
| - RUL e margin con maggiore varianza | |
| """ | |
| np.random.seed(seed) | |
| length = np.clip(np.random.normal(100,20,n_samples), 50, 250) | |
| width = np.clip(np.random.normal(50,15,n_samples), 20, 150) | |
| RUL = np.clip(np.random.normal(500,250,n_samples), 0, 1000).astype(int) | |
| margin = np.clip(np.random.normal(150,150,n_samples), -200,600).astype(int) | |
| shapes = np.random.choice(["axisymmetric","sheet_metal","alloy_plate","complex_plastic"], | |
| size=n_samples, p=[0.4,0.3,0.2,0.1]) | |
| weight = np.clip(np.random.normal(80,30,n_samples), 10, 250) | |
| thickness = np.clip(np.random.normal(8,4,n_samples), 0.5, 30) | |
| return pd.DataFrame({ | |
| 'length': length, | |
| 'width': width, | |
| 'RUL': RUL, | |
| 'margin': margin, | |
| 'shape': shapes, | |
| 'weight': weight, | |
| 'thickness': thickness | |
| }) | |
| ################################################## | |
| # 2) MODELLI ML PLACEHOLDER | |
| ################################################## | |
| class DummyTabTransformerClassifier: | |
| def __init__(self, input_dim=8): | |
| self.clf = MLPClassifier(hidden_layer_sizes=(16,8), max_iter=100, random_state=42) | |
| def fit(self, X, y): | |
| self.clf.fit(X,y) | |
| return self | |
| def predict(self, X): | |
| return self.clf.predict(X) | |
| def predict_proba(self, X): | |
| if hasattr(self.clf,"predict_proba"): | |
| return self.clf.predict_proba(X) | |
| else: | |
| preds=self.clf.predict(X) | |
| return np.array([[1.0,0.0] if p==0 else [0.0,1.0] for p in preds]) | |
| MODELS_ML = { | |
| "RandomForest": RandomForestClassifier(random_state=42, n_estimators=100), | |
| "LogisticRegression": LogisticRegression(random_state=42, max_iter=500), | |
| "SVM": SVC(probability=True, random_state=42), | |
| "TabTransformer(Dummy)": DummyTabTransformerClassifier() | |
| } | |
| ################################################## | |
| # 3) VAE PER LA PARTE GENERATIVA (UPCYCLING) | |
| ################################################## | |
| class MiniVAE(nn.Module): | |
| def __init__(self, input_dim=5, latent_dim=2): | |
| super().__init__() | |
| self.fc1 = nn.Linear(input_dim,32) | |
| self.fc21= nn.Linear(32,latent_dim) | |
| self.fc22= nn.Linear(32,latent_dim) | |
| self.fc3 = nn.Linear(latent_dim,32) | |
| self.fc4 = nn.Linear(32,input_dim) | |
| def encode(self,x): | |
| h = F.relu(self.fc1(x)) | |
| return self.fc21(h), self.fc22(h) | |
| def reparameterize(self, mu, logvar): | |
| std = torch.exp(0.5*logvar) | |
| eps = torch.randn_like(std) | |
| return mu + eps*std | |
| def decode(self,z): | |
| h=F.relu(self.fc3(z)) | |
| return self.fc4(h) | |
| def forward(self,x): | |
| mu,logvar=self.encode(x) | |
| z=self.reparameterize(mu,logvar) | |
| recon=self.decode(z) | |
| return recon, mu, logvar | |
| def vae_loss(recon_x,x,mu,logvar): | |
| mse = F.mse_loss(recon_x,x,reduction='sum') | |
| kld = -0.5*torch.sum(1+logvar - mu.pow(2)-logvar.exp()) | |
| return mse+kld | |
| ################################################## | |
| # 4) COSTANTI E MAPPING FEATURE | |
| ################################################## | |
| SHAPE_MAPPING = {"axisymmetric":0,"sheet_metal":1,"alloy_plate":2,"complex_plastic":3} | |
| ML_FEATURES = ["length","width","shape_code","weight","thickness","RUL","margin","compat_dim"] | |
| VAE_FEATURES = ["length","width","weight","thickness","shape_code"] | |
| ################################################## | |
| # 5) UTILITY: dimension_match e assign_class | |
| ################################################## | |
| def dimension_match(r, target_len, target_wid, t_shape, t_w, t_th, | |
| tol_len, tol_wid, tol_we, tol_th): | |
| c_len= abs(r["length"]-target_len)<=tol_len | |
| c_wid= abs(r["width"]-target_wid)<= tol_wid | |
| c_shp= (r["shape"]==t_shape) | |
| c_wei= abs(r["weight"]-t_w)<=tol_we | |
| c_thi= abs(r["thickness"]-t_th)<=tol_th | |
| return 1 if (c_len and c_wid and c_shp and c_wei and c_thi) else 0 | |
| def assign_class(r, thr_score=0.5, alpha=0.5, beta=0.5): | |
| rul_norm = r["RUL"]/1000.0 | |
| margin_norm= (r["margin"]+200)/800.0 | |
| score= alpha*rul_norm + beta*margin_norm | |
| if r["compat_dim"]==1 and score>=thr_score: | |
| return "Riutilizzo Funzionale" | |
| else: | |
| return "Upcycling Creativo" | |
| ################################################## | |
| # STEP 1: DATASET | |
| ################################################## | |
| def step1_dataset(): | |
| st.header("Step 1: Dataset") | |
| colA, colB = st.columns(2) | |
| with colA: | |
| data_opt= st.radio("Fonte Dati", ["Genera","Carica CSV"], horizontal=True) | |
| data=None | |
| if data_opt=="Genera": | |
| n= st.slider("Campioni sintetici",100,2000,300,step=100) | |
| if st.button("Genera"): | |
| data= generate_synthetic_data(n_samples=n) | |
| st.session_state["data_source"]="generated" | |
| else: | |
| upl= st.file_uploader("Carica CSV con col. minime [length,width,RUL,margin,shape,weight,thickness]", type=["csv"]) | |
| if upl: | |
| df= pd.read_csv(upl) | |
| needed=["length","width","RUL","margin","shape","weight","thickness"] | |
| if not all(c in df.columns for c in needed): | |
| st.error("CSV non valido. Manca qualche colonna.") | |
| else: | |
| data=df | |
| st.session_state["data_source"]="uploaded" | |
| with colB: | |
| st.markdown("**Parametri Compatibilità**") | |
| t_len= st.number_input("Lunghezza target",50.0,300.0,100.0) | |
| t_wid= st.number_input("Larghezza target",20.0,200.0,50.0) | |
| t_shp= st.selectbox("Forma target", list(SHAPE_MAPPING.keys())) | |
| t_wei= st.number_input("Peso target (kg)",5.0,300.0,80.0) | |
| t_thi= st.number_input("Spessore target (mm)",0.5,50.0,8.0) | |
| st.markdown("**Tolleranze**") | |
| tol_len= st.slider("Tol len ±",0.0,30.0,5.0) | |
| tol_wid= st.slider("Tol wid ±",0.0,20.0,3.0) | |
| tol_wei= st.slider("Tol weight ±",0.0,50.0,10.0) | |
| tol_thi= st.slider("Tol thick ±",0.0,5.0,1.0) | |
| st.markdown("**Score RUL & Margin**") | |
| thr= st.slider("Soglia Score",0.0,1.0,0.5) | |
| alpha= st.slider("Peso RUL(α)",0.0,1.0,0.5) | |
| beta= st.slider("Peso Margin(β)",0.0,1.0,0.5) | |
| if data is not None: | |
| data['shape_code']= data['shape'].map(SHAPE_MAPPING).fillna(-1).astype(int) | |
| data['compat_dim']= data.apply(lambda r: dimension_match(r, t_len, t_wid, t_shp, t_wei, t_thi, | |
| tol_len, tol_wid, tol_wei, tol_thi), | |
| axis=1) | |
| data['Target'] = data.apply(lambda r: assign_class(r, thr_score=thr, alpha=alpha, beta=beta), axis=1) | |
| st.dataframe(data.head(10)) | |
| st.write("Distrib. Target:", data["Target"].value_counts()) | |
| st.session_state["data"]= data | |
| csv=data.to_csv(index=False).encode('utf-8') | |
| st.download_button("Scarica dataset elaborato", csv, "dataset_processed.csv") | |
| ################################################## | |
| # STEP 2: ADD ML | |
| ################################################## | |
| def step2_trainML(): | |
| st.header("Step 2: Addestramento ML") | |
| data= st.session_state.get("data",None) | |
| if data is None: | |
| st.error("Devi completare Step 1.") | |
| return | |
| if "Target" not in data.columns: | |
| st.error("Manca colonna 'Target'. Rivedi Step 1.") | |
| return | |
| features_ml=[f for f in ML_FEATURES if f in data.columns] | |
| if not features_ml: | |
| st.error("Mancano feature minime ML.") | |
| return | |
| X= data[features_ml] | |
| y= data["Target"].map({"Riutilizzo Funzionale":0,"Upcycling Creativo":1}) | |
| if len(y.unique())<2: | |
| st.error("Dataset ha una sola classe. Impossibile train.") | |
| return | |
| X_train,X_test,y_train,y_test= train_test_split(X,y,test_size=0.25,random_state=42,stratify=y) | |
| st.write(f"Train={len(X_train)}, Test={len(X_test)}") | |
| trained={} | |
| results=[] | |
| for nome,model in MODELS_ML.items(): | |
| st.subheader(f"Modello: {nome}") | |
| from sklearn.pipeline import Pipeline | |
| pipe= Pipeline([ | |
| ("scaler",StandardScaler()), | |
| ("clf",model) | |
| ]) | |
| try: | |
| pipe.fit(X_train,y_train) | |
| y_pred= pipe.predict(X_test) | |
| acc= accuracy_score(y_test,y_pred) | |
| f1= f1_score(y_test,y_pred,average='weighted') | |
| results.append({"Modello":nome,"Accuracy":acc,"F1":f1}) | |
| trained[nome]= pipe | |
| cm= confusion_matrix(y_test,y_pred) | |
| fig, ax= plt.subplots() | |
| sns.heatmap(cm, annot=True, fmt='d', cmap="Greens", ax=ax) | |
| plt.xlabel("Pred") | |
| plt.ylabel("True") | |
| st.pyplot(fig) | |
| st.metric("Accuracy",f"{acc:.3f}") | |
| st.metric("F1 Score",f"{f1:.3f}") | |
| except Exception as e: | |
| st.error(f"Errore training {nome}: {e}") | |
| if results: | |
| df_r= pd.DataFrame(results).sort_values(by="Accuracy", ascending=False) | |
| st.dataframe(df_r) | |
| st.session_state["models"]= trained | |
| st.session_state["ml_results"]= df_r | |
| else: | |
| st.error("Nessun modello addestrato.") | |
| st.session_state["models"]=None | |
| ################################################## | |
| # STEP 2B: TRAIN VAE | |
| ################################################## | |
| def step2b_trainVAE(): | |
| st.header("Step 2B: Training VAE per Upcycling") | |
| data= st.session_state.get("data",None) | |
| if data is None: | |
| st.error("Completa Step 1.") | |
| return | |
| feats= [f for f in VAE_FEATURES if f in data.columns] | |
| if not feats: | |
| st.error(f"Mancano feature per VAE: {VAE_FEATURES}") | |
| return | |
| st.write("Useremo le feature:", feats) | |
| lat_dim= st.slider("Dim latente VAE",2,10,2) | |
| ep= st.number_input("Epochs",10,300,50) | |
| lr= st.number_input("Learning Rate",1e-5,1e-2,1e-3, format="%e") | |
| bs= st.selectbox("Batch size",[16,32,64],index=1) | |
| if not st.session_state.get("vae_trained",False): | |
| st.warning("VAE non addestrato") | |
| if st.button("Allena VAE"): | |
| st.session_state["vae"]= MiniVAE(input_dim=len(feats), latent_dim=lat_dim) | |
| from sklearn.preprocessing import StandardScaler | |
| X_vae= data[feats].copy() | |
| for c in X_vae.columns: | |
| if X_vae[c].isnull().any(): | |
| X_vae[c].fillna(X_vae[c].median(), inplace=True) | |
| scaler= StandardScaler() | |
| X_s= scaler.fit_transform(X_vae) | |
| st.session_state["vae_scaler"]= scaler | |
| dataset= torch.utils.data.TensorDataset(torch.tensor(X_s,dtype=torch.float32)) | |
| loader= torch.utils.data.DataLoader(dataset,batch_size=bs,shuffle=True) | |
| vae= st.session_state["vae"] | |
| opt= torch.optim.Adam(vae.parameters(),lr=lr) | |
| losses=[] | |
| vae.train() | |
| for epoch in range(int(ep)): | |
| ep_loss=0 | |
| for (batch,) in loader: | |
| opt.zero_grad() | |
| recon, mu, logvar= vae(batch) | |
| loss= vae_loss(recon,batch,mu,logvar) | |
| loss.backward() | |
| opt.step() | |
| ep_loss+=loss.item() | |
| avgL= ep_loss/len(dataset) | |
| losses.append(avgL) | |
| st.progress((epoch+1)/ep) | |
| st.success(f"VAE addestrato (Loss ~ {avgL:.2f})") | |
| st.line_chart(losses) | |
| st.session_state["vae_trained"]= True | |
| else: | |
| st.success("VAE già addestrato.") | |
| if st.button("Riallena"): | |
| st.session_state["vae_trained"]=False | |
| st.experimental_rerun() | |
| ################################################## | |
| # STEP 3: Upcycling Generative | |
| ################################################## | |
| def step3_upcycling_generative(): | |
| st.header("Step 3: Upcycling Generative - VAE + GenAI") | |
| if not st.session_state.get("vae_trained",False): | |
| st.error("Devi addestrare il VAE in Step 2B prima.") | |
| return | |
| vae= st.session_state.get("vae",None) | |
| vae_scaler= st.session_state.get("vae_scaler",None) | |
| if vae is None or vae_scaler is None: | |
| st.error("Mancano vae o scaler.") | |
| return | |
| lat_dim= vae.fc21.out_features | |
| st.write(f"VAE con lat_dim={lat_dim}. Generiamo idee upcycling.") | |
| n_ideas= st.number_input("Quante idee generare",1,10,3) | |
| if st.button("Genera Upcycling"): | |
| vae.eval() | |
| with torch.no_grad(): | |
| z=torch.randn(n_ideas,lat_dim) | |
| recon= vae.decode(z) | |
| arr= recon.numpy() | |
| try: | |
| df_gen= pd.DataFrame(vae_scaler.inverse_transform(arr), columns=vae_scaler.feature_names_in_) | |
| # shape_code -> shape | |
| if 'shape_code' in df_gen.columns: | |
| df_gen['shape_code']= df_gen['shape_code'].round().astype(int) | |
| inv_map={0:"axisymmetric",1:"sheet_metal",2:"alloy_plate",3:"complex_plastic"} | |
| df_gen['shape']= df_gen['shape_code'].map(inv_map).fillna('unknown') | |
| st.subheader("Configurazioni Generate (VAE)") | |
| st.dataframe(df_gen.round(2)) | |
| # Aggiungiamo GenAI test | |
| st.markdown("### Suggerimenti Testuali Upcycling (distilgpt2)") | |
| text_generator = pipeline("text-generation", | |
| model="distilgpt2", | |
| device=0 if torch.cuda.is_available() else -1) | |
| def gen_upcycle_text(shape, thick, wei): | |
| prompt = ( | |
| f"Ho un componente EoL con forma {shape}, spessore {thick:.1f} mm, peso {wei:.1f} kg.\n" | |
| "Dammi un'idea creativa di upcycling in italiano, con passaggi principali:" | |
| ) | |
| out= text_generator(prompt, max_new_tokens=50, do_sample=True, top_k=50) | |
| return out[0]["generated_text"] | |
| for i, row in df_gen.iterrows(): | |
| sh= row.get("shape","unknown") | |
| tk= row.get("thickness",1.0) | |
| we= row.get("weight",10.0) | |
| text_sugg= gen_upcycle_text(sh, tk, we) | |
| st.write(f"**Idea {i+1}**: shape={sh}, thickness={tk:.1f}, weight={we:.1f}") | |
| st.info(text_sugg) | |
| st.markdown("---") | |
| except Exception as e: | |
| st.error(f"Errore decodifica VAE: {e}") | |
| ################################################## | |
| # DASHBOARD | |
| ################################################## | |
| def show_dashboard(): | |
| st.header("Dashboard") | |
| data= st.session_state.get("data",None) | |
| if data is None: | |
| st.error("Nessun dataset.") | |
| return | |
| st.write("Distribuzione classi Target:", data["Target"].value_counts()) | |
| if "ml_results" in st.session_state: | |
| st.subheader("Risultati ML") | |
| st.dataframe(st.session_state["ml_results"]) | |
| else: | |
| st.info("Nessun risultato ML") | |
| if st.session_state.get("vae_trained",False): | |
| st.success("VAE addestrato.") | |
| else: | |
| st.warning("VAE non addestrato.") | |
| ################################################## | |
| # HELP | |
| ################################################## | |
| def show_help(): | |
| st.header("ℹ️ Guida Quattro Step") | |
| st.markdown(""" | |
| 1. **Step 1: Dataset** | |
| Generi o carichi CSV, definisci compatibilità, e assegni 'Riutilizzo' vs 'Upcycling'. | |
| 2. **Step 2: Addestramento ML** | |
| Allena modelli (RandomForest, ecc.) su [Riutilizzo vs Upcycling]. | |
| 3. **Step 2B: Training VAE** | |
| Allena VAE sulle feature geometriche (length, width, weight, thickness, shape_code). | |
| 4. **Step 3: Upcycling Generative** | |
| Genera N configurazioni col VAE e, per ognuna, ottieni un testo creativo di upcycling con un modello HF (distilgpt2). | |
| **Dashboard**: metriche. | |
| **Reset**: pulsante nella sidebar che cancella lo state. | |
| """) | |
| ################################################## | |
| # RESET | |
| ################################################## | |
| def reset_app(): | |
| for k in ["data","models","ml_results","vae","vae_trained","vae_scaler","data_source","params_dim"]: | |
| if k in st.session_state: | |
| del st.session_state[k] | |
| st.success("App resettata.") | |
| st.experimental_rerun() | |
| ################################################## | |
| # MAIN | |
| ################################################## | |
| def main(): | |
| st.sidebar.title("WEEKO – 4 Step Flow") | |
| step= st.sidebar.radio("Fasi:",[ | |
| "Step 1: Dataset", | |
| "Step 2: Addestramento ML", | |
| "Step 2B: Training VAE", | |
| "Step 3: Upcycling Generative", | |
| "Dashboard", | |
| "Help" | |
| ]) | |
| if st.sidebar.button("Reset App"): | |
| reset_app() | |
| if step=="Step 1: Dataset": | |
| step1_dataset() | |
| elif step=="Step 2: Addestramento ML": | |
| step2_trainML() | |
| elif step=="Step 2B: Training VAE": | |
| step2b_trainVAE() | |
| elif step=="Step 3: Upcycling Generative": | |
| step3_upcycling_generative() | |
| elif step=="Dashboard": | |
| show_dashboard() | |
| elif step=="Help": | |
| show_help() | |
| if __name__=="__main__": | |
| main() |