Skip to content

Commit aa06d3f

Browse files
committed
feat: more rafactoring
1 parent 7bc1bc3 commit aa06d3f

14 files changed

+317
-239
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,5 +241,5 @@ cython_debug/
241241
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
242242
#.idea/
243243

244-
output/
244+
output*/
245245

.idea/parquet-nuts.iml

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coverages/__init__.py

Whitespace-only changes.

coverages/nuts.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
from pathlib import Path
2+
3+
import geopandas as gpd
4+
from pandas import DataFrame, Series
5+
from shapely import box
6+
7+
from datasets.worldcover import (
8+
create_directories,
9+
get_tile_keys,
10+
process,
11+
land_cover_names
12+
)
13+
from inject_metadata import inject_metadata
14+
15+
nuts_worldcover_metadata = {
16+
"identifierKey": "NUTS_ID",
17+
"nameKey": "NUTS_NAME",
18+
"levelKey": "LEVL_CODE",
19+
"childrenKey": "children",
20+
"attributeKeys": land_cover_names
21+
}
22+
23+
24+
def nuts_level_func(stats_df: DataFrame, level: int) -> Series:
25+
return Series(stats_df["LEVL_CODE"] == level)
26+
27+
28+
def foo(row, df):
29+
if not row["children"]:
30+
return []
31+
32+
return row["children"].split(",")
33+
34+
35+
def nuts_children(df: DataFrame, nuts_id: str) -> Series:
36+
children = df.loc[df["NUTS_ID"] == nuts_id, "children"].iloc[0].split(",")
37+
38+
return Series(df["NUTS_ID"].isin(children))
39+
40+
41+
def nuts_intersections(ds_bbox: box, stats_df: DataFrame, level: int):
42+
return stats_df[
43+
(stats_df.geometry.intersects(ds_bbox)) & (stats_df["children"] == "")
44+
]
45+
46+
47+
def main():
48+
create_directories()
49+
geom_df = gpd.read_file("input/NUTS_with_children.fgb", engine="pyogrio")
50+
keys = get_tile_keys(geom_df, 0, nuts_level_func)
51+
file_names = process(
52+
keys,
53+
geom_df,
54+
4,
55+
nuts_level_func,
56+
nuts_children,
57+
nuts_intersections,
58+
"NUTS_ID",
59+
Path("output/worldcover-stats-nuts.fgb")
60+
)
61+
62+
for file_name in file_names:
63+
inject_metadata(file_name, nuts_worldcover_metadata)
64+
65+
66+
if __name__ == "__main__":
67+
main()

coverages/watersheds.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
from pathlib import Path
2+
3+
import geopandas as gpd
4+
from pandas import DataFrame, Series
5+
from shapely import box
6+
7+
from datasets.worldcover import (
8+
create_directories,
9+
get_tile_keys,
10+
process,
11+
land_cover_names,
12+
)
13+
from inject_metadata import inject_metadata
14+
15+
watersheds_worldcover_metadata = {
16+
"identifierKey": "HYBAS_ID",
17+
"nameKey": "HYBAS_ID",
18+
"levelKey": "level",
19+
"childrenKey": "children",
20+
"attributeKeys": land_cover_names
21+
}
22+
23+
24+
def hydrosheds_level_func(stats_df: DataFrame, level: int) -> Series:
25+
return Series(stats_df["PFAF_ID"].astype("str").str.len() == level + 1)
26+
27+
28+
def hydrosheds_intersections(ds_bbox: box, stats_df: DataFrame, level: int) -> DataFrame:
29+
return stats_df[stats_df.geometry.intersects(ds_bbox) & hydrosheds_level_func(stats_df, level - 1)]
30+
31+
32+
def hydrosheds_children(stats_df: DataFrame, pfaf_id: int) -> Series:
33+
"""
34+
Returns a boolean Series indicating whether the PFAF_ID is a child of the given PFAF_ID.
35+
"""
36+
37+
child_lower = pfaf_id * 10
38+
child_upper = child_lower + 9
39+
return Series((stats_df["PFAF_ID"] >= child_lower) & (stats_df["PFAF_ID"] <= child_upper))
40+
41+
42+
def main():
43+
create_directories()
44+
geom_df = gpd.read_file("input/hybas_eu_lev01-12_v1c.fgb", engine="pyogrio")
45+
keys = get_tile_keys(geom_df, 2, hydrosheds_level_func)
46+
file_names = process(
47+
keys,
48+
geom_df,
49+
12,
50+
hydrosheds_level_func,
51+
hydrosheds_children,
52+
hydrosheds_intersections,
53+
"PFAF_ID",
54+
Path("output/worldcover-stats-watersheds.fgb")
55+
)
56+
57+
for file_name in file_names:
58+
inject_metadata(file_name, watersheds_worldcover_metadata)
59+
60+
61+
if __name__ == "__main__":
62+
main()

datasets/__init__.py

Whitespace-only changes.

worldcover.py renamed to datasets/worldcover.py

Lines changed: 65 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
from pathlib import Path
44

55
import geopandas as gpd
6-
import humanize
76
import psutil
87
from pandas import DataFrame, Series
98
from pyproj import Geod
@@ -12,6 +11,12 @@
1211
from rasterio import features, DatasetReader
1312
import numpy as np
1413
from shapely import box
14+
import boto3
15+
import humanize
16+
from botocore import UNSIGNED
17+
from botocore.config import Config
18+
import rasterio
19+
1520

1621
# Land cover classification mapping
1722
LAND_COVER_CLASSES = {
@@ -39,6 +44,7 @@
3944

4045
type LevelFunc = Callable[[DataFrame, int], Series]
4146
type ChildFunc = Callable[[DataFrame, any], Series]
47+
type IntersectionFunc = Callable[[box, DataFrame, int], DataFrame]
4248

4349

4450
def get_process_memory_use():
@@ -47,13 +53,18 @@ def get_process_memory_use():
4753
return mem_info.rss
4854

4955

50-
def output_by_level(max_level: int, stats_df: DataFrame, level_fn: LevelFunc, file_path: Path):
56+
def output_by_level(max_level: int, stats_df: DataFrame, level_fn: LevelFunc, file_path: Path) -> list[Path]:
57+
output_file_names = []
58+
5159
for i in trange(max_level, desc=f"Saving to {str(file_path.parent)}"):
5260
level = stats_df[level_fn(stats_df, i)]
5361
suffix = file_path.suffix
5462
file_name = file_path.with_suffix(f".level{i:02}{suffix}")
63+
output_file_names.append(file_name)
5564
level.to_file(file_name)
5665

66+
return output_file_names
67+
5768

5869
def calculate_total_area(statistics: DataFrame):
5970
areas = []
@@ -84,13 +95,12 @@ def calculate_values(source_raster: DatasetReader, intersections: DataFrame) ->
8495
max_memory_usage = max(max_memory_usage, get_process_memory_use())
8596
current_memory_usage = get_process_memory_use()
8697
geometry_progress_bar.set_postfix_str(f"{humanize.naturalsize(current_memory_usage)}")
87-
geometry_progress_bar.set_description_str(f"{src_file_name.name} <- {region["HYBAS_ID"]}")
98+
geometry_progress_bar.set_description_str(f"{src_file_name.name}")
8899
geom = region.geometry
89100

90101
try:
91102
window = features.geometry_window(source_raster, [geom])
92103
except WindowError as e:
93-
print(f"Error: {e} - skipping raster")
94104
continue
95105

96106
window_xform = source_raster.window_transform(window)
@@ -138,7 +148,7 @@ def create_directories() -> tuple[Path, Path]:
138148

139149
def get_tile_keys(geom_df: DataFrame, level: int, level_fn: LevelFunc) -> list[str]:
140150
countries = geom_df[level_fn(geom_df, level)]
141-
tile_index_df = gpd.read_file("data/esa_worldcover_grid.fgb", engine="pyogrio")
151+
tile_index_df = gpd.read_file("input/esa_worldcover_grid.fgb", engine="pyogrio")
142152
intersected_tiles = gpd.sjoin(tile_index_df, countries, how="inner")
143153
unique_tiles = intersected_tiles.drop_duplicates(subset="ll_tile").copy()
144154

@@ -166,3 +176,53 @@ def sum_children(stats_df: DataFrame, bottom_level: int, level_fn: LevelFunc, ch
166176
continue
167177

168178
stats_df.loc[index, land_cover_names] = children[land_cover_names].sum()
179+
180+
181+
def process(tif_file_names: list[str],
182+
stats_df: DataFrame,
183+
bottom_level: int,
184+
level_fn: LevelFunc,
185+
child_fn: ChildFunc,
186+
intersection_fn: IntersectionFunc,
187+
code_column_name: str,
188+
output_path: Path
189+
) -> list[Path]:
190+
for name in land_cover_names:
191+
stats_df[name] = 0.0
192+
193+
stats_df["Unknown"] = 0.0
194+
stats_df["total_area"] = 0.0
195+
196+
s3 = boto3.client("s3", config=Config(signature_version=UNSIGNED))
197+
198+
print("Processing")
199+
tiff_progress_bar = trange(len(tif_file_names))
200+
for idx in tiff_progress_bar:
201+
tiff_progress_bar.set_postfix_str(f"{humanize.naturalsize(max_memory_usage)}")
202+
src_file_name = Path("./.cache/" + Path(tif_file_names[idx]).name)
203+
204+
if not src_file_name.exists():
205+
s3.download_file(s3_bucket, tif_file_names[idx], str(src_file_name.resolve()))
206+
207+
ds = rasterio.open(src_file_name)
208+
ds_bbox = box(*ds.bounds)
209+
210+
intersections = intersection_fn(ds_bbox, stats_df, bottom_level)
211+
212+
if len(intersections) == 0:
213+
ds.close()
214+
continue
215+
216+
results = calculate_values(ds, intersections)
217+
218+
stats_df.loc[results.index] = results
219+
220+
ds.close()
221+
222+
sum_children(stats_df, bottom_level, level_fn, child_fn, code_column_name)
223+
calculate_total_area(stats_df)
224+
225+
file_names = output_by_level(bottom_level, stats_df, level_fn, output_path)
226+
print("Done")
227+
228+
return file_names

datasets/worldsoils.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# https://gui.world-soils.com/mapserver/Europe?SERVICE=WCS&VERSION=2.0.1&REQUEST=GetCoverage&COVERAGEID=soc_0-5cm_mean_europe_2020-2022&FORMAT=image/tiff&OUTPUTCRS=http://www.opengis.net/def/crs/EPSG/0/4326&SUBSET=long(8,10)&SUBSET=lat(50,52)&SUBSETTINGCRS=http://www.opengis.net/def/crs/EPSG/0/4326
2+
from collections.abc import Callable
3+
4+
from pandas import DataFrame, Series
5+
import geopandas as gpd
6+
7+
type LevelFunc = Callable[[DataFrame, int], Series]
8+
9+
10+
def get_tile_keys(geom_df: DataFrame, level: int, level_fn: LevelFunc) -> list[str]:
11+
countries = geom_df[level_fn(geom_df, level)]
12+
13+
tile_index_df = gpd.read_file("input/esa_worldcover_grid.fgb", engine="pyogrio")
14+
intersected_tiles = gpd.sjoin(tile_index_df, countries, how="inner")
15+
unique_tiles = intersected_tiles.drop_duplicates(subset="ll_tile").copy()
16+
17+
tile_names = unique_tiles["ll_tile"].tolist()
18+
tile_keys = [f"v200/2021/map/ESA_WorldCover_10m_2021_v200_{tile_name.strip()}_Map.tif" for tile_name in tile_names]
19+
20+
unique_tiles["tile_key"] = tile_keys
21+
22+
unique_tiles = unique_tiles[["ll_tile", "tile_key", "geometry"]]
23+
24+
unique_tiles.to_file("output/worldcover-tiles.fgb")
25+
26+
return tile_keys

inject_metadata.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
import json
2+
from pathlib import Path
3+
4+
from osgeo import ogr
5+
from pydantic import BaseModel, ValidationError, field_serializer
6+
7+
8+
class MissingColumn(Exception):
9+
pass
10+
11+
12+
class MetadataIn(BaseModel):
13+
identifierKey: str
14+
nameKey: str
15+
levelKey: str
16+
childrenKey: str
17+
attributeKeys: list[str]
18+
19+
20+
class MetadataOut(BaseModel):
21+
identifierKey: str
22+
nameKey: str
23+
levelKey: str
24+
childrenKey: str
25+
attributeKeys: list[int]
26+
27+
# FGB metadata fields can only be strings.
28+
@field_serializer("attributeKeys")
29+
def serialize_attribute_keys(self, keys, _):
30+
return ",".join([str(v) for v in keys])
31+
32+
33+
def inject_metadata(fgb_file_path: Path, metadata: dict):
34+
try:
35+
metadata = MetadataIn.model_validate(metadata)
36+
except ValidationError:
37+
raise
38+
39+
if not fgb_file_path.exists():
40+
raise FileNotFoundError(f"Input file '{fgb_file_path}' not found")
41+
if fgb_file_path.suffix != ".fgb":
42+
raise ValueError(f"Input file '{fgb_file_path}' is not a FlatGeobuf file")
43+
44+
src_ds = ogr.Open(fgb_file_path)
45+
src_layer = src_ds.GetLayer()
46+
47+
# Check columns exist in the input file.
48+
defn = src_layer.GetLayerDefn()
49+
50+
if defn.GetFieldIndex(metadata.identifierKey) == -1:
51+
raise MissingColumn(f"Index column '{metadata.identifierKey}' not found in {fgb_file_path}")
52+
53+
if defn.GetFieldIndex(metadata.nameKey) == -1:
54+
raise MissingColumn(f"Name column '{metadata.nameKey}' not found in {fgb_file_path}")
55+
56+
if defn.GetFieldIndex(metadata.levelKey) == -1:
57+
raise MissingColumn(f"Level column '{metadata.levelKey}' not found in {fgb_file_path}")
58+
59+
if defn.GetFieldIndex(metadata.childrenKey) == -1:
60+
raise MissingColumn(f"Level column '{metadata.childrenKey}' not found in {fgb_file_path}")
61+
62+
attribute_column_indices = []
63+
64+
for column in metadata.attributeKeys:
65+
column_index = defn.GetFieldIndex(column)
66+
if column_index == -1:
67+
raise MissingColumn(f"Column '{column}' not found in {fgb_file_path}")
68+
else:
69+
attribute_column_indices.append(column_index)
70+
71+
metadata_out = MetadataOut(
72+
identifierKey=metadata.identifierKey,
73+
nameKey=metadata.nameKey,
74+
levelKey=metadata.levelKey,
75+
attributeKeys=attribute_column_indices,
76+
childrenKey=metadata.childrenKey
77+
)
78+
79+
# Overwrite the input file.
80+
driver = ogr.GetDriverByName("FlatGeobuf")
81+
dst_ds = driver.CreateDataSource(fgb_file_path)
82+
dst_ds.CopyLayer(src_layer, src_layer.GetName())
83+
84+
print(f"Metadata injected into {fgb_file_path}")
85+
print(f"{metadata_out}")
86+
87+
# We serialize the metadata to a string to take advantage of Pydantic's field serializer,
88+
# then load it back up as a regular Python object so OGR can set the metadata.
89+
dst_ds.SetMetadata(json.loads(metadata_out.model_dump_json()))
90+
91+
dst_ds = None
92+
src_ds = None
Binary file not shown.

0 commit comments

Comments
 (0)