# -*- coding: utf-8 -*-
import logging
import os
import copy
from datetime import datetime
from buildingmodel import data_path
from pathlib import Path
from shapely.validation import make_valid
import polars as pl
import geopandas as gpd
import numpy as np
import pandas as pd
import shapely
from shapely.geometry.polygon import orient, Polygon
from shapely.geometry import MultiPolygon, GeometryCollection
import logging
logger = logging.getLogger("buildingmodel")
from buildingmodel.exceptions import BuildingGeometryError
geo_levels = ["district", "city", "city_group", "department", "region"]
roof_materials = {
0: "INDETERMINE",
1: "TUILES",
2: "ARDOISES",
3: "ZINC ALUMINIUM",
4: "BETON",
9: "AUTRES",
}
wall_materials = {
0: "INDETERMINE",
1: "PIERRE",
2: "MEULIERE",
3: "BETON",
4: "BRIQUES",
5: "AGGLOMERE",
6: "BOIS",
9: "AUTRES",
}
residential_types = {
1: "house",
2: "apartment",
}
[docs]
def load_data(building_data, parameters):
"""Loads GIS building data
loads a GeoDataframe describing building data, orients the polygons in CCW and checks the validity of the parameters
and geometry
Args:
building_data (GeoDataframe or str): either a GeoDataframe containing building data or a path to a GIS file
parameters (Parameters) : global parameter
Returns:
GeoDataframe : GeoDataframe containing validated geometry and building description data
"""
if isinstance(building_data, gpd.GeoDataFrame):
buildings = building_data
logger.info(f"Data loaded. Contains {buildings.shape[0]} rows.")
elif os.path.exists(building_data):
if building_data.suffix == ".parquet":
buildings = gpd.read_parquet(building_data)
else:
buildings = gpd.read_file(building_data, encoding=parameters.encoding)
logger.info(f"File {building_data} opened. Contains {buildings.shape[0]} rows.")
else:
logger.error(
"building_data argument must be a GeoDataframe or a path to GIS file"
)
if buildings.crs.name != "RGF93_Lambert_93":
buildings = buildings.to_crs("EPSG:2154")
buildings["name"] = buildings[parameters.id_col]
# filtering according to categories in category_filter
for col, value_list in parameters.category_filters.items():
buildings = buildings.loc[buildings[col].isin(value_list), :]
# force geometry, name and id to be kept
for val in ["geometry", "name"]:
if val not in parameters.rename_dict.keys():
parameters.rename_dict[val] = val
if "construction_year_class" in buildings.columns:
parameters.rename_dict["construction_year_class"] = "construction_year_class"
if "residential_type" in buildings.columns:
parameters.rename_dict["residential_type"] = "residential_type"
# renaming entries according to rename_dict to obtain consistent column names
buildings = buildings.rename(columns=parameters.rename_dict)
buildings = buildings.loc[:, list(parameters.rename_dict.values())]
# rename usages and change type of usage to category
buildings.main_usage = buildings.main_usage.replace(parameters.usage_dict).astype(
"category"
)
buildings.secondary_usage = buildings.secondary_usage.replace(
parameters.usage_dict
).astype("category")
# remove incompatible geometries (non-Polygon) and orient Polygons CCW
buildings = clean_geometry(buildings)
# set altitude using altitude_max and altitude_min
buildings = set_altitude(buildings)
# year construction class from construction date
if "construction_year_class" not in buildings.columns:
set_construction_year_class(
buildings, parameters.date_format, parameters.construction_year_class
)
# # simplifying wall and roof material
set_materials(buildings)
# set residential type according to number of dwellings
if "residential_type" not in buildings.columns:
set_residential_type(buildings)
# when height is missing, set it using floor count and vice versa
set_height_floor_count(buildings, parameters.floor_height)
# set floor area
buildings["floor_area"] = buildings.geometry.area * buildings.floor_count
# filtering according to minimal floor area
buildings = buildings.loc[
buildings["floor_area"] >= parameters.minimal_floor_area, :
]
# set residential_only to True for buildings that contain only dwellings
set_residential_only(buildings)
# height filter
buildings = buildings.loc[~buildings["height"].isna(), :]
buildings = buildings.loc[buildings["height"] >= parameters.minimal_height, :]
buildings.loc[buildings["dwelling_count"].isna(), "dwelling_count"] = 0
# set district by spatial join with district boundaries
buildings = set_district(buildings, parameters)
buildings.reset_index(drop=True, inplace=True)
buildings["building_id"] = buildings.index
building_geometries = buildings[["geometry", "building_id", "height", "altitude"]]
buildings = pl.from_pandas(buildings.drop(columns="geometry"))
return buildings, building_geometries
[docs]
def clean_geometry(buildings):
"""
* Ensures that building geometries are Polygons. If they are MultiPolygons, the first Polygon is taken. If they
are neither, an exception is raised
* Removes 3rd coordinate if it is present
* Orients the polygon in counter clockwise order
Args:
buildings:
Returns:
"""
buildings.geometry = buildings.geometry.apply(check_geometry)
buildings = buildings.loc[buildings.geometry.type.isin(["Polygon"]), :].copy()
buildings.geometry = buildings.geometry.apply(shape_to_2d)
buildings.geometry = buildings.geometry.apply(lambda x: orient(x))
return buildings
def check_geometry(shape):
if isinstance(shape, Polygon):
if shape.is_valid:
return shape
else:
return check_geometry(make_valid(shape))
elif isinstance(shape, MultiPolygon):
shape = shape.geoms[0]
if shape.is_valid:
return shape
else:
return check_geometry(make_valid(shape))
elif isinstance(shape, GeometryCollection):
shape = shape.geoms[0]
if shape.is_valid:
return shape
else:
return check_geometry(make_valid(shape))
else:
return shape
def shape_to_2d(shape):
def remove_z(x, y, z=None):
return tuple(filter(None, [x, y]))
return shapely.ops.transform(remove_z, shape)
[docs]
def set_residential_type(buildings):
"""
:param buildings:
:return:
"""
buildings["residential_type"] = None
buildings.loc[(buildings["dwelling_count"] == 1), "residential_type"] = 1
buildings.loc[(buildings["dwelling_count"] >= 2), "residential_type"] = 2
for i in residential_types.keys():
buildings.loc[buildings.residential_type == i, "residential_type"] = (
residential_types[i]
)
buildings.loc[buildings.residential_type.isna(), "residential_type"] = "other"
buildings.residential_type = buildings.residential_type.astype("category")
buildings["multi_dwelling"] = False
buildings.loc[(buildings["dwelling_count"] >= 2), "multi_dwelling"] = True
[docs]
def set_residential_only(buildings):
"""
Use the main_usage and secondary_usage to determine if a building has only a residential usage
Args:
buildings:
Returns:
"""
buildings["residential_only"] = False
buildings.loc[
(buildings.main_usage == "residential") & (buildings.secondary_usage.isna()),
"residential_only",
] = True
buildings.loc[
(buildings.main_usage == "residential")
& (buildings.secondary_usage.isin(["unknown", "annex"])),
"residential_only",
] = True
buildings["has_annex"] = False
buildings.loc[
(buildings.main_usage == "residential")
& (buildings.secondary_usage == "annex"),
"has_annex",
] = True
def set_height_floor_count(buildings, floor_height):
for res_type in ["house", "apartment", "other"]:
default_floor_height = (
floor_height[res_type]["min"] + floor_height[res_type]["max"]
) / 2.0
# if height is nan and floor count is not, floor count is used to obtain height
buildings.loc[
(buildings["height"].isna())
& (~buildings["floor_count"].isna())
& (buildings["residential_type"] == res_type),
"height",
] = (
buildings.loc[
(buildings["height"].isna())
& (~buildings["floor_count"].isna())
& (buildings["residential_type"] == res_type),
"floor_count",
]
* default_floor_height
)
# if floor count is nan and height is not, height is used to obtain floor count
buildings.loc[
(buildings["floor_count"].isna())
& (buildings["residential_type"] == res_type)
& (~buildings["height"].isna()),
"floor_count",
] = (
buildings.loc[
(buildings["floor_count"].isna())
& (~buildings["height"].isna())
& (buildings["residential_type"] == res_type),
"height",
]
/ default_floor_height
).astype(int)
# when height and floor_count are not compatible, floor count is used to adjust height
ids = buildings.loc[
(
(buildings["floor_count"] * floor_height[res_type]["min"])
> (buildings["height"])
)
& (buildings["residential_type"] == res_type)
].index
buildings.loc[ids, "height"] = (
buildings.loc[ids, "floor_count"] * floor_height[res_type]["min"]
)
ids = buildings.loc[
(
(buildings["floor_count"] * floor_height[res_type]["max"])
< (buildings["height"])
)
& (buildings["residential_type"] == res_type)
].index
buildings.loc[ids, "height"] = (
buildings.loc[ids, "floor_count"] * floor_height[res_type]["max"]
)
buildings.loc[buildings.floor_count.isna(), "floor_count"] = 1.0
buildings["height"] = buildings["height"].astype(float).round()
def set_altitude(buildings):
# altitude filtering and simplification
buildings = buildings.loc[~buildings["altitude_min"].isna(), :].copy()
buildings["altitude"] = None
buildings.loc[buildings["altitude_max"].isna(), "altitude"] = buildings[
"altitude_min"
]
buildings.loc[~buildings["altitude_max"].isna(), "altitude"] = (
buildings["altitude_min"] + buildings["altitude_max"]
) / 2.0
return buildings
def set_materials(buildings):
for boundary in ["wall", "roof"]:
buildings[f"{boundary}_class"] = buildings[f"{boundary}_material"]
buildings.loc[buildings[f"{boundary}_class"].isna(), f"{boundary}_class"] = "0"
buildings.loc[buildings[f"{boundary}_class"] == "", f"{boundary}_class"] = "0"
buildings[f"{boundary}_class"] = buildings[f"{boundary}_class"].astype(int)
buildings.loc[buildings[f"{boundary}_class"] >= 10, f"{boundary}_class"] = (
buildings.loc[buildings[f"{boundary}_class"] >= 10, f"{boundary}_class"]
// 10
)
# buildings[f'{boundary}_class'] = f'{boundary}_materials'[buildings[f'{boundary}_class']]
buildings["roof_class"] = buildings["roof_class"].replace(roof_materials)
buildings["wall_class"] = buildings["wall_class"].replace(wall_materials)
buildings.wall_class = buildings.wall_class.astype("category")
buildings.roof_class = buildings.roof_class.astype("category")
def set_construction_year_class(buildings, date_format, construction_year_class):
m = ~(
buildings.construction_date.isnull() | (buildings.construction_date == "None")
)
if m.sum() == 0:
buildings["construction_year"] = None
buildings["construction_year_class"] = None
buildings.construction_year_class = buildings.construction_year_class.astype(
"category"
)
return None
buildings.loc[m, "construction_year"] = buildings.loc[m, "construction_date"].apply(
lambda x: datetime.strptime(x, date_format).year
if isinstance(x, str)
else x.year
)
buildings["construction_year_class"] = None
for key, val in construction_year_class.items():
buildings.loc[
(buildings["construction_year"] >= val[0])
& (buildings["construction_year"] <= val[1]),
"construction_year_class",
] = "[" + str(val[0]) + ", " + str(val[1]) + "]"
buildings.construction_year_class = buildings.construction_year_class.astype(
"category"
)
[docs]
def set_district(buildings, parameters):
"""
Args:
buildings:
Returns:
"""
district_buffer = buildings.geometry.union_all().convex_hull.buffer(50.0e3).bounds
districts = gpd.read_parquet(
Path(data_path["gis"]) / parameters.districts, bbox=district_buffer
)
gas_data = ["has_network_city_level", "has_network_grdf_data"]
districts[gas_data] = districts[gas_data].astype(bool).fillna(True)
building_with_district = gpd.sjoin(
buildings, districts, how="left", predicate="within"
)
district_spatial_index = districts.sindex
for build_idx in building_with_district.loc[
building_with_district["district"].isnull(), :
].index:
building_poly = building_with_district.loc[build_idx, "geometry"]
possible_matches_index = list(
district_spatial_index.intersection(building_poly.bounds)
)
possible_matches = districts.iloc[possible_matches_index]
precise_matches = possible_matches[possible_matches.intersects(building_poly)]
tmp_area = 0.0
geo_levels_code = None
for admin_idx in precise_matches.index:
new_area = (
precise_matches.loc[admin_idx, "geometry"]
.intersection(building_poly)
.area
)
if new_area > tmp_area:
tmp_area = new_area
geo_levels_code = precise_matches.loc[admin_idx, geo_levels]
building_with_district.loc[build_idx, geo_levels] = geo_levels_code
buildings[geo_levels + gas_data] = building_with_district[
geo_levels + gas_data
].values
buildings = buildings.astype({g_l: "category" for g_l in geo_levels})
return buildings