470 lines
14 KiB
Python
470 lines
14 KiB
Python
"""
|
||
geo_tools.core.geometry
|
||
~~~~~~~~~~~~~~~~~~~~~~~
|
||
基于 Shapely 2.x 的几何运算工具函数。
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
from typing import Literal, Sequence
|
||
|
||
import shapely
|
||
from shapely.geometry import (
|
||
LinearRing,
|
||
LineString,
|
||
MultiLineString,
|
||
MultiPoint,
|
||
MultiPolygon,
|
||
Point,
|
||
Polygon,
|
||
)
|
||
from shapely.geometry.base import BaseGeometry
|
||
|
||
from app.utils.logger import get_logger
|
||
from app.utils.validators import ensure_valid_geometry
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
|
||
# ── 几何有效性 ────────────────────────────────────────────────────────────────
|
||
|
||
def is_valid_geometry(geom: BaseGeometry | None) -> bool:
|
||
"""判断几何对象是否有效(非空且通过 Shapely 合法性检查)。
|
||
|
||
Parameters
|
||
----------
|
||
geom:
|
||
输入几何对象,可为 None。
|
||
|
||
Returns
|
||
-------
|
||
bool
|
||
如果几何对象有效且非空,返回 True;否则返回 False。
|
||
"""
|
||
if geom is None:
|
||
return False
|
||
return bool(geom.is_valid and not geom.is_empty)
|
||
|
||
|
||
def fix_geometry(geom: BaseGeometry | None) -> BaseGeometry | None:
|
||
"""尝试修复无效几何。
|
||
|
||
依次尝试:
|
||
1. ``buffer(0)`` — 适合大多数自相交多边形
|
||
2. ``make_valid``(Shapely 2.x)— 覆盖更多情形
|
||
|
||
Parameters
|
||
----------
|
||
geom:
|
||
输入几何对象,可为 None。
|
||
|
||
Returns
|
||
-------
|
||
BaseGeometry | None
|
||
修复后的几何;无法修复时返回 ``None``。
|
||
|
||
Notes
|
||
-----
|
||
对于复杂的无效几何,可能无法完全修复,此时会返回 None。
|
||
"""
|
||
if geom is None:
|
||
return None
|
||
if geom.is_valid:
|
||
return geom
|
||
|
||
# 方法一:buffer(0)
|
||
try:
|
||
fixed = geom.buffer(0)
|
||
if fixed.is_valid and not fixed.is_empty:
|
||
return fixed
|
||
except Exception:
|
||
pass
|
||
|
||
# 方法二:shapely.make_valid(Shapely >= 1.8)
|
||
try:
|
||
fixed = shapely.make_valid(geom)
|
||
if fixed.is_valid and not fixed.is_empty:
|
||
return fixed
|
||
except Exception:
|
||
pass
|
||
|
||
logger.warning("无法修复几何:%r", geom.geom_type)
|
||
return None
|
||
|
||
|
||
def explain_validity(geom: BaseGeometry) -> str:
|
||
"""返回 Shapely 对该几何的有效性说明(英文)。
|
||
|
||
Parameters
|
||
----------
|
||
geom:
|
||
输入几何对象。
|
||
|
||
Returns
|
||
-------
|
||
str
|
||
Shapely 生成的有效性说明字符串。
|
||
"""
|
||
from shapely.validation import explain_validity as _explain
|
||
return _explain(geom)
|
||
|
||
|
||
# ── 基础几何运算 ───────────────────────────────────────────────────────────────
|
||
|
||
def buffer_geometry(
|
||
geom: BaseGeometry,
|
||
distance: float,
|
||
cap_style: Literal["round", "square", "flat"] = "round",
|
||
join_style: Literal["round", "mitre", "bevel"] = "round",
|
||
resolution: int = 16,
|
||
verbose: bool = False,
|
||
) -> BaseGeometry | None:
|
||
"""对几何对象执行缓冲区运算。
|
||
|
||
Parameters
|
||
----------
|
||
geom:
|
||
输入几何。
|
||
distance:
|
||
缓冲距离(单位与 CRS 一致;地理坐标系单位为度)。
|
||
cap_style:
|
||
端头样式:"round"(圆角)、"square"(方角)、"flat"(平角)(仅线要素有效)。
|
||
join_style:
|
||
转角样式:"round"(圆角)、"mitre"(斜角)、"bevel"(尖角)。
|
||
resolution:
|
||
圆弧逼近精度(段数),默认 16。
|
||
verbose:
|
||
是否输出修复几何的警告信息,默认 False。
|
||
|
||
Returns
|
||
-------
|
||
BaseGeometry | None
|
||
缓冲区运算后的几何对象;失败时返回 None。
|
||
"""
|
||
try:
|
||
geom = ensure_valid_geometry(geom, verbose)
|
||
return geom.buffer(distance, cap_style=cap_style, join_style=join_style, resolution=resolution)
|
||
except Exception as e:
|
||
logger.warning(f"缓冲区计算失败,已跳过,原因:{str(e)}")
|
||
return None
|
||
|
||
|
||
def centroid(geom: BaseGeometry, verbose: bool = False) -> Point | None:
|
||
"""返回几何的质心点。
|
||
|
||
Parameters
|
||
----------
|
||
geom:
|
||
输入几何。
|
||
verbose:
|
||
是否输出修复几何的警告信息,默认 False。
|
||
|
||
Returns
|
||
-------
|
||
Point | None
|
||
几何的质心点;失败时返回 None。
|
||
"""
|
||
try:
|
||
geom = ensure_valid_geometry(geom, verbose)
|
||
return geom.centroid
|
||
except Exception as e:
|
||
logger.warning(f"质心计算失败,已跳过,原因:{str(e)}")
|
||
return None
|
||
|
||
|
||
def bounding_box(geom: BaseGeometry, verbose: bool = False) -> Polygon | None:
|
||
"""返回几何的最小外接矩形(BBOX)为多边形。
|
||
|
||
Parameters
|
||
----------
|
||
geom:
|
||
输入几何。
|
||
verbose:
|
||
是否输出修复几何的警告信息,默认 False。
|
||
|
||
Returns
|
||
-------
|
||
Polygon | None
|
||
最小外接矩形多边形;失败时返回 None。
|
||
"""
|
||
try:
|
||
geom = ensure_valid_geometry(geom, verbose)
|
||
from shapely.geometry import box
|
||
return box(*geom.bounds)
|
||
except Exception as e:
|
||
logger.warning(f"最小外接矩形计算失败,已跳过,原因:{str(e)}")
|
||
return None
|
||
|
||
|
||
def convex_hull(geom: BaseGeometry, verbose: bool = False) -> BaseGeometry | None:
|
||
"""返回几何的凸包。
|
||
|
||
Parameters
|
||
----------
|
||
geom:
|
||
输入几何。
|
||
verbose:
|
||
是否输出修复几何的警告信息,默认 False。
|
||
|
||
Returns
|
||
-------
|
||
BaseGeometry | None
|
||
几何的凸包;失败时返回 None。
|
||
"""
|
||
try:
|
||
geom = ensure_valid_geometry(geom, verbose)
|
||
return geom.convex_hull
|
||
except Exception as e:
|
||
logger.warning(f"凸包计算失败,已跳过,原因:{str(e)}")
|
||
return None
|
||
|
||
|
||
# ── 集合运算 ──────────────────────────────────────────────────────────────────
|
||
|
||
def intersect(geom_a: BaseGeometry, geom_b: BaseGeometry, verbose: bool = False) -> BaseGeometry | None:
|
||
"""返回两几何的交集。
|
||
|
||
Parameters
|
||
----------
|
||
geom_a:
|
||
第一个几何。
|
||
geom_b:
|
||
第二个几何。
|
||
verbose:
|
||
是否输出修复几何的警告信息,默认 False。
|
||
|
||
Returns
|
||
-------
|
||
BaseGeometry | None
|
||
两几何的交集;失败时返回 None。
|
||
"""
|
||
try:
|
||
geom_a = ensure_valid_geometry(geom_a, verbose)
|
||
geom_b = ensure_valid_geometry(geom_b, verbose)
|
||
return geom_a.intersection(geom_b)
|
||
except Exception as e:
|
||
logger.warning(f"交集计算失败,已跳过,原因:{str(e)}")
|
||
return None
|
||
|
||
|
||
def union(geom_a: BaseGeometry, geom_b: BaseGeometry, verbose: bool = False) -> BaseGeometry | None:
|
||
"""返回两几何的并集。
|
||
|
||
Parameters
|
||
----------
|
||
geom_a:
|
||
第一个几何。
|
||
geom_b:
|
||
第二个几何。
|
||
verbose:
|
||
是否输出修复几何的警告信息,默认 False。
|
||
|
||
Returns
|
||
-------
|
||
BaseGeometry | None
|
||
两几何的并集;失败时返回 None。
|
||
"""
|
||
try:
|
||
geom_a = ensure_valid_geometry(geom_a, verbose)
|
||
geom_b = ensure_valid_geometry(geom_b, verbose)
|
||
return geom_a.union(geom_b)
|
||
except Exception as e:
|
||
logger.warning(f"并集计算失败,已跳过,原因:{str(e)}")
|
||
return None
|
||
|
||
|
||
def difference(geom_a: BaseGeometry, geom_b: BaseGeometry, verbose: bool = False) -> BaseGeometry | None:
|
||
"""返回 ``geom_a`` 减去 ``geom_b`` 的差集。
|
||
|
||
Parameters
|
||
----------
|
||
geom_a:
|
||
第一个几何。
|
||
geom_b:
|
||
第二个几何。
|
||
verbose:
|
||
是否输出修复几何的警告信息,默认 False。
|
||
|
||
Returns
|
||
-------
|
||
BaseGeometry | None
|
||
geom_a 减去 geom_b 的差集;失败时返回 None。
|
||
"""
|
||
try:
|
||
geom_a = ensure_valid_geometry(geom_a, verbose)
|
||
geom_b = ensure_valid_geometry(geom_b, verbose)
|
||
return geom_a.difference(geom_b)
|
||
except Exception as e:
|
||
logger.warning(f"差集计算失败,已跳过,原因:{str(e)}")
|
||
return None
|
||
|
||
|
||
def symmetric_difference(geom_a: BaseGeometry, geom_b: BaseGeometry, verbose: bool = False) -> BaseGeometry | None:
|
||
"""返回两几何的对称差集(异或)。
|
||
|
||
Parameters
|
||
----------
|
||
geom_a:
|
||
第一个几何。
|
||
geom_b:
|
||
第二个几何。
|
||
verbose:
|
||
是否输出修复几何的警告信息,默认 False。
|
||
|
||
Returns
|
||
-------
|
||
BaseGeometry | None
|
||
两几何的对称差集;失败时返回 None。
|
||
"""
|
||
try:
|
||
geom_a = ensure_valid_geometry(geom_a, verbose)
|
||
geom_b = ensure_valid_geometry(geom_b, verbose)
|
||
return geom_a.symmetric_difference(geom_b)
|
||
except Exception as e:
|
||
logger.warning(f"对称差集计算失败,已跳过,原因:{str(e)}")
|
||
return None
|
||
|
||
|
||
def unary_union(geoms: Sequence[BaseGeometry], verbose: bool = False) -> BaseGeometry | None:
|
||
"""将多个几何合并为一个(等同于逐一 union)。
|
||
|
||
Parameters
|
||
----------
|
||
geoms:
|
||
几何对象序列。
|
||
verbose:
|
||
是否输出修复几何的警告信息,默认 False。
|
||
|
||
Returns
|
||
-------
|
||
BaseGeometry | None
|
||
合并后的几何对象;失败时返回 None。
|
||
"""
|
||
try:
|
||
# 检查几何有效性并尝试修复
|
||
fixed_geoms = []
|
||
for i, geom in enumerate(geoms):
|
||
try:
|
||
fixed = ensure_valid_geometry(geom, verbose)
|
||
if fixed is not None:
|
||
fixed_geoms.append(fixed)
|
||
else:
|
||
# 无法修复的几何跳过
|
||
if verbose:
|
||
logger.warning(f"几何对象 {i} 无效且无法修复,已跳过")
|
||
except Exception as e:
|
||
logger.warning(f"几何对象 {i} 处理失败,已跳过,原因:{str(e)}")
|
||
if not fixed_geoms:
|
||
logger.warning("没有有效的几何对象可合并")
|
||
return None
|
||
return shapely.unary_union(fixed_geoms)
|
||
except Exception as e:
|
||
logger.warning(f"几何合并失败,已跳过,原因:{str(e)}")
|
||
return None
|
||
|
||
|
||
# ── 空间关系判断 ───────────────────────────────────────────────────────────────
|
||
|
||
def contains(geom_a: BaseGeometry, geom_b: BaseGeometry, verbose: bool = False) -> bool:
|
||
"""判断 ``geom_a`` 是否完全包含 ``geom_b``。
|
||
|
||
Parameters
|
||
----------
|
||
geom_a:
|
||
第一个几何。
|
||
geom_b:
|
||
第二个几何。
|
||
verbose:
|
||
是否输出修复几何的警告信息,默认 False。
|
||
|
||
Returns
|
||
-------
|
||
bool
|
||
如果 geom_a 完全包含 geom_b,返回 True;否则返回 False;失败时返回 False。
|
||
"""
|
||
try:
|
||
geom_a = ensure_valid_geometry(geom_a, verbose)
|
||
geom_b = ensure_valid_geometry(geom_b, verbose)
|
||
return bool(geom_a.contains(geom_b))
|
||
except Exception as e:
|
||
logger.warning(f"包含关系判断失败,已返回 False,原因:{str(e)}")
|
||
return False
|
||
|
||
|
||
def within(geom_a: BaseGeometry, geom_b: BaseGeometry, verbose: bool = False) -> bool:
|
||
"""判断 ``geom_a`` 是否完全在 ``geom_b`` 内。
|
||
|
||
Parameters
|
||
----------
|
||
geom_a:
|
||
第一个几何。
|
||
geom_b:
|
||
第二个几何。
|
||
verbose:
|
||
是否输出修复几何的警告信息,默认 False。
|
||
|
||
Returns
|
||
-------
|
||
bool
|
||
如果 geom_a 完全在 geom_b 内,返回 True;否则返回 False;失败时返回 False。
|
||
"""
|
||
try:
|
||
geom_a = ensure_valid_geometry(geom_a, verbose)
|
||
geom_b = ensure_valid_geometry(geom_b, verbose)
|
||
return bool(geom_a.within(geom_b))
|
||
except Exception as e:
|
||
logger.warning(f"包含于关系判断失败,已返回 False,原因:{str(e)}")
|
||
return False
|
||
|
||
|
||
def intersects(geom_a: BaseGeometry, geom_b: BaseGeometry, verbose: bool = False) -> bool:
|
||
"""判断两几何是否相交(含边界接触)。
|
||
|
||
Parameters
|
||
----------
|
||
geom_a:
|
||
第一个几何。
|
||
geom_b:
|
||
第二个几何。
|
||
verbose:
|
||
是否输出修复几何的警告信息,默认 False。
|
||
|
||
Returns
|
||
-------
|
||
bool
|
||
如果两几何相交,返回 True;否则返回 False;失败时返回 False。
|
||
"""
|
||
try:
|
||
geom_a = ensure_valid_geometry(geom_a, verbose)
|
||
geom_b = ensure_valid_geometry(geom_b, verbose)
|
||
return bool(geom_a.intersects(geom_b))
|
||
except Exception as e:
|
||
logger.warning(f"相交关系判断失败,已返回 False,原因:{str(e)}")
|
||
return False
|
||
|
||
|
||
def distance_between(geom_a: BaseGeometry, geom_b: BaseGeometry, verbose: bool = False) -> float:
|
||
"""计算两几何间的最小距离(单位与 CRS 一致)。
|
||
|
||
Parameters
|
||
----------
|
||
geom_a:
|
||
第一个几何。
|
||
geom_b:
|
||
第二个几何。
|
||
verbose:
|
||
是否输出修复几何的警告信息,默认 False。
|
||
|
||
Returns
|
||
-------
|
||
float
|
||
两几何间的最小距离;失败时返回无穷大。
|
||
"""
|
||
try:
|
||
geom_a = ensure_valid_geometry(geom_a, verbose)
|
||
geom_b = ensure_valid_geometry(geom_b, verbose)
|
||
return geom_a.distance(geom_b)
|
||
except Exception as e:
|
||
logger.warning(f"距离计算失败,已返回无穷大,原因:{str(e)}")
|
||
return float('inf')
|