""" geo_tools.core.geometry ~~~~~~~~~~~~~~~~~~~~~~~ 基于 Shapely 2.x 的几何运算工具函数。 """ from __future__ import annotations from typing import Literal, Sequence import shapely from shapely.geometry import ( LinearRing, LineString, MultiLineString, MultiPoint, MultiPolygon, Point, Polygon, ) from shapely.geometry.base import BaseGeometry from app.utils.logger import get_logger from app.utils.validators import ensure_valid_geometry logger = get_logger(__name__) # ── 几何有效性 ──────────────────────────────────────────────────────────────── def is_valid_geometry(geom: BaseGeometry | None) -> bool: """判断几何对象是否有效(非空且通过 Shapely 合法性检查)。 Parameters ---------- geom: 输入几何对象,可为 None。 Returns ------- bool 如果几何对象有效且非空,返回 True;否则返回 False。 """ if geom is None: return False return bool(geom.is_valid and not geom.is_empty) def fix_geometry(geom: BaseGeometry | None) -> BaseGeometry | None: """尝试修复无效几何。 依次尝试: 1. ``buffer(0)`` — 适合大多数自相交多边形 2. ``make_valid``(Shapely 2.x)— 覆盖更多情形 Parameters ---------- geom: 输入几何对象,可为 None。 Returns ------- BaseGeometry | None 修复后的几何;无法修复时返回 ``None``。 Notes ----- 对于复杂的无效几何,可能无法完全修复,此时会返回 None。 """ if geom is None: return None if geom.is_valid: return geom # 方法一:buffer(0) try: fixed = geom.buffer(0) if fixed.is_valid and not fixed.is_empty: return fixed except Exception: pass # 方法二:shapely.make_valid(Shapely >= 1.8) try: fixed = shapely.make_valid(geom) if fixed.is_valid and not fixed.is_empty: return fixed except Exception: pass logger.warning("无法修复几何:%r", geom.geom_type) return None def explain_validity(geom: BaseGeometry) -> str: """返回 Shapely 对该几何的有效性说明(英文)。 Parameters ---------- geom: 输入几何对象。 Returns ------- str Shapely 生成的有效性说明字符串。 """ from shapely.validation import explain_validity as _explain return _explain(geom) # ── 基础几何运算 ─────────────────────────────────────────────────────────────── def buffer_geometry( geom: BaseGeometry, distance: float, cap_style: Literal["round", "square", "flat"] = "round", join_style: Literal["round", "mitre", "bevel"] = "round", resolution: int = 16, verbose: bool = False, ) -> BaseGeometry | None: """对几何对象执行缓冲区运算。 Parameters ---------- geom: 输入几何。 distance: 缓冲距离(单位与 CRS 一致;地理坐标系单位为度)。 cap_style: 端头样式:"round"(圆角)、"square"(方角)、"flat"(平角)(仅线要素有效)。 join_style: 转角样式:"round"(圆角)、"mitre"(斜角)、"bevel"(尖角)。 resolution: 圆弧逼近精度(段数),默认 16。 verbose: 是否输出修复几何的警告信息,默认 False。 Returns ------- BaseGeometry | None 缓冲区运算后的几何对象;失败时返回 None。 """ try: geom = ensure_valid_geometry(geom, verbose) return geom.buffer(distance, cap_style=cap_style, join_style=join_style, resolution=resolution) except Exception as e: logger.warning(f"缓冲区计算失败,已跳过,原因:{str(e)}") return None def centroid(geom: BaseGeometry, verbose: bool = False) -> Point | None: """返回几何的质心点。 Parameters ---------- geom: 输入几何。 verbose: 是否输出修复几何的警告信息,默认 False。 Returns ------- Point | None 几何的质心点;失败时返回 None。 """ try: geom = ensure_valid_geometry(geom, verbose) return geom.centroid except Exception as e: logger.warning(f"质心计算失败,已跳过,原因:{str(e)}") return None def bounding_box(geom: BaseGeometry, verbose: bool = False) -> Polygon | None: """返回几何的最小外接矩形(BBOX)为多边形。 Parameters ---------- geom: 输入几何。 verbose: 是否输出修复几何的警告信息,默认 False。 Returns ------- Polygon | None 最小外接矩形多边形;失败时返回 None。 """ try: geom = ensure_valid_geometry(geom, verbose) from shapely.geometry import box return box(*geom.bounds) except Exception as e: logger.warning(f"最小外接矩形计算失败,已跳过,原因:{str(e)}") return None def convex_hull(geom: BaseGeometry, verbose: bool = False) -> BaseGeometry | None: """返回几何的凸包。 Parameters ---------- geom: 输入几何。 verbose: 是否输出修复几何的警告信息,默认 False。 Returns ------- BaseGeometry | None 几何的凸包;失败时返回 None。 """ try: geom = ensure_valid_geometry(geom, verbose) return geom.convex_hull except Exception as e: logger.warning(f"凸包计算失败,已跳过,原因:{str(e)}") return None # ── 集合运算 ────────────────────────────────────────────────────────────────── def intersect(geom_a: BaseGeometry, geom_b: BaseGeometry, verbose: bool = False) -> BaseGeometry | None: """返回两几何的交集。 Parameters ---------- geom_a: 第一个几何。 geom_b: 第二个几何。 verbose: 是否输出修复几何的警告信息,默认 False。 Returns ------- BaseGeometry | None 两几何的交集;失败时返回 None。 """ try: geom_a = ensure_valid_geometry(geom_a, verbose) geom_b = ensure_valid_geometry(geom_b, verbose) return geom_a.intersection(geom_b) except Exception as e: logger.warning(f"交集计算失败,已跳过,原因:{str(e)}") return None def union(geom_a: BaseGeometry, geom_b: BaseGeometry, verbose: bool = False) -> BaseGeometry | None: """返回两几何的并集。 Parameters ---------- geom_a: 第一个几何。 geom_b: 第二个几何。 verbose: 是否输出修复几何的警告信息,默认 False。 Returns ------- BaseGeometry | None 两几何的并集;失败时返回 None。 """ try: geom_a = ensure_valid_geometry(geom_a, verbose) geom_b = ensure_valid_geometry(geom_b, verbose) return geom_a.union(geom_b) except Exception as e: logger.warning(f"并集计算失败,已跳过,原因:{str(e)}") return None def difference(geom_a: BaseGeometry, geom_b: BaseGeometry, verbose: bool = False) -> BaseGeometry | None: """返回 ``geom_a`` 减去 ``geom_b`` 的差集。 Parameters ---------- geom_a: 第一个几何。 geom_b: 第二个几何。 verbose: 是否输出修复几何的警告信息,默认 False。 Returns ------- BaseGeometry | None geom_a 减去 geom_b 的差集;失败时返回 None。 """ try: geom_a = ensure_valid_geometry(geom_a, verbose) geom_b = ensure_valid_geometry(geom_b, verbose) return geom_a.difference(geom_b) except Exception as e: logger.warning(f"差集计算失败,已跳过,原因:{str(e)}") return None def symmetric_difference(geom_a: BaseGeometry, geom_b: BaseGeometry, verbose: bool = False) -> BaseGeometry | None: """返回两几何的对称差集(异或)。 Parameters ---------- geom_a: 第一个几何。 geom_b: 第二个几何。 verbose: 是否输出修复几何的警告信息,默认 False。 Returns ------- BaseGeometry | None 两几何的对称差集;失败时返回 None。 """ try: geom_a = ensure_valid_geometry(geom_a, verbose) geom_b = ensure_valid_geometry(geom_b, verbose) return geom_a.symmetric_difference(geom_b) except Exception as e: logger.warning(f"对称差集计算失败,已跳过,原因:{str(e)}") return None def unary_union(geoms: Sequence[BaseGeometry], verbose: bool = False) -> BaseGeometry | None: """将多个几何合并为一个(等同于逐一 union)。 Parameters ---------- geoms: 几何对象序列。 verbose: 是否输出修复几何的警告信息,默认 False。 Returns ------- BaseGeometry | None 合并后的几何对象;失败时返回 None。 """ try: # 检查几何有效性并尝试修复 fixed_geoms = [] for i, geom in enumerate(geoms): try: fixed = ensure_valid_geometry(geom, verbose) if fixed is not None: fixed_geoms.append(fixed) else: # 无法修复的几何跳过 if verbose: logger.warning(f"几何对象 {i} 无效且无法修复,已跳过") except Exception as e: logger.warning(f"几何对象 {i} 处理失败,已跳过,原因:{str(e)}") if not fixed_geoms: logger.warning("没有有效的几何对象可合并") return None return shapely.unary_union(fixed_geoms) except Exception as e: logger.warning(f"几何合并失败,已跳过,原因:{str(e)}") return None # ── 空间关系判断 ─────────────────────────────────────────────────────────────── def contains(geom_a: BaseGeometry, geom_b: BaseGeometry, verbose: bool = False) -> bool: """判断 ``geom_a`` 是否完全包含 ``geom_b``。 Parameters ---------- geom_a: 第一个几何。 geom_b: 第二个几何。 verbose: 是否输出修复几何的警告信息,默认 False。 Returns ------- bool 如果 geom_a 完全包含 geom_b,返回 True;否则返回 False;失败时返回 False。 """ try: geom_a = ensure_valid_geometry(geom_a, verbose) geom_b = ensure_valid_geometry(geom_b, verbose) return bool(geom_a.contains(geom_b)) except Exception as e: logger.warning(f"包含关系判断失败,已返回 False,原因:{str(e)}") return False def within(geom_a: BaseGeometry, geom_b: BaseGeometry, verbose: bool = False) -> bool: """判断 ``geom_a`` 是否完全在 ``geom_b`` 内。 Parameters ---------- geom_a: 第一个几何。 geom_b: 第二个几何。 verbose: 是否输出修复几何的警告信息,默认 False。 Returns ------- bool 如果 geom_a 完全在 geom_b 内,返回 True;否则返回 False;失败时返回 False。 """ try: geom_a = ensure_valid_geometry(geom_a, verbose) geom_b = ensure_valid_geometry(geom_b, verbose) return bool(geom_a.within(geom_b)) except Exception as e: logger.warning(f"包含于关系判断失败,已返回 False,原因:{str(e)}") return False def intersects(geom_a: BaseGeometry, geom_b: BaseGeometry, verbose: bool = False) -> bool: """判断两几何是否相交(含边界接触)。 Parameters ---------- geom_a: 第一个几何。 geom_b: 第二个几何。 verbose: 是否输出修复几何的警告信息,默认 False。 Returns ------- bool 如果两几何相交,返回 True;否则返回 False;失败时返回 False。 """ try: geom_a = ensure_valid_geometry(geom_a, verbose) geom_b = ensure_valid_geometry(geom_b, verbose) return bool(geom_a.intersects(geom_b)) except Exception as e: logger.warning(f"相交关系判断失败,已返回 False,原因:{str(e)}") return False def distance_between(geom_a: BaseGeometry, geom_b: BaseGeometry, verbose: bool = False) -> float: """计算两几何间的最小距离(单位与 CRS 一致)。 Parameters ---------- geom_a: 第一个几何。 geom_b: 第二个几何。 verbose: 是否输出修复几何的警告信息,默认 False。 Returns ------- float 两几何间的最小距离;失败时返回无穷大。 """ try: geom_a = ensure_valid_geometry(geom_a, verbose) geom_b = ensure_valid_geometry(geom_b, verbose) return geom_a.distance(geom_b) except Exception as e: logger.warning(f"距离计算失败,已返回无穷大,原因:{str(e)}") return float('inf')