Source code for buildingmodel.io.gis

# -*- coding: utf-8 -*-

import logging
import os
import copy
from datetime import datetime
from buildingmodel import data_path
from pathlib import Path
from shapely.validation import make_valid
import polars as pl
import geopandas as gpd
import numpy as np
import pandas as pd
import shapely
from shapely.geometry.polygon import orient, Polygon
from shapely.geometry import MultiPolygon, GeometryCollection
import logging

logger = logging.getLogger("buildingmodel")
from buildingmodel.exceptions import BuildingGeometryError

geo_levels = ["district", "city", "city_group", "department", "region"]
roof_materials = {
    0: "INDETERMINE",
    1: "TUILES",
    2: "ARDOISES",
    3: "ZINC ALUMINIUM",
    4: "BETON",
    9: "AUTRES",
}

wall_materials = {
    0: "INDETERMINE",
    1: "PIERRE",
    2: "MEULIERE",
    3: "BETON",
    4: "BRIQUES",
    5: "AGGLOMERE",
    6: "BOIS",
    9: "AUTRES",
}

residential_types = {
    1: "house",
    2: "apartment",
}


[docs] def load_data(building_data, parameters): """Loads GIS building data loads a GeoDataframe describing building data, orients the polygons in CCW and checks the validity of the parameters and geometry Args: building_data (GeoDataframe or str): either a GeoDataframe containing building data or a path to a GIS file parameters (Parameters) : global parameter Returns: GeoDataframe : GeoDataframe containing validated geometry and building description data """ if isinstance(building_data, gpd.GeoDataFrame): buildings = building_data logger.info(f"Data loaded. Contains {buildings.shape[0]} rows.") elif os.path.exists(building_data): if building_data.suffix == ".parquet": buildings = gpd.read_parquet(building_data) else: buildings = gpd.read_file(building_data, encoding=parameters.encoding) logger.info(f"File {building_data} opened. Contains {buildings.shape[0]} rows.") else: logger.error( "building_data argument must be a GeoDataframe or a path to GIS file" ) if buildings.crs.name != "RGF93_Lambert_93": buildings = buildings.to_crs("EPSG:2154") buildings["name"] = buildings[parameters.id_col] # filtering according to categories in category_filter for col, value_list in parameters.category_filters.items(): buildings = buildings.loc[buildings[col].isin(value_list), :] # force geometry, name and id to be kept for val in ["geometry", "name"]: if val not in parameters.rename_dict.keys(): parameters.rename_dict[val] = val if "construction_year_class" in buildings.columns: parameters.rename_dict["construction_year_class"] = "construction_year_class" if "residential_type" in buildings.columns: parameters.rename_dict["residential_type"] = "residential_type" # renaming entries according to rename_dict to obtain consistent column names buildings = buildings.rename(columns=parameters.rename_dict) buildings = buildings.loc[:, list(parameters.rename_dict.values())] # rename usages and change type of usage to category buildings.main_usage = buildings.main_usage.replace(parameters.usage_dict).astype( "category" ) buildings.secondary_usage = buildings.secondary_usage.replace( parameters.usage_dict ).astype("category") # remove incompatible geometries (non-Polygon) and orient Polygons CCW buildings = clean_geometry(buildings) # set altitude using altitude_max and altitude_min buildings = set_altitude(buildings) # year construction class from construction date if "construction_year_class" not in buildings.columns: set_construction_year_class( buildings, parameters.date_format, parameters.construction_year_class ) # # simplifying wall and roof material set_materials(buildings) # set residential type according to number of dwellings if "residential_type" not in buildings.columns: set_residential_type(buildings) # when height is missing, set it using floor count and vice versa set_height_floor_count(buildings, parameters.floor_height) # set floor area buildings["floor_area"] = buildings.geometry.area * buildings.floor_count # filtering according to minimal floor area buildings = buildings.loc[ buildings["floor_area"] >= parameters.minimal_floor_area, : ] # set residential_only to True for buildings that contain only dwellings set_residential_only(buildings) # height filter buildings = buildings.loc[~buildings["height"].isna(), :] buildings = buildings.loc[buildings["height"] >= parameters.minimal_height, :] buildings.loc[buildings["dwelling_count"].isna(), "dwelling_count"] = 0 # set district by spatial join with district boundaries buildings = set_district(buildings, parameters) buildings.reset_index(drop=True, inplace=True) buildings["building_id"] = buildings.index building_geometries = buildings[["geometry", "building_id", "height", "altitude"]] buildings = pl.from_pandas(buildings.drop(columns="geometry")) return buildings, building_geometries
[docs] def clean_geometry(buildings): """ * Ensures that building geometries are Polygons. If they are MultiPolygons, the first Polygon is taken. If they are neither, an exception is raised * Removes 3rd coordinate if it is present * Orients the polygon in counter clockwise order Args: buildings: Returns: """ buildings.geometry = buildings.geometry.apply(check_geometry) buildings = buildings.loc[buildings.geometry.type.isin(["Polygon"]), :].copy() buildings.geometry = buildings.geometry.apply(shape_to_2d) buildings.geometry = buildings.geometry.apply(lambda x: orient(x)) return buildings
def check_geometry(shape): if isinstance(shape, Polygon): if shape.is_valid: return shape else: return check_geometry(make_valid(shape)) elif isinstance(shape, MultiPolygon): shape = shape.geoms[0] if shape.is_valid: return shape else: return check_geometry(make_valid(shape)) elif isinstance(shape, GeometryCollection): shape = shape.geoms[0] if shape.is_valid: return shape else: return check_geometry(make_valid(shape)) else: return shape def shape_to_2d(shape): def remove_z(x, y, z=None): return tuple(filter(None, [x, y])) return shapely.ops.transform(remove_z, shape)
[docs] def set_residential_type(buildings): """ :param buildings: :return: """ buildings["residential_type"] = None buildings.loc[(buildings["dwelling_count"] == 1), "residential_type"] = 1 buildings.loc[(buildings["dwelling_count"] >= 2), "residential_type"] = 2 for i in residential_types.keys(): buildings.loc[buildings.residential_type == i, "residential_type"] = ( residential_types[i] ) buildings.loc[buildings.residential_type.isna(), "residential_type"] = "other" buildings.residential_type = buildings.residential_type.astype("category") buildings["multi_dwelling"] = False buildings.loc[(buildings["dwelling_count"] >= 2), "multi_dwelling"] = True
[docs] def set_residential_only(buildings): """ Use the main_usage and secondary_usage to determine if a building has only a residential usage Args: buildings: Returns: """ buildings["residential_only"] = False buildings.loc[ (buildings.main_usage == "residential") & (buildings.secondary_usage.isna()), "residential_only", ] = True buildings.loc[ (buildings.main_usage == "residential") & (buildings.secondary_usage.isin(["unknown", "annex"])), "residential_only", ] = True buildings["has_annex"] = False buildings.loc[ (buildings.main_usage == "residential") & (buildings.secondary_usage == "annex"), "has_annex", ] = True
def set_height_floor_count(buildings, floor_height): for res_type in ["house", "apartment", "other"]: default_floor_height = ( floor_height[res_type]["min"] + floor_height[res_type]["max"] ) / 2.0 # if height is nan and floor count is not, floor count is used to obtain height buildings.loc[ (buildings["height"].isna()) & (~buildings["floor_count"].isna()) & (buildings["residential_type"] == res_type), "height", ] = ( buildings.loc[ (buildings["height"].isna()) & (~buildings["floor_count"].isna()) & (buildings["residential_type"] == res_type), "floor_count", ] * default_floor_height ) # if floor count is nan and height is not, height is used to obtain floor count buildings.loc[ (buildings["floor_count"].isna()) & (buildings["residential_type"] == res_type) & (~buildings["height"].isna()), "floor_count", ] = ( buildings.loc[ (buildings["floor_count"].isna()) & (~buildings["height"].isna()) & (buildings["residential_type"] == res_type), "height", ] / default_floor_height ).astype(int) # when height and floor_count are not compatible, floor count is used to adjust height ids = buildings.loc[ ( (buildings["floor_count"] * floor_height[res_type]["min"]) > (buildings["height"]) ) & (buildings["residential_type"] == res_type) ].index buildings.loc[ids, "height"] = ( buildings.loc[ids, "floor_count"] * floor_height[res_type]["min"] ) ids = buildings.loc[ ( (buildings["floor_count"] * floor_height[res_type]["max"]) < (buildings["height"]) ) & (buildings["residential_type"] == res_type) ].index buildings.loc[ids, "height"] = ( buildings.loc[ids, "floor_count"] * floor_height[res_type]["max"] ) buildings.loc[buildings.floor_count.isna(), "floor_count"] = 1.0 buildings["height"] = buildings["height"].astype(float).round() def set_altitude(buildings): # altitude filtering and simplification buildings = buildings.loc[~buildings["altitude_min"].isna(), :].copy() buildings["altitude"] = None buildings.loc[buildings["altitude_max"].isna(), "altitude"] = buildings[ "altitude_min" ] buildings.loc[~buildings["altitude_max"].isna(), "altitude"] = ( buildings["altitude_min"] + buildings["altitude_max"] ) / 2.0 return buildings def set_materials(buildings): for boundary in ["wall", "roof"]: buildings[f"{boundary}_class"] = buildings[f"{boundary}_material"] buildings.loc[buildings[f"{boundary}_class"].isna(), f"{boundary}_class"] = "0" buildings.loc[buildings[f"{boundary}_class"] == "", f"{boundary}_class"] = "0" buildings[f"{boundary}_class"] = buildings[f"{boundary}_class"].astype(int) buildings.loc[buildings[f"{boundary}_class"] >= 10, f"{boundary}_class"] = ( buildings.loc[buildings[f"{boundary}_class"] >= 10, f"{boundary}_class"] // 10 ) # buildings[f'{boundary}_class'] = f'{boundary}_materials'[buildings[f'{boundary}_class']] buildings["roof_class"] = buildings["roof_class"].replace(roof_materials) buildings["wall_class"] = buildings["wall_class"].replace(wall_materials) buildings.wall_class = buildings.wall_class.astype("category") buildings.roof_class = buildings.roof_class.astype("category") def set_construction_year_class(buildings, date_format, construction_year_class): m = ~( buildings.construction_date.isnull() | (buildings.construction_date == "None") ) if m.sum() == 0: buildings["construction_year"] = None buildings["construction_year_class"] = None buildings.construction_year_class = buildings.construction_year_class.astype( "category" ) return None buildings.loc[m, "construction_year"] = buildings.loc[m, "construction_date"].apply( lambda x: datetime.strptime(x, date_format).year if isinstance(x, str) else x.year ) buildings["construction_year_class"] = None for key, val in construction_year_class.items(): buildings.loc[ (buildings["construction_year"] >= val[0]) & (buildings["construction_year"] <= val[1]), "construction_year_class", ] = "[" + str(val[0]) + ", " + str(val[1]) + "]" buildings.construction_year_class = buildings.construction_year_class.astype( "category" )
[docs] def set_district(buildings, parameters): """ Args: buildings: Returns: """ district_buffer = buildings.geometry.union_all().convex_hull.buffer(50.0e3).bounds districts = gpd.read_parquet( Path(data_path["gis"]) / parameters.districts, bbox=district_buffer ) gas_data = ["has_network_city_level", "has_network_grdf_data"] districts[gas_data] = districts[gas_data].astype(bool).fillna(True) building_with_district = gpd.sjoin( buildings, districts, how="left", predicate="within" ) district_spatial_index = districts.sindex for build_idx in building_with_district.loc[ building_with_district["district"].isnull(), : ].index: building_poly = building_with_district.loc[build_idx, "geometry"] possible_matches_index = list( district_spatial_index.intersection(building_poly.bounds) ) possible_matches = districts.iloc[possible_matches_index] precise_matches = possible_matches[possible_matches.intersects(building_poly)] tmp_area = 0.0 geo_levels_code = None for admin_idx in precise_matches.index: new_area = ( precise_matches.loc[admin_idx, "geometry"] .intersection(building_poly) .area ) if new_area > tmp_area: tmp_area = new_area geo_levels_code = precise_matches.loc[admin_idx, geo_levels] building_with_district.loc[build_idx, geo_levels] = geo_levels_code buildings[geo_levels + gas_data] = building_with_district[ geo_levels + gas_data ].values buildings = buildings.astype({g_l: "category" for g_l in geo_levels}) return buildings