Rhodham96's picture
Update preprocessing/pipeline_components.py
933cc13 verified
import os
os.environ["PGEOCODE_CACHE_DIR"] = "/tmp/pgeocode"
import pandas as pd
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from scipy.stats import gaussian_kde
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import NearestNeighbors
from datetime import datetime
import pgeocode
# IMMEDIATELY after importing pgeocode, force its STORAGE_DIR
# This must be done before any pgeocode.Nominatim() calls
try:
pgeocode.STORAGE_DIR = "/tmp/pgeocode"
# Ensure the directory exists as pgeocode might not create it if overridden this way
os.makedirs(pgeocode.STORAGE_DIR, exist_ok=True)
print(f"DEBUG: Successfully forced pgeocode.STORAGE_DIR to '{pgeocode.STORAGE_DIR}' and ensured directory exists.")
except Exception as e:
print(f"ERROR: Failed to force pgeocode.STORAGE_DIR or create directory: {e}")
class DataCleaner(BaseEstimator, TransformerMixin):
def __init__(self):
self.col_types = {
'id': 'int', 'type': 'str', 'subtype': 'str', 'bedroomCount': 'int',
'bathroomCount': 'int', 'province': 'str', 'locality': 'str',
'postCode': 'int', 'habitableSurface': 'float', 'hasBasement': 'int',
'buildingCondition': 'str', 'buildingConstructionYear': 'int',
'hasLift': 'int', 'floodZoneType': 'str', 'heatingType': 'str',
'hasHeatPump': 'int', 'hasPhotovoltaicPanels': 'int',
'hasThermicPanels': 'int', 'kitchenType': 'str', 'landSurface': 'float',
'hasLivingRoom': 'int', 'livingRoomSurface': 'float', 'hasGarden': 'int',
'gardenSurface': 'float', 'parkingCountIndoor': 'int',
'parkingCountOutdoor': 'int', 'hasAirConditioning': 'int',
'hasArmoredDoor': 'int', 'hasVisiophone': 'int', 'hasOffice': 'int',
'toiletCount': 'int', 'hasSwimmingPool': 'int', 'hasFireplace': 'int',
'hasTerrace': 'int', 'terraceSurface': 'float', 'terraceOrientation': 'str',
'epcScore': 'str', 'facadeCount': 'int'
}
self.kitchenType_mode = None
def fit(self, X, y=None):
#self.kitchenType_mode = X['kitchenType'].mode()[0]
return self
def transform(self, X):
df = X.copy()
# Drop unnecessary columns
df = df.drop(columns=[col for col in ["Unnamed: 0", "url"] if col in df.columns])
df = df.drop(columns=[col for col in ['monthlyCost', 'hasBalcony', 'accessibleDisabledPeople',
'roomCount', 'diningRoomSurface', 'streetFacadeWidth',
'gardenOrientation', 'kitchenSurface', 'floorCount',
'hasDiningRoom', 'hasDressingRoom'] if col in df.columns])
# Handle binary columns
binary_cols = [
'hasBasement', 'hasLift', 'hasHeatPump', 'hasPhotovoltaicPanels',
'hasAirConditioning', 'hasArmoredDoor', 'hasVisiophone', 'hasOffice',
'hasSwimmingPool', 'hasFireplace', 'parkingCountIndoor', 'parkingCountOutdoor',
'hasAttic', 'hasThermicPanels'
]
for col in binary_cols:
df[col] = df[col].map({True: 1, False: 0, 'True': 1, 'False': 0, 'YES': 1, 'NO': 0}).fillna(0).astype(int)
# Handle dependent columns
df['hasLivingRoom'] = df['hasLivingRoom'].map({True: 1, False: 0, 'True': 1, 'False': 0, 'YES': 1, 'NO': 0})
df.loc[df['hasLivingRoom'].isna(), 'hasLivingRoom'] = df['livingRoomSurface'].notnull().astype(int)
df['hasGarden'] = df['hasGarden'].map({True: 1, False: 0, 'True': 1, 'False': 0, 'YES': 1, 'NO': 0})
df.loc[df['hasGarden'].isna(), 'hasGarden'] = df['gardenSurface'].notnull().astype(int)
df['hasTerrace'] = df['hasTerrace'].map({True: 1, False: 0, 'True': 1, 'False': 0, 'YES': 1, 'NO': 0})
df.loc[df['hasTerrace'].isna(), 'hasTerrace'] = df['terraceSurface'].notnull().astype(int)
# Set surfaces to 0 when feature is not present
df.loc[df['hasLivingRoom'] == 0, 'livingRoomSurface'] = 0
df.loc[df['hasGarden'] == 0, 'gardenSurface'] = 0
df.loc[df['hasTerrace'] == 0, 'terraceSurface'] = 0
df.loc[df['hasTerrace'] == 0, 'terraceOrientation'] = 0
# Handle facade count
df['facadeCount'] = df['facadeCount'].fillna(-1)
# Fill missing values
df['bedroomCount'] = df['bedroomCount'].fillna(-1).astype(float)
df['bathroomCount'] = df['bathroomCount'].fillna(-1).astype(float)
df['toiletCount'] = df['toiletCount'].fillna(-1).astype(float)
# Drop habitable surface na
df = df.dropna(subset=['habitableSurface'])
# Fill other missing values
df['buildingCondition'] = df['buildingCondition'].fillna('NOT_MENTIONED')
df['floodZoneType'] = df['floodZoneType'].fillna('NON_FLOOD_ZONE')
df['heatingType'] = df['heatingType'].fillna(df['heatingType'].mode()[0])
df['hasThermicPanels'] = df['hasThermicPanels'].fillna(0.0)
df['kitchenType'] = df['kitchenType'].fillna(df['kitchenType'].mode()[0])
df['landSurface'] = df['landSurface'].fillna(df['landSurface'].median())
df['livingRoomSurface'] = df['livingRoomSurface'].fillna(df['livingRoomSurface'].median())
# Transform building construction year into age and fillna(-1)
current_year = datetime.now().year
df['buildingAge'] = current_year - df['buildingConstructionYear']
df['buildingAge'] = df['buildingAge'].fillna(-1)
# Handle terrace surface and orientation
median_terrace = df.loc[(df['hasTerrace'] == 1) & (df['terraceSurface'].notnull()), 'terraceSurface'].median()
df.loc[(df['hasTerrace'] == 1) & (df['terraceSurface'].isna()), 'terraceSurface'] = -1
df.loc[(df['hasTerrace'] != 1) & (df['terraceSurface'].isna()), 'terraceSurface'] = 0
mode_terrace = df.loc[(df['hasTerrace'] == 1), 'terraceOrientation'].mode()[0]
df.loc[(df['hasTerrace'] == 1) & (df['terraceOrientation'].isna()), 'terraceOrientation'] = 'NOT_MENTIONED'
df.loc[(df['hasTerrace'] != 1) & (df['terraceOrientation'].isna()), 'terraceOrientation'] = 'NO_TERRACE'
# Convert data types
for col, dtype in self.col_types.items():
if col in df.columns:
if pd.api.types.is_integer_dtype(dtype):
df[col] = df[col].fillna(0).astype(dtype)
else:
df[col] = df[col].astype(dtype)
return df
class FeatureEngineer(BaseEstimator, TransformerMixin):
def __init__(self):
self.epc_mapping = {
'Flanders': {
'A++': 0, 'A+': 0, 'A': 100, 'B': 200, 'C': 300,
'D': 400, 'E': 500, 'F': 600, 'G': 700
},
'Wallonia': {
'A++': 0, 'A+': 50, 'A': 90, 'B': 170, 'C': 250,
'D': 330, 'E': 420, 'F': 510, 'G': 600
},
'Bruxelles': {
'A++': 0, 'A+': 0, 'A': 45, 'B': 95, 'C': 145,
'D': 210, 'E': 275, 'F': 345, 'G': 450
}
}
def fit(self, X, y=None):
return self
def transform(self, X):
df = X.copy()
if 'price' in df.columns:
# Filter out extremely high prices
high_price_count = (df['price'] > 1500000).sum()
df = df[df['price'] <= 1500000]
# Check for problematic values
zero_price = (df['price'] <= 0).sum()
zero_surface = (df['habitableSurface'] <= 0).sum()
# Handle problematic values
if zero_price > 0:
df.loc[df['price'] <= 0, 'price'] = np.nan
if zero_surface > 0:
df.loc[df['habitableSurface'] <= 0, 'habitableSurface'] = np.nan
# Add isHouse feature
df['isHouse'] = (df['type'] == 'HOUSE').astype(int)
# Add region information first
def get_region(zip_code):
if 1000 <= zip_code <= 1299:
return "Bruxelles"
elif 1300 <= zip_code <= 1499 or 4000 <= zip_code <= 7999:
return "Wallonia"
else:
return "Flanders"
df['region'] = df['postCode'].apply(get_region)
if 'price' in df.columns:
# Now add price per m2
df['pricePerM2'] = df['price'] / df['habitableSurface']
# Handle inf values
df['pricePerM2'] = df['pricePerM2'].replace([np.inf, -np.inf], np.nan)
# Fill NaN values with median by region
df['pricePerM2'] = df['pricePerM2'].fillna(-1)
# Convert EPC score
df['epcScore'] = df.apply(lambda row: self.epc_mapping.get(row['region'], {}).get(row['epcScore'], None), axis=1)
df['epcScore'] = df['epcScore'].fillna(-1)
# Convert building condition
condition_rating = {
'to restore': 0, 'to renovate': 1, 'to be done up': 2,
'good': 3, 'just renovated': 4, 'as new': 5
}
df['buildingCondition'] = (df['buildingCondition'].astype(str).str.strip().str.lower()
.map(condition_rating).fillna(-1).astype(int))
# Convert flood zone type
df['floodZoneType'] = (df['floodZoneType'] != 'NON_FLOOD_ZONE').astype(int)
return df
class CategoricalEncoder(BaseEstimator, TransformerMixin):
def __init__(self):
self.categorical_columns = ['province', 'heatingType', 'kitchenType', 'subtype', 'terraceOrientation']
def fit(self, X, y=None):
return self
def transform(self, X):
df = X.copy()
# One-hot encode categorical columns
for col in self.categorical_columns:
if col in df.columns:
df = pd.get_dummies(df, columns=[col], prefix=col, dtype=int)
return df
class CoordinateGetter(BaseEstimator, TransformerMixin):
def __init__(self):
pass
def fit(self, X, y=None):
return self
def transform(self, X):
df = X.copy()
if 'id' in df.columns:
df_giraffe = pd.read_csv('data/Giraffe.csv')
df_giraffe = df_giraffe[['propertyId', 'latitude', 'longitude']]
df_giraffe['id'] = df_giraffe['propertyId']
cols = df_giraffe.columns.tolist()
cols.remove('id')
new_order = ['id'] + cols
df_giraffe = df_giraffe[new_order]
df_giraffe = df_giraffe.drop(columns='propertyId')
df = df.merge(df_giraffe, on='id', how='left')
df = df.dropna(subset=['latitude', 'longitude'])
else :
print(f"DEBUG: PGEOCODE_CACHE_DIR from os.environ: '{os.environ.get('PGEOCODE_CACHE_DIR')}'")
print(f"DEBUG: XDG_CACHE_HOME from os.environ: '{os.environ.get('XDG_CACHE_HOME')}'")
print(f"DEBUG: os.path.expanduser('~'): '{os.path.expanduser('~')}'")
print(f"DEBUG: Current Working Directory: '{os.getcwd()}'")
# Try to create the /tmp/pgeocode directory to check permissions there
try:
temp_cache_dir = "/tmp/pgeocode"
os.makedirs(temp_cache_dir, exist_ok=True)
print(f"DEBUG: Successfully created/ensured existence of '{temp_cache_dir}'.")
except Exception as e:
print(f"DEBUG: Failed to create '{temp_cache_dir}': {e}")
nomi = pgeocode.Nominatim('be')
df['postCode'] = df['postCode'].astype(str)
unique_postcodes = df["postCode"].astype(str).unique()
geo_df = nomi.query_postal_code(list(unique_postcodes))
geo_df = geo_df[['postal_code', 'latitude', 'longitude']]
geo_df = geo_df.rename(columns={'postal_code': 'postCode'})
geo_df['postCode'] = geo_df['postCode'].astype(str)
df = df.merge(geo_df, on='postCode', how='left')
return df
class KDEKNNFeatureCreator(BaseEstimator, TransformerMixin):
def __init__(self, k=20):
self.k = k
self.scaler = StandardScaler()
self.knn = NearestNeighbors(n_neighbors=k)
self.train_prices = None
def fit(self, X, y=None):
if 'latitude' not in X.columns or 'longitude' not in X.columns:
print("Warning: Missing latitude/longitude columns")
return self
coords_scaled = self.scaler.fit_transform(X[['latitude', 'longitude']])
self.knn.fit(coords_scaled)
# Store training prices
self.train_prices = X['pricePerM2'].values
return self
def transform(self, X):
df = X.copy()
if 'latitude' not in df.columns or 'longitude' not in df.columns:
print("Warning: Missing latitude/longitude columns")
df['kde_price_per_m2_knn'] = np.nan
return df
coords_scaled = self.scaler.transform(df[['latitude', 'longitude']])
distances, indices = self.knn.kneighbors(coords_scaled)
kde_scores = []
invalid_kde_count = 0
for i in range(len(df)):
neighbor_idxs = indices[i]
# Use stored training prices for neighbors
neighbor_prices = self.train_prices[neighbor_idxs]
neighbor_prices = neighbor_prices[~np.isnan(neighbor_prices)]
if len(neighbor_prices) < 2:
kde_scores.append(np.nan)
invalid_kde_count += 1
continue
try:
kde = gaussian_kde(neighbor_prices)
value_to_evaluate = neighbor_prices.mean()
kde_score = kde(value_to_evaluate)[0]
if np.isfinite(kde_score):
kde_scores.append(kde_score)
else:
kde_scores.append(np.nan)
invalid_kde_count += 1
except Exception as e:
print(f"Error in KDE calculation for row {i}: {str(e)}")
kde_scores.append(np.nan)
invalid_kde_count += 1
df['kde_price_per_m2_knn'] = kde_scores
# Fill NaN values with median by region
df['kde_price_per_m2_knn'] = df['kde_price_per_m2_knn'].fillna(-1)
return df.drop(columns=['latitude', 'longitude'], errors='ignore')
class ColumnCleaner(BaseEstimator, TransformerMixin):
def __init__(self):
self.columns_to_drop = [
'id', 'postCode', 'buildingConstructionYear', 'type', 'locality', 'region',
'latitude', 'longitude', 'buildingConstructionYear'
]
def fit(self, X, y=None):
return self
def transform(self, X):
df = X.copy()
# Drop columns that are no longer needed
columns_to_drop = [col for col in self.columns_to_drop if col in df.columns]
df = df.drop(columns=columns_to_drop)
if 'pricePerM2' in df.columns:
df = df.drop(columns=['pricePerM2'])
# Ensure all remaining columns are numeric
non_numeric_cols = df.select_dtypes(include=['object', 'category']).columns
if len(non_numeric_cols) > 0:
# Convert any remaining categorical columns to numeric
for col in non_numeric_cols:
if col != 'price': # Don't encode the target variable
df[col] = pd.Categorical(df[col]).codes
# Reorganize columns to put price at the end
cols = df.columns.tolist()
if 'price' in cols:
cols.remove('price')
cols.append('price')
df = df[cols]
return df