216 lines
6.1 KiB
Python
216 lines
6.1 KiB
Python
"""
|
||
geo_tools.core.vector
|
||
~~~~~~~~~~~~~~~~~~~~~
|
||
基于 geopandas 的矢量要素处理函数。
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
from typing import Any, Literal
|
||
|
||
import geopandas as gpd
|
||
import pandas as pd
|
||
|
||
from app.utils.logger import get_logger
|
||
from app.utils.validators import validate_geometry
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
|
||
def reproject(gdf: gpd.GeoDataFrame, target_crs: str | int) -> gpd.GeoDataFrame:
|
||
"""将 GeoDataFrame 重投影到目标坐标系。
|
||
|
||
Parameters
|
||
----------
|
||
gdf:
|
||
输入 GeoDataFrame,必须已定义 CRS。
|
||
target_crs:
|
||
目标 CRS,如 ``"EPSG:3857"`` 或 ``4490``。
|
||
|
||
Returns
|
||
-------
|
||
gpd.GeoDataFrame
|
||
重投影后的 GeoDataFrame(新对象,原始不变)。
|
||
"""
|
||
if gdf.crs is None:
|
||
raise ValueError("GeoDataFrame 未定义 CRS,请先设置坐标系。")
|
||
if gdf.crs.to_epsg() == (target_crs if isinstance(target_crs, int) else None):
|
||
return gdf # 已经是目标 CRS,跳过
|
||
logger.debug("重投影:%s → %s(共 %d 条)", gdf.crs, target_crs, len(gdf))
|
||
return gdf.to_crs(target_crs)
|
||
|
||
|
||
def set_crs(gdf: gpd.GeoDataFrame, crs: str | int, *, overwrite: bool = False) -> gpd.GeoDataFrame:
|
||
"""为没有 CRS 的 GeoDataFrame 设置坐标系(不重投影)。
|
||
|
||
Parameters
|
||
----------
|
||
gdf:
|
||
输入数据。
|
||
crs:
|
||
目标 CRS。
|
||
overwrite:
|
||
若为 ``True``,即使已有 CRS 也强制覆盖(危险操作,请确认坐标系正确)。
|
||
"""
|
||
if gdf.crs is not None and not overwrite:
|
||
raise ValueError(
|
||
f"GeoDataFrame 已有 CRS:{gdf.crs}。若要覆盖,请传入 overwrite=True。"
|
||
)
|
||
return gdf.set_crs(crs, allow_override=overwrite)
|
||
|
||
|
||
def clip_to_extent(
|
||
gdf: gpd.GeoDataFrame,
|
||
bbox: tuple[float, float, float, float] | gpd.GeoDataFrame,
|
||
) -> gpd.GeoDataFrame:
|
||
"""按矩形范围或另一个 GeoDataFrame 裁切要素。
|
||
|
||
Parameters
|
||
----------
|
||
gdf:
|
||
待裁切的 GeoDataFrame。
|
||
bbox:
|
||
矩形范围 ``(minx, miny, maxx, maxy)`` 或用于裁切的 GeoDataFrame / GeoSeries。
|
||
|
||
Returns
|
||
-------
|
||
gpd.GeoDataFrame
|
||
"""
|
||
if isinstance(bbox, tuple):
|
||
from shapely.geometry import box as shapely_box
|
||
mask = shapely_box(*bbox)
|
||
result = gdf.clip(mask)
|
||
else:
|
||
if bbox.crs != gdf.crs:
|
||
bbox = bbox.to_crs(gdf.crs) # type: ignore
|
||
result = gdf.clip(bbox)
|
||
|
||
logger.debug("裁切完成:%d → %d 条", len(gdf), len(result))
|
||
return result
|
||
|
||
|
||
def dissolve_by(
|
||
gdf: gpd.GeoDataFrame,
|
||
by: str | list[str],
|
||
aggfunc: str | dict[str, Any] = "first",
|
||
) -> gpd.GeoDataFrame:
|
||
"""按属性字段融合(Dissolve)几何要素。
|
||
|
||
Parameters
|
||
----------
|
||
gdf:
|
||
输入 GeoDataFrame。
|
||
by:
|
||
融合字段名或字段列表。
|
||
aggfunc:
|
||
属性聚合函数,参考 ``pd.DataFrame.groupby``。
|
||
|
||
Returns
|
||
-------
|
||
gpd.GeoDataFrame
|
||
融合后的 GeoDataFrame,索引为 ``by`` 字段。
|
||
"""
|
||
logger.debug("按字段 %r 融合要素(%d 条 → ?)", by, len(gdf))
|
||
result = gdf.dissolve(by=by, aggfunc=aggfunc).reset_index()
|
||
logger.debug("融合完成:%d 条", len(result))
|
||
return result
|
||
|
||
|
||
def explode_multipart(gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
|
||
"""将多部分几何(MultiPolygon 等)拆分为单部分要素。
|
||
|
||
Parameters
|
||
----------
|
||
gdf:
|
||
输入 GeoDataFrame。
|
||
|
||
Returns
|
||
-------
|
||
gpd.GeoDataFrame
|
||
拆分后索引已 reset。
|
||
"""
|
||
result = gdf.explode(index_parts=False).reset_index(drop=True)
|
||
logger.debug("多部分拆分:%d → %d 条", len(gdf), len(result))
|
||
return result
|
||
|
||
|
||
def drop_invalid_geometries(gdf: gpd.GeoDataFrame, *, fix: bool = False) -> gpd.GeoDataFrame:
|
||
"""删除或修复无效几何。
|
||
|
||
Parameters
|
||
----------
|
||
gdf:
|
||
输入 GeoDataFrame。
|
||
fix:
|
||
若为 ``True``,尝试通过 ``buffer(0)`` 修复无效几何而非删除。
|
||
"""
|
||
stats = validate_geometry(gdf)
|
||
if stats["invalid"] == 0 and stats["null"] == 0:
|
||
return gdf
|
||
|
||
if fix:
|
||
from app.core.geometry import fix_geometry
|
||
gdf = gdf.copy()
|
||
mask = ~gdf.geometry.is_valid | gdf.geometry.isna()
|
||
gdf.loc[mask, "geometry"] = gdf.loc[mask, "geometry"].apply(fix_geometry) # type: ignore
|
||
logger.info("已修复 %d 个无效几何", stats["invalid"])
|
||
else:
|
||
before = len(gdf)
|
||
gdf = gdf[gdf.geometry.is_valid & gdf.geometry.notna()].copy()
|
||
logger.info("已删除 %d 个无效/空几何", before - len(gdf))
|
||
return gdf
|
||
|
||
|
||
def spatial_join(
|
||
left: gpd.GeoDataFrame,
|
||
right: gpd.GeoDataFrame,
|
||
how: Literal["left", "right", "inner"] = "left",
|
||
predicate: str = "intersects",
|
||
**kwargs: Any,
|
||
) -> gpd.GeoDataFrame:
|
||
"""空间连接(封装 geopandas.sjoin)。
|
||
|
||
Parameters
|
||
----------
|
||
left:
|
||
左侧 GeoDataFrame。
|
||
right:
|
||
右侧 GeoDataFrame。
|
||
how:
|
||
连接方式:``"left"``、``"right"``、``"inner"``。
|
||
predicate:
|
||
空间谓词:``"intersects"``、``"contains"``、``"within"``、``"touches"``。
|
||
"""
|
||
if left.crs != right.crs:
|
||
right = right.to_crs(left.crs) # type: ignore
|
||
result = gpd.sjoin(left, right, how=how, predicate=predicate, **kwargs)
|
||
logger.debug("空间连接完成:%d 条结果", len(result))
|
||
return result
|
||
|
||
|
||
def add_area_column(
|
||
gdf: gpd.GeoDataFrame,
|
||
col_name: str = "area_m2",
|
||
projected_crs: str = "EPSG:3857",
|
||
) -> gpd.GeoDataFrame:
|
||
"""添加面积列(单位:平方米)。
|
||
|
||
将数据临时投影到 ``projected_crs``(笛卡尔投影)计算面积后回填到原 GDF。
|
||
|
||
Parameters
|
||
----------
|
||
gdf:
|
||
输入 GeoDataFrame(面要素)。
|
||
col_name:
|
||
新列名。
|
||
projected_crs:
|
||
用于面积计算的投影 CRS(需为平面坐标系)。
|
||
"""
|
||
gdf = gdf.copy()
|
||
if gdf.crs is None or not gdf.crs.is_projected:
|
||
projected = gdf.to_crs(projected_crs)
|
||
else:
|
||
projected = gdf
|
||
gdf[col_name] = projected.geometry.area
|
||
return gdf
|