import streamlit as st import pandas as pd import numpy as np import seaborn as sns import matplotlib.pyplot as plt from statistics import mode, StatisticsError # scikit-learn from sklearn.model_selection import train_test_split from sklearn.preprocessing import StandardScaler from sklearn.pipeline import Pipeline from sklearn.metrics import confusion_matrix, accuracy_score, f1_score from sklearn.ensemble import RandomForestClassifier from sklearn.linear_model import LogisticRegression from sklearn.svm import SVC from sklearn.neural_network import MLPClassifier # torch import torch import torch.nn as nn import torch.nn.functional as F # Transformers per la GenAI testuale from transformers import pipeline ################################################## # 1) FUNZIONE generate_synthetic_data (PRIMA DELL'USO) ################################################## def generate_synthetic_data(n_samples=300, seed=42): """ Genera un dataset sintetico con: - length, width, RUL, margin, shape, weight, thickness - shape pescata da ["axisymmetric","sheet_metal","alloy_plate","complex_plastic"] - RUL e margin con maggiore varianza """ np.random.seed(seed) length = np.clip(np.random.normal(100,20,n_samples), 50, 250) width = np.clip(np.random.normal(50,15,n_samples), 20, 150) RUL = np.clip(np.random.normal(500,250,n_samples), 0, 1000).astype(int) margin = np.clip(np.random.normal(150,150,n_samples), -200,600).astype(int) shapes = np.random.choice(["axisymmetric","sheet_metal","alloy_plate","complex_plastic"], size=n_samples, p=[0.4,0.3,0.2,0.1]) weight = np.clip(np.random.normal(80,30,n_samples), 10, 250) thickness = np.clip(np.random.normal(8,4,n_samples), 0.5, 30) return pd.DataFrame({ 'length': length, 'width': width, 'RUL': RUL, 'margin': margin, 'shape': shapes, 'weight': weight, 'thickness': thickness }) ################################################## # 2) MODELLI ML PLACEHOLDER ################################################## class DummyTabTransformerClassifier: def __init__(self, input_dim=8): self.clf = MLPClassifier(hidden_layer_sizes=(16,8), max_iter=100, random_state=42) def fit(self, X, y): self.clf.fit(X,y) return self def predict(self, X): return self.clf.predict(X) def predict_proba(self, X): if hasattr(self.clf,"predict_proba"): return self.clf.predict_proba(X) else: preds=self.clf.predict(X) return np.array([[1.0,0.0] if p==0 else [0.0,1.0] for p in preds]) MODELS_ML = { "RandomForest": RandomForestClassifier(random_state=42, n_estimators=100), "LogisticRegression": LogisticRegression(random_state=42, max_iter=500), "SVM": SVC(probability=True, random_state=42), "TabTransformer(Dummy)": DummyTabTransformerClassifier() } ################################################## # 3) VAE PER LA PARTE GENERATIVA (UPCYCLING) ################################################## class MiniVAE(nn.Module): def __init__(self, input_dim=5, latent_dim=2): super().__init__() self.fc1 = nn.Linear(input_dim,32) self.fc21= nn.Linear(32,latent_dim) self.fc22= nn.Linear(32,latent_dim) self.fc3 = nn.Linear(latent_dim,32) self.fc4 = nn.Linear(32,input_dim) def encode(self,x): h = F.relu(self.fc1(x)) return self.fc21(h), self.fc22(h) def reparameterize(self, mu, logvar): std = torch.exp(0.5*logvar) eps = torch.randn_like(std) return mu + eps*std def decode(self,z): h=F.relu(self.fc3(z)) return self.fc4(h) def forward(self,x): mu,logvar=self.encode(x) z=self.reparameterize(mu,logvar) recon=self.decode(z) return recon, mu, logvar def vae_loss(recon_x,x,mu,logvar): mse = F.mse_loss(recon_x,x,reduction='sum') kld = -0.5*torch.sum(1+logvar - mu.pow(2)-logvar.exp()) return mse+kld ################################################## # 4) COSTANTI E MAPPING FEATURE ################################################## SHAPE_MAPPING = {"axisymmetric":0,"sheet_metal":1,"alloy_plate":2,"complex_plastic":3} ML_FEATURES = ["length","width","shape_code","weight","thickness","RUL","margin","compat_dim"] VAE_FEATURES = ["length","width","weight","thickness","shape_code"] ################################################## # 5) UTILITY: dimension_match e assign_class ################################################## def dimension_match(r, target_len, target_wid, t_shape, t_w, t_th, tol_len, tol_wid, tol_we, tol_th): c_len= abs(r["length"]-target_len)<=tol_len c_wid= abs(r["width"]-target_wid)<= tol_wid c_shp= (r["shape"]==t_shape) c_wei= abs(r["weight"]-t_w)<=tol_we c_thi= abs(r["thickness"]-t_th)<=tol_th return 1 if (c_len and c_wid and c_shp and c_wei and c_thi) else 0 def assign_class(r, thr_score=0.5, alpha=0.5, beta=0.5): rul_norm = r["RUL"]/1000.0 margin_norm= (r["margin"]+200)/800.0 score= alpha*rul_norm + beta*margin_norm if r["compat_dim"]==1 and score>=thr_score: return "Riutilizzo Funzionale" else: return "Upcycling Creativo" ################################################## # STEP 1: DATASET ################################################## def step1_dataset(): st.header("Step 1: Dataset") colA, colB = st.columns(2) with colA: data_opt= st.radio("Fonte Dati", ["Genera","Carica CSV"], horizontal=True) data=None if data_opt=="Genera": n= st.slider("Campioni sintetici",100,2000,300,step=100) if st.button("Genera"): data= generate_synthetic_data(n_samples=n) st.session_state["data_source"]="generated" else: upl= st.file_uploader("Carica CSV con col. minime [length,width,RUL,margin,shape,weight,thickness]", type=["csv"]) if upl: df= pd.read_csv(upl) needed=["length","width","RUL","margin","shape","weight","thickness"] if not all(c in df.columns for c in needed): st.error("CSV non valido. Manca qualche colonna.") else: data=df st.session_state["data_source"]="uploaded" with colB: st.markdown("**Parametri Compatibilità**") t_len= st.number_input("Lunghezza target",50.0,300.0,100.0) t_wid= st.number_input("Larghezza target",20.0,200.0,50.0) t_shp= st.selectbox("Forma target", list(SHAPE_MAPPING.keys())) t_wei= st.number_input("Peso target (kg)",5.0,300.0,80.0) t_thi= st.number_input("Spessore target (mm)",0.5,50.0,8.0) st.markdown("**Tolleranze**") tol_len= st.slider("Tol len ±",0.0,30.0,5.0) tol_wid= st.slider("Tol wid ±",0.0,20.0,3.0) tol_wei= st.slider("Tol weight ±",0.0,50.0,10.0) tol_thi= st.slider("Tol thick ±",0.0,5.0,1.0) st.markdown("**Score RUL & Margin**") thr= st.slider("Soglia Score",0.0,1.0,0.5) alpha= st.slider("Peso RUL(α)",0.0,1.0,0.5) beta= st.slider("Peso Margin(β)",0.0,1.0,0.5) if data is not None: data['shape_code']= data['shape'].map(SHAPE_MAPPING).fillna(-1).astype(int) data['compat_dim']= data.apply(lambda r: dimension_match(r, t_len, t_wid, t_shp, t_wei, t_thi, tol_len, tol_wid, tol_wei, tol_thi), axis=1) data['Target'] = data.apply(lambda r: assign_class(r, thr_score=thr, alpha=alpha, beta=beta), axis=1) st.dataframe(data.head(10)) st.write("Distrib. Target:", data["Target"].value_counts()) st.session_state["data"]= data csv=data.to_csv(index=False).encode('utf-8') st.download_button("Scarica dataset elaborato", csv, "dataset_processed.csv") ################################################## # STEP 2: ADD ML ################################################## def step2_trainML(): st.header("Step 2: Addestramento ML") data= st.session_state.get("data",None) if data is None: st.error("Devi completare Step 1.") return if "Target" not in data.columns: st.error("Manca colonna 'Target'. Rivedi Step 1.") return features_ml=[f for f in ML_FEATURES if f in data.columns] if not features_ml: st.error("Mancano feature minime ML.") return X= data[features_ml] y= data["Target"].map({"Riutilizzo Funzionale":0,"Upcycling Creativo":1}) if len(y.unique())<2: st.error("Dataset ha una sola classe. Impossibile train.") return X_train,X_test,y_train,y_test= train_test_split(X,y,test_size=0.25,random_state=42,stratify=y) st.write(f"Train={len(X_train)}, Test={len(X_test)}") trained={} results=[] for nome,model in MODELS_ML.items(): st.subheader(f"Modello: {nome}") from sklearn.pipeline import Pipeline pipe= Pipeline([ ("scaler",StandardScaler()), ("clf",model) ]) try: pipe.fit(X_train,y_train) y_pred= pipe.predict(X_test) acc= accuracy_score(y_test,y_pred) f1= f1_score(y_test,y_pred,average='weighted') results.append({"Modello":nome,"Accuracy":acc,"F1":f1}) trained[nome]= pipe cm= confusion_matrix(y_test,y_pred) fig, ax= plt.subplots() sns.heatmap(cm, annot=True, fmt='d', cmap="Greens", ax=ax) plt.xlabel("Pred") plt.ylabel("True") st.pyplot(fig) st.metric("Accuracy",f"{acc:.3f}") st.metric("F1 Score",f"{f1:.3f}") except Exception as e: st.error(f"Errore training {nome}: {e}") if results: df_r= pd.DataFrame(results).sort_values(by="Accuracy", ascending=False) st.dataframe(df_r) st.session_state["models"]= trained st.session_state["ml_results"]= df_r else: st.error("Nessun modello addestrato.") st.session_state["models"]=None ################################################## # STEP 2B: TRAIN VAE ################################################## def step2b_trainVAE(): st.header("Step 2B: Training VAE per Upcycling") data= st.session_state.get("data",None) if data is None: st.error("Completa Step 1.") return feats= [f for f in VAE_FEATURES if f in data.columns] if not feats: st.error(f"Mancano feature per VAE: {VAE_FEATURES}") return st.write("Useremo le feature:", feats) lat_dim= st.slider("Dim latente VAE",2,10,2) ep= st.number_input("Epochs",10,300,50) lr= st.number_input("Learning Rate",1e-5,1e-2,1e-3, format="%e") bs= st.selectbox("Batch size",[16,32,64],index=1) if not st.session_state.get("vae_trained",False): st.warning("VAE non addestrato") if st.button("Allena VAE"): st.session_state["vae"]= MiniVAE(input_dim=len(feats), latent_dim=lat_dim) from sklearn.preprocessing import StandardScaler X_vae= data[feats].copy() for c in X_vae.columns: if X_vae[c].isnull().any(): X_vae[c].fillna(X_vae[c].median(), inplace=True) scaler= StandardScaler() X_s= scaler.fit_transform(X_vae) st.session_state["vae_scaler"]= scaler dataset= torch.utils.data.TensorDataset(torch.tensor(X_s,dtype=torch.float32)) loader= torch.utils.data.DataLoader(dataset,batch_size=bs,shuffle=True) vae= st.session_state["vae"] opt= torch.optim.Adam(vae.parameters(),lr=lr) losses=[] vae.train() for epoch in range(int(ep)): ep_loss=0 for (batch,) in loader: opt.zero_grad() recon, mu, logvar= vae(batch) loss= vae_loss(recon,batch,mu,logvar) loss.backward() opt.step() ep_loss+=loss.item() avgL= ep_loss/len(dataset) losses.append(avgL) st.progress((epoch+1)/ep) st.success(f"VAE addestrato (Loss ~ {avgL:.2f})") st.line_chart(losses) st.session_state["vae_trained"]= True else: st.success("VAE già addestrato.") if st.button("Riallena"): st.session_state["vae_trained"]=False st.experimental_rerun() ################################################## # STEP 3: Upcycling Generative ################################################## def step3_upcycling_generative(): st.header("Step 3: Upcycling Generative - VAE + GenAI") if not st.session_state.get("vae_trained",False): st.error("Devi addestrare il VAE in Step 2B prima.") return vae= st.session_state.get("vae",None) vae_scaler= st.session_state.get("vae_scaler",None) if vae is None or vae_scaler is None: st.error("Mancano vae o scaler.") return lat_dim= vae.fc21.out_features st.write(f"VAE con lat_dim={lat_dim}. Generiamo idee upcycling.") n_ideas= st.number_input("Quante idee generare",1,10,3) if st.button("Genera Upcycling"): vae.eval() with torch.no_grad(): z=torch.randn(n_ideas,lat_dim) recon= vae.decode(z) arr= recon.numpy() try: df_gen= pd.DataFrame(vae_scaler.inverse_transform(arr), columns=vae_scaler.feature_names_in_) # shape_code -> shape if 'shape_code' in df_gen.columns: df_gen['shape_code']= df_gen['shape_code'].round().astype(int) inv_map={0:"axisymmetric",1:"sheet_metal",2:"alloy_plate",3:"complex_plastic"} df_gen['shape']= df_gen['shape_code'].map(inv_map).fillna('unknown') st.subheader("Configurazioni Generate (VAE)") st.dataframe(df_gen.round(2)) # Aggiungiamo GenAI test st.markdown("### Suggerimenti Testuali Upcycling (distilgpt2)") text_generator = pipeline("text-generation", model="distilgpt2", device=0 if torch.cuda.is_available() else -1) def gen_upcycle_text(shape, thick, wei): prompt = ( f"Ho un componente EoL con forma {shape}, spessore {thick:.1f} mm, peso {wei:.1f} kg.\n" "Dammi un'idea creativa di upcycling in italiano, con passaggi principali:" ) out= text_generator(prompt, max_new_tokens=50, do_sample=True, top_k=50) return out[0]["generated_text"] for i, row in df_gen.iterrows(): sh= row.get("shape","unknown") tk= row.get("thickness",1.0) we= row.get("weight",10.0) text_sugg= gen_upcycle_text(sh, tk, we) st.write(f"**Idea {i+1}**: shape={sh}, thickness={tk:.1f}, weight={we:.1f}") st.info(text_sugg) st.markdown("---") except Exception as e: st.error(f"Errore decodifica VAE: {e}") ################################################## # DASHBOARD ################################################## def show_dashboard(): st.header("Dashboard") data= st.session_state.get("data",None) if data is None: st.error("Nessun dataset.") return st.write("Distribuzione classi Target:", data["Target"].value_counts()) if "ml_results" in st.session_state: st.subheader("Risultati ML") st.dataframe(st.session_state["ml_results"]) else: st.info("Nessun risultato ML") if st.session_state.get("vae_trained",False): st.success("VAE addestrato.") else: st.warning("VAE non addestrato.") ################################################## # HELP ################################################## def show_help(): st.header("ℹ️ Guida Quattro Step") st.markdown(""" 1. **Step 1: Dataset** Generi o carichi CSV, definisci compatibilità, e assegni 'Riutilizzo' vs 'Upcycling'. 2. **Step 2: Addestramento ML** Allena modelli (RandomForest, ecc.) su [Riutilizzo vs Upcycling]. 3. **Step 2B: Training VAE** Allena VAE sulle feature geometriche (length, width, weight, thickness, shape_code). 4. **Step 3: Upcycling Generative** Genera N configurazioni col VAE e, per ognuna, ottieni un testo creativo di upcycling con un modello HF (distilgpt2). **Dashboard**: metriche. **Reset**: pulsante nella sidebar che cancella lo state. """) ################################################## # RESET ################################################## def reset_app(): for k in ["data","models","ml_results","vae","vae_trained","vae_scaler","data_source","params_dim"]: if k in st.session_state: del st.session_state[k] st.success("App resettata.") st.experimental_rerun() ################################################## # MAIN ################################################## def main(): st.sidebar.title("WEEKO – 4 Step Flow") step= st.sidebar.radio("Fasi:",[ "Step 1: Dataset", "Step 2: Addestramento ML", "Step 2B: Training VAE", "Step 3: Upcycling Generative", "Dashboard", "Help" ]) if st.sidebar.button("Reset App"): reset_app() if step=="Step 1: Dataset": step1_dataset() elif step=="Step 2: Addestramento ML": step2_trainML() elif step=="Step 2B: Training VAE": step2b_trainVAE() elif step=="Step 3: Upcycling Generative": step3_upcycling_generative() elif step=="Dashboard": show_dashboard() elif step=="Help": show_help() if __name__=="__main__": main()