|
|
import os |
|
|
os.environ["PGEOCODE_CACHE_DIR"] = "/tmp/pgeocode" |
|
|
|
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
from sklearn.base import BaseEstimator, TransformerMixin |
|
|
from scipy.stats import gaussian_kde |
|
|
from sklearn.preprocessing import StandardScaler |
|
|
from sklearn.neighbors import NearestNeighbors |
|
|
from datetime import datetime |
|
|
import pgeocode |
|
|
|
|
|
|
|
|
try: |
|
|
pgeocode.STORAGE_DIR = "/tmp/pgeocode" |
|
|
|
|
|
os.makedirs(pgeocode.STORAGE_DIR, exist_ok=True) |
|
|
print(f"DEBUG: Successfully forced pgeocode.STORAGE_DIR to '{pgeocode.STORAGE_DIR}' and ensured directory exists.") |
|
|
except Exception as e: |
|
|
print(f"ERROR: Failed to force pgeocode.STORAGE_DIR or create directory: {e}") |
|
|
|
|
|
class DataCleaner(BaseEstimator, TransformerMixin): |
|
|
def __init__(self): |
|
|
self.col_types = { |
|
|
'id': 'int', 'type': 'str', 'subtype': 'str', 'bedroomCount': 'int', |
|
|
'bathroomCount': 'int', 'province': 'str', 'locality': 'str', |
|
|
'postCode': 'int', 'habitableSurface': 'float', 'hasBasement': 'int', |
|
|
'buildingCondition': 'str', 'buildingConstructionYear': 'int', |
|
|
'hasLift': 'int', 'floodZoneType': 'str', 'heatingType': 'str', |
|
|
'hasHeatPump': 'int', 'hasPhotovoltaicPanels': 'int', |
|
|
'hasThermicPanels': 'int', 'kitchenType': 'str', 'landSurface': 'float', |
|
|
'hasLivingRoom': 'int', 'livingRoomSurface': 'float', 'hasGarden': 'int', |
|
|
'gardenSurface': 'float', 'parkingCountIndoor': 'int', |
|
|
'parkingCountOutdoor': 'int', 'hasAirConditioning': 'int', |
|
|
'hasArmoredDoor': 'int', 'hasVisiophone': 'int', 'hasOffice': 'int', |
|
|
'toiletCount': 'int', 'hasSwimmingPool': 'int', 'hasFireplace': 'int', |
|
|
'hasTerrace': 'int', 'terraceSurface': 'float', 'terraceOrientation': 'str', |
|
|
'epcScore': 'str', 'facadeCount': 'int' |
|
|
} |
|
|
self.kitchenType_mode = None |
|
|
|
|
|
def fit(self, X, y=None): |
|
|
|
|
|
return self |
|
|
|
|
|
def transform(self, X): |
|
|
df = X.copy() |
|
|
|
|
|
|
|
|
df = df.drop(columns=[col for col in ["Unnamed: 0", "url"] if col in df.columns]) |
|
|
df = df.drop(columns=[col for col in ['monthlyCost', 'hasBalcony', 'accessibleDisabledPeople', |
|
|
'roomCount', 'diningRoomSurface', 'streetFacadeWidth', |
|
|
'gardenOrientation', 'kitchenSurface', 'floorCount', |
|
|
'hasDiningRoom', 'hasDressingRoom'] if col in df.columns]) |
|
|
|
|
|
|
|
|
|
|
|
binary_cols = [ |
|
|
'hasBasement', 'hasLift', 'hasHeatPump', 'hasPhotovoltaicPanels', |
|
|
'hasAirConditioning', 'hasArmoredDoor', 'hasVisiophone', 'hasOffice', |
|
|
'hasSwimmingPool', 'hasFireplace', 'parkingCountIndoor', 'parkingCountOutdoor', |
|
|
'hasAttic', 'hasThermicPanels' |
|
|
] |
|
|
|
|
|
for col in binary_cols: |
|
|
df[col] = df[col].map({True: 1, False: 0, 'True': 1, 'False': 0, 'YES': 1, 'NO': 0}).fillna(0).astype(int) |
|
|
|
|
|
|
|
|
df['hasLivingRoom'] = df['hasLivingRoom'].map({True: 1, False: 0, 'True': 1, 'False': 0, 'YES': 1, 'NO': 0}) |
|
|
df.loc[df['hasLivingRoom'].isna(), 'hasLivingRoom'] = df['livingRoomSurface'].notnull().astype(int) |
|
|
|
|
|
df['hasGarden'] = df['hasGarden'].map({True: 1, False: 0, 'True': 1, 'False': 0, 'YES': 1, 'NO': 0}) |
|
|
df.loc[df['hasGarden'].isna(), 'hasGarden'] = df['gardenSurface'].notnull().astype(int) |
|
|
|
|
|
df['hasTerrace'] = df['hasTerrace'].map({True: 1, False: 0, 'True': 1, 'False': 0, 'YES': 1, 'NO': 0}) |
|
|
df.loc[df['hasTerrace'].isna(), 'hasTerrace'] = df['terraceSurface'].notnull().astype(int) |
|
|
|
|
|
|
|
|
df.loc[df['hasLivingRoom'] == 0, 'livingRoomSurface'] = 0 |
|
|
df.loc[df['hasGarden'] == 0, 'gardenSurface'] = 0 |
|
|
df.loc[df['hasTerrace'] == 0, 'terraceSurface'] = 0 |
|
|
df.loc[df['hasTerrace'] == 0, 'terraceOrientation'] = 0 |
|
|
|
|
|
|
|
|
df['facadeCount'] = df['facadeCount'].fillna(-1) |
|
|
|
|
|
|
|
|
df['bedroomCount'] = df['bedroomCount'].fillna(-1).astype(float) |
|
|
df['bathroomCount'] = df['bathroomCount'].fillna(-1).astype(float) |
|
|
df['toiletCount'] = df['toiletCount'].fillna(-1).astype(float) |
|
|
|
|
|
|
|
|
df = df.dropna(subset=['habitableSurface']) |
|
|
|
|
|
|
|
|
df['buildingCondition'] = df['buildingCondition'].fillna('NOT_MENTIONED') |
|
|
df['floodZoneType'] = df['floodZoneType'].fillna('NON_FLOOD_ZONE') |
|
|
df['heatingType'] = df['heatingType'].fillna(df['heatingType'].mode()[0]) |
|
|
df['hasThermicPanels'] = df['hasThermicPanels'].fillna(0.0) |
|
|
df['kitchenType'] = df['kitchenType'].fillna(df['kitchenType'].mode()[0]) |
|
|
df['landSurface'] = df['landSurface'].fillna(df['landSurface'].median()) |
|
|
df['livingRoomSurface'] = df['livingRoomSurface'].fillna(df['livingRoomSurface'].median()) |
|
|
|
|
|
|
|
|
current_year = datetime.now().year |
|
|
df['buildingAge'] = current_year - df['buildingConstructionYear'] |
|
|
df['buildingAge'] = df['buildingAge'].fillna(-1) |
|
|
|
|
|
|
|
|
median_terrace = df.loc[(df['hasTerrace'] == 1) & (df['terraceSurface'].notnull()), 'terraceSurface'].median() |
|
|
df.loc[(df['hasTerrace'] == 1) & (df['terraceSurface'].isna()), 'terraceSurface'] = -1 |
|
|
df.loc[(df['hasTerrace'] != 1) & (df['terraceSurface'].isna()), 'terraceSurface'] = 0 |
|
|
|
|
|
mode_terrace = df.loc[(df['hasTerrace'] == 1), 'terraceOrientation'].mode()[0] |
|
|
df.loc[(df['hasTerrace'] == 1) & (df['terraceOrientation'].isna()), 'terraceOrientation'] = 'NOT_MENTIONED' |
|
|
df.loc[(df['hasTerrace'] != 1) & (df['terraceOrientation'].isna()), 'terraceOrientation'] = 'NO_TERRACE' |
|
|
|
|
|
|
|
|
for col, dtype in self.col_types.items(): |
|
|
if col in df.columns: |
|
|
if pd.api.types.is_integer_dtype(dtype): |
|
|
df[col] = df[col].fillna(0).astype(dtype) |
|
|
else: |
|
|
df[col] = df[col].astype(dtype) |
|
|
|
|
|
return df |
|
|
|
|
|
class FeatureEngineer(BaseEstimator, TransformerMixin): |
|
|
def __init__(self): |
|
|
self.epc_mapping = { |
|
|
'Flanders': { |
|
|
'A++': 0, 'A+': 0, 'A': 100, 'B': 200, 'C': 300, |
|
|
'D': 400, 'E': 500, 'F': 600, 'G': 700 |
|
|
}, |
|
|
'Wallonia': { |
|
|
'A++': 0, 'A+': 50, 'A': 90, 'B': 170, 'C': 250, |
|
|
'D': 330, 'E': 420, 'F': 510, 'G': 600 |
|
|
}, |
|
|
'Bruxelles': { |
|
|
'A++': 0, 'A+': 0, 'A': 45, 'B': 95, 'C': 145, |
|
|
'D': 210, 'E': 275, 'F': 345, 'G': 450 |
|
|
} |
|
|
} |
|
|
|
|
|
def fit(self, X, y=None): |
|
|
return self |
|
|
|
|
|
def transform(self, X): |
|
|
df = X.copy() |
|
|
if 'price' in df.columns: |
|
|
|
|
|
high_price_count = (df['price'] > 1500000).sum() |
|
|
df = df[df['price'] <= 1500000] |
|
|
|
|
|
zero_price = (df['price'] <= 0).sum() |
|
|
zero_surface = (df['habitableSurface'] <= 0).sum() |
|
|
|
|
|
|
|
|
if zero_price > 0: |
|
|
df.loc[df['price'] <= 0, 'price'] = np.nan |
|
|
|
|
|
if zero_surface > 0: |
|
|
df.loc[df['habitableSurface'] <= 0, 'habitableSurface'] = np.nan |
|
|
|
|
|
|
|
|
df['isHouse'] = (df['type'] == 'HOUSE').astype(int) |
|
|
|
|
|
|
|
|
def get_region(zip_code): |
|
|
if 1000 <= zip_code <= 1299: |
|
|
return "Bruxelles" |
|
|
elif 1300 <= zip_code <= 1499 or 4000 <= zip_code <= 7999: |
|
|
return "Wallonia" |
|
|
else: |
|
|
return "Flanders" |
|
|
|
|
|
df['region'] = df['postCode'].apply(get_region) |
|
|
if 'price' in df.columns: |
|
|
|
|
|
df['pricePerM2'] = df['price'] / df['habitableSurface'] |
|
|
|
|
|
df['pricePerM2'] = df['pricePerM2'].replace([np.inf, -np.inf], np.nan) |
|
|
|
|
|
df['pricePerM2'] = df['pricePerM2'].fillna(-1) |
|
|
|
|
|
|
|
|
df['epcScore'] = df.apply(lambda row: self.epc_mapping.get(row['region'], {}).get(row['epcScore'], None), axis=1) |
|
|
df['epcScore'] = df['epcScore'].fillna(-1) |
|
|
|
|
|
|
|
|
condition_rating = { |
|
|
'to restore': 0, 'to renovate': 1, 'to be done up': 2, |
|
|
'good': 3, 'just renovated': 4, 'as new': 5 |
|
|
} |
|
|
df['buildingCondition'] = (df['buildingCondition'].astype(str).str.strip().str.lower() |
|
|
.map(condition_rating).fillna(-1).astype(int)) |
|
|
|
|
|
|
|
|
df['floodZoneType'] = (df['floodZoneType'] != 'NON_FLOOD_ZONE').astype(int) |
|
|
|
|
|
return df |
|
|
|
|
|
class CategoricalEncoder(BaseEstimator, TransformerMixin): |
|
|
def __init__(self): |
|
|
self.categorical_columns = ['province', 'heatingType', 'kitchenType', 'subtype', 'terraceOrientation'] |
|
|
|
|
|
def fit(self, X, y=None): |
|
|
return self |
|
|
|
|
|
def transform(self, X): |
|
|
df = X.copy() |
|
|
|
|
|
|
|
|
for col in self.categorical_columns: |
|
|
if col in df.columns: |
|
|
df = pd.get_dummies(df, columns=[col], prefix=col, dtype=int) |
|
|
|
|
|
return df |
|
|
|
|
|
class CoordinateGetter(BaseEstimator, TransformerMixin): |
|
|
def __init__(self): |
|
|
pass |
|
|
|
|
|
def fit(self, X, y=None): |
|
|
return self |
|
|
|
|
|
def transform(self, X): |
|
|
df = X.copy() |
|
|
if 'id' in df.columns: |
|
|
df_giraffe = pd.read_csv('data/Giraffe.csv') |
|
|
df_giraffe = df_giraffe[['propertyId', 'latitude', 'longitude']] |
|
|
|
|
|
df_giraffe['id'] = df_giraffe['propertyId'] |
|
|
cols = df_giraffe.columns.tolist() |
|
|
cols.remove('id') |
|
|
new_order = ['id'] + cols |
|
|
df_giraffe = df_giraffe[new_order] |
|
|
|
|
|
df_giraffe = df_giraffe.drop(columns='propertyId') |
|
|
|
|
|
df = df.merge(df_giraffe, on='id', how='left') |
|
|
df = df.dropna(subset=['latitude', 'longitude']) |
|
|
|
|
|
else : |
|
|
print(f"DEBUG: PGEOCODE_CACHE_DIR from os.environ: '{os.environ.get('PGEOCODE_CACHE_DIR')}'") |
|
|
print(f"DEBUG: XDG_CACHE_HOME from os.environ: '{os.environ.get('XDG_CACHE_HOME')}'") |
|
|
print(f"DEBUG: os.path.expanduser('~'): '{os.path.expanduser('~')}'") |
|
|
print(f"DEBUG: Current Working Directory: '{os.getcwd()}'") |
|
|
|
|
|
|
|
|
try: |
|
|
temp_cache_dir = "/tmp/pgeocode" |
|
|
os.makedirs(temp_cache_dir, exist_ok=True) |
|
|
print(f"DEBUG: Successfully created/ensured existence of '{temp_cache_dir}'.") |
|
|
except Exception as e: |
|
|
print(f"DEBUG: Failed to create '{temp_cache_dir}': {e}") |
|
|
|
|
|
nomi = pgeocode.Nominatim('be') |
|
|
|
|
|
df['postCode'] = df['postCode'].astype(str) |
|
|
unique_postcodes = df["postCode"].astype(str).unique() |
|
|
|
|
|
geo_df = nomi.query_postal_code(list(unique_postcodes)) |
|
|
geo_df = geo_df[['postal_code', 'latitude', 'longitude']] |
|
|
geo_df = geo_df.rename(columns={'postal_code': 'postCode'}) |
|
|
geo_df['postCode'] = geo_df['postCode'].astype(str) |
|
|
df = df.merge(geo_df, on='postCode', how='left') |
|
|
|
|
|
return df |
|
|
|
|
|
class KDEKNNFeatureCreator(BaseEstimator, TransformerMixin): |
|
|
def __init__(self, k=20): |
|
|
self.k = k |
|
|
self.scaler = StandardScaler() |
|
|
self.knn = NearestNeighbors(n_neighbors=k) |
|
|
self.train_prices = None |
|
|
|
|
|
def fit(self, X, y=None): |
|
|
if 'latitude' not in X.columns or 'longitude' not in X.columns: |
|
|
print("Warning: Missing latitude/longitude columns") |
|
|
return self |
|
|
|
|
|
coords_scaled = self.scaler.fit_transform(X[['latitude', 'longitude']]) |
|
|
self.knn.fit(coords_scaled) |
|
|
|
|
|
|
|
|
self.train_prices = X['pricePerM2'].values |
|
|
|
|
|
return self |
|
|
|
|
|
def transform(self, X): |
|
|
df = X.copy() |
|
|
|
|
|
if 'latitude' not in df.columns or 'longitude' not in df.columns: |
|
|
print("Warning: Missing latitude/longitude columns") |
|
|
df['kde_price_per_m2_knn'] = np.nan |
|
|
return df |
|
|
|
|
|
coords_scaled = self.scaler.transform(df[['latitude', 'longitude']]) |
|
|
distances, indices = self.knn.kneighbors(coords_scaled) |
|
|
|
|
|
kde_scores = [] |
|
|
|
|
|
invalid_kde_count = 0 |
|
|
|
|
|
for i in range(len(df)): |
|
|
neighbor_idxs = indices[i] |
|
|
|
|
|
neighbor_prices = self.train_prices[neighbor_idxs] |
|
|
neighbor_prices = neighbor_prices[~np.isnan(neighbor_prices)] |
|
|
|
|
|
if len(neighbor_prices) < 2: |
|
|
kde_scores.append(np.nan) |
|
|
invalid_kde_count += 1 |
|
|
continue |
|
|
|
|
|
try: |
|
|
kde = gaussian_kde(neighbor_prices) |
|
|
value_to_evaluate = neighbor_prices.mean() |
|
|
kde_score = kde(value_to_evaluate)[0] |
|
|
|
|
|
if np.isfinite(kde_score): |
|
|
kde_scores.append(kde_score) |
|
|
else: |
|
|
kde_scores.append(np.nan) |
|
|
invalid_kde_count += 1 |
|
|
except Exception as e: |
|
|
print(f"Error in KDE calculation for row {i}: {str(e)}") |
|
|
kde_scores.append(np.nan) |
|
|
invalid_kde_count += 1 |
|
|
|
|
|
df['kde_price_per_m2_knn'] = kde_scores |
|
|
|
|
|
|
|
|
df['kde_price_per_m2_knn'] = df['kde_price_per_m2_knn'].fillna(-1) |
|
|
|
|
|
return df.drop(columns=['latitude', 'longitude'], errors='ignore') |
|
|
|
|
|
class ColumnCleaner(BaseEstimator, TransformerMixin): |
|
|
def __init__(self): |
|
|
self.columns_to_drop = [ |
|
|
'id', 'postCode', 'buildingConstructionYear', 'type', 'locality', 'region', |
|
|
'latitude', 'longitude', 'buildingConstructionYear' |
|
|
] |
|
|
|
|
|
def fit(self, X, y=None): |
|
|
return self |
|
|
|
|
|
def transform(self, X): |
|
|
df = X.copy() |
|
|
|
|
|
|
|
|
columns_to_drop = [col for col in self.columns_to_drop if col in df.columns] |
|
|
df = df.drop(columns=columns_to_drop) |
|
|
if 'pricePerM2' in df.columns: |
|
|
df = df.drop(columns=['pricePerM2']) |
|
|
|
|
|
non_numeric_cols = df.select_dtypes(include=['object', 'category']).columns |
|
|
if len(non_numeric_cols) > 0: |
|
|
|
|
|
for col in non_numeric_cols: |
|
|
if col != 'price': |
|
|
df[col] = pd.Categorical(df[col]).codes |
|
|
|
|
|
|
|
|
cols = df.columns.tolist() |
|
|
if 'price' in cols: |
|
|
cols.remove('price') |
|
|
cols.append('price') |
|
|
df = df[cols] |
|
|
|
|
|
return df |