valegro's picture
Update app.py
5037b39 verified
import streamlit as st
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from statistics import mode, StatisticsError
# scikit-learn
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.metrics import confusion_matrix, accuracy_score, f1_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.neural_network import MLPClassifier
# torch
import torch
import torch.nn as nn
import torch.nn.functional as F
# Transformers per la GenAI testuale
from transformers import pipeline
##################################################
# 1) FUNZIONE generate_synthetic_data (PRIMA DELL'USO)
##################################################
def generate_synthetic_data(n_samples=300, seed=42):
"""
Genera un dataset sintetico con:
- length, width, RUL, margin, shape, weight, thickness
- shape pescata da ["axisymmetric","sheet_metal","alloy_plate","complex_plastic"]
- RUL e margin con maggiore varianza
"""
np.random.seed(seed)
length = np.clip(np.random.normal(100,20,n_samples), 50, 250)
width = np.clip(np.random.normal(50,15,n_samples), 20, 150)
RUL = np.clip(np.random.normal(500,250,n_samples), 0, 1000).astype(int)
margin = np.clip(np.random.normal(150,150,n_samples), -200,600).astype(int)
shapes = np.random.choice(["axisymmetric","sheet_metal","alloy_plate","complex_plastic"],
size=n_samples, p=[0.4,0.3,0.2,0.1])
weight = np.clip(np.random.normal(80,30,n_samples), 10, 250)
thickness = np.clip(np.random.normal(8,4,n_samples), 0.5, 30)
return pd.DataFrame({
'length': length,
'width': width,
'RUL': RUL,
'margin': margin,
'shape': shapes,
'weight': weight,
'thickness': thickness
})
##################################################
# 2) MODELLI ML PLACEHOLDER
##################################################
class DummyTabTransformerClassifier:
def __init__(self, input_dim=8):
self.clf = MLPClassifier(hidden_layer_sizes=(16,8), max_iter=100, random_state=42)
def fit(self, X, y):
self.clf.fit(X,y)
return self
def predict(self, X):
return self.clf.predict(X)
def predict_proba(self, X):
if hasattr(self.clf,"predict_proba"):
return self.clf.predict_proba(X)
else:
preds=self.clf.predict(X)
return np.array([[1.0,0.0] if p==0 else [0.0,1.0] for p in preds])
MODELS_ML = {
"RandomForest": RandomForestClassifier(random_state=42, n_estimators=100),
"LogisticRegression": LogisticRegression(random_state=42, max_iter=500),
"SVM": SVC(probability=True, random_state=42),
"TabTransformer(Dummy)": DummyTabTransformerClassifier()
}
##################################################
# 3) VAE PER LA PARTE GENERATIVA (UPCYCLING)
##################################################
class MiniVAE(nn.Module):
def __init__(self, input_dim=5, latent_dim=2):
super().__init__()
self.fc1 = nn.Linear(input_dim,32)
self.fc21= nn.Linear(32,latent_dim)
self.fc22= nn.Linear(32,latent_dim)
self.fc3 = nn.Linear(latent_dim,32)
self.fc4 = nn.Linear(32,input_dim)
def encode(self,x):
h = F.relu(self.fc1(x))
return self.fc21(h), self.fc22(h)
def reparameterize(self, mu, logvar):
std = torch.exp(0.5*logvar)
eps = torch.randn_like(std)
return mu + eps*std
def decode(self,z):
h=F.relu(self.fc3(z))
return self.fc4(h)
def forward(self,x):
mu,logvar=self.encode(x)
z=self.reparameterize(mu,logvar)
recon=self.decode(z)
return recon, mu, logvar
def vae_loss(recon_x,x,mu,logvar):
mse = F.mse_loss(recon_x,x,reduction='sum')
kld = -0.5*torch.sum(1+logvar - mu.pow(2)-logvar.exp())
return mse+kld
##################################################
# 4) COSTANTI E MAPPING FEATURE
##################################################
SHAPE_MAPPING = {"axisymmetric":0,"sheet_metal":1,"alloy_plate":2,"complex_plastic":3}
ML_FEATURES = ["length","width","shape_code","weight","thickness","RUL","margin","compat_dim"]
VAE_FEATURES = ["length","width","weight","thickness","shape_code"]
##################################################
# 5) UTILITY: dimension_match e assign_class
##################################################
def dimension_match(r, target_len, target_wid, t_shape, t_w, t_th,
tol_len, tol_wid, tol_we, tol_th):
c_len= abs(r["length"]-target_len)<=tol_len
c_wid= abs(r["width"]-target_wid)<= tol_wid
c_shp= (r["shape"]==t_shape)
c_wei= abs(r["weight"]-t_w)<=tol_we
c_thi= abs(r["thickness"]-t_th)<=tol_th
return 1 if (c_len and c_wid and c_shp and c_wei and c_thi) else 0
def assign_class(r, thr_score=0.5, alpha=0.5, beta=0.5):
rul_norm = r["RUL"]/1000.0
margin_norm= (r["margin"]+200)/800.0
score= alpha*rul_norm + beta*margin_norm
if r["compat_dim"]==1 and score>=thr_score:
return "Riutilizzo Funzionale"
else:
return "Upcycling Creativo"
##################################################
# STEP 1: DATASET
##################################################
def step1_dataset():
st.header("Step 1: Dataset")
colA, colB = st.columns(2)
with colA:
data_opt= st.radio("Fonte Dati", ["Genera","Carica CSV"], horizontal=True)
data=None
if data_opt=="Genera":
n= st.slider("Campioni sintetici",100,2000,300,step=100)
if st.button("Genera"):
data= generate_synthetic_data(n_samples=n)
st.session_state["data_source"]="generated"
else:
upl= st.file_uploader("Carica CSV con col. minime [length,width,RUL,margin,shape,weight,thickness]", type=["csv"])
if upl:
df= pd.read_csv(upl)
needed=["length","width","RUL","margin","shape","weight","thickness"]
if not all(c in df.columns for c in needed):
st.error("CSV non valido. Manca qualche colonna.")
else:
data=df
st.session_state["data_source"]="uploaded"
with colB:
st.markdown("**Parametri Compatibilità**")
t_len= st.number_input("Lunghezza target",50.0,300.0,100.0)
t_wid= st.number_input("Larghezza target",20.0,200.0,50.0)
t_shp= st.selectbox("Forma target", list(SHAPE_MAPPING.keys()))
t_wei= st.number_input("Peso target (kg)",5.0,300.0,80.0)
t_thi= st.number_input("Spessore target (mm)",0.5,50.0,8.0)
st.markdown("**Tolleranze**")
tol_len= st.slider("Tol len ±",0.0,30.0,5.0)
tol_wid= st.slider("Tol wid ±",0.0,20.0,3.0)
tol_wei= st.slider("Tol weight ±",0.0,50.0,10.0)
tol_thi= st.slider("Tol thick ±",0.0,5.0,1.0)
st.markdown("**Score RUL & Margin**")
thr= st.slider("Soglia Score",0.0,1.0,0.5)
alpha= st.slider("Peso RUL(α)",0.0,1.0,0.5)
beta= st.slider("Peso Margin(β)",0.0,1.0,0.5)
if data is not None:
data['shape_code']= data['shape'].map(SHAPE_MAPPING).fillna(-1).astype(int)
data['compat_dim']= data.apply(lambda r: dimension_match(r, t_len, t_wid, t_shp, t_wei, t_thi,
tol_len, tol_wid, tol_wei, tol_thi),
axis=1)
data['Target'] = data.apply(lambda r: assign_class(r, thr_score=thr, alpha=alpha, beta=beta), axis=1)
st.dataframe(data.head(10))
st.write("Distrib. Target:", data["Target"].value_counts())
st.session_state["data"]= data
csv=data.to_csv(index=False).encode('utf-8')
st.download_button("Scarica dataset elaborato", csv, "dataset_processed.csv")
##################################################
# STEP 2: ADD ML
##################################################
def step2_trainML():
st.header("Step 2: Addestramento ML")
data= st.session_state.get("data",None)
if data is None:
st.error("Devi completare Step 1.")
return
if "Target" not in data.columns:
st.error("Manca colonna 'Target'. Rivedi Step 1.")
return
features_ml=[f for f in ML_FEATURES if f in data.columns]
if not features_ml:
st.error("Mancano feature minime ML.")
return
X= data[features_ml]
y= data["Target"].map({"Riutilizzo Funzionale":0,"Upcycling Creativo":1})
if len(y.unique())<2:
st.error("Dataset ha una sola classe. Impossibile train.")
return
X_train,X_test,y_train,y_test= train_test_split(X,y,test_size=0.25,random_state=42,stratify=y)
st.write(f"Train={len(X_train)}, Test={len(X_test)}")
trained={}
results=[]
for nome,model in MODELS_ML.items():
st.subheader(f"Modello: {nome}")
from sklearn.pipeline import Pipeline
pipe= Pipeline([
("scaler",StandardScaler()),
("clf",model)
])
try:
pipe.fit(X_train,y_train)
y_pred= pipe.predict(X_test)
acc= accuracy_score(y_test,y_pred)
f1= f1_score(y_test,y_pred,average='weighted')
results.append({"Modello":nome,"Accuracy":acc,"F1":f1})
trained[nome]= pipe
cm= confusion_matrix(y_test,y_pred)
fig, ax= plt.subplots()
sns.heatmap(cm, annot=True, fmt='d', cmap="Greens", ax=ax)
plt.xlabel("Pred")
plt.ylabel("True")
st.pyplot(fig)
st.metric("Accuracy",f"{acc:.3f}")
st.metric("F1 Score",f"{f1:.3f}")
except Exception as e:
st.error(f"Errore training {nome}: {e}")
if results:
df_r= pd.DataFrame(results).sort_values(by="Accuracy", ascending=False)
st.dataframe(df_r)
st.session_state["models"]= trained
st.session_state["ml_results"]= df_r
else:
st.error("Nessun modello addestrato.")
st.session_state["models"]=None
##################################################
# STEP 2B: TRAIN VAE
##################################################
def step2b_trainVAE():
st.header("Step 2B: Training VAE per Upcycling")
data= st.session_state.get("data",None)
if data is None:
st.error("Completa Step 1.")
return
feats= [f for f in VAE_FEATURES if f in data.columns]
if not feats:
st.error(f"Mancano feature per VAE: {VAE_FEATURES}")
return
st.write("Useremo le feature:", feats)
lat_dim= st.slider("Dim latente VAE",2,10,2)
ep= st.number_input("Epochs",10,300,50)
lr= st.number_input("Learning Rate",1e-5,1e-2,1e-3, format="%e")
bs= st.selectbox("Batch size",[16,32,64],index=1)
if not st.session_state.get("vae_trained",False):
st.warning("VAE non addestrato")
if st.button("Allena VAE"):
st.session_state["vae"]= MiniVAE(input_dim=len(feats), latent_dim=lat_dim)
from sklearn.preprocessing import StandardScaler
X_vae= data[feats].copy()
for c in X_vae.columns:
if X_vae[c].isnull().any():
X_vae[c].fillna(X_vae[c].median(), inplace=True)
scaler= StandardScaler()
X_s= scaler.fit_transform(X_vae)
st.session_state["vae_scaler"]= scaler
dataset= torch.utils.data.TensorDataset(torch.tensor(X_s,dtype=torch.float32))
loader= torch.utils.data.DataLoader(dataset,batch_size=bs,shuffle=True)
vae= st.session_state["vae"]
opt= torch.optim.Adam(vae.parameters(),lr=lr)
losses=[]
vae.train()
for epoch in range(int(ep)):
ep_loss=0
for (batch,) in loader:
opt.zero_grad()
recon, mu, logvar= vae(batch)
loss= vae_loss(recon,batch,mu,logvar)
loss.backward()
opt.step()
ep_loss+=loss.item()
avgL= ep_loss/len(dataset)
losses.append(avgL)
st.progress((epoch+1)/ep)
st.success(f"VAE addestrato (Loss ~ {avgL:.2f})")
st.line_chart(losses)
st.session_state["vae_trained"]= True
else:
st.success("VAE già addestrato.")
if st.button("Riallena"):
st.session_state["vae_trained"]=False
st.experimental_rerun()
##################################################
# STEP 3: Upcycling Generative
##################################################
def step3_upcycling_generative():
st.header("Step 3: Upcycling Generative - VAE + GenAI")
if not st.session_state.get("vae_trained",False):
st.error("Devi addestrare il VAE in Step 2B prima.")
return
vae= st.session_state.get("vae",None)
vae_scaler= st.session_state.get("vae_scaler",None)
if vae is None or vae_scaler is None:
st.error("Mancano vae o scaler.")
return
lat_dim= vae.fc21.out_features
st.write(f"VAE con lat_dim={lat_dim}. Generiamo idee upcycling.")
n_ideas= st.number_input("Quante idee generare",1,10,3)
if st.button("Genera Upcycling"):
vae.eval()
with torch.no_grad():
z=torch.randn(n_ideas,lat_dim)
recon= vae.decode(z)
arr= recon.numpy()
try:
df_gen= pd.DataFrame(vae_scaler.inverse_transform(arr), columns=vae_scaler.feature_names_in_)
# shape_code -> shape
if 'shape_code' in df_gen.columns:
df_gen['shape_code']= df_gen['shape_code'].round().astype(int)
inv_map={0:"axisymmetric",1:"sheet_metal",2:"alloy_plate",3:"complex_plastic"}
df_gen['shape']= df_gen['shape_code'].map(inv_map).fillna('unknown')
st.subheader("Configurazioni Generate (VAE)")
st.dataframe(df_gen.round(2))
# Aggiungiamo GenAI test
st.markdown("### Suggerimenti Testuali Upcycling (distilgpt2)")
text_generator = pipeline("text-generation",
model="distilgpt2",
device=0 if torch.cuda.is_available() else -1)
def gen_upcycle_text(shape, thick, wei):
prompt = (
f"Ho un componente EoL con forma {shape}, spessore {thick:.1f} mm, peso {wei:.1f} kg.\n"
"Dammi un'idea creativa di upcycling in italiano, con passaggi principali:"
)
out= text_generator(prompt, max_new_tokens=50, do_sample=True, top_k=50)
return out[0]["generated_text"]
for i, row in df_gen.iterrows():
sh= row.get("shape","unknown")
tk= row.get("thickness",1.0)
we= row.get("weight",10.0)
text_sugg= gen_upcycle_text(sh, tk, we)
st.write(f"**Idea {i+1}**: shape={sh}, thickness={tk:.1f}, weight={we:.1f}")
st.info(text_sugg)
st.markdown("---")
except Exception as e:
st.error(f"Errore decodifica VAE: {e}")
##################################################
# DASHBOARD
##################################################
def show_dashboard():
st.header("Dashboard")
data= st.session_state.get("data",None)
if data is None:
st.error("Nessun dataset.")
return
st.write("Distribuzione classi Target:", data["Target"].value_counts())
if "ml_results" in st.session_state:
st.subheader("Risultati ML")
st.dataframe(st.session_state["ml_results"])
else:
st.info("Nessun risultato ML")
if st.session_state.get("vae_trained",False):
st.success("VAE addestrato.")
else:
st.warning("VAE non addestrato.")
##################################################
# HELP
##################################################
def show_help():
st.header("ℹ️ Guida Quattro Step")
st.markdown("""
1. **Step 1: Dataset**
Generi o carichi CSV, definisci compatibilità, e assegni 'Riutilizzo' vs 'Upcycling'.
2. **Step 2: Addestramento ML**
Allena modelli (RandomForest, ecc.) su [Riutilizzo vs Upcycling].
3. **Step 2B: Training VAE**
Allena VAE sulle feature geometriche (length, width, weight, thickness, shape_code).
4. **Step 3: Upcycling Generative**
Genera N configurazioni col VAE e, per ognuna, ottieni un testo creativo di upcycling con un modello HF (distilgpt2).
**Dashboard**: metriche.
**Reset**: pulsante nella sidebar che cancella lo state.
""")
##################################################
# RESET
##################################################
def reset_app():
for k in ["data","models","ml_results","vae","vae_trained","vae_scaler","data_source","params_dim"]:
if k in st.session_state:
del st.session_state[k]
st.success("App resettata.")
st.experimental_rerun()
##################################################
# MAIN
##################################################
def main():
st.sidebar.title("WEEKO – 4 Step Flow")
step= st.sidebar.radio("Fasi:",[
"Step 1: Dataset",
"Step 2: Addestramento ML",
"Step 2B: Training VAE",
"Step 3: Upcycling Generative",
"Dashboard",
"Help"
])
if st.sidebar.button("Reset App"):
reset_app()
if step=="Step 1: Dataset":
step1_dataset()
elif step=="Step 2: Addestramento ML":
step2_trainML()
elif step=="Step 2B: Training VAE":
step2b_trainVAE()
elif step=="Step 3: Upcycling Generative":
step3_upcycling_generative()
elif step=="Dashboard":
show_dashboard()
elif step=="Help":
show_help()
if __name__=="__main__":
main()