Source code for pixcdust.dggs.h3_tools
#
# Copyright (C) 2024 Centre National d'Etudes Spatiales (CNES)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import geopandas as gpd
import h3
from astropy_healpix import HEALPix
from astropy import units as u
from shapely.geometry import Polygon
[docs]
def h3_to_polygon(h3_index):
boundary = h3.api.basic_int.h3_to_geo_boundary(h3_index, geo_json=True)
return Polygon(boundary)
[docs]
def get_h3_res_name(res: int) -> str:
return "h3_" + str(res).zfill(2)
[docs]
def gdf_to_h3_gdf(
gdf: gpd.GeoDataFrame,
resolution: int,
) -> gpd.GeoDataFrame:
"""
Convert a GeoDataFrame with latitude and longitude columns to a GeoDataFrame
where rows are aggregated into H3 hexagons based on a given resolution.
Args:
gdf (gpd.GeoDataFrame): The input GeoDataFrame containing 'latitude' and 'longitude' columns.
resolution (int): The H3 resolution level for hexagon indexing.
Returns:
gpd.GeoDataFrame: A new GeoDataFrame where the rows are grouped by H3 hexagons,
containing the mean of the grouped variables, and a geometry column with polygons
representing the H3 hexagons.
"""
# Get the column name for H3 index based on the resolution
h3_col = get_h3_res_name(resolution)
# Apply the H3 function to each row to calculate the H3 index based on latitude, longitude, and resolution
gdf[h3_col] = gdf.apply(lambda row: h3.api.basic_int.geo_to_h3(row['latitude'], row['longitude'], resolution),
axis=1)
# Drop the latitude, longitude, and geometry columns as they are no longer needed
h3_df = gdf.drop(columns=['latitude', 'longitude', 'geometry'])
# Group by the H3 index column, and compute the mean of all other columns in each group
h3_df = h3_df.groupby(h3_col).mean().reset_index()
# Convert each H3 index into a polygon geometry (hexagon) representing its boundaries
geometry = h3_df[h3_col].apply(h3_to_polygon)
return gpd.GeoDataFrame(data=h3_df, geometry=geometry, crs=4326)
[docs]
def healpix_to_polygon(healpix: HEALPix, pix: int):
# Get the boundaries of the HEALPix pixel
lon, lat = healpix.boundaries_lonlat(pix, step=1)
# Create a polygon using the lat/lon values
return Polygon(zip(lon.deg[0], lat.deg[0]))
[docs]
def get_healpix_res_name(res: int) -> str:
return "healpix_" + str(res).zfill(2)
[docs]
def gdf_to_healpix_gdf(
gdf: gpd.GeoDataFrame,
resolution: int,
) -> gpd.GeoDataFrame:
"""
Convert a GeoDataFrame with latitude and longitude columns to a GeoDataFrame
where rows are aggregated into Healpix geometry based on a given resolution.
Args:
gdf (gpd.GeoDataFrame): The input GeoDataFrame containing 'latitude' and 'longitude' columns.
resolution (int): The Healpix resolution level for indexing.
Returns:
gpd.GeoDataFrame: A new GeoDataFrame where the rows are grouped by H3 hexagons,
containing the mean of the grouped variables, and a geometry column with polygons
representing the Healpix geometry.
"""
nside = 2**resolution
# Init heaplix grid
healpix = HEALPix(nside=nside, order='nested')
# Get the column name for Healpix index based on the resolution
healpix_col = get_healpix_res_name(resolution)
# Apply the Healpix function to each row to calculate the Healpix index based on latitude, longitude, and resolution
gdf[healpix_col] = gdf.apply(
lambda row: healpix.lonlat_to_healpix(row['longitude'] * u.deg, row['latitude'] * u.deg), axis=1
)
# Drop the latitude, longitude, and geometry columns as they are no longer needed
healpix_df = gdf.drop(columns=['latitude', 'longitude', 'geometry'])
# Group by the Healpix index column, and compute the mean of all other columns in each group
healpix_df = healpix_df.groupby(healpix_col).mean().reset_index()
# Convert each Healpix index into a polygon geometry representing its boundaries
geometry = healpix_df[healpix_col].apply(lambda pix: healpix_to_polygon(healpix, pix))
return gpd.GeoDataFrame(data=healpix_df, geometry=geometry, crs=4326)