Source code for pixcdust.tests.test_converters

import random
from pathlib import PosixPath, Path
from typing import List, Union

import fiona
import numpy as np
import geopandas as gpd
import pytest
from shapely.geometry import Polygon
import xarray as xr

from pixcdust.converters.gpkg import Nc2GpkgConverter, GpkgDGGSProjecter
from pixcdust.converters.shapefile import Nc2ShpConverter
from pixcdust.converters.zarr import Nc2ZarrConverter
from pixcdust.readers import GpkgReader
from pixcdust.readers.zarr import ZarrReader
from pixcdust.readers.netcdf import NcSimpleReader

[docs] LIM_AREA_POL = Polygon( [(-1.50580, 43.39543), (-1.36597, 43.39543), (-1.36597, 43.56471), (-1.50580, 43.56471), (-1.50580, 43.39543)])
[docs] LIM_AREA_GEOM = gpd.GeoDataFrame(index=[0], crs='epsg:4326', geometry=[LIM_AREA_POL])
"""Geometry used as area of interest of limited area tests."""
[docs] def test_nc_simple_reader_conditions(input_files): """Test NcSimpleReader with conditions on variables.""" # Define conditions conditions = {"classification": {'operator': "ge", 'threshold': 4}, # classification >= 4 "classification": {'operator': "le", 'threshold': 3}, # classification <= 3 "sig0": {'operator': "gt", 'threshold': 15} # sig0 > 15 } converted_vars = ['height', 'sig0', 'classification'] # Instantiate the NcSimpleReader with conditions reader = NcSimpleReader( input_files, variables=converted_vars, conditions=conditions, ) # Read the dataset and apply filtering reader.read() # Validate the data after filtering for var, condition in conditions.items(): op = condition.get("operator") val = condition.get("threshold") if op == 'ge': assert (reader.data[var] >= val).all(), f"{var} not >= {val}" elif op == 'le': assert (reader.data[var] <= val).all(), f"{var} not <= {val}" elif op == 'gt': assert (reader.data[var] > val).all(), f"{var} not > {val}" elif op == 'lt': assert (reader.data[var] < val).all(), f"{var} not < {val}"
[docs] def validate_conversion_to_nc(read_data: xr.Dataset, converted_vars:List[str], first_file: Union[str, Path])\ -> None: """Compare the start of a converted database to the first original netcdf file. Args: read_data: Data to validate. converted_vars: Names of variables to compare. first_file: Netcdf file containing the expected data. Raises: AssertionError: If the data are too different. """ ncsimple = NcSimpleReader(str(first_file)) ncsimple.open_dataset() validate_conversion(read_data, converted_vars, ncsimple.data,is_longer=True)
[docs] def validate_conversion( read_data: xr.Dataset, converted_vars:List[str], expected_data: xr.Dataset, is_longer: bool, len_tol: int = 0, sort_var: bool = False ) -> None: """Compare the read data to the expected data. Args: read_data: Data to validate. converted_vars: Names of variables to compare. expected_data: Expected data. is_longer: Os the expected_data expected to only contain the start of the read_data. len_tol: Tolerance for len_tol missing point because of numeric error on lon/lat. sort_var: Is the data ordering different and do we neer to sort them. Raises: AssertionError: If the data are too different. """ for var in converted_vars: read_var = read_data[var].data expected_var = expected_data[var].data if sort_var: # can't do something like sorting by longitude as minor conversion error in the longitude # result in random swaps (then a few massive errors because we mismatch the points). read_var = np.sort(read_var) expected_var = np.sort(expected_var) np.testing.assert_allclose(read_var[0:30], expected_var[0:30]) expected_last = len(expected_var) if is_longer: last = expected_last else: last = len(read_var) np.testing.assert_allclose(read_var[last-30:last-1], expected_var[expected_last-30:expected_last-1]) r = random.randrange(30,last) if len_tol == 0: np.testing.assert_allclose(read_var[r-30:r-1], expected_var[r-30:r-1]) if is_longer: assert len(read_var) > expected_last else: assert expected_last + len_tol >= len(read_var) >= expected_last - len_tol
[docs] def test_convert_zarr_full_area(input_files, first_file, tmp_folder): """Test zarr conversion without area_of_interest. It is compared to the input data. """ # Conversion output = str(tmp_folder / "zarr_conv_test_full") converted_vars = ['height', 'sig0', 'classification'] pixc = Nc2ZarrConverter( input_files, variables=converted_vars, ) pixc.database_from_nc(output, mode="o") # Validation pixc_read = ZarrReader(output) pixc_read.read() validate_conversion_to_nc(pixc_read.data, converted_vars, first_file)
@pytest.fixture(scope="session")
[docs] def converted_lim_gpkg(input_files, tmp_folder): output_gpkg = str(tmp_folder / "gpkg_conv_test_lim") converted_vars = ['height', 'sig0', 'classification'] Nc2GpkgConverter( input_files, variables=converted_vars, area_of_interest=LIM_AREA_GEOM, ).database_from_nc(output_gpkg, mode="o") return output_gpkg
[docs] def test_convert_gpkg_and_zarr_limited_area(input_files, first_file, tmp_folder, converted_lim_gpkg): """Test geopackage and zarr conversion with area_of_interest. They are compared to each other. Note that they are ordered differently with some missing points due tu lon/lat casting. """ # Conversion output_zarr = str(tmp_folder / "zarr_conv_test_lim") converted_vars = ['height', 'sig0', 'classification'] output_gpkg = converted_lim_gpkg Nc2ZarrConverter( input_files, variables=converted_vars, area_of_interest=LIM_AREA_GEOM, ).database_from_nc(output_zarr, mode="o") # Validation gpkg_read = GpkgReader(output_gpkg) gpkg_read.read() zarr_read = ZarrReader(output_zarr) zarr_read.read() validate_conversion(gpkg_read.data, converted_vars, zarr_read.data, is_longer=False, len_tol=2, sort_var="longitude")
[docs] def test_convert_shape_limited_area(input_files, first_file, tmp_folder): """Test shapefile conversion with area_of_interest. """ # Conversion output = str(tmp_folder / "shp_conv_test_full") converted_vars = ['height', 'sig0', 'classification'] pixc = Nc2ShpConverter( input_files, variables=converted_vars, area_of_interest=LIM_AREA_GEOM, ) pixc.database_from_nc(output, mode="o")
# TODO : read the data # Validation # pixc_read = PixCZarrReader(output) # pixc_read.read() # validate_conversion(gpkg_read.data, converted_vars, zarr_read.data, False) # Test for GpkgDGGSProjecter
[docs] def test_gpkg_dggs_projecter(converted_lim_gpkg, tmp_folder): """Test the GpkgDGGSProjecter class by converting a sample Gpkg to a DGGS projection.""" input_gpkg = converted_lim_gpkg # Define parameters dggs_res = 10 healpix = False dggs_layer_pattern = '_h3' output_path = str(tmp_folder / "gpkg_dggs_output") # Create an instance of GpkgDGGSProjecter projecter = GpkgDGGSProjecter( path=input_gpkg, dggs_res=dggs_res, healpix=healpix, dggs_layer_pattern=dggs_layer_pattern, path_out=output_path ) # Validate initialization assert projecter.path == input_gpkg assert projecter.dggs_res == dggs_res assert projecter.healpix == healpix assert projecter.dggs_layer_pattern == dggs_layer_pattern assert projecter.path_out == output_path # Validate layers are correctly loaded for layer in fiona.listlayers(input_gpkg): if not layer.endswith(dggs_layer_pattern): assert layer in projecter.database.layers # Mock computation for a single layer and validate the conversion projecter.compute_layers() # Validate output file existence and content for layer in projecter.database.layers: layername_out = f"{layer}_{dggs_res}_{dggs_layer_pattern}" # Check if the file was created with the right name assert layername_out in fiona.listlayers(output_path) # Test the same with HEALPix projection projecter.healpix = True projecter.compute_layers() # Validate output for HEALPix for layer in projecter.database.layers: layername_out = f"{layer}_{dggs_res}_{dggs_layer_pattern}" assert layername_out in fiona.listlayers(output_path)